Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cleanups and fixups. Seems to work now (but computation time are not reported into...
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006 Ahmed Harbaoui.                                       */
5 /* Copyright (c) 2006 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras.h"
12 #include "xbt/matrix.h"
13 #define PROC_MATRIX_SIZE 6
14 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
15
16 #define DATA_MATRIX_SIZE 600
17 const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
20
21 /* struct for recovering results */
22 GRAS_DEFINE_TYPE(s_result,struct s_result {
23   int linepos;
24   int rowpos;
25   xbt_matrix_t C GRAS_ANNOTE(subtype,double);
26 });
27 typedef struct s_result result_t;
28
29 /* struct to send initial data to slave */
30 GRAS_DEFINE_TYPE(s_assignment,struct s_assignment {
31   int linepos;
32   int rowpos;
33   xbt_host_t line[PROC_MATRIX_SIZE];
34   xbt_host_t row[PROC_MATRIX_SIZE];
35   xbt_matrix_t A GRAS_ANNOTE(subtype,double);
36   xbt_matrix_t B GRAS_ANNOTE(subtype,double);
37 });
38 typedef struct s_assignment s_assignment_t;
39
40 /* register messages which may be sent (common to client and server) */
41 static void register_messages(void) {
42   gras_datadesc_type_t result_type;
43   gras_datadesc_type_t assignment_type;
44
45   gras_datadesc_set_const("PROC_MATRIX_SIZE",PROC_MATRIX_SIZE);
46   result_type=gras_datadesc_by_symbol(s_result);
47   assignment_type=gras_datadesc_by_symbol(s_assignment);
48         
49   /* receive a final result from slave */
50   gras_msgtype_declare("result", result_type);
51
52   /* send from master to slave to assign a position and some data */
53   gras_msgtype_declare("assignment", assignment_type);
54
55   /* send data between slaves */
56   gras_msgtype_declare("dataA", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
57   gras_msgtype_declare("dataB", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
58 }
59
60 /* Function prototypes */
61 int slave (int argc,char *argv[]);
62 int master (int argc,char *argv[]);
63
64
65 /* **********************************************************************
66  * master code
67  * **********************************************************************/
68
69 /* Global private data */
70 typedef struct {
71   int nbr_row,nbr_line;
72   int remaining_step;
73   int remaining_ack;
74 } master_data_t;
75
76
77 /***  Function Scatter Sequentiel ***/
78
79 static void scatter(){
80
81 }/* end_of_Scatter */
82
83 /***  Function: Scatter // ***/
84
85 static void scatter_parl(){
86
87 }/* end_of_Scatter // */
88
89 /***  Function: multiplication ***/
90
91 static void multiplication(){
92
93 }/* end_of_multiplication */
94
95 /***  Function: gather ***/
96
97 static void gather(){
98
99 }/* end_of_gather */
100
101 int master (int argc,char *argv[]) {
102
103   int i,port;
104
105   xbt_matrix_t A,B,C;
106   result_t result;
107
108   gras_socket_t from;
109
110   /*  Init the GRAS's infrastructure */
111   gras_init(&argc, argv);
112
113   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
114   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
115
116   /*  Initialize Matrices */
117
118   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
119   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
120   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
121         
122   /*  Get arguments and create sockets */
123   port=atoi(argv[1]);
124   //scatter();
125   //scatter_parl();
126   //multiplication();
127   //gather();
128   /************************* Init Data Send *********************************/
129   gras_os_sleep(5);
130
131   for( i=1;i<argc && i<=SLAVE_COUNT;i++){
132     grid[i-1]=xbt_host_from_string(argv[i]);
133     socket[i-1]=gras_socket_client(grid[i-1]->name,grid[i-1]->port);
134       
135     INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port);
136   }
137   xbt_assert2(i-1==SLAVE_COUNT,
138               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
139               i-1,SLAVE_COUNT);
140   /* FIXME: let the surnumerous slave die properly */
141  
142   int row=0, line=0;
143   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
144   for(i=0 ; i<SLAVE_COUNT; i++){
145     s_assignment_t assignment;
146     int j;
147
148     assignment.linepos=line; // assigned line
149     assignment.rowpos=row;   // assigned row
150
151     /* Neiborhood */
152     for (j=0; j<PROC_MATRIX_SIZE; j++) {
153       assignment.row[j] = grid[ j*PROC_MATRIX_SIZE+(row) ] ;
154       assignment.line[j] =  grid[ (line)*PROC_MATRIX_SIZE+j ] ;
155     }
156
157     assignment.A=xbt_matrix_new_sub(A,
158                                     submatrix_size,submatrix_size,
159                                     submatrix_size*line,submatrix_size*row,
160                                     NULL);
161     assignment.B=xbt_matrix_new_sub(B,
162                                     submatrix_size,submatrix_size,
163                                     submatrix_size*line,submatrix_size*row,
164                                     NULL);
165     row++;
166     if (row >= PROC_MATRIX_SIZE) {
167       row=0;
168       line++;
169     }
170                 
171     gras_msg_send(socket[i],gras_msgtype_by_name("assignment"),&assignment);
172     xbt_matrix_free(assignment.A);
173     xbt_matrix_free(assignment.B);
174   }
175   // end assignment
176
177   /******************************* multiplication ********************************/
178   /* wait for results */
179   for( i=0;i< SLAVE_COUNT;i++){
180     gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result);
181     xbt_matrix_copy_values(C,result.C,   submatrix_size,submatrix_size,
182                            submatrix_size*result.linepos,
183                            submatrix_size*result.rowpos,
184                            0,0,NULL);
185     xbt_matrix_free(result.C);
186   }
187   /*    end of gather   */
188   INFO0 ("The Result of Multiplication is :");
189   xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double);
190    
191   for(i=0; i<SLAVE_COUNT; i++) {
192      gras_socket_close(socket[i]);
193      xbt_host_free(grid[i]);
194   }
195    
196   xbt_matrix_free(A);
197   xbt_matrix_free(B);
198   xbt_matrix_free(C);
199   gras_exit();
200   return 0;
201 } /* end_of_master */
202
203 /* **********************************************************************
204  * slave code
205  * **********************************************************************/
206
207 int slave(int argc,char *argv[]) {
208
209   xbt_ex_t e; 
210
211   int step,l;
212   xbt_matrix_t bA=xbt_matrix_new(submatrix_size,submatrix_size,
213                                  sizeof(double),NULL);
214   xbt_matrix_t bB=xbt_matrix_new(submatrix_size,submatrix_size,
215                                  sizeof(double),NULL);
216
217   int myline,myrow;
218   xbt_matrix_t mydataA,mydataB;
219   xbt_matrix_t bC=xbt_matrix_double_new_zeros(submatrix_size,submatrix_size);
220   
221   result_t result;
222  
223   gras_socket_t from,sock;  /* to exchange data with my neighbor */
224   gras_socket_t master;     /* for the barrier */
225
226   /* sockets for brodcast to other slave */
227   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
228   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
229   memset(socket_line,0,sizeof(socket_line));
230   memset(socket_row,0,sizeof(socket_row));
231    
232   /* Init the GRAS's infrastructure */
233   gras_init(&argc, argv);
234
235   /*  Create my master socket */
236   sock = gras_socket_server(atoi(argv[1]));
237   int i;
238
239   /*  Register the known messages */
240   register_messages();
241
242   /* Recover my initialized Data and My Position*/
243   s_assignment_t assignment;
244   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],gras_os_myport());
245   TRY {
246     gras_msg_wait(600,gras_msgtype_by_name("assignment"),&master,&assignment);
247   } CATCH(e) {
248     RETHROW0("Can't get my assignment from master : %s");
249   }
250   myline  = assignment.linepos;
251   myrow   = assignment.rowpos;
252   mydataA = assignment.A;
253   mydataB = assignment.B;
254
255   INFO2("Receive my pos (%d,%d) and assignment",myline,myrow);
256
257   /* Get my neighborhood from the assignment message (skipping myself) */
258   int j=0;
259   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
260     if (strcmp(gras_os_myname(),assignment.line[i]->name)) {
261       socket_line[j]=gras_socket_client(assignment.line[i]->name,
262                                         assignment.line[i]->port);
263       j++;
264     }
265     xbt_host_free(assignment.line[i]);
266   }
267   for (i=0,j=0 ; i<PROC_MATRIX_SIZE ; i++){
268     if (strcmp(gras_os_myname(),assignment.row[i]->name)) {
269       socket_row[j]=gras_socket_client(assignment.row[i]->name,
270                                        assignment.row[i]->port);
271       j++;
272     }
273     xbt_host_free(assignment.row[i]);    
274   }
275
276   for (step=0; step<PROC_MATRIX_SIZE;step++) {
277         
278     /* a line brodcast */
279     if(myline==step){
280        INFO2("LINE: step(%d) = Myline(%d). Broadcast my data.",step,myline);
281        for (l=0;l < PROC_MATRIX_SIZE-1 ;l++)
282          gras_msg_send(socket_row[l], 
283                        gras_msgtype_by_name("dataB"), 
284                        &mydataB);
285         
286        xbt_matrix_free(bB);
287        bB = xbt_matrix_new_sub(mydataB,
288                                submatrix_size,submatrix_size,
289                                0,0,NULL);       
290     } else {
291       TRY {
292         xbt_matrix_free(bB);
293         gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
294       } CATCH(e) {
295         RETHROW0("Can't get a data message from line : %s");
296       }
297       INFO3("LINE: step(%d) <> Myline(%d). Receive data from %s",step,myline,
298             gras_socket_peer_name(from));
299     }
300
301     /* a row brodcast */
302     if (myrow==step) { 
303        INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow);
304        for (l=1;l < PROC_MATRIX_SIZE ;l++)
305          gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
306        xbt_matrix_free(bA);
307        bA = xbt_matrix_new_sub(mydataA,
308                                submatrix_size,submatrix_size,
309                                0,0,NULL);
310     } else {
311       TRY {
312         xbt_matrix_free(bA);
313         gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
314       } CATCH(e) {
315         RETHROW0("Can't get a data message from row : %s");
316       }
317       INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s",step,myrow,
318             gras_socket_peer_name(from));
319     }
320     xbt_matrix_double_addmult(bA,bB,bC);
321
322   };
323  
324   /* send Result to master */  
325   result.C=bC;
326   result.linepos=myline;
327   result.rowpos=myrow;
328
329   TRY {
330     gras_msg_send(master, gras_msgtype_by_name("result"),&result);
331   } CATCH(e) {
332     RETHROW0("Failed to send PING to server: %s");
333   }
334   INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
335         gras_socket_peer_name(master),gras_socket_peer_port(master));
336   /*  Free the allocated resources, and shut GRAS down */
337
338   xbt_matrix_free(bA);
339   xbt_matrix_free(bB);
340   xbt_matrix_free(bC);
341
342   xbt_matrix_free(mydataA);
343   xbt_matrix_free(mydataB);
344   gras_socket_close(sock);
345   gras_socket_close(master);
346   gras_socket_close(from);
347   /* FIXME: Some of these sockets are "not known", no idea why *
348   for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
349      if (socket_line[l])
350        gras_socket_close(socket_line[l]);
351      if (socket_row[l])
352        gras_socket_close(socket_row[l]); 
353   }*/
354    
355
356   gras_exit();
357   INFO0("Done.");
358   return 0;
359 } /* end_of_slave */