Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a61618b3ec8f777b65b274ea3e206a681be8290e
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006 Ahmed Harbaoui.                                       */
5 /* Copyright (c) 2006 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras.h"
12 #include "xbt/matrix.h"
13 #define PROC_MATRIX_SIZE 3
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
16
17 #define DATA_MATRIX_SIZE 9
18 const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
19
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
21
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result,struct s_result {
24   int linepos;
25   int rowpos;
26   xbt_matrix_t C GRAS_ANNOTE(subtype,double);
27 });
28 typedef struct s_result result_t;
29
30 /* struct to send initial data to slave */
31 GRAS_DEFINE_TYPE(s_assignment,struct s_assignment {
32   int linepos;
33   int rowpos;
34   xbt_host_t line[NEIGHBOR_COUNT];
35   xbt_host_t row[NEIGHBOR_COUNT];
36   xbt_matrix_t A GRAS_ANNOTE(subtype,double);
37   xbt_matrix_t B GRAS_ANNOTE(subtype,double);
38 });
39 typedef struct s_assignment s_assignment_t;
40
41 /* register messages which may be sent (common to client and server) */
42 static void register_messages(void) {
43   gras_datadesc_type_t result_type;
44   gras_datadesc_type_t assignment_type;
45
46   gras_datadesc_set_const("NEIGHBOR_COUNT",NEIGHBOR_COUNT);
47   result_type=gras_datadesc_by_symbol(s_result);
48   assignment_type=gras_datadesc_by_symbol(s_assignment);
49         
50   /* receive a final result from slave */
51   gras_msgtype_declare("result", result_type);
52
53   /* send from master to slave to assign a position and some data */
54   gras_msgtype_declare("assignment", assignment_type);
55
56   /* send data between slaves */
57   gras_msgtype_declare("dataA", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
58   gras_msgtype_declare("dataB", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
59 }
60
61 /* Function prototypes */
62 int slave (int argc,char *argv[]);
63 int master (int argc,char *argv[]);
64
65
66 /* **********************************************************************
67  * master code
68  * **********************************************************************/
69
70 /* Global private data */
71 typedef struct {
72   int nbr_row,nbr_line;
73   int remaining_step;
74   int remaining_ack;
75 } master_data_t;
76
77
78 /***  Function Scatter Sequentiel ***/
79
80 static void scatter(){
81
82 }/* end_of_Scatter */
83
84 /***  Function: Scatter // ***/
85
86 static void scatter_parl(){
87
88 }/* end_of_Scatter // */
89
90 /***  Function: multiplication ***/
91
92 static void multiplication(){
93
94 }/* end_of_multiplication */
95
96 /***  Function: gather ***/
97
98 static void gather(){
99
100 }/* end_of_gather */
101
102 int master (int argc,char *argv[]) {
103
104   int i,port;
105
106   xbt_matrix_t A,B,C;
107   result_t result;
108
109   gras_socket_t from;
110
111
112   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
113   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
114
115   /*  Init the GRAS's infrastructure */
116   gras_init(&argc, argv);
117   register_messages();
118       
119   /*  Initialize Matrices */
120   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
121   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
122   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
123         
124   /*  Get arguments and create sockets */
125   port=atoi(argv[1]);
126   //scatter();
127   //scatter_parl();
128   //multiplication();
129   //gather();
130   /************************* Init Data Send *********************************/
131   gras_os_sleep(2);
132
133   for( i=0; i+1<argc && i<SLAVE_COUNT;i++){
134     grid[i]=xbt_host_from_string(argv[i+1]);
135     socket[i]=gras_socket_client(grid[i]->name,grid[i]->port);
136       
137     INFO2("Connected to %s:%d.",grid[i]->name,grid[i]->port);
138   }
139   xbt_assert2(i==SLAVE_COUNT,
140               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
141               i,SLAVE_COUNT);
142   /* FIXME: let the surnumerous slave die properly */
143  
144   int row=0, line=0;
145   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
146   for(i=0 ; i<SLAVE_COUNT; i++){
147     s_assignment_t assignment;
148     int j,k;
149
150     assignment.linepos=line; // assigned line
151     assignment.rowpos=row;   // assigned row
152
153     /* Neiborhood */
154     for (j=0,k=0; j<PROC_MATRIX_SIZE; j++) {
155       if (i != j*PROC_MATRIX_SIZE+(row)) {          
156          assignment.row[k] = grid[ j*PROC_MATRIX_SIZE+(row) ] ;
157          k++;
158       }
159     }
160     for (j=0,k=0; j<PROC_MATRIX_SIZE; j++) {
161       if (i != (line)*PROC_MATRIX_SIZE+j) {         
162          assignment.line[k] =  grid[ (line)*PROC_MATRIX_SIZE+j ] ;
163          k++;
164       }
165     }
166
167     assignment.A=xbt_matrix_new_sub(A,
168                                     submatrix_size,submatrix_size,
169                                     submatrix_size*line,submatrix_size*row,
170                                     NULL);
171     assignment.B=xbt_matrix_new_sub(B,
172                                     submatrix_size,submatrix_size,
173                                     submatrix_size*line,submatrix_size*row,
174                                     NULL);
175     row++;
176     if (row >= PROC_MATRIX_SIZE) {
177       row=0;
178       line++;
179     }
180                 
181     gras_msg_send(socket[i],gras_msgtype_by_name("assignment"),&assignment);
182     xbt_matrix_free(assignment.A);
183     xbt_matrix_free(assignment.B);
184   }
185   // end assignment
186
187   /******************************* multiplication ********************************/
188   /* wait for results */
189   for( i=0;i< SLAVE_COUNT;i++){
190     gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result);
191     VERB2("%d slaves are done already. Waiting for %d",i+1, SLAVE_COUNT);
192     xbt_matrix_copy_values(C,result.C,   submatrix_size,submatrix_size,
193                            submatrix_size*result.linepos,
194                            submatrix_size*result.rowpos,
195                            0,0,NULL);
196     xbt_matrix_free(result.C);
197   }
198   /*    end of gather   */
199   if (DATA_MATRIX_SIZE < 30) {
200      INFO0 ("The Result of Multiplication is :");
201      xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double);
202   } else {
203      INFO1("Matrix size too big (%d>30) to be displayed here",DATA_MATRIX_SIZE);
204   }
205
206   for(i=0; i<SLAVE_COUNT; i++) {
207      gras_socket_close(socket[i]);
208      xbt_host_free(grid[i]);
209   }
210    
211   xbt_matrix_free(A);
212   xbt_matrix_free(B);
213   xbt_matrix_free(C);
214   gras_exit();
215   return 0;
216 } /* end_of_master */
217
218 /* **********************************************************************
219  * slave code
220  * **********************************************************************/
221
222 int slave(int argc,char *argv[]) {
223
224   xbt_ex_t e; 
225
226   int step,l;
227   xbt_matrix_t bA=xbt_matrix_new(submatrix_size,submatrix_size,
228                                  sizeof(double),NULL);
229   xbt_matrix_t bB=xbt_matrix_new(submatrix_size,submatrix_size,
230                                  sizeof(double),NULL);
231
232   int myline,myrow;
233   xbt_matrix_t mydataA,mydataB;
234   xbt_matrix_t bC=xbt_matrix_double_new_zeros(submatrix_size,submatrix_size);
235   
236   result_t result;
237  
238   gras_socket_t from,sock;  /* to exchange data with my neighbor */
239   gras_socket_t master;     /* for the barrier */
240
241   /* sockets for brodcast to other slave */
242   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
243   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
244   memset(socket_line,0,sizeof(socket_line));
245   memset(socket_row,0,sizeof(socket_row));
246    
247   /* Init the GRAS's infrastructure */
248   gras_init(&argc, argv);
249
250   /*  Create my master socket */
251   sock = gras_socket_server(atoi(argv[1]));
252   int i;
253
254   /*  Register the known messages */
255   register_messages();
256
257   /* Recover my initialized Data and My Position*/
258   s_assignment_t assignment;
259   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],gras_os_myport());
260   TRY {
261     gras_msg_wait(600,gras_msgtype_by_name("assignment"),&master,&assignment);
262   } CATCH(e) {
263     RETHROW0("Can't get my assignment from master : %s");
264   }
265   myline  = assignment.linepos;
266   myrow   = assignment.rowpos;
267   mydataA = assignment.A;
268   mydataB = assignment.B;
269
270   INFO2("Receive my pos (%d,%d) and assignment",myline,myrow);
271
272   /* Get my neighborhood from the assignment message (skipping myself) */
273   for (i=0 ; i<PROC_MATRIX_SIZE-1 ; i++){
274     socket_line[i]=gras_socket_client(assignment.line[i]->name,
275                                       assignment.line[i]->port);
276     xbt_host_free(assignment.line[i]);
277   }
278   for (i=0 ; i<PROC_MATRIX_SIZE-1 ; i++){
279     socket_row[i]=gras_socket_client(assignment.row[i]->name,
280                                      assignment.row[i]->port);
281     xbt_host_free(assignment.row[i]);    
282   }
283
284   for (step=0; step<PROC_MATRIX_SIZE;step++) {
285         
286     /* a line brodcast */
287     if(myline==step){
288        INFO3("LINE: step(%d) = Myline(%d). Broadcast my data (myport=%d).",
289              step,myline,gras_os_myport());
290        for (l=0;l < PROC_MATRIX_SIZE-1 ;l++) {
291           INFO2("LINE:   Send to %s:%d",
292                 gras_socket_peer_name(socket_row[l]),
293                 gras_socket_peer_port(socket_row[l]));
294          gras_msg_send(socket_row[l], 
295                        gras_msgtype_by_name("dataB"), 
296                        &mydataB);
297        }
298        
299         
300        xbt_matrix_free(bB);
301        bB = xbt_matrix_new_sub(mydataB,
302                                submatrix_size,submatrix_size,
303                                0,0,NULL);       
304     } else {
305       TRY {
306         xbt_matrix_free(bB);
307         gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
308       } CATCH(e) {
309         RETHROW0("Can't get a data message from line : %s");
310       }
311       INFO4("LINE: step(%d) <> Myline(%d). Receive data from %s:%d",step,myline,
312             gras_socket_peer_name(from), gras_socket_peer_port(from));
313     }
314
315     /* a row brodcast */
316     if (myrow==step) { 
317        INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow);
318        for (l=1;l < PROC_MATRIX_SIZE ; l++) {
319           INFO2("ROW:   Send to %s:%d",
320                 gras_socket_peer_name(socket_line[l-1]),
321                 gras_socket_peer_port(socket_line[l-1]));
322           gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
323        }
324        xbt_matrix_free(bA);
325        bA = xbt_matrix_new_sub(mydataA,
326                                submatrix_size,submatrix_size,
327                                0,0,NULL);
328     } else {
329       TRY {
330         xbt_matrix_free(bA);
331         gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
332       } CATCH(e) {
333         RETHROW0("Can't get a data message from row : %s");
334       }
335       INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s",step,myrow,
336             gras_socket_peer_name(from));
337     }
338     xbt_matrix_double_addmult(bA,bB,bC);
339
340   };
341  
342   /* send Result to master */  
343   result.C=bC;
344   result.linepos=myline;
345   result.rowpos=myrow;
346
347   TRY {
348     gras_msg_send(master, gras_msgtype_by_name("result"),&result);
349   } CATCH(e) {
350     RETHROW0("Failed to send PING to server: %s");
351   }
352   INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
353         gras_socket_peer_name(master),gras_socket_peer_port(master));
354   /*  Free the allocated resources, and shut GRAS down */
355
356   xbt_matrix_free(bA);
357   xbt_matrix_free(bB);
358   xbt_matrix_free(bC);
359
360   xbt_matrix_free(mydataA);
361   xbt_matrix_free(mydataB);
362   gras_socket_close(sock);
363   gras_socket_close(master);
364   gras_socket_close(from);
365    /* FIXME: some are said to be unknown
366   for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
367      if (socket_line[l])
368        gras_socket_close(socket_line[l]);
369      if (socket_row[l])
370        gras_socket_close(socket_row[l]); 
371   }*/
372    
373   gras_exit();
374   INFO0("Done.");
375   return 0;
376 } /* end_of_slave */