Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
stupid me. col means row in french, so use line/row and not row/col, which means...
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                       */
3
4 /* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved.                  */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #define PROC_MATRIX_SIZE 3
11 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
12
13 #define DATA_MATRIX_SIZE 3
14
15 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
16
17 GRAS_DEFINE_TYPE(s_matrix,struct s_matrix {
18   int rows;
19   int cols;
20   double *data GRAS_ANNOTE(size, rows*cols);
21 };)
22 typedef struct s_matrix matrix_t;
23
24 /* struct for recovering results */
25 GRAS_DEFINE_TYPE(s_result,struct s_result {
26   int i;
27   int j;
28   double value;
29 });
30 typedef struct s_result result_t;
31
32 /* struct to send initial data to slave */
33 GRAS_DEFINE_TYPE(s_init_data,struct s_init_data {
34   int myrow;
35   int mycol;
36   double a;
37   double b;
38 });
39 typedef struct s_init_data init_data_t;
40
41 /* register messages which may be sent (common to client and server) */
42 static void register_messages(void) {
43   gras_datadesc_type_t result_type;
44   gras_datadesc_type_t init_data_type;
45   result_type=gras_datadesc_by_symbol(s_result);
46   init_data_type=gras_datadesc_by_symbol(s_init_data);
47         
48   gras_msgtype_declare("result", result_type);  // receive a final result from slave
49   gras_msgtype_declare("init_data", init_data_type);  // send from master to slave to initialize data bA,bB
50
51   gras_msgtype_declare("ask_result", gras_datadesc_by_name("int")); // send from master to slave to ask a final result  
52   gras_msgtype_declare("step", gras_datadesc_by_name("int"));// send from master to slave to indicate the begining of step 
53   gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));//send from slave to master to indicate the end of the current step
54   gras_msgtype_declare("dataA", gras_datadesc_by_name("double"));// send data between slave
55   gras_msgtype_declare("dataB", gras_datadesc_by_name("double"));// send data between slave
56 }
57
58 /* Function prototypes */
59 int slave (int argc,char *argv[]);
60 int master (int argc,char *argv[]);
61
62
63 /* **********************************************************************
64  * master code
65  * **********************************************************************/
66
67 /* Global private data */
68 typedef struct {
69   int nbr_col,nbr_row;
70   int remaining_step;
71   int remaining_ack;
72 } master_data_t;
73
74
75 /***  Function initilaze matrixs ***/
76
77 static void initmatrix(matrix_t *X){
78   int i;
79
80   for(i=0 ; i<(X->rows)*(X->cols); i++)
81     X->data[i]=1.0;//*rand()/(RAND_MAX+1.0);
82 } /* end_of_initmatrixs */
83
84 /***  Function Scatter Sequentiel ***/
85
86 static void scatter(){
87
88 }/* end_of_Scatter */
89
90 /***  Function: Scatter // ***/
91
92 static void scatter_parl(){
93
94 }/* end_of_Scatter // */
95
96 /***  Function: multiplication ***/
97
98 static void multiplication(){
99
100 }/* end_of_multiplication */
101
102 /***  Function: gather ***/
103
104 static void gather(){
105
106 }/* end_of_gather */
107
108 /***  Function: Display Matrix ***/
109
110 static void display(matrix_t X){
111         
112   int i,j,t=0;
113
114   printf("      ");
115   for(j=0;j<X.cols;j++)
116     printf("%.3d ",j);
117   printf("\n");
118   printf("    __");
119   for(j=0;j<X.cols;j++)
120     printf("____");
121   printf("_\n");
122
123   for(i=0;i<X.rows;i++){
124     printf("%.3d | ",i);
125     for(j=0;j<X.cols;j++)
126       printf("%.3g ",X.data[t++]);
127     printf("|\n");
128   }
129   printf("    --");
130   for(j=0;j<X.cols;j++)
131     printf("----");
132   printf("-\n");
133
134 }/* end_of_display */
135
136 int master (int argc,char *argv[]) {
137
138   xbt_ex_t e;
139
140   int i,port,ask_result,step;
141
142   matrix_t A,B,C;
143   result_t result;
144
145   gras_socket_t from;
146
147   /*  Init the GRAS's infrastructure */
148   gras_init(&argc, argv);
149
150   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
151
152   /*  Initialize Matrixs */
153
154   A.rows=A.cols=DATA_MATRIX_SIZE;
155   B.rows=B.cols=DATA_MATRIX_SIZE;
156   C.rows=C.cols=DATA_MATRIX_SIZE;
157         
158   A.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
159   B.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
160   C.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
161         
162   initmatrix(&A);
163   initmatrix(&B);
164         
165   /*  Get arguments and create sockets */
166   port=atoi(argv[1]);
167   //scatter();
168   //scatter_parl();
169   //multiplication();
170   //gather();
171   //display(A);
172   /************************* Init Data Send *********************************/
173   int step_ack,j=0;
174   init_data_t mydata;
175   gras_os_sleep(60);      // MODIFIER LES TEMPS D'ATTENTE 60 c trop normalement
176
177   int row=1, col=1;
178   for( i=2;i< argc;i++){
179     TRY {
180       socket[j]=gras_socket_client(argv[i],port);
181     } CATCH(e) {
182       RETHROW0("Unable to connect to the server: %s");
183     }
184     INFO2("Connected to %s:%d.",argv[i],port);
185                 
186     mydata.myrow=row;  // My row
187     mydata.mycol=col;  // My column
188     row++;
189     if (row > PROC_MATRIX_SIZE) {
190       row=1;
191       col++;
192     }
193                 
194     mydata.a=A.data[(mydata.myrow-1)*PROC_MATRIX_SIZE+(mydata.mycol-1)];
195     mydata.b=B.data[(mydata.myrow-1)*PROC_MATRIX_SIZE+(mydata.mycol-1)];;
196                 
197     gras_msg_send(socket[j],gras_msgtype_by_name("init_data"),&mydata);
198     INFO3("Send Init Data to %s : data A= %.3g & data B= %.3g",
199           gras_socket_peer_name(socket[j]),mydata.a,mydata.b);
200     j++;
201   }
202     // end init Data Send
203
204   /******************************* multiplication ********************************/
205   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
206         
207   for (step=1; step <= PROC_MATRIX_SIZE; step++){
208     //    gras_os_sleep(50);
209     for (i=0; i< SLAVE_COUNT; i++){
210       TRY {
211         gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step);
212       } CATCH(e) {
213         gras_socket_close(socket[i]);
214         RETHROW0("Unable to send the msg : %s");
215       }
216     }
217     INFO1("send to slave to begin a %d th step",step);
218     /* wait for computing and slave messages exchange */
219     i=0;
220         
221     while  ( i< SLAVE_COUNT){
222       TRY {
223         gras_msg_wait(1300,gras_msgtype_by_name("step_ack"),&from,&step_ack);
224       } CATCH(e) {
225         RETHROW0("I Can't get a Ack step message from slave : %s");
226       }
227       i++;
228       INFO3("Receive Ack step ack from %s (got %d of %d)",
229             gras_socket_peer_name(from),
230             i, SLAVE_COUNT);
231     }
232   }
233   /*********************************  gather ***************************************/
234
235   ask_result=0;
236   for( i=1;i< argc;i++){
237     gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
238     INFO1("Send (Ask Result) message to %s",gras_socket_peer_name(socket[i]));
239   }
240   /* wait for results */
241   for( i=1;i< argc;i++){
242     gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
243     C.data[(result.i-1)*DATA_MATRIX_SIZE+(result.j-1)]=result.value;
244   }
245   /*    end of gather   */
246   INFO0 ("The Result of Multiplication is :");
247   display(C);
248
249   return 0;
250 } /* end_of_master */
251
252 /* **********************************************************************
253  * slave code
254  * **********************************************************************/
255
256 int slave(int argc,char *argv[]) {
257
258   xbt_ex_t e; 
259
260   int step,port,l,result_ack=0; 
261   double bA,bB;
262
263   int myrow,mycol;
264   double mydataA,mydataB;
265   double bC=0;
266   
267   //  static end_step;
268
269   result_t result;
270  
271   gras_socket_t from,sock;  /* to receive from server for steps */
272
273   /* sockets for brodcast to other slave */
274   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
275   gras_socket_t socket_column[PROC_MATRIX_SIZE-1];
276
277   /* Init the GRAS's infrastructure */
278
279   gras_init(&argc, argv);
280
281   /* Get arguments and create sockets */
282
283   port=atoi(argv[1]);
284   
285   /*  Create my master socket */
286   sock = gras_socket_server(port);
287   INFO2("Launch %s (port=%d)",argv[0],port);
288   gras_os_sleep(1); //wait to start all slaves 
289
290   int i;
291   for (i=1;i<PROC_MATRIX_SIZE;i++){
292     socket_row[i-1]=gras_socket_client(argv[i+1],port);
293     socket_column[i-1]=gras_socket_client(argv[i+PROC_MATRIX_SIZE],port);
294   }
295
296   /*  Register the known messages */
297   register_messages();
298
299   /* Recover my initialized Data and My Position*/
300   init_data_t mydata;
301   INFO0("wait for init Data");
302   TRY {
303     gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
304   } CATCH(e) {
305     RETHROW0("I Can't get a init Data message from master : %s");
306   }
307   myrow=mydata.myrow;
308   mycol=mydata.mycol;
309   mydataA=mydata.a;
310   mydataB=mydata.b;
311   INFO4("Receive MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )",
312         myrow,mycol,mydataA,mydataB);
313   step=1;
314   
315   do {  //repeat until compute Cb
316     step=PROC_MATRIX_SIZE+1;  // just intilization for loop
317         
318     TRY {
319       gras_msg_wait(200,gras_msgtype_by_name("step"),&from,&step);
320     } CATCH(e) {
321       RETHROW0("I Can't get a Next Step message from master : %s");
322     }
323     INFO1("Receive a step message from master: step = %d ",step);
324
325     if (step < PROC_MATRIX_SIZE ){
326       /* a row brodcast */
327       gras_os_sleep(3);  // IL FAUT EXPRIMER LE TEMPS D'ATTENTE EN FONCTION DE "SLAVE_COUNT"
328       if(myrow==step){
329         INFO2("step(%d) = Myrow(%d)",step,myrow);
330         for (l=1;l < PROC_MATRIX_SIZE ;l++){
331           gras_msg_send(socket_column[l-1], gras_msgtype_by_name("dataB"), &mydataB);
332           bB=mydataB;
333           INFO1("send my data B (%.3g) to my (vertical) neighbors",bB);  
334         }
335       }
336       if(myrow != step){ 
337         INFO2("step(%d) <> Myrow(%d)",step,myrow);
338         TRY {
339           gras_msg_wait(600,gras_msgtype_by_name("dataB"),
340                         &from,&bB);
341         } CATCH(e) {
342           RETHROW0("I Can't get a data message from row : %s");
343         }
344         INFO2("Receive data B (%.3g) from my neighbor: %s",bB,gras_socket_peer_name(from));
345       }
346       /* a column brodcast */
347       if(mycol==step){
348         for (l=1;l < PROC_MATRIX_SIZE ;l++){
349           gras_msg_send(socket_row[l-1],gras_msgtype_by_name("dataA"), &mydataA);
350           bA=mydataA;
351           INFO1("send my data A (%.3g) to my (horizontal) neighbors",bA);
352         }
353       }
354
355       if(mycol != step){
356         TRY {
357           gras_msg_wait(1200,gras_msgtype_by_name("dataA"),
358                         &from,&bA);
359         } CATCH(e) {
360           RETHROW0("I Can't get a data message from column : %s");
361         }
362         INFO2("Receive data A (%.3g) from my neighbor : %s ",bA,gras_socket_peer_name(from));
363       }
364       bC+=bA*bB;
365       INFO1(">>>>>>>> My BC = %.3g",bC);
366
367       /* send a ack msg to master */
368         
369       gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step);
370         
371       INFO1("Send ack to master for to end %d th step",step);
372     }
373     if(step==PROC_MATRIX_SIZE-1) break;
374         
375   } while (step < PROC_MATRIX_SIZE);
376   /*  wait Message from master to send the result */
377  
378   result.value=bC;
379   result.i=myrow;
380   result.j=mycol;
381  
382   TRY {
383     gras_msg_wait(600,gras_msgtype_by_name("ask_result"),
384                   &from,&result_ack);
385   } CATCH(e) {
386     RETHROW0("I Can't get a data message from row : %s");
387   }
388   /* send Result to master */
389   TRY {
390     gras_msg_send(from, gras_msgtype_by_name("result"),&result);
391   } CATCH(e) {
392     // gras_socket_close(from);
393     RETHROW0("Failed to send PING to server: %s");
394   }
395   INFO3(">>>>>>>> Result: %.3f sent to %s:%d <<<<<<<<",
396         bC,
397         gras_socket_peer_name(from),gras_socket_peer_port(from));
398   /*  Free the allocated resources, and shut GRAS down */
399   gras_socket_close(from);
400   gras_exit();
401   INFO0("Done.");
402   return 0;
403 } /* end_of_slave */