Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dc7da5bd152ff9bf722036d8c20831bcd55f7b1e
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006 Ahmed Harbaoui.                                       */
5 /* Copyright (c) 2006 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras.h"
12 #include "xbt/matrix.h"
13 #include "amok/hostmanagement.h"
14
15 #define PROC_MATRIX_SIZE 3
16 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
17 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
18
19 #define DATA_MATRIX_SIZE 9
20 const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
21
22 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
23
24 /* struct for recovering results */
25 GRAS_DEFINE_TYPE(s_result,struct s_result {
26   int linepos;
27   int rowpos;
28   xbt_matrix_t C GRAS_ANNOTE(subtype,double);
29 });
30 typedef struct s_result result_t;
31
32 /* struct to send initial data to slave */
33 GRAS_DEFINE_TYPE(s_pmm_assignment,struct s_pmm_assignment {
34   int linepos;
35   int rowpos;
36   xbt_host_t line[NEIGHBOR_COUNT];
37   xbt_host_t row[NEIGHBOR_COUNT];
38   xbt_matrix_t A GRAS_ANNOTE(subtype,double);
39   xbt_matrix_t B GRAS_ANNOTE(subtype,double);
40 });
41 typedef struct s_pmm_assignment s_pmm_assignment_t;
42
43 /* register messages which may be sent (common to client and server) */
44 static void register_messages(void) {
45   gras_datadesc_type_t result_type;
46   gras_datadesc_type_t pmm_assignment_type;
47
48   gras_datadesc_set_const("NEIGHBOR_COUNT",NEIGHBOR_COUNT);
49   result_type=gras_datadesc_by_symbol(s_result);
50   pmm_assignment_type=gras_datadesc_by_symbol(s_pmm_assignment);
51         
52   /* receive a final result from slave */
53   gras_msgtype_declare("result", result_type);
54
55   /* send from master to slave to assign a position and some data */
56   gras_msgtype_declare("pmm_slave", pmm_assignment_type);
57
58   /* send data between slaves */
59   gras_msgtype_declare("dataA", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
60   gras_msgtype_declare("dataB", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
61 }
62
63 /* Function prototypes */
64 int slave (int argc,char *argv[]);
65 int master (int argc,char *argv[]);
66
67
68 /* **********************************************************************
69  * master code
70  * **********************************************************************/
71
72 /* Global private data */
73 typedef struct {
74   int nbr_row,nbr_line;
75   int remaining_step;
76   int remaining_ack;
77 } master_data_t;
78
79
80 int master (int argc,char *argv[]) {
81
82   int i;
83
84   xbt_matrix_t A,B,C;
85   result_t result;
86
87   gras_socket_t from;
88
89   xbt_dynar_t hosts; /* group of slaves */
90   xbt_host_t grid[SLAVE_COUNT]; /* The slaves as an array */
91   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
92
93   /* Init the GRAS's infrastructure */
94   gras_init(&argc, argv);
95   amok_hm_init();
96   register_messages();
97       
98   /* Initialize data matrices */
99   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
100   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
101   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
102         
103   /* Create the connexions */
104   gras_socket_server(atoi(argv[1]));
105   hosts=amok_hm_group_new("pmm");
106   INFO0("Wait for peers for 10 sec");
107   gras_msg_handleall(10); /* friends, we're ready. Come and play */
108   INFO1("Got %ld pals",xbt_dynar_length(hosts));
109
110   for (i=0;
111        i<xbt_dynar_length(hosts) && i<SLAVE_COUNT;
112        i++) {
113
114     xbt_dynar_get_cpy(hosts,i,&grid[i]);
115     socket[i]=gras_socket_client(grid[i]->name,grid[i]->port);
116     INFO2("Connected to %s:%d.",grid[i]->name,grid[i]->port);
117   }
118   xbt_assert2(i==SLAVE_COUNT,
119               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
120               i,SLAVE_COUNT);
121
122   /* Kill surnumerous slaves */
123   for (i=SLAVE_COUNT; i<xbt_dynar_length(hosts); ) {
124     xbt_host_t h;
125
126     xbt_dynar_get_cpy(hosts,i,&h);
127     amok_hm_kill_hp(h->name,h->port);
128     free(h);
129   }
130
131
132   /* Assign job to slaves */
133   int row=0, line=0;
134   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
135   for(i=0 ; i<SLAVE_COUNT; i++){
136     s_pmm_assignment_t assignment;
137     int j,k;
138
139     assignment.linepos=line; // assigned line
140     assignment.rowpos=row;   // assigned row
141
142     /* Neiborhood */
143     for (j=0,k=0; j<PROC_MATRIX_SIZE; j++) {
144       if (i != j*PROC_MATRIX_SIZE+(row)) {          
145          assignment.row[k] = grid[ j*PROC_MATRIX_SIZE+(row) ] ;
146          k++;
147       }
148     }
149     for (j=0,k=0; j<PROC_MATRIX_SIZE; j++) {
150       if (i != (line)*PROC_MATRIX_SIZE+j) {         
151          assignment.line[k] =  grid[ (line)*PROC_MATRIX_SIZE+j ] ;
152          k++;
153       }
154     }
155
156     assignment.A=xbt_matrix_new_sub(A,
157                                     submatrix_size,submatrix_size,
158                                     submatrix_size*line,submatrix_size*row,
159                                     NULL);
160     assignment.B=xbt_matrix_new_sub(B,
161                                     submatrix_size,submatrix_size,
162                                     submatrix_size*line,submatrix_size*row,
163                                     NULL);
164     row++;
165     if (row >= PROC_MATRIX_SIZE) {
166       row=0;
167       line++;
168     }
169                 
170     gras_msg_send(socket[i],gras_msgtype_by_name("pmm_slave"),&assignment);
171     xbt_matrix_free(assignment.A);
172     xbt_matrix_free(assignment.B);
173   }
174
175   /* (have a rest while the slave perform the multiplication) */
176
177   /* Retrieve the results */
178   for( i=0;i< SLAVE_COUNT;i++){
179     gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result);
180     VERB2("%d slaves are done already. Waiting for %d",i+1, SLAVE_COUNT);
181     xbt_matrix_copy_values(C,result.C,   submatrix_size,submatrix_size,
182                            submatrix_size*result.linepos,
183                            submatrix_size*result.rowpos,
184                            0,0,NULL);
185     xbt_matrix_free(result.C);
186   }
187   /*    end of gather   */
188
189   if (DATA_MATRIX_SIZE < 30) {
190      INFO0 ("The Result of Multiplication is :");
191      xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double);
192   } else {
193      INFO1("Matrix size too big (%d>30) to be displayed here",DATA_MATRIX_SIZE);
194   }
195
196   amok_hm_group_shutdown ("pmm");   /* Ok, we're out of here */
197
198   for(i=0; i<SLAVE_COUNT; i++) {
199      gras_socket_close(socket[i]);
200   }
201    
202   xbt_matrix_free(A);
203   xbt_matrix_free(B);
204   xbt_matrix_free(C);
205   gras_exit();
206   return 0;
207 } /* end_of_master */
208
209 /* **********************************************************************
210  * slave code
211  * **********************************************************************/
212
213 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) {
214   /* Recover my initialized Data and My Position*/
215   s_pmm_assignment_t assignment = *(s_pmm_assignment_t*)payload;
216   gras_socket_t master = gras_msg_cb_ctx_from(ctx);
217
218   xbt_ex_t e; 
219
220   int step,l;
221   xbt_matrix_t bA=xbt_matrix_new(submatrix_size,submatrix_size,
222                                  sizeof(double),NULL);
223   xbt_matrix_t bB=xbt_matrix_new(submatrix_size,submatrix_size,
224                                  sizeof(double),NULL);
225
226   int myline,myrow;
227   xbt_matrix_t mydataA,mydataB;
228   xbt_matrix_t bC=xbt_matrix_double_new_zeros(submatrix_size,submatrix_size);
229   
230   result_t result;
231  
232   gras_socket_t from;  /* to exchange data with my neighbor */
233
234   /* sockets for brodcast to other slave */
235   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
236   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
237   memset(socket_line,0,sizeof(socket_line));
238   memset(socket_row,0,sizeof(socket_row));
239    
240   int i;
241
242   gras_os_sleep(1); /* wait for my pals */
243
244   myline  = assignment.linepos;
245   myrow   = assignment.rowpos;
246   mydataA = assignment.A;
247   mydataB = assignment.B;
248
249   INFO2("Receive my pos (%d,%d) and assignment",myline,myrow);
250
251   /* Get my neighborhood from the assignment message (skipping myself) */
252   for (i=0 ; i<PROC_MATRIX_SIZE-1 ; i++){
253     socket_line[i]=gras_socket_client(assignment.line[i]->name,
254                                       assignment.line[i]->port);
255     xbt_host_free(assignment.line[i]);
256   }
257   for (i=0 ; i<PROC_MATRIX_SIZE-1 ; i++){
258     socket_row[i]=gras_socket_client(assignment.row[i]->name,
259                                      assignment.row[i]->port);
260     xbt_host_free(assignment.row[i]);    
261   }
262
263   for (step=0; step<PROC_MATRIX_SIZE;step++) {
264         
265     /* a line brodcast */
266     if(myline==step){
267        INFO3("LINE: step(%d) = Myline(%d). Broadcast my data (myport=%d).",
268              step,myline,gras_os_myport());
269        for (l=0;l < PROC_MATRIX_SIZE-1 ;l++) {
270           INFO2("LINE:   Send to %s:%d",
271                 gras_socket_peer_name(socket_row[l]),
272                 gras_socket_peer_port(socket_row[l]));
273          gras_msg_send(socket_row[l], 
274                        gras_msgtype_by_name("dataB"), 
275                        &mydataB);
276        }
277        
278         
279        xbt_matrix_free(bB);
280        bB = xbt_matrix_new_sub(mydataB,
281                                submatrix_size,submatrix_size,
282                                0,0,NULL);       
283     } else {
284       TRY {
285         xbt_matrix_free(bB);
286         gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
287       } CATCH(e) {
288         RETHROW0("Can't get a data message from line : %s");
289       }
290       INFO4("LINE: step(%d) <> Myline(%d). Receive data from %s:%d",step,myline,
291             gras_socket_peer_name(from), gras_socket_peer_port(from));
292     }
293
294     /* a row brodcast */
295     if (myrow==step) { 
296        INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow);
297        for (l=1;l < PROC_MATRIX_SIZE ; l++) {
298           INFO2("ROW:   Send to %s:%d",
299                 gras_socket_peer_name(socket_line[l-1]),
300                 gras_socket_peer_port(socket_line[l-1]));
301           gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
302        }
303        xbt_matrix_free(bA);
304        bA = xbt_matrix_new_sub(mydataA,
305                                submatrix_size,submatrix_size,
306                                0,0,NULL);
307     } else {
308       TRY {
309         xbt_matrix_free(bA);
310         gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
311       } CATCH(e) {
312         RETHROW0("Can't get a data message from row : %s");
313       }
314       INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s",step,myrow,
315             gras_socket_peer_name(from));
316     }
317     xbt_matrix_double_addmult(bA,bB,bC);
318
319   };
320  
321   /* send Result to master */  
322   result.C=bC;
323   result.linepos=myline;
324   result.rowpos=myrow;
325
326   TRY {
327     gras_msg_send(master, gras_msgtype_by_name("result"),&result);
328   } CATCH(e) {
329     RETHROW0("Failed to send answer to server: %s");
330   }
331   INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
332         gras_socket_peer_name(master),gras_socket_peer_port(master));
333   /*  Free the allocated resources, and shut GRAS down */
334
335   xbt_matrix_free(bA);
336   xbt_matrix_free(bB);
337   xbt_matrix_free(bC);
338
339   xbt_matrix_free(mydataA);
340   xbt_matrix_free(mydataB);
341   gras_socket_close(master);
342   gras_socket_close(from);
343   /* FIXME: some are said to be unknown 
344   for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
345      if (socket_line[l])
346        gras_socket_close(socket_line[l]);
347      if (socket_row[l])
348        gras_socket_close(socket_row[l]); 
349   }*/
350
351   return 1;
352 }
353
354 int slave(int argc,char *argv[]) {
355   gras_socket_t mysock;
356   gras_socket_t master;
357
358   /* Init the GRAS's infrastructure */
359   gras_init(&argc, argv);
360   amok_hm_init();
361
362   /*  Register the known messages and my callback */
363   register_messages();
364   gras_cb_register(gras_msgtype_by_name("pmm_slave"),pmm_worker_cb);
365
366   /* Create the connexions */
367   mysock = gras_socket_server_range(3000,9999,0,0);
368   INFO1("Sensor starting (on port %d)",gras_os_myport());
369   gras_os_sleep(2); /* let the master get ready */
370   master = gras_socket_client_from_string(argv[1]);
371                                 
372   /* Join and run the group */
373   amok_hm_group_join(master,"pmm");
374   amok_hm_mainloop(600);
375
376   /* housekeeping */
377   gras_socket_close(mysock);
378   //  gras_socket_close(master); Unknown
379   gras_exit();
380   return 0;
381 } /* end_of_slave */