Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Works both in RL and SG. Those processes are as stupid as lemmings. They we blocking...
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006 Ahmed Harbaoui.                                       */
5 /* Copyright (c) 2006 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras.h"
12 #include "xbt/matrix.h"
13 #define PROC_MATRIX_SIZE 2
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
16
17 #define DATA_MATRIX_SIZE 8
18 const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
19
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
21
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result,struct s_result {
24   int linepos;
25   int rowpos;
26   xbt_matrix_t C GRAS_ANNOTE(subtype,double);
27 });
28 typedef struct s_result result_t;
29
30 /* struct to send initial data to slave */
31 GRAS_DEFINE_TYPE(s_assignment,struct s_assignment {
32   int linepos;
33   int rowpos;
34   xbt_host_t line[NEIGHBOR_COUNT];
35   xbt_host_t row[NEIGHBOR_COUNT];
36   xbt_matrix_t A GRAS_ANNOTE(subtype,double);
37   xbt_matrix_t B GRAS_ANNOTE(subtype,double);
38 });
39 typedef struct s_assignment s_assignment_t;
40
41 /* register messages which may be sent (common to client and server) */
42 static void register_messages(void) {
43   gras_datadesc_type_t result_type;
44   gras_datadesc_type_t assignment_type;
45
46   gras_datadesc_set_const("NEIGHBOR_COUNT",NEIGHBOR_COUNT);
47   result_type=gras_datadesc_by_symbol(s_result);
48   assignment_type=gras_datadesc_by_symbol(s_assignment);
49         
50   /* receive a final result from slave */
51   gras_msgtype_declare("result", result_type);
52
53   /* send from master to slave to assign a position and some data */
54   gras_msgtype_declare("assignment", assignment_type);
55
56   /* send data between slaves */
57   gras_msgtype_declare("dataA", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
58   gras_msgtype_declare("dataB", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL));
59 }
60
61 /* Function prototypes */
62 int slave (int argc,char *argv[]);
63 int master (int argc,char *argv[]);
64
65
66 /* **********************************************************************
67  * master code
68  * **********************************************************************/
69
70 /* Global private data */
71 typedef struct {
72   int nbr_row,nbr_line;
73   int remaining_step;
74   int remaining_ack;
75 } master_data_t;
76
77
78 /***  Function Scatter Sequentiel ***/
79
80 static void scatter(){
81
82 }/* end_of_Scatter */
83
84 /***  Function: Scatter // ***/
85
86 static void scatter_parl(){
87
88 }/* end_of_Scatter // */
89
90 /***  Function: multiplication ***/
91
92 static void multiplication(){
93
94 }/* end_of_multiplication */
95
96 /***  Function: gather ***/
97
98 static void gather(){
99
100 }/* end_of_gather */
101
102 int master (int argc,char *argv[]) {
103
104   int i,port;
105
106   xbt_matrix_t A,B,C;
107   result_t result;
108
109   gras_socket_t from;
110
111
112   xbt_host_t grid[SLAVE_COUNT]; /* The slaves */
113   gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
114
115   /*  Init the GRAS's infrastructure */
116   gras_init(&argc, argv);
117   register_messages();
118       
119   /*  Initialize Matrices */
120   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
121   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
122   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
123         
124   /*  Get arguments and create sockets */
125   port=atoi(argv[1]);
126   //scatter();
127   //scatter_parl();
128   //multiplication();
129   //gather();
130   /************************* Init Data Send *********************************/
131   gras_os_sleep(2);
132
133   for( i=1;i<argc && i<=SLAVE_COUNT;i++){
134     grid[i-1]=xbt_host_from_string(argv[i]);
135     socket[i-1]=gras_socket_client(grid[i-1]->name,grid[i-1]->port);
136       
137     INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port);
138   }
139   xbt_assert2(i-1==SLAVE_COUNT,
140               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
141               i-1,SLAVE_COUNT);
142   /* FIXME: let the surnumerous slave die properly */
143  
144   int row=0, line=0;
145   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
146   for(i=0 ; i<SLAVE_COUNT; i++){
147     s_assignment_t assignment;
148     int j,k;
149
150     assignment.linepos=line; // assigned line
151     assignment.rowpos=row;   // assigned row
152
153     /* Neiborhood */
154     for (j=0,k=0; j<PROC_MATRIX_SIZE; j++) {
155       if (i != j*PROC_MATRIX_SIZE+(row)) {          
156          assignment.row[k] = grid[ j*PROC_MATRIX_SIZE+(row) ] ;
157          k++;
158       }
159     }
160     for (j=0,k=0; j<PROC_MATRIX_SIZE; j++) {
161       if (i != (line)*PROC_MATRIX_SIZE+j) {         
162          assignment.line[k] =  grid[ (line)*PROC_MATRIX_SIZE+j ] ;
163          k++;
164       }
165     }
166
167     assignment.A=xbt_matrix_new_sub(A,
168                                     submatrix_size,submatrix_size,
169                                     submatrix_size*line,submatrix_size*row,
170                                     NULL);
171     assignment.B=xbt_matrix_new_sub(B,
172                                     submatrix_size,submatrix_size,
173                                     submatrix_size*line,submatrix_size*row,
174                                     NULL);
175     row++;
176     if (row >= PROC_MATRIX_SIZE) {
177       row=0;
178       line++;
179     }
180                 
181     gras_msg_send(socket[i],gras_msgtype_by_name("assignment"),&assignment);
182     xbt_matrix_free(assignment.A);
183     xbt_matrix_free(assignment.B);
184   }
185   // end assignment
186
187   /******************************* multiplication ********************************/
188   /* wait for results */
189   for( i=0;i< SLAVE_COUNT;i++){
190     gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result);
191     xbt_matrix_copy_values(C,result.C,   submatrix_size,submatrix_size,
192                            submatrix_size*result.linepos,
193                            submatrix_size*result.rowpos,
194                            0,0,NULL);
195     xbt_matrix_free(result.C);
196   }
197   /*    end of gather   */
198   if (DATA_MATRIX_SIZE < 50) {
199      INFO0 ("The Result of Multiplication is :");
200      xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double);
201   } else {
202      INFO1("Matrix size too big (%d>50) to be displayed here",DATA_MATRIX_SIZE);
203   }
204
205   for(i=0; i<SLAVE_COUNT; i++) {
206      gras_socket_close(socket[i]);
207      xbt_host_free(grid[i]);
208   }
209    
210   xbt_matrix_free(A);
211   xbt_matrix_free(B);
212   xbt_matrix_free(C);
213   gras_exit();
214   return 0;
215 } /* end_of_master */
216
217 /* **********************************************************************
218  * slave code
219  * **********************************************************************/
220
221 int slave(int argc,char *argv[]) {
222
223   xbt_ex_t e; 
224
225   int step,l;
226   xbt_matrix_t bA=xbt_matrix_new(submatrix_size,submatrix_size,
227                                  sizeof(double),NULL);
228   xbt_matrix_t bB=xbt_matrix_new(submatrix_size,submatrix_size,
229                                  sizeof(double),NULL);
230
231   int myline,myrow;
232   xbt_matrix_t mydataA,mydataB;
233   xbt_matrix_t bC=xbt_matrix_double_new_zeros(submatrix_size,submatrix_size);
234   
235   result_t result;
236  
237   gras_socket_t from,sock;  /* to exchange data with my neighbor */
238   gras_socket_t master;     /* for the barrier */
239
240   /* sockets for brodcast to other slave */
241   gras_socket_t socket_line[PROC_MATRIX_SIZE-1];
242   gras_socket_t socket_row[PROC_MATRIX_SIZE-1];
243   memset(socket_line,0,sizeof(socket_line));
244   memset(socket_row,0,sizeof(socket_row));
245    
246   /* Init the GRAS's infrastructure */
247   gras_init(&argc, argv);
248
249   /*  Create my master socket */
250   sock = gras_socket_server(atoi(argv[1]));
251   int i;
252
253   /*  Register the known messages */
254   register_messages();
255
256   /* Recover my initialized Data and My Position*/
257   s_assignment_t assignment;
258   INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],gras_os_myport());
259   TRY {
260     gras_msg_wait(600,gras_msgtype_by_name("assignment"),&master,&assignment);
261   } CATCH(e) {
262     RETHROW0("Can't get my assignment from master : %s");
263   }
264   myline  = assignment.linepos;
265   myrow   = assignment.rowpos;
266   mydataA = assignment.A;
267   mydataB = assignment.B;
268
269   INFO2("Receive my pos (%d,%d) and assignment",myline,myrow);
270
271   /* Get my neighborhood from the assignment message (skipping myself) */
272   for (i=0 ; i<PROC_MATRIX_SIZE-1 ; i++){
273     socket_line[i]=gras_socket_client(assignment.line[i]->name,
274                                       assignment.line[i]->port);
275     xbt_host_free(assignment.line[i]);
276   }
277   for (i=0 ; i<PROC_MATRIX_SIZE-1 ; i++){
278     socket_row[i]=gras_socket_client(assignment.row[i]->name,
279                                      assignment.row[i]->port);
280     xbt_host_free(assignment.row[i]);    
281   }
282
283   for (step=0; step<PROC_MATRIX_SIZE;step++) {
284         
285     /* a line brodcast */
286     if(myline==step){
287        INFO3("LINE: step(%d) = Myline(%d). Broadcast my data (myport=%d).",
288              step,myline,gras_os_myport());
289        for (l=0;l < PROC_MATRIX_SIZE-1 ;l++) {
290           INFO2("LINE:   Send to %s:%d",
291                 gras_socket_peer_name(socket_row[l]),
292                 gras_socket_peer_port(socket_row[l]));
293          gras_msg_send(socket_row[l], 
294                        gras_msgtype_by_name("dataB"), 
295                        &mydataB);
296        }
297        
298         
299        xbt_matrix_free(bB);
300        bB = xbt_matrix_new_sub(mydataB,
301                                submatrix_size,submatrix_size,
302                                0,0,NULL);       
303     } else {
304       TRY {
305         xbt_matrix_free(bB);
306         gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
307       } CATCH(e) {
308         RETHROW0("Can't get a data message from line : %s");
309       }
310       INFO4("LINE: step(%d) <> Myline(%d). Receive data from %s:%d",step,myline,
311             gras_socket_peer_name(from), gras_socket_peer_port(from));
312     }
313
314     /* a row brodcast */
315     if (myrow==step) { 
316        INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow);
317        for (l=1;l < PROC_MATRIX_SIZE ; l++) {
318           INFO2("ROW:   Send to %s:%d",
319                 gras_socket_peer_name(socket_line[l-1]),
320                 gras_socket_peer_port(socket_line[l-1]));
321           gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
322        }
323        xbt_matrix_free(bA);
324        bA = xbt_matrix_new_sub(mydataA,
325                                submatrix_size,submatrix_size,
326                                0,0,NULL);
327     } else {
328       TRY {
329         xbt_matrix_free(bA);
330         gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
331       } CATCH(e) {
332         RETHROW0("Can't get a data message from row : %s");
333       }
334       INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s",step,myrow,
335             gras_socket_peer_name(from));
336     }
337     xbt_matrix_double_addmult(bA,bB,bC);
338
339   };
340  
341   /* send Result to master */  
342   result.C=bC;
343   result.linepos=myline;
344   result.rowpos=myrow;
345
346   TRY {
347     gras_msg_send(master, gras_msgtype_by_name("result"),&result);
348   } CATCH(e) {
349     RETHROW0("Failed to send PING to server: %s");
350   }
351   INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
352         gras_socket_peer_name(master),gras_socket_peer_port(master));
353   /*  Free the allocated resources, and shut GRAS down */
354
355   xbt_matrix_free(bA);
356   xbt_matrix_free(bB);
357   xbt_matrix_free(bC);
358
359   xbt_matrix_free(mydataA);
360   xbt_matrix_free(mydataB);
361   gras_socket_close(sock);
362   gras_socket_close(master);
363   gras_socket_close(from);
364   /* FIXME: Some of these sockets are "not known", no idea why *
365   for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
366      if (socket_line[l])
367        gras_socket_close(socket_line[l]);
368      if (socket_row[l])
369        gras_socket_close(socket_row[l]); 
370   }*/
371    
372
373   gras_exit();
374   INFO0("Done.");
375   return 0;
376 } /* end_of_slave */