Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix warnings about clobbered variables in gras/pmm example.
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* pmm - parallel matrix multiplication "double diffusion"                  */
2
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #include "xbt/matrix.h"
11 #include "amok/peermanagement.h"
12
13 #define PROC_MATRIX_SIZE 3
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
16
17 #define DATA_MATRIX_SIZE 18
18 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
19
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
21
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result, struct s_result {
24                  int linepos; int rowpos;
25                  xbt_matrix_t C GRAS_ANNOTE(subtype, double);
26                  });
27
28 typedef struct s_result result_t;
29
30 /* struct to send initial data to slave */
31 GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
32                  int linepos;
33                  int rowpos;
34                  xbt_peer_t line[NEIGHBOR_COUNT];
35                  xbt_peer_t row[NEIGHBOR_COUNT];
36                  xbt_matrix_t A GRAS_ANNOTE(subtype, double);
37                  xbt_matrix_t B GRAS_ANNOTE(subtype, double);
38                  });
39
40 typedef struct s_pmm_assignment s_pmm_assignment_t;
41
42 /* register messages which may be sent (common to client and server) */
43 static void register_messages(void)
44 {
45   gras_datadesc_type_t result_type;
46   gras_datadesc_type_t pmm_assignment_type;
47
48   gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
49   result_type = gras_datadesc_by_symbol(s_result);
50   pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
51
52   /* receive a final result from slave */
53   gras_msgtype_declare("result", result_type);
54
55   /* send from master to slave to assign a position and some data */
56   gras_msgtype_declare("pmm_slave", pmm_assignment_type);
57
58   /* send data between slaves */
59   gras_msgtype_declare("dataA",
60                        gras_datadesc_matrix(gras_datadesc_by_name
61                                             ("double"), NULL));
62   gras_msgtype_declare("dataB",
63                        gras_datadesc_matrix(gras_datadesc_by_name
64                                             ("double"), NULL));
65
66   /* synchronization message */
67   gras_msgtype_declare("pmm_sync", 0);
68 }
69
70 static gras_socket_t try_gras_socket_client_from_string(const char *host)
71 {
72   volatile gras_socket_t sock = NULL;
73   xbt_ex_t e;
74   TRY {
75     sock = gras_socket_client_from_string(host);
76   }
77   CATCH(e) {
78     if (e.category != system_error)
79       /* dunno what happened, let the exception go through */
80       RETHROWF("Unable to connect to the server: %s");
81     xbt_ex_free(e);
82   }
83   return sock;
84 }
85
86 static void my_gras_msg_wait(double timeout, const char* msgt_want,
87                              gras_socket_t* expeditor, void *payload,
88                              const char *error_msg)
89 {
90   TRY {
91     gras_msg_wait(timeout, msgt_want, expeditor, payload);
92   }
93   CATCH_ANONYMOUS {
94     RETHROWF("%s: %s", error_msg);
95   }
96 }
97
98 /* Function prototypes */
99 int slave(int argc, char *argv[]);
100 int master(int argc, char *argv[]);
101
102
103 /* **********************************************************************
104  * master code
105  * **********************************************************************/
106
107 /* Global private data */
108 typedef struct {
109   int nbr_row, nbr_line;
110   int remaining_step;
111   int remaining_ack;
112 } master_data_t;
113
114 int master(int argc, char *argv[])
115 {
116
117   int i;
118
119   xbt_matrix_t A, B, C;
120   result_t result;
121
122   gras_socket_t from;
123
124   xbt_dynar_t peers;            /* group of slaves */
125   xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
126   gras_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
127
128   /* Init the GRAS's infrastructure */
129   gras_init(&argc, argv);
130   amok_pm_init();
131   register_messages();
132
133   /* Initialize data matrices */
134   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
135   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
136   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
137
138   /* Create the connexions */
139   xbt_assert(argc > 1, "Usage: master <port>");
140   gras_socket_server(atoi(argv[1]));
141   peers = amok_pm_group_new("pmm");
142
143   /* friends, we're ready. Come and play */
144   XBT_INFO("Wait for peers for 2 sec");
145   gras_msg_handleall(2);
146   while (xbt_dynar_length(peers) < SLAVE_COUNT) {
147     XBT_INFO("Got only %ld pals (of %d). Wait 2 more seconds",
148         xbt_dynar_length(peers),SLAVE_COUNT);
149     gras_msg_handleall(2);
150   }
151   XBT_INFO("Good. Got %ld pals", xbt_dynar_length(peers));
152
153   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
154     xbt_dynar_get_cpy(peers, i, &grid[i]);
155     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
156   }
157   xbt_assert(i == SLAVE_COUNT,
158               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
159               i, SLAVE_COUNT);
160
161   /* Kill surnumerous slaves */
162   for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
163     xbt_peer_t h;
164
165     xbt_dynar_remove_at(peers, i, &h);
166     XBT_INFO("Too much slaves. Killing %s:%d", h->name, h->port);
167     amok_pm_kill_hp(h->name, h->port);
168     free(h);
169   }
170
171
172   /* Assign job to slaves */
173   int row = 0, line = 0;
174   XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
175   for (i = 0; i < SLAVE_COUNT; i++) {
176     s_pmm_assignment_t assignment;
177     int j, k;
178
179     assignment.linepos = line;  // assigned line
180     assignment.rowpos = row;    // assigned row
181
182     /* Neiborhood */
183     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
184       if (i != j * PROC_MATRIX_SIZE + (row)) {
185         assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
186         k++;
187       }
188     }
189     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
190       if (i != (line) * PROC_MATRIX_SIZE + j) {
191         assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
192         k++;
193       }
194     }
195
196     assignment.A = xbt_matrix_new_sub(A,
197                                       submatrix_size, submatrix_size,
198                                       submatrix_size * line,
199                                       submatrix_size * row, NULL);
200     assignment.B =
201         xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
202                            submatrix_size * line, submatrix_size * row,
203                            NULL);
204     row++;
205     if (row >= PROC_MATRIX_SIZE) {
206       row = 0;
207       line++;
208     }
209
210     gras_msg_send(socket[i], "pmm_slave", &assignment);
211     xbt_matrix_free(assignment.A);
212     xbt_matrix_free(assignment.B);
213   }
214
215   /* synchronize slaves */
216   for (i = 0; i < PROC_MATRIX_SIZE; i++) {
217     int j;
218     for (j = 0; j < SLAVE_COUNT; j++)
219       gras_msg_wait(600, "pmm_sync", NULL, NULL);
220     for (j = 0; j < SLAVE_COUNT; j++)
221       gras_msg_send(socket[j], "pmm_sync", NULL);
222   }
223
224   /* Retrieve the results */
225   for (i = 0; i < SLAVE_COUNT; i++) {
226     gras_msg_wait(6000, "result", &from, &result);
227     XBT_VERB("%d slaves are done already. Waiting for %d", i + 1,
228           SLAVE_COUNT);
229     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
230                            submatrix_size * result.linepos,
231                            submatrix_size * result.rowpos, 0, 0, NULL);
232     xbt_matrix_free(result.C);
233   }
234   /*    end of gather   */
235
236   if (xbt_matrix_double_is_seq(C))
237     XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
238   else {
239     XBT_WARN("the result seems wrong");
240     if (DATA_MATRIX_SIZE < 30) {
241       XBT_INFO("The Result of Multiplication is :");
242       xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
243     } else {
244       XBT_INFO("Matrix size too big (%d>30) to be displayed here",
245             DATA_MATRIX_SIZE);
246     }
247   }
248
249   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
250
251   for (i = 0; i < SLAVE_COUNT; i++)
252     gras_socket_close(socket[i]);
253
254   xbt_matrix_free(A);
255   xbt_matrix_free(B);
256   xbt_matrix_free(C);
257   gras_exit();
258   return 0;
259 }                               /* end_of_master */
260
261 /* **********************************************************************
262  * slave code
263  * **********************************************************************/
264
265 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
266 {
267   /* Recover my initialized Data and My Position */
268   s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
269   gras_socket_t master = gras_msg_cb_ctx_from(ctx);
270
271   int step, l;
272   xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
273                                    sizeof(double), NULL);
274   xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
275                                    sizeof(double), NULL);
276
277   int myline, myrow;
278   xbt_matrix_t mydataA, mydataB;
279   xbt_matrix_t bC =
280       xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
281
282   result_t result;
283
284   gras_socket_t from;           /* to exchange data with my neighbor */
285
286   /* sockets for brodcast to other slave */
287   gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
288   gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
289   memset(socket_line, 0, sizeof(socket_line));
290   memset(socket_row, 0, sizeof(socket_row));
291
292   int i;
293
294   gras_os_sleep(1);             /* wait for my pals */
295
296   myline = assignment.linepos;
297   myrow = assignment.rowpos;
298   mydataA = assignment.A;
299   mydataB = assignment.B;
300
301   if (gras_if_RL())
302     XBT_INFO("Receive my pos and assignment");
303   else
304     XBT_INFO("Receive my pos (%d,%d) and assignment", myline, myrow);
305
306   /* Get my neighborhood from the assignment message (skipping myself) */
307   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
308     socket_line[i] = gras_socket_client(assignment.line[i]->name,
309                                         assignment.line[i]->port);
310     xbt_peer_free(assignment.line[i]);
311   }
312   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
313     socket_row[i] = gras_socket_client(assignment.row[i]->name,
314                                        assignment.row[i]->port);
315     xbt_peer_free(assignment.row[i]);
316   }
317
318   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
319     gras_msg_send(master, "pmm_sync", NULL);
320     gras_msg_wait(600, "pmm_sync", NULL, NULL);
321
322     /* a line brodcast */
323     if (myline == step) {
324       XBT_VERB("LINE: step(%d) = Myline(%d). Broadcast my data.", step,
325             myline);
326       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
327         XBT_VERB("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
328         gras_msg_send(socket_row[l], "dataB", &mydataB);
329       }
330
331
332       xbt_matrix_free(bB);
333       bB = xbt_matrix_new_sub(mydataB,
334                               submatrix_size, submatrix_size, 0, 0, NULL);
335     } else {
336       xbt_matrix_free(bB);
337       my_gras_msg_wait(600, "dataB", &from, &bB,
338                        "Can't get a data message from line");
339       XBT_VERB("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
340             myline, gras_socket_peer_name(from));
341     }
342
343     /* a row brodcast */
344     if (myrow == step) {
345       XBT_VERB("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
346       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
347         XBT_VERB("ROW:   Send to %s",
348               gras_socket_peer_name(socket_line[l - 1]));
349         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
350       }
351       xbt_matrix_free(bA);
352       bA = xbt_matrix_new_sub(mydataA,
353                               submatrix_size, submatrix_size, 0, 0, NULL);
354     } else {
355       xbt_matrix_free(bA);
356       my_gras_msg_wait(1200, "dataA", &from, &bA,
357                        "Can't get a data message from row");
358       XBT_VERB("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
359             gras_socket_peer_name(from));
360     }
361     xbt_matrix_double_addmult(bA, bB, bC);
362
363   }
364
365   /* send Result to master */
366   result.C = bC;
367   result.linepos = myline;
368   result.rowpos = myrow;
369
370   TRY {
371     gras_msg_send(master, "result", &result);
372   }
373   CATCH_ANONYMOUS {
374     RETHROWF("Failed to send answer to server: %s");
375   }
376   XBT_VERB(">>>>>>>> Result sent to %s:%d <<<<<<<<",
377         gras_socket_peer_name(master), gras_socket_peer_port(master));
378   /*  Free the allocated resources, and shut GRAS down */
379
380   xbt_matrix_free(bA);
381   xbt_matrix_free(bB);
382   xbt_matrix_free(bC);
383
384   xbt_matrix_free(mydataA);
385   xbt_matrix_free(mydataB);
386   /* FIXME: some are said to be unknown 
387      gras_socket_close(master);
388      gras_socket_close(from);
389      for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
390      if (socket_line[l])
391      gras_socket_close(socket_line[l]);
392      if (socket_row[l])
393      gras_socket_close(socket_row[l]); 
394      } */
395
396   return 0;
397 }
398
399 int slave(int argc, char *argv[])
400 {
401   gras_socket_t mysock;
402   gras_socket_t master = NULL;
403   int rank;
404
405   /* Init the GRAS's infrastructure */
406   gras_init(&argc, argv);
407   amok_pm_init();
408   if (argc != 3 && argc != 2)
409     xbt_die("Usage: slave masterhost:masterport [rank]");
410   if (argc == 2)
411     rank = -1;
412   else
413     rank = atoi(argv[2]);
414
415   /*  Register the known messages and my callback */
416   register_messages();
417   gras_cb_register("pmm_slave", pmm_worker_cb);
418
419   /* Create the connexions */
420   mysock = gras_socket_server_range(3000, 9999, 0, 0);
421   XBT_INFO("Sensor %d starting", rank);
422   while (!(master = try_gras_socket_client_from_string(argv[1])))
423     gras_os_sleep(0.5);
424
425   /* Join and run the group */
426   rank = amok_pm_group_join(master, "pmm");
427   amok_pm_mainloop(600);
428
429   /* housekeeping */
430   gras_socket_close(mysock);
431   //  gras_socket_close(master); Unknown
432   gras_exit();
433   return 0;
434 }                               /* end_of_slave */