Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
28671fe054502f0b65add39159dcdadc9e40a675
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* $Id$ */
2 /* pmm - parallel matrix multiplication "double diffusion"                  */
3
4 /* Copyright (c) 2006-2008 The SimGrid team. All rights reserved.           */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "gras.h"
10 #include "xbt/matrix.h"
11 #include "amok/peermanagement.h"
12
13 #define PROC_MATRIX_SIZE 3
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
16
17 #define DATA_MATRIX_SIZE 18
18 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
19
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
21
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result, struct s_result {
24                  int linepos;
25                  int rowpos; xbt_matrix_t C GRAS_ANNOTE(subtype, double);});
26
27 typedef struct s_result result_t;
28
29 /* struct to send initial data to slave */
30 GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
31                  int linepos;
32                  int rowpos;
33                  xbt_peer_t line[NEIGHBOR_COUNT];
34                  xbt_peer_t row[NEIGHBOR_COUNT];
35                  xbt_matrix_t A GRAS_ANNOTE(subtype, double);
36                  xbt_matrix_t B GRAS_ANNOTE(subtype, double);});
37
38 typedef struct s_pmm_assignment s_pmm_assignment_t;
39
40 /* register messages which may be sent (common to client and server) */
41 static void register_messages(void)
42 {
43   gras_datadesc_type_t result_type;
44   gras_datadesc_type_t pmm_assignment_type;
45
46   gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
47   result_type = gras_datadesc_by_symbol(s_result);
48   pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
49
50   /* receive a final result from slave */
51   gras_msgtype_declare("result", result_type);
52
53   /* send from master to slave to assign a position and some data */
54   gras_msgtype_declare("pmm_slave", pmm_assignment_type);
55
56   /* send data between slaves */
57   gras_msgtype_declare("dataA",
58                        gras_datadesc_matrix(gras_datadesc_by_name("double"),
59                                             NULL));
60   gras_msgtype_declare("dataB",
61                        gras_datadesc_matrix(gras_datadesc_by_name("double"),
62                                             NULL));
63 }
64
65 /* Function prototypes */
66 int slave(int argc, char *argv[]);
67 int master(int argc, char *argv[]);
68
69
70 /* **********************************************************************
71  * master code
72  * **********************************************************************/
73
74 /* Global private data */
75 typedef struct {
76   int nbr_row, nbr_line;
77   int remaining_step;
78   int remaining_ack;
79 } master_data_t;
80
81 int master(int argc, char *argv[])
82 {
83
84   int i;
85
86   xbt_matrix_t A, B, C;
87   result_t result;
88
89   gras_socket_t from;
90
91   xbt_dynar_t peers;            /* group of slaves */
92   xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
93   gras_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
94
95   /* Init the GRAS's infrastructure */
96   gras_init(&argc, argv);
97   amok_pm_init();
98   register_messages();
99
100   /* Initialize data matrices */
101   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
102   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
103   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
104
105   /* Create the connexions */
106   xbt_assert0(argc > 1, "Usage: master <port>");
107   gras_socket_server(atoi(argv[1]));
108   peers = amok_pm_group_new("pmm");
109
110   /* friends, we're ready. Come and play */
111   INFO0("Wait for peers for 2 sec");
112   gras_msg_handleall(2);
113   while (xbt_dynar_length(peers)<9) {
114     INFO1("Got only %ld pals. Wait 2 more seconds", xbt_dynar_length(peers));
115     gras_msg_handleall(2);
116   }
117   INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
118
119   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
120     xbt_dynar_get_cpy(peers, i, &grid[i]);
121     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
122   }
123   xbt_assert2(i == SLAVE_COUNT,
124               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
125               i, SLAVE_COUNT);
126
127   /* Kill surnumerous slaves */
128   for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
129     xbt_peer_t h;
130
131     xbt_dynar_remove_at(peers, i, &h);
132     INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
133     amok_pm_kill_hp(h->name, h->port);
134     free(h);
135   }
136
137
138   /* Assign job to slaves */
139   int row = 0, line = 0;
140   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
141   for (i = 0; i < SLAVE_COUNT; i++) {
142     s_pmm_assignment_t assignment;
143     int j, k;
144
145     assignment.linepos = line;  // assigned line
146     assignment.rowpos = row;    // assigned row
147
148     /* Neiborhood */
149     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
150       if (i != j * PROC_MATRIX_SIZE + (row)) {
151         assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
152         k++;
153       }
154     }
155     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
156       if (i != (line) * PROC_MATRIX_SIZE + j) {
157         assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
158         k++;
159       }
160     }
161
162     assignment.A = xbt_matrix_new_sub(A,
163                                       submatrix_size, submatrix_size,
164                                       submatrix_size * line,
165                                       submatrix_size * row, NULL);
166     assignment.B =
167       xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
168                          submatrix_size * line, submatrix_size * row, NULL);
169     row++;
170     if (row >= PROC_MATRIX_SIZE) {
171       row = 0;
172       line++;
173     }
174
175     gras_msg_send(socket[i], "pmm_slave", &assignment);
176     xbt_matrix_free(assignment.A);
177     xbt_matrix_free(assignment.B);
178   }
179
180   /* (have a rest while the slave perform the multiplication) */
181
182   /* Retrieve the results */
183   for (i = 0; i < SLAVE_COUNT; i++) {
184     gras_msg_wait(6000, "result", &from, &result);
185     VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT);
186     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
187                            submatrix_size * result.linepos,
188                            submatrix_size * result.rowpos, 0, 0, NULL);
189     xbt_matrix_free(result.C);
190   }
191   /*    end of gather   */
192
193   if (xbt_matrix_double_is_seq(C))
194     INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
195   else {
196     WARN0("the result seems wrong");
197   if (DATA_MATRIX_SIZE < 30) {
198     INFO0("The Result of Multiplication is :");
199     xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
200   } else {
201     INFO1("Matrix size too big (%d>30) to be displayed here",
202           DATA_MATRIX_SIZE);
203   }
204   }
205
206   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
207
208   for (i = 0; i < SLAVE_COUNT; i++)
209     gras_socket_close(socket[i]);
210
211   xbt_matrix_free(A);
212   xbt_matrix_free(B);
213   xbt_matrix_free(C);
214   gras_exit();
215   return 0;
216 }                               /* end_of_master */
217
218 /* **********************************************************************
219  * slave code
220  * **********************************************************************/
221
222 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
223 {
224   /* Recover my initialized Data and My Position */
225   s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
226   gras_socket_t master = gras_msg_cb_ctx_from(ctx);
227
228   xbt_ex_t e;
229
230   int step, l;
231   xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
232                                    sizeof(double), NULL);
233   xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
234                                    sizeof(double), NULL);
235
236   int myline, myrow;
237   xbt_matrix_t mydataA, mydataB;
238   xbt_matrix_t bC =
239     xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
240
241   result_t result;
242
243   gras_socket_t from;           /* to exchange data with my neighbor */
244
245   /* sockets for brodcast to other slave */
246   gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
247   gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
248   memset(socket_line, 0, sizeof(socket_line));
249   memset(socket_row, 0, sizeof(socket_row));
250
251   int i;
252
253   gras_os_sleep(1);             /* wait for my pals */
254
255   myline = assignment.linepos;
256   myrow = assignment.rowpos;
257   mydataA = assignment.A;
258   mydataB = assignment.B;
259
260   if (gras_if_RL())
261     INFO0("Receive my pos and assignment");
262   else
263     INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
264
265   /* Get my neighborhood from the assignment message (skipping myself) */
266   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
267     socket_line[i] = gras_socket_client(assignment.line[i]->name,
268                                         assignment.line[i]->port);
269     xbt_peer_free(assignment.line[i]);
270   }
271   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
272     socket_row[i] = gras_socket_client(assignment.row[i]->name,
273                                        assignment.row[i]->port);
274     xbt_peer_free(assignment.row[i]);
275   }
276
277   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
278
279     /* a line brodcast */
280     if (myline == step) {
281       VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline);
282       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
283         VERB1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
284         gras_msg_send(socket_row[l], "dataB", &mydataB);
285       }
286
287
288       xbt_matrix_free(bB);
289       bB = xbt_matrix_new_sub(mydataB,
290                               submatrix_size, submatrix_size, 0, 0, NULL);
291     } else {
292       TRY {
293         xbt_matrix_free(bB);
294         gras_msg_wait(600, "dataB", &from, &bB);
295       }
296       CATCH(e) {
297         RETHROW0("Can't get a data message from line : %s");
298       }
299       VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
300             myline, gras_socket_peer_name(from));
301     }
302
303     /* a row brodcast */
304     if (myrow == step) {
305       VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
306       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
307         VERB1("ROW:   Send to %s", gras_socket_peer_name(socket_line[l - 1]));
308         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
309       }
310       xbt_matrix_free(bA);
311       bA = xbt_matrix_new_sub(mydataA,
312                               submatrix_size, submatrix_size, 0, 0, NULL);
313     } else {
314       TRY {
315         xbt_matrix_free(bA);
316         gras_msg_wait(1200, "dataA", &from, &bA);
317       }
318       CATCH(e) {
319         RETHROW0("Can't get a data message from row : %s");
320       }
321       VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
322             gras_socket_peer_name(from));
323     }
324     xbt_matrix_double_addmult(bA, bB, bC);
325
326   };
327
328   /* send Result to master */
329   result.C = bC;
330   result.linepos = myline;
331   result.rowpos = myrow;
332
333   TRY {
334     gras_msg_send(master, "result", &result);
335   }
336   CATCH(e) {
337     RETHROW0("Failed to send answer to server: %s");
338   }
339   VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
340         gras_socket_peer_name(master), gras_socket_peer_port(master));
341   /*  Free the allocated resources, and shut GRAS down */
342
343   xbt_matrix_free(bA);
344   xbt_matrix_free(bB);
345   xbt_matrix_free(bC);
346
347   xbt_matrix_free(mydataA);
348   xbt_matrix_free(mydataB);
349   /* FIXME: some are said to be unknown 
350      gras_socket_close(master);
351      gras_socket_close(from);
352      for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
353      if (socket_line[l])
354      gras_socket_close(socket_line[l]);
355      if (socket_row[l])
356      gras_socket_close(socket_row[l]); 
357      } */
358
359   return 0;
360 }
361
362 int slave(int argc, char *argv[])
363 {
364   gras_socket_t mysock;
365   gras_socket_t master = NULL;
366   int connected = 0;
367   int rank;
368
369   /* Init the GRAS's infrastructure */
370   gras_init(&argc, argv);
371   amok_pm_init();
372   if (argc != 3 && argc != 2)
373     xbt_die("Usage: slave masterhost:masterport [rank]");
374   if (argc == 2)
375     rank = -1;
376   else
377     rank = atoi(argv[2]);
378
379   /*  Register the known messages and my callback */
380   register_messages();
381   gras_cb_register("pmm_slave", pmm_worker_cb);
382
383   /* Create the connexions */
384   mysock = gras_socket_server_range(3000, 9999, 0, 0);
385   INFO1("Sensor %d starting", rank);
386   while (!connected) {
387     xbt_ex_t e;
388     TRY {
389       master = gras_socket_client_from_string(argv[1]);
390       connected = 1;
391     }
392     CATCH(e) {
393       if (e.category != system_error)
394         RETHROW;
395       xbt_ex_free(e);
396       gras_os_sleep(0.5);
397     }
398   }
399
400   /* Join and run the group */
401   rank = amok_pm_group_join(master, "pmm");
402   amok_pm_mainloop(600);
403
404   /* housekeeping */
405   gras_socket_close(mysock);
406   //  gras_socket_close(master); Unknown
407   gras_exit();
408   return 0;
409 }                               /* end_of_slave */