Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
temporarly reduce the example size until it gets debugged in SG
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* pmm - parallel matrix multiplication "double diffusion"                  */
2
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #include "xbt/matrix.h"
11 #include "amok/peermanagement.h"
12
13 #define PROC_MATRIX_SIZE 2
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
16
17 #define DATA_MATRIX_SIZE 18
18 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
19
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
21
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result, struct s_result {
24                  int linepos; int rowpos;
25                  xbt_matrix_t C GRAS_ANNOTE(subtype, double);
26                  });
27
28 typedef struct s_result result_t;
29
30 /* struct to send initial data to slave */
31 GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
32                  int linepos;
33                  int rowpos;
34                  xbt_peer_t line[NEIGHBOR_COUNT];
35                  xbt_peer_t row[NEIGHBOR_COUNT];
36                  xbt_matrix_t A GRAS_ANNOTE(subtype, double);
37                  xbt_matrix_t B GRAS_ANNOTE(subtype, double);
38                  });
39
40 typedef struct s_pmm_assignment s_pmm_assignment_t;
41
42 /* register messages which may be sent (common to client and server) */
43 static void register_messages(void)
44 {
45   gras_datadesc_type_t result_type;
46   gras_datadesc_type_t pmm_assignment_type;
47
48   gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
49   result_type = gras_datadesc_by_symbol(s_result);
50   pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
51
52   /* receive a final result from slave */
53   gras_msgtype_declare("result", result_type);
54
55   /* send from master to slave to assign a position and some data */
56   gras_msgtype_declare("pmm_slave", pmm_assignment_type);
57
58   /* send data between slaves */
59   gras_msgtype_declare("dataA",
60                        gras_datadesc_matrix(gras_datadesc_by_name
61                                             ("double"), NULL));
62   gras_msgtype_declare("dataB",
63                        gras_datadesc_matrix(gras_datadesc_by_name
64                                             ("double"), NULL));
65
66   /* synchronization message */
67   gras_msgtype_declare("pmm_sync", 0);
68 }
69
70 /* Function prototypes */
71 int slave(int argc, char *argv[]);
72 int master(int argc, char *argv[]);
73
74
75 /* **********************************************************************
76  * master code
77  * **********************************************************************/
78
79 /* Global private data */
80 typedef struct {
81   int nbr_row, nbr_line;
82   int remaining_step;
83   int remaining_ack;
84 } master_data_t;
85
86 int master(int argc, char *argv[])
87 {
88
89   int i;
90
91   xbt_matrix_t A, B, C;
92   result_t result;
93
94   gras_socket_t from;
95
96   xbt_dynar_t peers;            /* group of slaves */
97   xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
98   gras_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
99
100   /* Init the GRAS's infrastructure */
101   gras_init(&argc, argv);
102   amok_pm_init();
103   register_messages();
104
105   /* Initialize data matrices */
106   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
107   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
108   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
109
110   /* Create the connexions */
111   xbt_assert0(argc > 1, "Usage: master <port>");
112   gras_socket_server(atoi(argv[1]));
113   peers = amok_pm_group_new("pmm");
114
115   /* friends, we're ready. Come and play */
116   INFO0("Wait for peers for 2 sec");
117   gras_msg_handleall(2);
118   while (xbt_dynar_length(peers) < 9) {
119     INFO1("Got only %ld pals. Wait 2 more seconds",
120           xbt_dynar_length(peers));
121     gras_msg_handleall(2);
122   }
123   INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
124
125   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
126     xbt_dynar_get_cpy(peers, i, &grid[i]);
127     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
128   }
129   xbt_assert2(i == SLAVE_COUNT,
130               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
131               i, SLAVE_COUNT);
132
133   /* Kill surnumerous slaves */
134   for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
135     xbt_peer_t h;
136
137     xbt_dynar_remove_at(peers, i, &h);
138     INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
139     amok_pm_kill_hp(h->name, h->port);
140     free(h);
141   }
142
143
144   /* Assign job to slaves */
145   int row = 0, line = 0;
146   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
147   for (i = 0; i < SLAVE_COUNT; i++) {
148     s_pmm_assignment_t assignment;
149     int j, k;
150
151     assignment.linepos = line;  // assigned line
152     assignment.rowpos = row;    // assigned row
153
154     /* Neiborhood */
155     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
156       if (i != j * PROC_MATRIX_SIZE + (row)) {
157         assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
158         k++;
159       }
160     }
161     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
162       if (i != (line) * PROC_MATRIX_SIZE + j) {
163         assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
164         k++;
165       }
166     }
167
168     assignment.A = xbt_matrix_new_sub(A,
169                                       submatrix_size, submatrix_size,
170                                       submatrix_size * line,
171                                       submatrix_size * row, NULL);
172     assignment.B =
173         xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
174                            submatrix_size * line, submatrix_size * row,
175                            NULL);
176     row++;
177     if (row >= PROC_MATRIX_SIZE) {
178       row = 0;
179       line++;
180     }
181
182     gras_msg_send(socket[i], "pmm_slave", &assignment);
183     xbt_matrix_free(assignment.A);
184     xbt_matrix_free(assignment.B);
185   }
186
187   /* synchronize slaves */
188   for (i = 0; i < PROC_MATRIX_SIZE; i++) {
189     int j;
190     for (j = 0; j < SLAVE_COUNT; j++)
191       gras_msg_wait(600, "pmm_sync", NULL, NULL);
192     for (j = 0; j < SLAVE_COUNT; j++)
193       gras_msg_send(socket[j], "pmm_sync", NULL);
194   }
195
196   /* Retrieve the results */
197   for (i = 0; i < SLAVE_COUNT; i++) {
198     gras_msg_wait(6000, "result", &from, &result);
199     VERB2("%d slaves are done already. Waiting for %d", i + 1,
200           SLAVE_COUNT);
201     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
202                            submatrix_size * result.linepos,
203                            submatrix_size * result.rowpos, 0, 0, NULL);
204     xbt_matrix_free(result.C);
205   }
206   /*    end of gather   */
207
208   if (xbt_matrix_double_is_seq(C))
209     INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
210   else {
211     WARN0("the result seems wrong");
212     if (DATA_MATRIX_SIZE < 30) {
213       INFO0("The Result of Multiplication is :");
214       xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
215     } else {
216       INFO1("Matrix size too big (%d>30) to be displayed here",
217             DATA_MATRIX_SIZE);
218     }
219   }
220
221   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
222
223   for (i = 0; i < SLAVE_COUNT; i++)
224     gras_socket_close(socket[i]);
225
226   xbt_matrix_free(A);
227   xbt_matrix_free(B);
228   xbt_matrix_free(C);
229   gras_exit();
230   return 0;
231 }                               /* end_of_master */
232
233 /* **********************************************************************
234  * slave code
235  * **********************************************************************/
236
237 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
238 {
239   /* Recover my initialized Data and My Position */
240   s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
241   gras_socket_t master = gras_msg_cb_ctx_from(ctx);
242
243   xbt_ex_t e;
244
245   int step, l;
246   xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
247                                    sizeof(double), NULL);
248   xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
249                                    sizeof(double), NULL);
250
251   int myline, myrow;
252   xbt_matrix_t mydataA, mydataB;
253   xbt_matrix_t bC =
254       xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
255
256   result_t result;
257
258   gras_socket_t from;           /* to exchange data with my neighbor */
259
260   /* sockets for brodcast to other slave */
261   gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
262   gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
263   memset(socket_line, 0, sizeof(socket_line));
264   memset(socket_row, 0, sizeof(socket_row));
265
266   int i;
267
268   gras_os_sleep(1);             /* wait for my pals */
269
270   myline = assignment.linepos;
271   myrow = assignment.rowpos;
272   mydataA = assignment.A;
273   mydataB = assignment.B;
274
275   if (gras_if_RL())
276     INFO0("Receive my pos and assignment");
277   else
278     INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
279
280   /* Get my neighborhood from the assignment message (skipping myself) */
281   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
282     socket_line[i] = gras_socket_client(assignment.line[i]->name,
283                                         assignment.line[i]->port);
284     xbt_peer_free(assignment.line[i]);
285   }
286   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
287     socket_row[i] = gras_socket_client(assignment.row[i]->name,
288                                        assignment.row[i]->port);
289     xbt_peer_free(assignment.row[i]);
290   }
291
292   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
293     gras_msg_send(master, "pmm_sync", NULL);
294     gras_msg_wait(600, "pmm_sync", NULL, NULL);
295
296     /* a line brodcast */
297     if (myline == step) {
298       VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step,
299             myline);
300       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
301         VERB1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
302         gras_msg_send(socket_row[l], "dataB", &mydataB);
303       }
304
305
306       xbt_matrix_free(bB);
307       bB = xbt_matrix_new_sub(mydataB,
308                               submatrix_size, submatrix_size, 0, 0, NULL);
309     } else {
310       TRY {
311         xbt_matrix_free(bB);
312         gras_msg_wait(600, "dataB", &from, &bB);
313       }
314       CATCH(e) {
315         RETHROW0("Can't get a data message from line : %s");
316       }
317       VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
318             myline, gras_socket_peer_name(from));
319     }
320
321     /* a row brodcast */
322     if (myrow == step) {
323       VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
324       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
325         VERB1("ROW:   Send to %s",
326               gras_socket_peer_name(socket_line[l - 1]));
327         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
328       }
329       xbt_matrix_free(bA);
330       bA = xbt_matrix_new_sub(mydataA,
331                               submatrix_size, submatrix_size, 0, 0, NULL);
332     } else {
333       TRY {
334         xbt_matrix_free(bA);
335         gras_msg_wait(1200, "dataA", &from, &bA);
336       }
337       CATCH(e) {
338         RETHROW0("Can't get a data message from row : %s");
339       }
340       VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
341             gras_socket_peer_name(from));
342     }
343     xbt_matrix_double_addmult(bA, bB, bC);
344
345   };
346
347   /* send Result to master */
348   result.C = bC;
349   result.linepos = myline;
350   result.rowpos = myrow;
351
352   TRY {
353     gras_msg_send(master, "result", &result);
354   }
355   CATCH(e) {
356     RETHROW0("Failed to send answer to server: %s");
357   }
358   VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
359         gras_socket_peer_name(master), gras_socket_peer_port(master));
360   /*  Free the allocated resources, and shut GRAS down */
361
362   xbt_matrix_free(bA);
363   xbt_matrix_free(bB);
364   xbt_matrix_free(bC);
365
366   xbt_matrix_free(mydataA);
367   xbt_matrix_free(mydataB);
368   /* FIXME: some are said to be unknown 
369      gras_socket_close(master);
370      gras_socket_close(from);
371      for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
372      if (socket_line[l])
373      gras_socket_close(socket_line[l]);
374      if (socket_row[l])
375      gras_socket_close(socket_row[l]); 
376      } */
377
378   return 0;
379 }
380
381 int slave(int argc, char *argv[])
382 {
383   gras_socket_t mysock;
384   gras_socket_t master = NULL;
385   int connected = 0;
386   int rank;
387
388   /* Init the GRAS's infrastructure */
389   gras_init(&argc, argv);
390   amok_pm_init();
391   if (argc != 3 && argc != 2)
392     xbt_die("Usage: slave masterhost:masterport [rank]");
393   if (argc == 2)
394     rank = -1;
395   else
396     rank = atoi(argv[2]);
397
398   /*  Register the known messages and my callback */
399   register_messages();
400   gras_cb_register("pmm_slave", pmm_worker_cb);
401
402   /* Create the connexions */
403   mysock = gras_socket_server_range(3000, 9999, 0, 0);
404   INFO1("Sensor %d starting", rank);
405   while (!connected) {
406     xbt_ex_t e;
407     TRY {
408       master = gras_socket_client_from_string(argv[1]);
409       connected = 1;
410     }
411     CATCH(e) {
412       if (e.category != system_error)
413         RETHROW;
414       xbt_ex_free(e);
415       gras_os_sleep(0.5);
416     }
417   }
418   INFO2("Connected to master: %s:%d",gras_socket_peer_name(master),gras_socket_peer_port(master));
419
420   /* Join and run the group */
421   rank = amok_pm_group_join(master, "pmm");
422   amok_pm_mainloop(600);
423
424   /* housekeeping */
425   gras_socket_close(mysock);
426   //  gras_socket_close(master); Unknown
427   gras_exit();
428   return 0;
429 }                               /* end_of_slave */