Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
be2cdbf49268ce8dd2524fcfb14d9a4031768aa2
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved.                 */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #define PROC_MATRIX_SIZE 3
11 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
12
13 #define DATA_MATRIX_SIZE 3
14
15 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
16
17 GRAS_DEFINE_TYPE(s_matrix,struct s_matrix {
18   int lines;
19   int rows;
20   double *data GRAS_ANNOTE(size, lines*rows);
21 };)
22 typedef struct s_matrix matrix_t;
23
24 /* struct for recovering results */
25 GRAS_DEFINE_TYPE(s_result,struct s_result {
26   int i;
27   int j;
28   double value;
29 });
30 typedef struct s_result result_t;
31
32 /* struct to send initial data to slave */
33 GRAS_DEFINE_TYPE(s_init_data,struct s_init_data {
34   int linepos;
35   int rowpos;
36   xbt_host_t line[PROC_MATRIX_SIZE];
37   xbt_host_t row[PROC_MATRIX_SIZE];
38   double a;
39   double b;
40 });
41 typedef struct s_init_data init_data_t;
42
43 /* register messages which may be sent (common to client and server) */
44 static void register_messages(void) {
45   gras_datadesc_type_t result_type;
46   gras_datadesc_type_t init_data_type;
47   gras_datadesc_set_const("PROC_MATRIX_SIZE",PROC_MATRIX_SIZE);
48   result_type=gras_datadesc_by_symbol(s_result);
49   init_data_type=gras_datadesc_by_symbol(s_init_data);
50         
51   gras_msgtype_declare("result", result_type);  // receive a final result from slave
52   gras_msgtype_declare("init_data", init_data_type);  // send from master to slave to initialize data bA,bB
53
54   gras_msgtype_declare("ask_result", gras_datadesc_by_name("int")); // send from master to slave to ask a final result  
55   gras_msgtype_declare("step", gras_datadesc_by_name("int"));// send from master to slave to indicate the begining of step 
56   gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));//send from slave to master to indicate the end of the current step
57   gras_msgtype_declare("dataA", gras_datadesc_by_name("double"));// send data between slave
58   gras_msgtype_declare("dataB", gras_datadesc_by_name("double"));// send data between slave
59 }
60
61 /* Function prototypes */
62 int slave (int argc,char *argv[]);
63 int master (int argc,char *argv[]);
64
65
66 /* **********************************************************************
67  * master code
68  * **********************************************************************/
69
70 /* Global private data */
71 typedef struct {
72   int nbr_row,nbr_line;
73   int remaining_step;
74   int remaining_ack;
75 } master_data_t;
76
77
78 /***  Function initilaze matrixs ***/
79
80 static void initmatrix(matrix_t *X){
81   int i;
82
83   for(i=0 ; i<(X->lines)*(X->rows); i++)
84     X->data[i]=1.0;//*rand()/(RAND_MAX+1.0);
85 } /* end_of_initmatrixs */
86
87 /***  Function Scatter Sequentiel ***/
88
89 static void scatter(){
90
91 }/* end_of_Scatter */
92
93 /***  Function: Scatter // ***/
94
95 static void scatter_parl(){
96
97 }/* end_of_Scatter // */
98
99 /***  Function: multiplication ***/
100
101 static void multiplication(){
102
103 }/* end_of_multiplication */
104
105 /***  Function: gather ***/
106
107 static void gather(){
108
109 }/* end_of_gather */
110
111 /***  Function: Display Matrix ***/
112
113 static void display(matrix_t X){
114         
115   int i,j,t=0;
116
117   printf("      ");
118   for(j=0;j<X.rows;j++)
119     printf("%.3d ",j);
120   printf("\n");
121   printf("    __");
122   for(j=0;j<X.rows;j++)
123     printf("____");
124   printf("_\n");
125
126   for(i=0;i<X.lines;i++){
127     printf("%.3d | ",i);
128     for(j=0;j<X.rows;j++)
129       printf("%.3g ",X.data[t++]);
130     printf("|\n");
131   }
132   printf("    --");
133   for(j=0;j<X.rows;j++)
134     printf("----");
135   printf("-\n");
136
137 }/* end_of_display */
138
139 int master (int argc,char *argv[]) {
140
141   xbt_ex_t e;
142
143   int i,port,ask_result,step;
144
145   matrix_t A,B,C;
146   result_t result;
147
148   gras_socket_t from;
149
150   /*  Init the GRAS's infrastructure */
151   gras_init(&argc, argv);
152
153   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
154   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
155
156   /*  Initialize Matrixs */
157
158   A.lines=A.rows=DATA_MATRIX_SIZE;
159   B.lines=B.rows=DATA_MATRIX_SIZE;
160   C.lines=C.rows=DATA_MATRIX_SIZE;
161         
162   A.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
163   B.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
164   C.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
165         
166   initmatrix(&A);
167   initmatrix(&B);
168         
169   /*  Get arguments and create sockets */
170   port=atoi(argv[1]);
171   //scatter();
172   //scatter_parl();
173   //multiplication();
174   //gather();
175   //display(A);
176   /************************* Init Data Send *********************************/
177   int step_ack;
178   gras_os_sleep(60);      // MODIFIER LES TEMPS D'ATTENTE 60 c trop normalement
179
180   for( i=1;i< argc;i++){
181     grid[i-1]=xbt_host_from_string(argv[i]);
182     socket[i-1]=gras_socket_client(grid[i-1]->name,grid[i-1]->port);
183       
184     INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port);
185   }
186
187   int row=1, line=1,j;
188   for(i=0 ; i<SLAVE_COUNT; i++){
189     init_data_t mydata;
190     mydata.linepos=line;  // My line
191     mydata.rowpos=row;  // My row
192
193
194     /* Neiborhood */
195     for (j=0; j<PROC_MATRIX_SIZE; j++) {
196       mydata.row[j] = xbt_host_copy( grid[ j*PROC_MATRIX_SIZE+(row-1) ] );
197       mydata.line[j] =  xbt_host_copy( grid[ (line-1)*PROC_MATRIX_SIZE+j ] );
198     }
199
200     row++;
201     if (row > PROC_MATRIX_SIZE) {
202       row=1;
203       line++;
204     }
205                 
206     mydata.a=A.data[(line-1)*PROC_MATRIX_SIZE+(row-1)];
207     mydata.b=B.data[(line-1)*PROC_MATRIX_SIZE+(row-1)];;
208                 
209     gras_msg_send(socket[i],gras_msgtype_by_name("init_data"),&mydata);
210     INFO3("Send Init Data to %s : data A= %.3g & data B= %.3g",
211           gras_socket_peer_name(socket[i]),mydata.a,mydata.b);
212
213   }
214     // end init Data Send
215
216   /******************************* multiplication ********************************/
217   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
218         
219   for (step=1; step <= PROC_MATRIX_SIZE; step++){
220     //    gras_os_sleep(50);
221     for (i=0; i< SLAVE_COUNT; i++){
222       TRY {
223         gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step);
224       } CATCH(e) {
225         gras_socket_close(socket[i]);
226         RETHROW0("Unable to send the msg : %s");
227       }
228     }
229     INFO1("send to slave to begin a %d th step",step);
230     /* wait for computing and slave messages exchange */
231     i=0;
232         
233     while  ( i< SLAVE_COUNT){
234       TRY {
235         gras_msg_wait(1300,gras_msgtype_by_name("step_ack"),&from,&step_ack);
236       } CATCH(e) {
237         RETHROW0("I Can't get a Ack step message from slave : %s");
238       }
239       i++;
240       INFO3("Receive Ack step ack from %s (got %d of %d)",
241             gras_socket_peer_name(from),
242             i, SLAVE_COUNT);
243     }
244   }
245   /*********************************  gather ***************************************/
246
247   ask_result=0;
248   for( i=1;i< argc;i++){
249     gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
250     INFO1("Send (Ask Result) message to %s",gras_socket_peer_name(socket[i]));
251   }
252   /* wait for results */
253   for( i=1;i< argc;i++){
254     gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
255     C.data[(result.i-1)*DATA_MATRIX_SIZE+(result.j-1)]=result.value;
256   }
257   /*    end of gather   */
258   INFO0 ("The Result of Multiplication is :");
259   display(C);
260
261   return 0;
262 } /* end_of_master */
263
264 /* **********************************************************************
265  * slave code
266  * **********************************************************************/
267
268 int slave(int argc,char *argv[]) {
269
270   xbt_ex_t e; 
271
272   int step,port,l,result_ack=0; 
273   double bA,bB;
274
275   int myline,myrow;
276   double mydataA,mydataB;
277   double bC=0;
278   
279   //  static end_step;
280
281   result_t result;
282  
283   gras_socket_t from,sock;  /* to receive from server for steps */
284
285   /* sockets for brodcast to other slave */
286   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
287   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
288
289   /* Init the GRAS's infrastructure */
290
291   gras_init(&argc, argv);
292
293   /* Get arguments and create sockets */
294
295   port=atoi(argv[1]);
296   
297   /*  Create my master socket */
298   sock = gras_socket_server(port);
299   int i;
300
301   /*  Register the known messages */
302   register_messages();
303
304   /* Recover my initialized Data and My Position*/
305   init_data_t mydata;
306   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],port);
307   TRY {
308     gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
309   } CATCH(e) {
310     RETHROW0("Can't get a init Data message from master : %s");
311   }
312   myline=mydata.linepos;
313   myrow=mydata.rowpos;
314   mydataA=mydata.a;
315   mydataB=mydata.b;
316   INFO4("Receive MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )",
317         myline,myrow,mydataA,mydataB);
318
319   /* Get my neighborhood from the enrollment message */
320   int j=0;
321   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
322     if (strcmp(gras_os_myname(),mydata.line[i]->name)) {
323       socket_line[j]=gras_socket_client(mydata.line[i]->name,mydata.line[i]->port);
324       j++;
325       //INFO3("Line neighbour %d: %s:%d",j,mydata.line[i]->name,mydata.line[i]->port);
326     }
327     xbt_host_free(mydata.line[i]);
328   }
329   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
330     if (strcmp(gras_os_myname(),mydata.row[i]->name)) {
331       socket_row[j]=gras_socket_client(mydata.row[i]->name,mydata.row[i]->port);
332       //INFO3("Row neighbour %d : %s:%d",j,mydata.row[i]->name,mydata.row[i]->port);
333       j++;
334     }
335     xbt_host_free(mydata.row[i]);    
336   }
337
338   step=1;
339   
340   do {  //repeat until compute Cb
341     step=PROC_MATRIX_SIZE+1;  // just intilization for loop
342         
343     TRY {
344       gras_msg_wait(200,gras_msgtype_by_name("step"),&from,&step);
345     } CATCH(e) {
346       RETHROW0("Can't get a Next Step message from master : %s");
347     }
348     INFO1("Receive a step message from master: step = %d ",step);
349
350     if (step < PROC_MATRIX_SIZE ){
351       /* a line brodcast */
352       gras_os_sleep(3);  // IL FAUT EXPRIMER LE TEMPS D'ATTENTE EN FONCTION DE "SLAVE_COUNT"
353       if(myline==step){
354         INFO2("step(%d) = Myline(%d)",step,myline);
355         for (l=1;l < PROC_MATRIX_SIZE ;l++){
356           gras_msg_send(socket_row[l-1], gras_msgtype_by_name("dataB"), &mydataB);
357           bB=mydataB;
358           INFO1("send my data B (%.3g) to my (vertical) neighbors",bB);  
359         }
360       }
361       if(myline != step){ 
362         INFO2("step(%d) <> Myline(%d)",step,myline);
363         TRY {
364           gras_msg_wait(600,gras_msgtype_by_name("dataB"),
365                         &from,&bB);
366         } CATCH(e) {
367           RETHROW0("I Can't get a data message from line : %s");
368         }
369         INFO2("Receive data B (%.3g) from my neighbor: %s",bB,gras_socket_peer_name(from));
370       }
371       /* a row brodcast */
372       if(myrow==step){
373         for (l=1;l < PROC_MATRIX_SIZE ;l++){
374           gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
375           bA=mydataA;
376           INFO1("send my data A (%.3g) to my (horizontal) neighbors",bA);
377         }
378       }
379
380       if(myrow != step){
381         TRY {
382           gras_msg_wait(1200,gras_msgtype_by_name("dataA"),
383                         &from,&bA);
384         } CATCH(e) {
385           RETHROW0("I Can't get a data message from row : %s");
386         }
387         INFO2("Receive data A (%.3g) from my neighbor : %s ",bA,gras_socket_peer_name(from));
388       }
389       bC+=bA*bB;
390       INFO1(">>>>>>>> My BC = %.3g",bC);
391
392       /* send a ack msg to master */
393         
394       gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step);
395         
396       INFO1("Send ack to master for to end %d th step",step);
397     }
398     if(step==PROC_MATRIX_SIZE-1) break;
399         
400   } while (step < PROC_MATRIX_SIZE);
401   /*  wait Message from master to send the result */
402  
403   result.value=bC;
404   result.i=myline;
405   result.j=myrow;
406  
407   TRY {
408     gras_msg_wait(600,gras_msgtype_by_name("ask_result"),
409                   &from,&result_ack);
410   } CATCH(e) {
411     RETHROW0("I Can't get a data message from line : %s");
412   }
413   /* send Result to master */
414   TRY {
415     gras_msg_send(from, gras_msgtype_by_name("result"),&result);
416   } CATCH(e) {
417     // gras_socket_close(from);
418     RETHROW0("Failed to send PING to server: %s");
419   }
420   INFO3(">>>>>>>> Result: %.3f sent to %s:%d <<<<<<<<",
421         bC,
422         gras_socket_peer_name(from),gras_socket_peer_port(from));
423   /*  Free the allocated resources, and shut GRAS down */
424   gras_socket_close(from);
425   gras_exit();
426   INFO0("Done.");
427   return 0;
428 } /* end_of_slave */