Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
example of parallel matrix multiplication : I modified the pmm.c by adding messages...
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* pmm - paralel matrix multiplication "double diffusion"                       */
2
3 /* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved.                  */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "gras.h"
9 #define MATRIX_SIZE 3
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Messages specific to this example");
12
13 GRAS_DEFINE_TYPE(s_matrix,struct s_matrix {
14         int rows;
15         int cols;
16         double *data GRAS_ANNOTE(size, rows*cols);
17 };)
18 typedef struct s_matrix matrix_t;
19
20 /* struct for recovering results */
21 GRAS_DEFINE_TYPE(s_result,struct s_result {
22         int i;
23         int j;
24         double value;
25 });
26 typedef struct s_result result_t;
27
28 /* struct to send initial data to sensor */
29 GRAS_DEFINE_TYPE(s_init_data,struct s_init_data {
30         int myrow;
31         int mycol;
32         double a;
33         double b;
34 });
35 typedef struct s_init_data init_data_t;
36
37 /* register messages which may be sent (common to client and server) */
38 static void register_messages(void) {
39         gras_datadesc_type_t result_type;
40         gras_datadesc_type_t init_data_type;
41         result_type=gras_datadesc_by_symbol(s_result);
42         init_data_type=gras_datadesc_by_symbol(s_init_data);
43         
44         gras_msgtype_declare("result", result_type);
45         gras_msgtype_declare("init_data", init_data_type);
46
47         gras_msgtype_declare("ask_result", gras_datadesc_by_name("int"));       
48         gras_msgtype_declare("step", gras_datadesc_by_name("int"));
49         gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));
50         gras_msgtype_declare("data", gras_datadesc_by_name("int"));
51 }
52
53 /* Function prototypes */
54 int maestro (int argc,char *argv[]);
55 int sensor (int argc,char *argv[]);
56
57 /* **********************************************************************
58  * Maestro code
59  * **********************************************************************/
60
61 /* Global private data */
62 typedef struct {
63   int nbr_col,nbr_row;
64   int remaining_step;
65   int remaining_ack;
66 } maestro_data_t;
67
68
69 static int maestro_cb_data_handler(gras_msg_cb_ctx_t ctx, void *payload) {
70
71   xbt_ex_t e;
72   /* 1. Get the payload into the msg variable */
73   int msg=*(int*)payload_data;
74
75   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
76
77   /*code of callback */
78    
79   /* 8. Make sure we don't leak sockets */
80   gras_socket_close(expeditor);
81    
82   /* 9. Tell GRAS that we consummed this message */
83   return 1;
84 } /* end_of_maestro_cb_data_handler */
85
86 /***  Function initilaze matrixs ***/
87
88 static void initmatrix(matrix_t *X){
89         int i;
90         for(i=0 ; i<X.rows*X.cols; i++)
91                 X.data[i]=1.0;//1.0*rand()/(RAND_MAX+1.0);
92
93 } /* end_of_initmatrixs */
94
95 /***  Function Scatter Sequentiel ***/
96
97 static void scatter(){
98
99 }/* end_of_Scatter */
100
101 /***  Function: Scatter // ***/
102
103 static void scatter_parl(){
104
105 }/* end_of_Scatter // */
106
107 /***  Function: multiplication ***/
108
109 static void multiplication(){
110         
111   int step,i;
112         
113   for (step=1; step <= MATRIX_SIZE; step++){
114     for (i=0; i< nbr_sensor; i++){
115        TRY {
116                gras_msg_send(proc[(i/3)+1][(i%3)+1], gras_msgtype_by_name("step"), &step);  /* initialize Mycol, MyRow, mydataA,mydataB*/
117   
118                myrow,mycol,mydataA,mydataB
119        } CATCH(e) {
120         gras_socket_close(proc[(i/3)+1][(i%3)+1]);
121         RETHROW0("Unable to send the msg : %s");
122        }
123     }
124     /* wait for computing and sensor messages exchange */
125     TRY {
126             gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
127     } CATCH(e) {
128             RETHROW0("I Can't get a init Data message from Maestro : %s");
129     }
130   }
131
132 }/* end_of_multiplication */
133
134 /***  Function: gather ***/
135
136 static void gather(){
137
138         
139 }/* end_of_gather */
140
141 /***  Function: Display Matrix ***/
142
143 static void display(matrix_t X){
144         
145 int i,j,t=0;
146
147   printf("      ");
148   for(j=0;j<X.cols;j++)
149     printf("%.3d ",j);
150     printf("\n");
151     printf("    __");
152     for(j=0;j<X.cols;j++)
153         printf("____");
154         printf("_\n");
155
156         for(i=0;i<X.rows;i++){
157           printf("%.3d | ",i);
158           for(j=0;j<X.cols;j++)
159             printf("%.3g ",X.data[t++]);
160           printf("|\n");
161         }
162         printf("    --");
163         for(j=0;j<X.cols;j++)
164                 printf("----");
165         printf("-\n");
166
167 }/* end_of_display */
168
169 int maestro (int argc,char *argv[]) {
170
171 xbt_ex_t e;
172 int i,ask_result,step;
173 result_t result;
174 matrix_t A,B,C;
175 gras_socket_t from;
176 result_t result;
177
178   gras_socket_t socket[MATRIX_SIZE*MATRIX_SIZE]; /* sockets for brodcast to other sensor */
179         
180
181   /*  Initialize Matrixs */
182
183         A.rows=A.cols=MATRIX_SIZE;
184         B.rows=B.cols=MATRIX_SIZE;
185         C.rows=C.cols=MATRIX_SIZE;
186         
187         A.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
188         B.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
189         C.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
190         
191         initmatrix(&A);
192         initmatrix(&B);
193         
194         /*  Init the GRAS's infrastructure */
195         gras_init(&argc, argv);
196         /*  Get arguments and create sockets */
197         port=atoi(argv[1]);
198         //scatter();multiplication();gather();
199         //scatter_parl();
200         /****************************** Init Data Send *********************************/
201         int j=0;
202         init_data_t mydata;
203         for( i=2;i< argc;i+=3){
204                 
205                 TRY {
206                         socket[j]=gras_socket_client(argv[i],port);
207                         
208                 } CATCH(e) {
209                         RETHROW0("Unable to connect to the server: %s");
210                 }
211                 INFO2("Connected to %s:%d.",argv[i],port);
212                 
213                 mydata.myrow=argv[i+1];  // My rank of row
214                 mydata.mycol=argv[i+2];  // My rank of column
215                 mydata.a=A.data[(mydata.myrow-1)*MATRIX_SIZE+(mydata.mycol-1)];
216                 mydata.b=B.data[(mydata.myrow-1)*MATRIX_SIZE+(mydata.mycol-1)];;
217                 
218                 gras_msg_send(socket[j],gras_msgtype_by_name("init_data"),&mydata);
219                 j++;
220         } // end init Data Send
221
222         /******************************* multiplication ********************************/
223
224         for (step=1; step <= MATRIX_SIZE; step++){
225                 for (i=0; i< nbr_sensor; i++){
226                 TRY {
227                 gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step);  /* initialize Mycol, MyRow, mydataA,mydataB*/
228   
229                 myrow,mycol,mydataA,mydataB
230                     } CATCH(e) {
231                 gras_socket_close(socket[i]);
232                 RETHROW0("Unable to send the msg : %s");
233                 }
234         }
235         INFO0("send to sensor to begin a next step");
236         /* wait for computing and sensor messages exchange */
237         i=0;
238         while  ( i< nbr_sensor){
239                 TRY {
240                 gras_msg_wait(600,gras_msgtype_by_name("step_ack"),&from,&mydata);
241                 } CATCH(e) {
242                 RETHROW0("I Can't get a init Data message from Maestro : %s");
243                 }
244                 
245                 i++;
246                 INFO1("Recive step ack from %s",gras_socket_peer_name(from));
247         }
248
249         }
250         /*********************************  gather ***************************************/
251         
252         int ask_result=0;
253         for( i=1;i< argc;i++){
254                 gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
255         }
256         /* wait for results */
257         for( i=1;i< argc;i++){
258                 gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
259                 C.data[(result.i-1)*MATRIX_SIZE+(result.j-1)]=result.value;
260         }
261         /*    end of gather   */
262         display(C);
263
264 return 0;
265 } /* end_of_maestro */
266
267 /* **********************************************************************
268  * Sensor code
269  * **********************************************************************/
270
271 int sensor(int argc,char *argv[]) {
272
273   xbt_ex_t e; 
274
275   static int bC=0;
276   static int myrow,mycol;
277   static double mydataA,mydataB;
278   int bA,bB;
279   int step,l,result_ack=0;
280
281   gras_socket_t from;  /* to recive from server for steps */
282
283   gras_socket_t socket_row[2],socket_column[2]; /* sockets for brodcast to other sensor */
284
285   /* Init the GRAS's infrastructure */
286
287   gras_init(&argc, argv);
288
289   /* Get arguments and create sockets */
290
291   port=atoi(argv[1]);
292   int i;
293   for (i=1;i<MATRIX_SIZE;i++){
294   socket_row[i]=gras_socket_client(argv[i+1],port);
295   socket_column[i]=gras_socket_client(argv[i+MATRIX_SIZE],port);
296   }
297   INFO2("Launch %s (port=%d)",argv[0],port);
298
299   /*  Create my master socket */
300   sock = gras_socket_server(port);
301
302   /*  Register the known messages */
303   register_messages();
304
305   /* Recover my initialized Data and My Position*/
306   init_data_t mydata;
307
308   TRY {
309           gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
310   } CATCH(e) {
311         RETHROW0("I Can't get a init Data message from Maestro : %s");
312   }
313   myrow=mydata.myrow;
314   mycol=mydata.mycol;
315   mydataA=mydata.a;
316   mydataB=mydata.b;
317
318   INFO4("Recover MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )",
319         myrow,mycol,mydataA,mydataB);
320
321
322   do {  //repeat until compute Cb
323         step=MATRIX_SIZE+1;  // juste intilization for loop
324
325   TRY {
326         gras_msg_wait(600,gras_msgtype_by_name("step"),&from,&step);
327   } CATCH(e) {
328           RETHROW0("I Can't get a Next Step message from Maestro : %s");
329   }
330
331   /*  Wait for sensors startup */
332   gras_os_sleep(1);
333
334   if (step < MATRIX_SIZE){
335           /* a row brodcast */
336           if(myrow==step){
337                   for (l=1;l < MATRIX_SIZE ;l++){
338                           gras_msg_send(socket_row[l], gras_msgtype_by_name("data"), &mydataB);
339                           bB=mydataB;
340                   }
341           }
342           else
343           {
344                   TRY {
345                           gras_msg_wait(600,gras_msgtype_by_name("data"),
346                                         &from,&bB);
347                   } CATCH(e) {
348                           RETHROW0("I Can't get a data message from row : %s");
349                   }
350           }
351           /* a column brodcast */       
352           if(mycol==step){
353                   for (l=1;l < MATRIX_SIZE ;l++){
354                           gras_msg_send(socket_column[l],gras_msgtype_by_name("data"), &mydataA);
355                           bA=mydataA;
356                   }
357           }
358           else
359           {
360                   TRY {
361                           gras_msg_wait(600,gras_msgtype_by_name("data"),
362                                         &from,&bA);
363                   } CATCH(e) {
364                           RETHROW0("I Can't get a data message from column : %s");
365                   }
366           }
367           bC+=bA*bB;
368           }
369           /* send a ack msg to Maestro */
370         
371           gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step);
372         
373           INFO1("Send ack to maestro for to end %d th step",step);
374         
375           if(step==MATRIX_SIZE-1) break;
376
377   } while (step < MATRIX_SIZE);
378     /*  wait Message from maestro to send the result */
379  
380         result.value=bC;
381         result.i=myrow;
382         result.j=mycol;
383  
384           TRY {
385                   gras_msg_wait(600,gras_msgtype_by_name("ask_result"),
386                                 &from,&result_ack);
387           } CATCH(e) {
388                   RETHROW0("I Can't get a data message from row : %s");
389           }
390           /* 5. send Result to Maestro */
391           TRY {
392                   gras_msg_send(from, gras_msgtype_by_name("result"),&result);
393           } CATCH(e) {
394                  // gras_socket_close(from);
395                   RETHROW0("Failed to send PING to server: %s");
396           }
397           INFO3(">>>>>>>> Result: %d sent to %s:%d <<<<<<<<",
398                 bC,
399                 gras_socket_peer_name(from),gras_socket_peer_port(from));
400     /*  Free the allocated resources, and shut GRAS down */
401           gras_socket_close(from);
402           gras_exit();
403           INFO0("Done.");
404           return 0;
405 } /* end_of_sensor */