Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
efbb6b06045eb2291b8e20c670d17e7c759ac8fe
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006 Ahmed Harbaoui.                                       */
5 /* Copyright (c) 2006 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras.h"
12 #include "xbt/matrix.h"
13 #define PROC_MATRIX_SIZE 3
14 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
15
16 #define DATA_MATRIX_SIZE 600
17 const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
20
21 /* struct for recovering results */
22 GRAS_DEFINE_TYPE(s_result,struct s_result {
23   int linepos;
24   int rowpos;
25   xbt_matrix_t C GRAS_ANNOTE(subtype,double);
26 });
27 typedef struct s_result result_t;
28
29 /* struct to send initial data to slave */
30 GRAS_DEFINE_TYPE(s_assignment,struct s_assignment {
31   int linepos;
32   int rowpos;
33   xbt_host_t line[PROC_MATRIX_SIZE];
34   xbt_host_t row[PROC_MATRIX_SIZE];
35   xbt_matrix_t A GRAS_ANNOTE(subtype,double);
36   xbt_matrix_t B GRAS_ANNOTE(subtype,double);
37 });
38 typedef struct s_assignment s_assignment_t;
39
40 /* register messages which may be sent (common to client and server) */
41 static void register_messages(void) {
42   gras_datadesc_type_t result_type;
43   gras_datadesc_type_t assignment_type;
44
45   gras_datadesc_set_const("PROC_MATRIX_SIZE",PROC_MATRIX_SIZE);
46   result_type=gras_datadesc_by_symbol(s_result);
47   assignment_type=gras_datadesc_by_symbol(s_assignment);
48         
49   /* receive a final result from slave */
50   gras_msgtype_declare("result", result_type);
51
52   /* send from master to slave to assign a position and some data */
53   gras_msgtype_declare("assignment", assignment_type);
54
55   /* send data between slaves */
56   gras_msgtype_declare("dataA", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
57   gras_msgtype_declare("dataB", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
58 }
59
60 /* Function prototypes */
61 int slave (int argc,char *argv[]);
62 int master (int argc,char *argv[]);
63
64
65 /* **********************************************************************
66  * master code
67  * **********************************************************************/
68
69 /* Global private data */
70 typedef struct {
71   int nbr_row,nbr_line;
72   int remaining_step;
73   int remaining_ack;
74 } master_data_t;
75
76
77 /***  Function Scatter Sequentiel ***/
78
79 static void scatter(){
80
81 }/* end_of_Scatter */
82
83 /***  Function: Scatter // ***/
84
85 static void scatter_parl(){
86
87 }/* end_of_Scatter // */
88
89 /***  Function: multiplication ***/
90
91 static void multiplication(){
92
93 }/* end_of_multiplication */
94
95 /***  Function: gather ***/
96
97 static void gather(){
98
99 }/* end_of_gather */
100
101 int master (int argc,char *argv[]) {
102
103   int i,port;
104
105   xbt_matrix_t A,B,C;
106   result_t result;
107
108   gras_socket_t from;
109
110
111   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
112   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
113
114   /*  Init the GRAS's infrastructure */
115   gras_init(&argc, argv);
116   register_messages();
117       
118   /*  Initialize Matrices */
119   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
120   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
121   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
122         
123   /*  Get arguments and create sockets */
124   port=atoi(argv[1]);
125   //scatter();
126   //scatter_parl();
127   //multiplication();
128   //gather();
129   /************************* Init Data Send *********************************/
130   gras_os_sleep(2);
131
132   for( i=1;i<argc && i<=SLAVE_COUNT;i++){
133     grid[i-1]=xbt_host_from_string(argv[i]);
134     socket[i-1]=gras_socket_client(grid[i-1]->name,grid[i-1]->port);
135       
136     INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port);
137   }
138   xbt_assert2(i-1==SLAVE_COUNT,
139               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
140               i-1,SLAVE_COUNT);
141   /* FIXME: let the surnumerous slave die properly */
142  
143   int row=0, line=0;
144   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
145   for(i=0 ; i<SLAVE_COUNT; i++){
146     s_assignment_t assignment;
147     int j;
148
149     assignment.linepos=line; // assigned line
150     assignment.rowpos=row;   // assigned row
151
152     /* Neiborhood */
153     for (j=0; j<PROC_MATRIX_SIZE; j++) {
154       assignment.row[j] = grid[ j*PROC_MATRIX_SIZE+(row) ] ;
155       assignment.line[j] =  grid[ (line)*PROC_MATRIX_SIZE+j ] ;
156     }
157
158     assignment.A=xbt_matrix_new_sub(A,
159                                     submatrix_size,submatrix_size,
160                                     submatrix_size*line,submatrix_size*row,
161                                     NULL);
162     assignment.B=xbt_matrix_new_sub(B,
163                                     submatrix_size,submatrix_size,
164                                     submatrix_size*line,submatrix_size*row,
165                                     NULL);
166     row++;
167     if (row >= PROC_MATRIX_SIZE) {
168       row=0;
169       line++;
170     }
171                 
172     gras_msg_send(socket[i],gras_msgtype_by_name("assignment"),&assignment);
173     xbt_matrix_free(assignment.A);
174     xbt_matrix_free(assignment.B);
175   }
176   // end assignment
177
178   /******************************* multiplication ********************************/
179   /* wait for results */
180   for( i=0;i< SLAVE_COUNT;i++){
181     gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result);
182     xbt_matrix_copy_values(C,result.C,   submatrix_size,submatrix_size,
183                            submatrix_size*result.linepos,
184                            submatrix_size*result.rowpos,
185                            0,0,NULL);
186     xbt_matrix_free(result.C);
187   }
188   /*    end of gather   */
189   if (DATA_MATRIX_SIZE < 50) {
190      INFO0 ("The Result of Multiplication is :");
191      xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double);
192   } else {
193      INFO1("Matrix size too big (%d>50) to be displayed here",DATA_MATRIX_SIZE);
194   }
195
196   for(i=0; i<SLAVE_COUNT; i++) {
197      gras_socket_close(socket[i]);
198      xbt_host_free(grid[i]);
199   }
200    
201   xbt_matrix_free(A);
202   xbt_matrix_free(B);
203   xbt_matrix_free(C);
204   gras_exit();
205   return 0;
206 } /* end_of_master */
207
208 /* **********************************************************************
209  * slave code
210  * **********************************************************************/
211
212 int slave(int argc,char *argv[]) {
213
214   xbt_ex_t e; 
215
216   int step,l;
217   xbt_matrix_t bA=xbt_matrix_new(submatrix_size,submatrix_size,
218                                  sizeof(double),NULL);
219   xbt_matrix_t bB=xbt_matrix_new(submatrix_size,submatrix_size,
220                                  sizeof(double),NULL);
221
222   int myline,myrow;
223   xbt_matrix_t mydataA,mydataB;
224   xbt_matrix_t bC=xbt_matrix_double_new_zeros(submatrix_size,submatrix_size);
225   
226   result_t result;
227  
228   gras_socket_t from,sock;  /* to exchange data with my neighbor */
229   gras_socket_t master;     /* for the barrier */
230
231   /* sockets for brodcast to other slave */
232   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
233   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
234   memset(socket_line,0,sizeof(socket_line));
235   memset(socket_row,0,sizeof(socket_row));
236    
237   /* Init the GRAS's infrastructure */
238   gras_init(&argc, argv);
239
240   /*  Create my master socket */
241   sock = gras_socket_server(atoi(argv[1]));
242   int i;
243
244   /*  Register the known messages */
245   register_messages();
246
247   /* Recover my initialized Data and My Position*/
248   s_assignment_t assignment;
249   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],gras_os_myport());
250   TRY {
251     gras_msg_wait(600,gras_msgtype_by_name("assignment"),&master,&assignment);
252   } CATCH(e) {
253     RETHROW0("Can't get my assignment from master : %s");
254   }
255   myline  = assignment.linepos;
256   myrow   = assignment.rowpos;
257   mydataA = assignment.A;
258   mydataB = assignment.B;
259
260   INFO2("Receive my pos (%d,%d) and assignment",myline,myrow);
261
262   /* Get my neighborhood from the assignment message (skipping myself) */
263   int j=0;
264   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
265     if (strcmp(gras_os_myname(),assignment.line[i]->name)) {
266       socket_line[j]=gras_socket_client(assignment.line[i]->name,
267                                         assignment.line[i]->port);
268       j++;
269     }
270     xbt_host_free(assignment.line[i]);
271   }
272   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
273     if (strcmp(gras_os_myname(),assignment.row[i]->name)) {
274       socket_row[j]=gras_socket_client(assignment.row[i]->name,
275                                        assignment.row[i]->port);
276       j++;
277     }
278     xbt_host_free(assignment.row[i]);    
279   }
280
281   for (step=0; step<PROC_MATRIX_SIZE;step++) {
282         
283     /* a line brodcast */
284     if(myline==step){
285        INFO2("LINE: step(%d) = Myline(%d). Broadcast my data.",step,myline);
286        for (l=0;l < PROC_MATRIX_SIZE-1 ;l++)
287          gras_msg_send(socket_row[l], 
288                        gras_msgtype_by_name("dataB"), 
289                        &mydataB);
290         
291        xbt_matrix_free(bB);
292        bB = xbt_matrix_new_sub(mydataB,
293                                submatrix_size,submatrix_size,
294                                0,0,NULL);       
295     } else {
296       TRY {
297         xbt_matrix_free(bB);
298         gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
299       } CATCH(e) {
300         RETHROW0("Can't get a data message from line : %s");
301       }
302       INFO3("LINE: step(%d) <> Myline(%d). Receive data from %s",step,myline,
303             gras_socket_peer_name(from));
304     }
305
306     /* a row brodcast */
307     if (myrow==step) { 
308        INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow);
309        for (l=1;l < PROC_MATRIX_SIZE ;l++)
310          gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
311        xbt_matrix_free(bA);
312        bA = xbt_matrix_new_sub(mydataA,
313                                submatrix_size,submatrix_size,
314                                0,0,NULL);
315     } else {
316       TRY {
317         xbt_matrix_free(bA);
318         gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
319       } CATCH(e) {
320         RETHROW0("Can't get a data message from row : %s");
321       }
322       INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s",step,myrow,
323             gras_socket_peer_name(from));
324     }
325     xbt_matrix_double_addmult(bA,bB,bC);
326
327   };
328  
329   /* send Result to master */  
330   result.C=bC;
331   result.linepos=myline;
332   result.rowpos=myrow;
333
334   TRY {
335     gras_msg_send(master, gras_msgtype_by_name("result"),&result);
336   } CATCH(e) {
337     RETHROW0("Failed to send PING to server: %s");
338   }
339   INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
340         gras_socket_peer_name(master),gras_socket_peer_port(master));
341   /*  Free the allocated resources, and shut GRAS down */
342
343   xbt_matrix_free(bA);
344   xbt_matrix_free(bB);
345   xbt_matrix_free(bC);
346
347   xbt_matrix_free(mydataA);
348   xbt_matrix_free(mydataB);
349   gras_socket_close(sock);
350   gras_socket_close(master);
351   gras_socket_close(from);
352   /* FIXME: Some of these sockets are "not known", no idea why *
353   for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
354      if (socket_line[l])
355        gras_socket_close(socket_line[l]);
356      if (socket_row[l])
357        gras_socket_close(socket_row[l]); 
358   }*/
359    
360
361   gras_exit();
362   INFO0("Done.");
363   return 0;
364 } /* end_of_slave */