Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
*** empty log message ***
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* pmm - paralel matrix multiplication "double diffusion"                       */
2
3 /* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved.                  */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "gras.h"
9 #define MATRIX_SIZE 3
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Messages specific to this example");
12
13 GRAS_DEFINE_TYPE(s_matrix,struct s_matrix {
14         int rows;
15         int cols;
16         double *data GRAS_ANNOTE(size, rows*cols);
17 };)
18 typedef struct s_matrix matrix_t;
19
20 /* struct for recovering results */
21 GRAS_DEFINE_TYPE(s_result,struct s_result {
22         int i;
23         int j;
24         double value;
25 });
26 typedef struct s_result result_t;
27
28 /* struct to send initial data to sensor */
29 GRAS_DEFINE_TYPE(s_init_data,struct s_init_data {
30         int myrow;
31         int mycol;
32         double a;
33         double b;
34 });
35 typedef struct s_init_data init_data_t;
36
37 /* register messages which may be sent (common to client and server) */
38 static void register_messages(void) {
39         gras_datadesc_type_t result_type;
40         gras_datadesc_type_t init_data_type;
41         result_type=gras_datadesc_by_symbol(s_result);
42         init_data_type=gras_datadesc_by_symbol(s_init_data);
43         
44         gras_msgtype_declare("result", result_type);
45         gras_msgtype_declare("init_data", init_data_type);
46
47         gras_msgtype_declare("ask_result", gras_datadesc_by_name("int"));       
48         gras_msgtype_declare("step", gras_datadesc_by_name("int"));
49         gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));
50         gras_msgtype_declare("data", gras_datadesc_by_name("int"));
51 }
52
53 /* Function prototypes */
54 int maestro (int argc,char *argv[]);
55 int sensor (int argc,char *argv[]);
56
57 /* **********************************************************************
58  * Maestro code
59  * **********************************************************************/
60
61 /* Global private data */
62 typedef struct {
63   int nbr_col,nbr_row;
64   int remaining_step;
65   int remaining_ack;
66 } maestro_data_t;
67
68
69 static int maestro_cb_data_handler(gras_msg_cb_ctx_t ctx, void *payload) {
70
71   xbt_ex_t e;
72   /* 1. Get the payload into the msg variable */
73   int msg=*(int*)payload_data;
74
75   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
76
77   /*code of callback */
78    
79   /* 8. Make sure we don't leak sockets */
80   gras_socket_close(expeditor);
81    
82   /* 9. Tell GRAS that we consummed this message */
83   return 1;
84 } /* end_of_maestro_cb_data_handler */
85
86 /***  Function initilaze matrixs ***/
87
88 static void initmatrix(matrix_t *X){
89         int i;
90         for(i=0 ; i<X.rows*X.cols; i++)
91                 X.data[i]=1.0;//1.0*rand()/(RAND_MAX+1.0);
92
93 } /* end_of_initmatrixs */
94
95 /***  Function Scatter Sequentiel ***/
96
97 static void scatter(){
98
99 }/* end_of_Scatter */
100
101 /***  Function: Scatter // ***/
102
103 static void scatter_parl(){
104
105 }/* end_of_Scatter // */
106
107 /***  Function: multiplication ***/
108
109 static void multiplication(){
110         
111   int step,i;
112         
113   for (step=1; step <= MATRIX_SIZE; step++){
114     for (i=0; i< nbr_sensor; i++){
115        TRY {
116                gras_msg_send(proc[(i/3)+1][(i%3)+1], gras_msgtype_by_name("step"), &step);  /* initialize Mycol, MyRow, mydataA,mydataB*/
117   
118                myrow,mycol,mydataA,mydataB
119        } CATCH(e) {
120         gras_socket_close(proc[(i/3)+1][(i%3)+1]);
121         RETHROW0("Unable to send the msg : %s");
122        }
123     }
124     /* wait for computing and sensor messages exchange */
125     TRY {
126             gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
127     } CATCH(e) {
128             RETHROW0("I Can't get a init Data message from Maestro : %s");
129     }
130   }
131
132 }/* end_of_multiplication */
133
134 /***  Function: gather ***/
135
136 static void gather(){
137
138         
139 }/* end_of_gather */
140
141 /***  Function: Display Matrix ***/
142
143 static void display(matrix_t X){
144         
145 int i,j,t=0;
146
147   printf("      ");
148   for(j=0;j<X.cols;j++)
149     printf("%.3d ",j);
150     printf("\n");
151     printf("    __");
152     for(j=0;j<X.cols;j++)
153         printf("____");
154         printf("_\n");
155
156         for(i=0;i<X.rows;i++){
157           printf("%.3d | ",i);
158           for(j=0;j<X.cols;j++)
159             printf("%.3g ",X.data[t++]);
160           printf("|\n");
161         }
162         printf("    --");
163         for(j=0;j<X.cols;j++)
164                 printf("----");
165         printf("-\n");
166
167 }/* end_of_display */
168
169 int maestro (int argc,char *argv[]) {
170
171 xbt_ex_t e;
172 int i,ask_result,step;
173 result_t result;
174 matrix_t A,B,C;
175
176   gras_socket_t socket[MATRIX_SIZE*MATRIX_SIZE]; /* sockets for brodcast to other sensor */
177         
178
179   /*  Initialize Matrixs */
180
181         A.rows=A.cols=MATRIX_SIZE;
182         B.rows=B.cols=MATRIX_SIZE;
183         C.rows=C.cols=MATRIX_SIZE;
184         
185         A.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
186         B.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
187         C.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
188         
189         initmatrix(&A);
190         initmatrix(&B);
191         
192         /*  Init the GRAS's infrastructure */
193         gras_init(&argc, argv);
194         /*  Get arguments and create sockets */
195         port=atoi(argv[1]);
196         //scatter();multiplication();gather();
197         //scatter_parl();
198         /****************************** Init Data Send *********************************/
199         int j=0;
200         init_data_t mydata;
201         for( i=2;i< argc;i+=3){
202                 
203                 TRY {
204                         socket[j]=gras_socket_client(argv[i],port);
205                         
206                 } CATCH(e) {
207                         RETHROW0("Unable to connect to the server: %s");
208                 }
209                 INFO2("Connected to %s:%d.",argv[i],port);
210                 
211                 mydata.myrow=argv[i+1];  // My rank of row
212                 mydata.mycol=argv[i+2];  // My rank of column
213                 mydata.a=A.data[(mydata.myrow-1)*MATRIX_SIZE+(mydata.mycol-1)];
214                 mydata.b=B.data[(mydata.myrow-1)*MATRIX_SIZE+(mydata.mycol-1)];;
215                 
216                 gras_msg_send(socket[j],gras_msgtype_by_name("init_data"),&mydata);
217                 j++;
218         } // end init Data Send
219
220         /******************************* multiplication ********************************/
221
222         for (step=1; step <= MATRIX_SIZE; step++){
223                 for (i=0; i< nbr_sensor; i++){
224                 TRY {
225                 gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step);  /* initialize Mycol, MyRow, mydataA,mydataB*/
226   
227                 myrow,mycol,mydataA,mydataB
228                     } CATCH(e) {
229                 gras_socket_close(socket[i]);
230                 RETHROW0("Unable to send the msg : %s");
231                 }
232         }
233         
234         /* wait for computing and sensor messages exchange */
235         for (i=0; i< nbr_sensor; i++){
236                 TRY {
237                 gras_msg_wait(600,gras_msgtype_by_name(""),&from,&mydata);
238                 } CATCH(e) {
239                 RETHROW0("I Can't get a init Data message from Maestro : %s");
240                 }
241         }
242         }
243         /*********************************  gather ***************************************/
244         
245         int ask_result=0;
246         for( i=1;i< argc;i++){
247                 gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
248         }
249         /* wait for results */
250         for( i=1;i< argc;i++){
251                 gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
252                 C.data[(result.i-1)*MATRIX_SIZE+(result.j-1)]=result.value;
253         }
254         /*    end of gather   */
255         display(C);
256
257 return 0;
258 } /* end_of_maestro */
259
260 /* **********************************************************************
261  * Sensor code
262  * **********************************************************************/
263
264 int sensor(int argc,char *argv[]) {
265
266   xbt_ex_t e; 
267
268   static int bC=0;
269   static int myrow,mycol;
270   static double mydataA,mydataB;
271   int bA,bB;
272   int step,l,result=0;
273
274   gras_socket_t from;  /* to recive from server for steps */
275
276   gras_socket_t socket_row[2],socket_column[2]; /* sockets for brodcast to other sensor */
277
278   /* Init the GRAS's infrastructure */
279
280   gras_init(&argc, argv);
281
282   /* Get arguments and create sockets */
283
284   port=atoi(argv[1]);
285   int i;
286   for (i=1;i<MATRIX_SIZE;i++){
287   socket_row[i]=gras_socket_client(argv[i+1],port);
288   socket_column[i]=gras_socket_client(argv[i+MATRIX_SIZE],port);
289   }
290   INFO2("Launch %s (port=%d)",argv[0],port);
291
292   /*  Create my master socket */
293   sock = gras_socket_server(port);
294
295   /*  Register the known messages */
296   register_messages();
297
298   /* Recover my initialized Data and My Position*/
299   init_data_t mydata;
300
301   TRY {
302           gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
303   } CATCH(e) {
304         RETHROW0("I Can't get a init Data message from Maestro : %s");
305   }
306   myrow=mydata.myrow;
307   mycol=mydata.mycol;
308   mydataA=mydata.a;
309   mydataB=mydata.b;
310
311   INFO4("Recover MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )",
312         myrow,mycol,mydataA,mydataB);
313
314
315   do {  //repeat until compute Cb
316         step=MATRIX_SIZE+1;  // juste intilization for loop
317
318   TRY {
319         gras_msg_wait(600,gras_msgtype_by_name("step"),&from,&step);
320   } CATCH(e) {
321           RETHROW0("I Can't get a Next Step message from Maestro : %s");
322   }
323
324   /*  Wait for sensors startup */
325   gras_os_sleep(1);
326
327   if (step < MATRIX_SIZE){
328           /* a row brodcast */
329           if(myrow==step){
330                   for (l=1;l < MATRIX_SIZE ;l++){
331                           gras_msg_send(socket_row[l], gras_msgtype_by_name("data"), &mydataB);
332                           bB=mydataB;
333                   }
334           }
335           else
336           {
337                   TRY {
338                           gras_msg_wait(600,gras_msgtype_by_name("data"),
339                                         &from,&bB);
340                   } CATCH(e) {
341                           RETHROW0("I Can't get a data message from row : %s");
342                   }
343           }
344           /* a column brodcast */       
345           if(mycol==step){
346                   for (l=1;l < MATRIX_SIZE ;l++){
347                           gras_msg_send(socket_column[l],gras_msgtype_by_name("data"), &mydataA);
348                           bA=mydataA;
349                   }
350           }
351           else
352           {
353                   TRY {
354                           gras_msg_wait(600,gras_msgtype_by_name("data"),
355                                         &from,&bA);
356                   } CATCH(e) {
357                           RETHROW0("I Can't get a data message from column : %s");
358                   }
359           }
360           bC+=bA*bB;
361           }
362           /* send a ack msg to Maestro */
363         
364           gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step);
365         
366           INFO1("Send ack to maestro for to end %d th step",step);
367         
368           if(step==MATRIX_SIZE-1) break;
369
370   } while (step < MATRIX_SIZE);
371     /*  wait Message from maestro to send the result */
372             /*after finished the bC computing */
373           TRY {
374                   gras_msg_wait(600,gras_msgtype_by_name("result"),
375                                 &from,&result);
376           } CATCH(e) {
377                   RETHROW0("I Can't get a data message from row : %s");
378           }
379           /* 5. send Result to the Maestro */
380           TRY {
381                   gras_msg_send(from, gras_msgtype_by_name("result"),&bC);
382           } CATCH(e) {
383                   gras_socket_close(from);
384                   RETHROW0("Failed to send PING to server: %s");
385           }
386           INFO3(">>>>>>>> Result: %d sent to %s:%d <<<<<<<<",
387                 bC,
388                 gras_socket_peer_name(from),gras_socket_peer_port(from));
389     /*  Free the allocated resources, and shut GRAS down */
390           gras_socket_close(from);
391           gras_exit();
392           INFO0("Done.");
393           return 0;
394 } /* end_of_sensor */