Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Cosmetics: rename the initial message to 'assignment' since it's kinda what it is...
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved.                 */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #define PROC_MATRIX_SIZE 3
11 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
12
13 #define DATA_MATRIX_SIZE 3
14
15 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
16
17 GRAS_DEFINE_TYPE(s_matrix,struct s_matrix {
18   int lines;
19   int rows;
20   double *data GRAS_ANNOTE(size, lines*rows);
21 };)
22 typedef struct s_matrix matrix_t;
23
24 /* struct for recovering results */
25 GRAS_DEFINE_TYPE(s_result,struct s_result {
26   int i;
27   int j;
28   double value;
29 });
30 typedef struct s_result result_t;
31
32 /* struct to send initial data to slave */
33 GRAS_DEFINE_TYPE(s_assignment,struct s_assignment {
34   int linepos;
35   int rowpos;
36   xbt_host_t line[PROC_MATRIX_SIZE];
37   xbt_host_t row[PROC_MATRIX_SIZE];
38   double a;
39   double b;
40 });
41 typedef struct s_assignment assignment_t;
42
43 /* register messages which may be sent (common to client and server) */
44 static void register_messages(void) {
45   gras_datadesc_type_t result_type;
46   gras_datadesc_type_t assignment_type;
47   gras_datadesc_set_const("PROC_MATRIX_SIZE",PROC_MATRIX_SIZE);
48   result_type=gras_datadesc_by_symbol(s_result);
49   assignment_type=gras_datadesc_by_symbol(s_assignment);
50         
51   /* receive a final result from slave */
52   gras_msgtype_declare("result", result_type);
53
54   /* send from master to slave to assign a position and some data */
55   gras_msgtype_declare("assignment", assignment_type);
56
57   /* send from master to slave to ask a final result */
58   gras_msgtype_declare("ask_result", gras_datadesc_by_name("int")); 
59
60   /* send from master to slave to indicate the begining of step */
61   gras_msgtype_declare("step", gras_datadesc_by_name("int"));
62   /* send from slave to master to indicate the end of the current step */
63   gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));
64
65   /* send data between slave */
66   gras_msgtype_declare("dataA", gras_datadesc_by_name("double"));
67   /* send data between slave */
68   gras_msgtype_declare("dataB", gras_datadesc_by_name("double"));
69 }
70
71 /* Function prototypes */
72 int slave (int argc,char *argv[]);
73 int master (int argc,char *argv[]);
74
75
76 /* **********************************************************************
77  * master code
78  * **********************************************************************/
79
80 /* Global private data */
81 typedef struct {
82   int nbr_row,nbr_line;
83   int remaining_step;
84   int remaining_ack;
85 } master_data_t;
86
87
88 /***  Function initilaze matrixs ***/
89
90 static void initmatrix(matrix_t *X){
91   int i;
92
93   for(i=0 ; i<(X->lines)*(X->rows); i++)
94     X->data[i]=1.0;//*rand()/(RAND_MAX+1.0);
95 } /* end_of_initmatrixs */
96
97 /***  Function Scatter Sequentiel ***/
98
99 static void scatter(){
100
101 }/* end_of_Scatter */
102
103 /***  Function: Scatter // ***/
104
105 static void scatter_parl(){
106
107 }/* end_of_Scatter // */
108
109 /***  Function: multiplication ***/
110
111 static void multiplication(){
112
113 }/* end_of_multiplication */
114
115 /***  Function: gather ***/
116
117 static void gather(){
118
119 }/* end_of_gather */
120
121 /***  Function: Display Matrix ***/
122
123 static void display(matrix_t X){
124         
125   int i,j,t=0;
126
127   printf("      ");
128   for(j=0;j<X.rows;j++)
129     printf("%.3d ",j);
130   printf("\n");
131   printf("    __");
132   for(j=0;j<X.rows;j++)
133     printf("____");
134   printf("_\n");
135
136   for(i=0;i<X.lines;i++){
137     printf("%.3d | ",i);
138     for(j=0;j<X.rows;j++)
139       printf("%.3g ",X.data[t++]);
140     printf("|\n");
141   }
142   printf("    --");
143   for(j=0;j<X.rows;j++)
144     printf("----");
145   printf("-\n");
146
147 }/* end_of_display */
148
149 int master (int argc,char *argv[]) {
150
151   xbt_ex_t e;
152
153   int i,port,ask_result,step;
154
155   matrix_t A,B,C;
156   result_t result;
157
158   gras_socket_t from;
159
160   /*  Init the GRAS's infrastructure */
161   gras_init(&argc, argv);
162
163   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
164   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
165
166   /*  Initialize Matrixs */
167
168   A.lines=A.rows=DATA_MATRIX_SIZE;
169   B.lines=B.rows=DATA_MATRIX_SIZE;
170   C.lines=C.rows=DATA_MATRIX_SIZE;
171         
172   A.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
173   B.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
174   C.data=xbt_malloc0(sizeof(double)*DATA_MATRIX_SIZE*DATA_MATRIX_SIZE);
175         
176   initmatrix(&A);
177   initmatrix(&B);
178         
179   /*  Get arguments and create sockets */
180   port=atoi(argv[1]);
181   //scatter();
182   //scatter_parl();
183   //multiplication();
184   //gather();
185   //display(A);
186   /************************* Init Data Send *********************************/
187   int step_ack;
188   gras_os_sleep(60);      // MODIFIER LES TEMPS D'ATTENTE 60 c trop normalement
189
190   for( i=1;i< argc;i++){
191     grid[i-1]=xbt_host_from_string(argv[i]);
192     socket[i-1]=gras_socket_client(grid[i-1]->name,grid[i-1]->port);
193       
194     INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port);
195   }
196
197   int row=1, line=1;
198   for(i=0 ; i<SLAVE_COUNT; i++){
199     assignment_t assignment;
200     int j;
201
202     assignment.linepos=line;  // My line
203     assignment.rowpos=row;  // My row
204
205
206     /* Neiborhood */
207     for (j=0; j<PROC_MATRIX_SIZE; j++) {
208       assignment.row[j] = grid[ j*PROC_MATRIX_SIZE+(row-1) ] ;
209       assignment.line[j] =  grid[ (line-1)*PROC_MATRIX_SIZE+j ] ;
210     }
211
212     row++;
213     if (row > PROC_MATRIX_SIZE) {
214       row=1;
215       line++;
216     }
217                 
218     assignment.a=A.data[(line-1)*PROC_MATRIX_SIZE+(row-1)];
219     assignment.b=B.data[(line-1)*PROC_MATRIX_SIZE+(row-1)];;
220                 
221     gras_msg_send(socket[i],gras_msgtype_by_name("assignment"),&assignment);
222     //    INFO3("Send assignment to %s : data A= %.3g & data B= %.3g",
223     //    gras_socket_peer_name(socket[i]),mydata.a,mydata.b);
224
225   }
226   // end assignment
227
228   /******************************* multiplication ********************************/
229   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
230         
231   for (step=1; step <= PROC_MATRIX_SIZE; step++){
232     //    gras_os_sleep(50);
233     for (i=0; i< SLAVE_COUNT; i++){
234       TRY {
235         gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step);
236       } CATCH(e) {
237         gras_socket_close(socket[i]);
238         RETHROW0("Unable to send the msg : %s");
239       }
240     }
241     INFO1("send to slave to begin a %d th step",step);
242     /* wait for computing and slave messages exchange */
243     i=0;
244         
245     while  ( i< SLAVE_COUNT){
246       TRY {
247         gras_msg_wait(1300,gras_msgtype_by_name("step_ack"),&from,&step_ack);
248       } CATCH(e) {
249         RETHROW0("I Can't get a Ack step message from slave : %s");
250       }
251       i++;
252       INFO3("Receive Ack step ack from %s (got %d of %d)",
253             gras_socket_peer_name(from),
254             i, SLAVE_COUNT);
255     }
256   }
257   /*********************************  gather ***************************************/
258
259   ask_result=0;
260   for( i=1;i< argc;i++){
261     gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
262     INFO1("Send (Ask Result) message to %s",gras_socket_peer_name(socket[i]));
263   }
264   /* wait for results */
265   for( i=1;i< argc;i++){
266     gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
267     C.data[(result.i-1)*DATA_MATRIX_SIZE+(result.j-1)]=result.value;
268   }
269   /*    end of gather   */
270   INFO0 ("The Result of Multiplication is :");
271   display(C);
272
273   return 0;
274 } /* end_of_master */
275
276 /* **********************************************************************
277  * slave code
278  * **********************************************************************/
279
280 int slave(int argc,char *argv[]) {
281
282   xbt_ex_t e; 
283
284   int step,port,l,result_ack=0; 
285   double bA,bB;
286
287   int myline,myrow;
288   double mydataA,mydataB;
289   double bC=0;
290   
291   result_t result;
292  
293   gras_socket_t from,sock;  /* to receive from server for steps */
294
295   /* sockets for brodcast to other slave */
296   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
297   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
298
299   /* Init the GRAS's infrastructure */
300
301   gras_init(&argc, argv);
302
303   /* Get arguments and create sockets */
304
305   port=atoi(argv[1]);
306   
307   /*  Create my master socket */
308   sock = gras_socket_server(port);
309   int i;
310
311   /*  Register the known messages */
312   register_messages();
313
314   /* Recover my initialized Data and My Position*/
315   assignment_t assignment;
316   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],port);
317   TRY {
318     gras_msg_wait(600,gras_msgtype_by_name("assignment"),&from,&assignment);
319   } CATCH(e) {
320     RETHROW0("Can't get my assignment from master : %s");
321   }
322   myline  = assignment.linepos;
323   myrow   = assignment.rowpos;
324   mydataA = assignment.a;
325   mydataB = assignment.b;
326   INFO4("Receive my pos (%d,%d) and assignment ( A=%.3g | B=%.3g )",
327         myline,myrow,mydataA,mydataB);
328
329   /* Get my neighborhood from the assignment message */
330   int j=0;
331   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
332     if (strcmp(gras_os_myname(),assignment.line[i]->name)) {
333       socket_line[j]=gras_socket_client(assignment.line[i]->name,
334                                         assignment.line[i]->port);
335       j++;
336       //INFO3("Line neighbour %d: %s:%d",j,mydata.line[i]->name,mydata.line[i]->port);
337     }
338     xbt_host_free(assignment.line[i]);
339   }
340   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
341     if (strcmp(gras_os_myname(),assignment.row[i]->name)) {
342       socket_row[j]=gras_socket_client(assignment.row[i]->name,
343                                        assignment.row[i]->port);
344       //INFO3("Row neighbour %d : %s:%d",j,mydata.row[i]->name,mydata.row[i]->port);
345       j++;
346     }
347     xbt_host_free(assignment.row[i]);    
348   }
349
350   step=1;
351   
352   do {  //repeat until compute Cb
353     step=PROC_MATRIX_SIZE+1;  // just intilization for loop
354         
355     TRY {
356       gras_msg_wait(200,gras_msgtype_by_name("step"),&from,&step);
357     } CATCH(e) {
358       RETHROW0("Can't get a Next Step message from master : %s");
359     }
360     INFO1("Receive a step message from master: step = %d ",step);
361
362     if (step < PROC_MATRIX_SIZE ){
363       /* a line brodcast */
364       gras_os_sleep(3);  // IL FAUT EXPRIMER LE TEMPS D'ATTENTE EN FONCTION DE "SLAVE_COUNT"
365       if(myline==step){
366         INFO2("step(%d) = Myline(%d)",step,myline);
367         for (l=1;l < PROC_MATRIX_SIZE ;l++){
368           gras_msg_send(socket_row[l-1], gras_msgtype_by_name("dataB"), &mydataB);
369           bB=mydataB;
370           INFO1("send my data B (%.3g) to my (vertical) neighbors",bB);  
371         }
372       }
373       if(myline != step){ 
374         INFO2("step(%d) <> Myline(%d)",step,myline);
375         TRY {
376           gras_msg_wait(600,gras_msgtype_by_name("dataB"),
377                         &from,&bB);
378         } CATCH(e) {
379           RETHROW0("I Can't get a data message from line : %s");
380         }
381         INFO2("Receive data B (%.3g) from my neighbor: %s",bB,gras_socket_peer_name(from));
382       }
383       /* a row brodcast */
384       if(myrow==step){
385         for (l=1;l < PROC_MATRIX_SIZE ;l++){
386           gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
387           bA=mydataA;
388           INFO1("send my data A (%.3g) to my (horizontal) neighbors",bA);
389         }
390       }
391
392       if(myrow != step){
393         TRY {
394           gras_msg_wait(1200,gras_msgtype_by_name("dataA"),
395                         &from,&bA);
396         } CATCH(e) {
397           RETHROW0("I Can't get a data message from row : %s");
398         }
399         INFO2("Receive data A (%.3g) from my neighbor : %s ",bA,gras_socket_peer_name(from));
400       }
401       bC+=bA*bB;
402       INFO1(">>>>>>>> My BC = %.3g",bC);
403
404       /* send a ack msg to master */
405         
406       gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step);
407         
408       INFO1("Send ack to master for to end %d th step",step);
409     }
410     if(step==PROC_MATRIX_SIZE-1) break;
411         
412   } while (step < PROC_MATRIX_SIZE);
413   /*  wait Message from master to send the result */
414  
415   result.value=bC;
416   result.i=myline;
417   result.j=myrow;
418  
419   TRY {
420     gras_msg_wait(600,gras_msgtype_by_name("ask_result"),
421                   &from,&result_ack);
422   } CATCH(e) {
423     RETHROW0("I Can't get a data message from line : %s");
424   }
425   /* send Result to master */
426   TRY {
427     gras_msg_send(from, gras_msgtype_by_name("result"),&result);
428   } CATCH(e) {
429     // gras_socket_close(from);
430     RETHROW0("Failed to send PING to server: %s");
431   }
432   INFO3(">>>>>>>> Result: %.3f sent to %s:%d <<<<<<<<",
433         bC,
434         gras_socket_peer_name(from),gras_socket_peer_port(from));
435   /*  Free the allocated resources, and shut GRAS down */
436   gras_socket_close(from);
437   gras_exit();
438   INFO0("Done.");
439   return 0;
440 } /* end_of_slave */