Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
More work on this. Looks better now, but still doesn't run properly
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006 Ahmed Harbaoui.                                       */
5 /* Copyright (c) 2006 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras.h"
12 #include "xbt/matrix.h"
13 #define PROC_MATRIX_SIZE 2
14 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
15
16 #define DATA_MATRIX_SIZE 4
17 const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
20
21 /* struct for recovering results */
22 GRAS_DEFINE_TYPE(s_result,struct s_result {
23   int linepos;
24   int rowpos;
25   xbt_matrix_t C GRAS_ANNOTE(subtype,double);
26 });
27 typedef struct s_result result_t;
28
29 /* struct to send initial data to slave */
30 GRAS_DEFINE_TYPE(s_assignment,struct s_assignment {
31   int linepos;
32   int rowpos;
33   xbt_host_t line[PROC_MATRIX_SIZE];
34   xbt_host_t row[PROC_MATRIX_SIZE];
35   xbt_matrix_t A GRAS_ANNOTE(subtype,double);
36   xbt_matrix_t B GRAS_ANNOTE(subtype,double);
37 });
38 typedef struct s_assignment s_assignment_t;
39
40 /* register messages which may be sent (common to client and server) */
41 static void register_messages(void) {
42   gras_datadesc_type_t result_type;
43   gras_datadesc_type_t assignment_type;
44
45   gras_datadesc_set_const("PROC_MATRIX_SIZE",PROC_MATRIX_SIZE);
46   result_type=gras_datadesc_by_symbol(s_result);
47   assignment_type=gras_datadesc_by_symbol(s_assignment);
48         
49   /* receive a final result from slave */
50   gras_msgtype_declare("result", result_type);
51
52   /* send from master to slave to assign a position and some data */
53   gras_msgtype_declare("assignment", assignment_type);
54
55   /* send from master to slave to ask a final result */
56   gras_msgtype_declare("ask_result", gras_datadesc_by_name("int")); 
57
58   /* send from master to slave to indicate the begining of step */
59   gras_msgtype_declare("step", gras_datadesc_by_name("int"));
60   /* send from slave to master to indicate the end of the current step */
61   gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));
62
63   /* send data between slave */
64   gras_msgtype_declare("dataA", gras_datadesc_by_name("double"));
65   /* send data between slave */
66   gras_msgtype_declare("dataB", gras_datadesc_by_name("double"));
67 }
68
69 /* Function prototypes */
70 int slave (int argc,char *argv[]);
71 int master (int argc,char *argv[]);
72
73
74 /* **********************************************************************
75  * master code
76  * **********************************************************************/
77
78 /* Global private data */
79 typedef struct {
80   int nbr_row,nbr_line;
81   int remaining_step;
82   int remaining_ack;
83 } master_data_t;
84
85
86 /***  Function Scatter Sequentiel ***/
87
88 static void scatter(){
89
90 }/* end_of_Scatter */
91
92 /***  Function: Scatter // ***/
93
94 static void scatter_parl(){
95
96 }/* end_of_Scatter // */
97
98 /***  Function: multiplication ***/
99
100 static void multiplication(){
101
102 }/* end_of_multiplication */
103
104 /***  Function: gather ***/
105
106 static void gather(){
107
108 }/* end_of_gather */
109
110 int master (int argc,char *argv[]) {
111
112   xbt_ex_t e;
113
114   int i,port,ask_result,step;
115
116   xbt_matrix_t A,B,C;
117   result_t result;
118
119   gras_socket_t from;
120
121   /*  Init the GRAS's infrastructure */
122   gras_init(&argc, argv);
123
124   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
125   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
126
127   /*  Initialize Matrices */
128
129   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
130   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
131   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
132         
133   //xbt_matrix_dump(B,"B:seq",0,xbt_matrix_dump_display_double);
134
135
136   /*  Get arguments and create sockets */
137   port=atoi(argv[1]);
138   //scatter();
139   //scatter_parl();
140   //multiplication();
141   //gather();
142   //display(A);
143   /************************* Init Data Send *********************************/
144   int step_ack;
145   gras_os_sleep(5);
146
147   for( i=1;i<argc && i<=SLAVE_COUNT;i++){
148     grid[i-1]=xbt_host_from_string(argv[i]);
149     socket[i-1]=gras_socket_client(grid[i-1]->name,grid[i-1]->port);
150       
151     INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port);
152   }
153   /* FIXME: let the surnumerous slave die properly */
154
155   int row=0, line=0;
156   for(i=0 ; i<SLAVE_COUNT; i++){
157     s_assignment_t assignment;
158     int j;
159
160     assignment.linepos=line; // assigned line
161     assignment.rowpos=row;   // assigned row
162
163     /* Neiborhood */
164     for (j=0; j<PROC_MATRIX_SIZE; j++) {
165       assignment.row[j] = grid[ j*PROC_MATRIX_SIZE+(row) ] ;
166       assignment.line[j] =  grid[ (line)*PROC_MATRIX_SIZE+j ] ;
167     }
168
169     assignment.A=xbt_matrix_new_sub(A,
170                                     submatrix_size,submatrix_size,
171                                     submatrix_size*line,submatrix_size*row,
172                                     NULL);
173     assignment.B=xbt_matrix_new_sub(B,
174                                     submatrix_size,submatrix_size,
175                                     submatrix_size*line,submatrix_size*row,
176                                     NULL);
177     //xbt_matrix_dump(assignment.B,"assignment.B",0,xbt_matrix_dump_display_double);
178     row++;
179     if (row >= PROC_MATRIX_SIZE) {
180       row=0;
181       line++;
182     }
183                 
184     gras_msg_send(socket[i],gras_msgtype_by_name("assignment"),&assignment);
185     //    INFO3("Send assignment to %s : data A= %.3g & data B= %.3g",
186     //    gras_socket_peer_name(socket[i]),mydata.a,mydata.b);
187
188   }
189   // end assignment
190
191   /******************************* multiplication ********************************/
192   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
193         
194   for (step=0; step < PROC_MATRIX_SIZE; step++){
195     for (i=0; i< SLAVE_COUNT; i++){
196       TRY {
197         gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step);
198       } CATCH(e) {
199         gras_socket_close(socket[i]);
200         RETHROW0("Unable to send the msg : %s");
201       }
202     }
203     INFO1("XXXXXX Next step (%d)",step);
204
205     /* wait for computing and slave messages exchange */
206
207     i=0;        
208     while  ( i< SLAVE_COUNT) {
209       TRY {
210         gras_msg_wait(1300,gras_msgtype_by_name("step_ack"),&from,&step_ack);
211       } CATCH(e) {
212         RETHROW0("Can't get a Ack step message from slave : %s");
213       }
214       i++;
215       DEBUG3("Got step ack from %s (got %d of %d)",
216             gras_socket_peer_name(from), i, SLAVE_COUNT);
217     }
218   }
219   /*********************************  gather ***************************************/
220
221   ask_result=0;
222   for( i=1;i< argc;i++){
223     gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
224     INFO1("Send (Ask Result) message to %s",gras_socket_peer_name(socket[i]));
225   }
226   /* wait for results */
227   for( i=1;i< argc;i++){
228     gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
229     xbt_matrix_copy_values(result.C,C,   submatrix_size,submatrix_size,
230                            submatrix_size*result.linepos,
231                            submatrix_size*result.rowpos,
232                            0,0,NULL);
233   }
234   /*    end of gather   */
235   INFO0 ("The Result of Multiplication is :");
236   xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double);
237
238   return 0;
239 } /* end_of_master */
240
241 /* **********************************************************************
242  * slave code
243  * **********************************************************************/
244
245 int slave(int argc,char *argv[]) {
246
247   xbt_ex_t e; 
248
249   int step,port,l,result_ack=0; 
250   xbt_matrix_t bA=xbt_matrix_new(submatrix_size,submatrix_size,
251                                  sizeof(double),NULL);
252   xbt_matrix_t bB=xbt_matrix_new(submatrix_size,submatrix_size,
253                                  sizeof(double),NULL);
254
255   int myline,myrow;
256   xbt_matrix_t mydataA,mydataB;
257   xbt_matrix_t bC=xbt_matrix_double_new_zeros(submatrix_size,submatrix_size);
258   
259   result_t result;
260  
261   gras_socket_t from,sock;  /* to exchange data with my neighbor */
262   gras_socket_t master;     /* for the barrier */
263
264   /* sockets for brodcast to other slave */
265   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
266   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
267
268   /* Init the GRAS's infrastructure */
269
270   gras_init(&argc, argv);
271
272   /* Get arguments and create sockets */
273
274   port=atoi(argv[1]);
275   
276   /*  Create my master socket */
277   sock = gras_socket_server(port);
278   int i;
279
280   /*  Register the known messages */
281   register_messages();
282
283   /* Recover my initialized Data and My Position*/
284   s_assignment_t assignment;
285   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],port);
286   TRY {
287     gras_msg_wait(600,gras_msgtype_by_name("assignment"),&master,&assignment);
288   } CATCH(e) {
289     RETHROW0("Can't get my assignment from master : %s");
290   }
291   myline  = assignment.linepos;
292   myrow   = assignment.rowpos;
293   mydataA = assignment.A;
294   mydataB = assignment.B;
295   INFO1("mydataB=%p",mydataB);
296
297   INFO2("Receive my pos (%d,%d) and assignment",myline,myrow);
298
299   //  xbt_matrix_dump(mydataB,"myB",0,xbt_matrix_dump_display_double);
300
301   /* Get my neighborhood from the assignment message (skipping myself) */
302   int j=0;
303   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
304     if (strcmp(gras_os_myname(),assignment.line[i]->name)) {
305       socket_line[j]=gras_socket_client(assignment.line[i]->name,
306                                         assignment.line[i]->port);
307       j++;
308     }
309     xbt_host_free(assignment.line[i]);
310   }
311   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
312     if (strcmp(gras_os_myname(),assignment.row[i]->name)) {
313       socket_row[j]=gras_socket_client(assignment.row[i]->name,
314                                        assignment.row[i]->port);
315       j++;
316     }
317     xbt_host_free(assignment.row[i]);    
318   }
319
320   
321   do {  //repeat until compute Cb
322         
323     TRY {
324       gras_msg_wait(200,gras_msgtype_by_name("step"),NULL,&step);
325     } CATCH(e) {
326       RETHROW0("Can't get a Next Step message from master : %s");
327     }
328     INFO1("Receive a step message from master: step = %d ",step);
329
330     /* a line brodcast */
331     gras_os_sleep(3);  // IL FAUT EXPRIMER LE TEMPS D'ATTENTE EN FONCTION DE "SLAVE_COUNT"
332
333     if(myline==step){
334       INFO2("step(%d) = Myline(%d)",step,myline);
335       for (l=0;l < PROC_MATRIX_SIZE-1 ;l++){
336         INFO1("mydataB=%p",mydataB);
337         gras_msg_send(socket_row[l], 
338                       gras_msgtype_by_name("dataB"), 
339                       &mydataB);
340         INFO1("mydataB=%p",mydataB);
341         
342         xbt_matrix_free(bB);
343         INFO1("mydataB=%p",mydataB);
344         xbt_matrix_dump(mydataB,"myB",0,xbt_matrix_dump_display_double);
345         bB = xbt_matrix_new_sub(mydataB,
346                                 submatrix_size,submatrix_size,
347                                 0,0,NULL);
348         
349         INFO0("send my data B to my (vertical) neighbors");
350       }
351     } else {
352       INFO2("step(%d) <> Myline(%d)",step,myline);
353       TRY {
354         INFO1("mydataB=%p",mydataB);
355         xbt_matrix_free(bB);
356         INFO1("mydataB=%p",mydataB);
357         gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
358         INFO1("mydataB=%p",mydataB);
359       } CATCH(e) {
360         RETHROW0("Can't get a data message from line : %s");
361       }
362       INFO1("Receive data B from my neighbor: %s",
363             gras_socket_peer_name(from));
364     }
365
366     /* a row brodcast */
367     if (myrow==step) {
368       for (l=1;l < PROC_MATRIX_SIZE ;l++){
369         gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
370         xbt_matrix_free(bA);
371         bA = xbt_matrix_new_sub(mydataA,
372                                 submatrix_size,submatrix_size,
373                                 0,0,NULL);
374         
375         INFO0("send my data A to my (horizontal) neighbors");
376       }
377     } else {
378       TRY {
379         xbt_matrix_free(bA);
380         gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
381       } CATCH(e) {
382         RETHROW0("Can't get a data message from row : %s");
383       }
384       INFO1("Receive data A from my neighbor : %s ",
385             gras_socket_peer_name(from));
386     }
387     xbt_matrix_double_addmult(bA,bB,bC);
388
389     /* send a ack msg to master */
390         
391     gras_msg_send(master,gras_msgtype_by_name("step_ack"),&step);
392     
393     INFO1("Send ack to master for to end %d th step",step);
394         
395   } while (step < PROC_MATRIX_SIZE);
396   /*  wait Message from master to send the result */
397  
398   result.C=bC;
399   result.linepos=myline;
400   result.rowpos=myrow;
401  
402   TRY {
403     gras_msg_wait(600,gras_msgtype_by_name("ask_result"),
404                   &master,&result_ack);
405   } CATCH(e) {
406     RETHROW0("Can't get a data message from line : %s");
407   }
408   /* send Result to master */
409   TRY {
410     gras_msg_send(master, gras_msgtype_by_name("result"),&result);
411   } CATCH(e) {
412     // gras_socket_close(from);
413     RETHROW0("Failed to send PING to server: %s");
414   }
415   INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
416         gras_socket_peer_name(master),gras_socket_peer_port(master));
417   /*  Free the allocated resources, and shut GRAS down */
418   gras_socket_close(master);
419   gras_socket_close(from);
420   gras_exit();
421   INFO0("Done.");
422   return 0;
423 } /* end_of_slave */