Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix test pmm_rl by synchronizing the slaves [Arnaud Giersch]
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* pmm - parallel matrix multiplication "double diffusion"                  */
2
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #include "xbt/matrix.h"
11 #include "amok/peermanagement.h"
12
13 #define PROC_MATRIX_SIZE 3
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
16
17 #define DATA_MATRIX_SIZE 18
18 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
19
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
21
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result, struct s_result {
24                  int linepos;
25                  int rowpos; xbt_matrix_t C GRAS_ANNOTE(subtype, double);});
26
27 typedef struct s_result result_t;
28
29 /* struct to send initial data to slave */
30 GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
31                  int linepos;
32                  int rowpos;
33                  xbt_peer_t line[NEIGHBOR_COUNT];
34                  xbt_peer_t row[NEIGHBOR_COUNT];
35                  xbt_matrix_t A GRAS_ANNOTE(subtype, double);
36                  xbt_matrix_t B GRAS_ANNOTE(subtype, double);});
37
38 typedef struct s_pmm_assignment s_pmm_assignment_t;
39
40 /* register messages which may be sent (common to client and server) */
41 static void register_messages(void)
42 {
43   gras_datadesc_type_t result_type;
44   gras_datadesc_type_t pmm_assignment_type;
45
46   gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
47   result_type = gras_datadesc_by_symbol(s_result);
48   pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
49
50   /* receive a final result from slave */
51   gras_msgtype_declare("result", result_type);
52
53   /* send from master to slave to assign a position and some data */
54   gras_msgtype_declare("pmm_slave", pmm_assignment_type);
55
56   /* send data between slaves */
57   gras_msgtype_declare("dataA",
58                        gras_datadesc_matrix(gras_datadesc_by_name("double"),
59                                             NULL));
60   gras_msgtype_declare("dataB",
61                        gras_datadesc_matrix(gras_datadesc_by_name("double"),
62                                             NULL));
63
64   /* synchronization message */
65   gras_msgtype_declare("pmm_sync", 0);
66 }
67
68 /* Function prototypes */
69 int slave(int argc, char *argv[]);
70 int master(int argc, char *argv[]);
71
72
73 /* **********************************************************************
74  * master code
75  * **********************************************************************/
76
77 /* Global private data */
78 typedef struct {
79   int nbr_row, nbr_line;
80   int remaining_step;
81   int remaining_ack;
82 } master_data_t;
83
84 int master(int argc, char *argv[])
85 {
86
87   int i;
88
89   xbt_matrix_t A, B, C;
90   result_t result;
91
92   gras_socket_t from;
93
94   xbt_dynar_t peers;            /* group of slaves */
95   xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
96   gras_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
97
98   /* Init the GRAS's infrastructure */
99   gras_init(&argc, argv);
100   amok_pm_init();
101   register_messages();
102
103   /* Initialize data matrices */
104   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
105   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
106   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
107
108   /* Create the connexions */
109   xbt_assert0(argc > 1, "Usage: master <port>");
110   gras_socket_server(atoi(argv[1]));
111   peers = amok_pm_group_new("pmm");
112
113   /* friends, we're ready. Come and play */
114   INFO0("Wait for peers for 2 sec");
115   gras_msg_handleall(2);
116   while (xbt_dynar_length(peers)<9) {
117     INFO1("Got only %ld pals. Wait 2 more seconds", xbt_dynar_length(peers));
118     gras_msg_handleall(2);
119   }
120   INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
121
122   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
123     xbt_dynar_get_cpy(peers, i, &grid[i]);
124     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
125   }
126   xbt_assert2(i == SLAVE_COUNT,
127               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
128               i, SLAVE_COUNT);
129
130   /* Kill surnumerous slaves */
131   for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
132     xbt_peer_t h;
133
134     xbt_dynar_remove_at(peers, i, &h);
135     INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
136     amok_pm_kill_hp(h->name, h->port);
137     free(h);
138   }
139
140
141   /* Assign job to slaves */
142   int row = 0, line = 0;
143   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
144   for (i = 0; i < SLAVE_COUNT; i++) {
145     s_pmm_assignment_t assignment;
146     int j, k;
147
148     assignment.linepos = line;  // assigned line
149     assignment.rowpos = row;    // assigned row
150
151     /* Neiborhood */
152     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
153       if (i != j * PROC_MATRIX_SIZE + (row)) {
154         assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
155         k++;
156       }
157     }
158     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
159       if (i != (line) * PROC_MATRIX_SIZE + j) {
160         assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
161         k++;
162       }
163     }
164
165     assignment.A = xbt_matrix_new_sub(A,
166                                       submatrix_size, submatrix_size,
167                                       submatrix_size * line,
168                                       submatrix_size * row, NULL);
169     assignment.B =
170       xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
171                          submatrix_size * line, submatrix_size * row, NULL);
172     row++;
173     if (row >= PROC_MATRIX_SIZE) {
174       row = 0;
175       line++;
176     }
177
178     gras_msg_send(socket[i], "pmm_slave", &assignment);
179     xbt_matrix_free(assignment.A);
180     xbt_matrix_free(assignment.B);
181   }
182
183   /* synchronize slaves */
184   for (i = 0 ; i < PROC_MATRIX_SIZE ; i++) {
185     int j;
186     for (j = 0 ; j < SLAVE_COUNT ; j++)
187       gras_msg_wait(600, "pmm_sync", NULL, NULL);
188     for (j = 0 ; j < SLAVE_COUNT ; j++)
189       gras_msg_send(socket[j], "pmm_sync", NULL);
190   }
191
192   /* Retrieve the results */
193   for (i = 0; i < SLAVE_COUNT; i++) {
194     gras_msg_wait(6000, "result", &from, &result);
195     VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT);
196     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
197                            submatrix_size * result.linepos,
198                            submatrix_size * result.rowpos, 0, 0, NULL);
199     xbt_matrix_free(result.C);
200   }
201   /*    end of gather   */
202
203   if (xbt_matrix_double_is_seq(C))
204     INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
205   else {
206     WARN0("the result seems wrong");
207   if (DATA_MATRIX_SIZE < 30) {
208     INFO0("The Result of Multiplication is :");
209     xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
210   } else {
211     INFO1("Matrix size too big (%d>30) to be displayed here",
212           DATA_MATRIX_SIZE);
213   }
214   }
215
216   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
217
218   for (i = 0; i < SLAVE_COUNT; i++)
219     gras_socket_close(socket[i]);
220
221   xbt_matrix_free(A);
222   xbt_matrix_free(B);
223   xbt_matrix_free(C);
224   gras_exit();
225   return 0;
226 }                               /* end_of_master */
227
228 /* **********************************************************************
229  * slave code
230  * **********************************************************************/
231
232 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
233 {
234   /* Recover my initialized Data and My Position */
235   s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
236   gras_socket_t master = gras_msg_cb_ctx_from(ctx);
237
238   xbt_ex_t e;
239
240   int step, l;
241   xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
242                                    sizeof(double), NULL);
243   xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
244                                    sizeof(double), NULL);
245
246   int myline, myrow;
247   xbt_matrix_t mydataA, mydataB;
248   xbt_matrix_t bC =
249     xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
250
251   result_t result;
252
253   gras_socket_t from;           /* to exchange data with my neighbor */
254
255   /* sockets for brodcast to other slave */
256   gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
257   gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
258   memset(socket_line, 0, sizeof(socket_line));
259   memset(socket_row, 0, sizeof(socket_row));
260
261   int i;
262
263   gras_os_sleep(1);             /* wait for my pals */
264
265   myline = assignment.linepos;
266   myrow = assignment.rowpos;
267   mydataA = assignment.A;
268   mydataB = assignment.B;
269
270   if (gras_if_RL())
271     INFO0("Receive my pos and assignment");
272   else
273     INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
274
275   /* Get my neighborhood from the assignment message (skipping myself) */
276   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
277     socket_line[i] = gras_socket_client(assignment.line[i]->name,
278                                         assignment.line[i]->port);
279     xbt_peer_free(assignment.line[i]);
280   }
281   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
282     socket_row[i] = gras_socket_client(assignment.row[i]->name,
283                                        assignment.row[i]->port);
284     xbt_peer_free(assignment.row[i]);
285   }
286
287   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
288     gras_msg_send(master, "pmm_sync", NULL);
289     gras_msg_wait(600, "pmm_sync", NULL, NULL);
290
291     /* a line brodcast */
292     if (myline == step) {
293       VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline);
294       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
295         VERB1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
296         gras_msg_send(socket_row[l], "dataB", &mydataB);
297       }
298
299
300       xbt_matrix_free(bB);
301       bB = xbt_matrix_new_sub(mydataB,
302                               submatrix_size, submatrix_size, 0, 0, NULL);
303     } else {
304       TRY {
305         xbt_matrix_free(bB);
306         gras_msg_wait(600, "dataB", &from, &bB);
307       }
308       CATCH(e) {
309         RETHROW0("Can't get a data message from line : %s");
310       }
311       VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
312             myline, gras_socket_peer_name(from));
313     }
314
315     /* a row brodcast */
316     if (myrow == step) {
317       VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
318       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
319         VERB1("ROW:   Send to %s", gras_socket_peer_name(socket_line[l - 1]));
320         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
321       }
322       xbt_matrix_free(bA);
323       bA = xbt_matrix_new_sub(mydataA,
324                               submatrix_size, submatrix_size, 0, 0, NULL);
325     } else {
326       TRY {
327         xbt_matrix_free(bA);
328         gras_msg_wait(1200, "dataA", &from, &bA);
329       }
330       CATCH(e) {
331         RETHROW0("Can't get a data message from row : %s");
332       }
333       VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
334             gras_socket_peer_name(from));
335     }
336     xbt_matrix_double_addmult(bA, bB, bC);
337
338   };
339
340   /* send Result to master */
341   result.C = bC;
342   result.linepos = myline;
343   result.rowpos = myrow;
344
345   TRY {
346     gras_msg_send(master, "result", &result);
347   }
348   CATCH(e) {
349     RETHROW0("Failed to send answer to server: %s");
350   }
351   VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
352         gras_socket_peer_name(master), gras_socket_peer_port(master));
353   /*  Free the allocated resources, and shut GRAS down */
354
355   xbt_matrix_free(bA);
356   xbt_matrix_free(bB);
357   xbt_matrix_free(bC);
358
359   xbt_matrix_free(mydataA);
360   xbt_matrix_free(mydataB);
361   /* FIXME: some are said to be unknown 
362      gras_socket_close(master);
363      gras_socket_close(from);
364      for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
365      if (socket_line[l])
366      gras_socket_close(socket_line[l]);
367      if (socket_row[l])
368      gras_socket_close(socket_row[l]); 
369      } */
370
371   return 0;
372 }
373
374 int slave(int argc, char *argv[])
375 {
376   gras_socket_t mysock;
377   gras_socket_t master = NULL;
378   int connected = 0;
379   int rank;
380
381   /* Init the GRAS's infrastructure */
382   gras_init(&argc, argv);
383   amok_pm_init();
384   if (argc != 3 && argc != 2)
385     xbt_die("Usage: slave masterhost:masterport [rank]");
386   if (argc == 2)
387     rank = -1;
388   else
389     rank = atoi(argv[2]);
390
391   /*  Register the known messages and my callback */
392   register_messages();
393   gras_cb_register("pmm_slave", pmm_worker_cb);
394
395   /* Create the connexions */
396   mysock = gras_socket_server_range(3000, 9999, 0, 0);
397   INFO1("Sensor %d starting", rank);
398   while (!connected) {
399     xbt_ex_t e;
400     TRY {
401       master = gras_socket_client_from_string(argv[1]);
402       connected = 1;
403     }
404     CATCH(e) {
405       if (e.category != system_error)
406         RETHROW;
407       xbt_ex_free(e);
408       gras_os_sleep(0.5);
409     }
410   }
411
412   /* Join and run the group */
413   rank = amok_pm_group_join(master, "pmm");
414   amok_pm_mainloop(600);
415
416   /* housekeeping */
417   gras_socket_close(mysock);
418   //  gras_socket_close(master); Unknown
419   gras_exit();
420   return 0;
421 }                               /* end_of_slave */