1 /* pmm - parallel matrix multiplication "double diffusion" */
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/matrix.h"
11 #include "amok/peermanagement.h"
13 #define PROC_MATRIX_SIZE 3
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
17 #define DATA_MATRIX_SIZE 18
18 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
22 /* struct for recovering results */
23 XBT_DEFINE_TYPE(s_result, struct s_result {
24 int linepos; int rowpos;
25 xbt_matrix_t C XBT_ANNOTE(subtype, double);
28 typedef struct s_result result_t;
30 /* struct to send initial data to slave */
31 XBT_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
34 xbt_peer_t line[NEIGHBOR_COUNT];
35 xbt_peer_t row[NEIGHBOR_COUNT];
36 xbt_matrix_t A XBT_ANNOTE(subtype, double);
37 xbt_matrix_t B XBT_ANNOTE(subtype, double);
40 typedef struct s_pmm_assignment s_pmm_assignment_t;
42 /* register messages which may be sent (common to client and server) */
43 static void register_messages(void)
45 xbt_datadesc_type_t result_type;
46 xbt_datadesc_type_t pmm_assignment_type;
48 xbt_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
49 result_type = xbt_datadesc_by_symbol(s_result);
50 pmm_assignment_type = xbt_datadesc_by_symbol(s_pmm_assignment);
52 /* receive a final result from slave */
53 gras_msgtype_declare("result", result_type);
55 /* send from master to slave to assign a position and some data */
56 gras_msgtype_declare("pmm_slave", pmm_assignment_type);
58 /* send data between slaves */
59 gras_msgtype_declare("dataA",
60 xbt_datadesc_matrix(xbt_datadesc_by_name
62 gras_msgtype_declare("dataB",
63 xbt_datadesc_matrix(xbt_datadesc_by_name
66 /* synchronization message */
67 gras_msgtype_declare("pmm_sync", 0);
70 static xbt_socket_t try_gras_socket_client_from_string(const char *host)
72 volatile xbt_socket_t sock = NULL;
75 sock = gras_socket_client_from_string(host);
78 if (e.category != system_error)
79 /* dunno what happened, let the exception go through */
80 RETHROWF("Unable to connect to the server: %s");
86 static void my_gras_msg_wait(double timeout, const char* msgt_want,
87 xbt_socket_t* expeditor, void *payload,
88 const char *error_msg)
91 gras_msg_wait(timeout, msgt_want, expeditor, payload);
94 RETHROWF("%s: %s", error_msg);
98 /* Function prototypes */
99 int slave(int argc, char *argv[]);
100 int master(int argc, char *argv[]);
103 /* **********************************************************************
105 * **********************************************************************/
107 /* Global private data */
109 int nbr_row, nbr_line;
114 int master(int argc, char *argv[])
119 xbt_matrix_t A, B, C;
124 xbt_dynar_t peers; /* group of slaves */
125 xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
126 xbt_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
128 /* Init the GRAS's infrastructure */
129 gras_init(&argc, argv);
133 /* Initialize data matrices */
134 A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
135 B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
136 C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
138 /* Create the connexions */
139 xbt_assert(argc > 1, "Usage: master <port>");
140 gras_socket_server(atoi(argv[1]));
141 peers = amok_pm_group_new("pmm");
143 /* friends, we're ready. Come and play */
144 XBT_INFO("Wait for peers for 2 sec");
145 gras_msg_handleall(2);
146 while (xbt_dynar_length(peers) < SLAVE_COUNT) {
147 XBT_INFO("Got only %ld pals (of %d). Wait 2 more seconds",
148 xbt_dynar_length(peers),SLAVE_COUNT);
149 gras_msg_handleall(2);
151 XBT_INFO("Good. Got %ld pals", xbt_dynar_length(peers));
153 for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
154 xbt_dynar_get_cpy(peers, i, &grid[i]);
155 socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
157 xbt_assert(i == SLAVE_COUNT,
158 "Not enough slaves for this setting (got %d of %d). Change the deployment file",
161 /* Kill surnumerous slaves */
162 for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
165 xbt_dynar_remove_at(peers, i, &h);
166 XBT_INFO("Too much slaves. Killing %s:%d", h->name, h->port);
167 amok_pm_kill_hp(h->name, h->port);
172 /* Assign job to slaves */
173 int row = 0, line = 0;
174 XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
175 for (i = 0; i < SLAVE_COUNT; i++) {
176 s_pmm_assignment_t assignment;
179 assignment.linepos = line; // assigned line
180 assignment.rowpos = row; // assigned row
183 for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
184 if (i != j * PROC_MATRIX_SIZE + (row)) {
185 assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
189 for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
190 if (i != (line) * PROC_MATRIX_SIZE + j) {
191 assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
196 assignment.A = xbt_matrix_new_sub(A,
197 submatrix_size, submatrix_size,
198 submatrix_size * line,
199 submatrix_size * row, NULL);
201 xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
202 submatrix_size * line, submatrix_size * row,
205 if (row >= PROC_MATRIX_SIZE) {
210 gras_msg_send(socket[i], "pmm_slave", &assignment);
211 xbt_matrix_free(assignment.A);
212 xbt_matrix_free(assignment.B);
215 /* synchronize slaves */
216 for (i = 0; i < PROC_MATRIX_SIZE; i++) {
218 for (j = 0; j < SLAVE_COUNT; j++)
219 gras_msg_wait(600, "pmm_sync", NULL, NULL);
220 for (j = 0; j < SLAVE_COUNT; j++)
221 gras_msg_send(socket[j], "pmm_sync", NULL);
224 /* Retrieve the results */
225 for (i = 0; i < SLAVE_COUNT; i++) {
226 gras_msg_wait(6000, "result", &from, &result);
227 XBT_VERB("%d slaves are done already. Waiting for %d", i + 1,
229 xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
230 submatrix_size * result.linepos,
231 submatrix_size * result.rowpos, 0, 0, NULL);
232 xbt_matrix_free(result.C);
236 if (xbt_matrix_double_is_seq(C))
237 XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
239 XBT_WARN("the result seems wrong");
240 if (DATA_MATRIX_SIZE < 30) {
241 XBT_INFO("The Result of Multiplication is :");
242 xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
244 XBT_INFO("Matrix size too big (%d>30) to be displayed here",
249 amok_pm_group_shutdown("pmm"); /* Ok, we're out of here */
251 for (i = 0; i < SLAVE_COUNT; i++)
252 gras_socket_close(socket[i]);
259 } /* end_of_master */
261 /* **********************************************************************
263 * **********************************************************************/
265 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
267 /* Recover my initialized Data and My Position */
268 s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
269 xbt_socket_t master = gras_msg_cb_ctx_from(ctx);
272 xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
273 sizeof(double), NULL);
274 xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
275 sizeof(double), NULL);
278 xbt_matrix_t mydataA, mydataB;
280 xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
284 xbt_socket_t from; /* to exchange data with my neighbor */
286 /* sockets for brodcast to other slave */
287 xbt_socket_t socket_line[PROC_MATRIX_SIZE - 1];
288 xbt_socket_t socket_row[PROC_MATRIX_SIZE - 1];
289 memset(socket_line, 0, sizeof(socket_line));
290 memset(socket_row, 0, sizeof(socket_row));
294 gras_os_sleep(1); /* wait for my pals */
296 myline = assignment.linepos;
297 myrow = assignment.rowpos;
298 mydataA = assignment.A;
299 mydataB = assignment.B;
302 XBT_INFO("Receive my pos and assignment");
304 XBT_INFO("Receive my pos (%d,%d) and assignment", myline, myrow);
306 /* Get my neighborhood from the assignment message (skipping myself) */
307 for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
308 socket_line[i] = gras_socket_client(assignment.line[i]->name,
309 assignment.line[i]->port);
310 xbt_peer_free(assignment.line[i]);
312 for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
313 socket_row[i] = gras_socket_client(assignment.row[i]->name,
314 assignment.row[i]->port);
315 xbt_peer_free(assignment.row[i]);
318 for (step = 0; step < PROC_MATRIX_SIZE; step++) {
319 gras_msg_send(master, "pmm_sync", NULL);
320 gras_msg_wait(600, "pmm_sync", NULL, NULL);
322 /* a line brodcast */
323 if (myline == step) {
324 XBT_VERB("LINE: step(%d) = Myline(%d). Broadcast my data.", step,
326 for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
327 XBT_VERB("LINE: Send to %s", xbt_socket_peer_name(socket_row[l]));
328 gras_msg_send(socket_row[l], "dataB", &mydataB);
333 bB = xbt_matrix_new_sub(mydataB,
334 submatrix_size, submatrix_size, 0, 0, NULL);
337 my_gras_msg_wait(600, "dataB", &from, &bB,
338 "Can't get a data message from line");
339 XBT_VERB("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
340 myline, xbt_socket_peer_name(from));
345 XBT_VERB("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
346 for (l = 1; l < PROC_MATRIX_SIZE; l++) {
347 XBT_VERB("ROW: Send to %s",
348 xbt_socket_peer_name(socket_line[l - 1]));
349 gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
352 bA = xbt_matrix_new_sub(mydataA,
353 submatrix_size, submatrix_size, 0, 0, NULL);
356 my_gras_msg_wait(1200, "dataA", &from, &bA,
357 "Can't get a data message from row");
358 XBT_VERB("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
359 xbt_socket_peer_name(from));
361 xbt_matrix_double_addmult(bA, bB, bC);
365 /* send Result to master */
367 result.linepos = myline;
368 result.rowpos = myrow;
371 gras_msg_send(master, "result", &result);
374 RETHROWF("Failed to send answer to server: %s");
376 XBT_VERB(">>>>>>>> Result sent to %s:%d <<<<<<<<",
377 xbt_socket_peer_name(master), xbt_socket_peer_port(master));
378 /* Free the allocated resources, and shut GRAS down */
384 xbt_matrix_free(mydataA);
385 xbt_matrix_free(mydataB);
386 /* FIXME: some are said to be unknown
387 gras_socket_close(master);
388 gras_socket_close(from);
389 for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
391 gras_socket_close(socket_line[l]);
393 gras_socket_close(socket_row[l]);
399 int slave(int argc, char *argv[])
402 xbt_socket_t master = NULL;
405 /* Init the GRAS's infrastructure */
406 gras_init(&argc, argv);
408 if (argc != 3 && argc != 2)
409 xbt_die("Usage: slave masterhost:masterport [rank]");
413 rank = atoi(argv[2]);
415 /* Register the known messages and my callback */
417 gras_cb_register("pmm_slave", pmm_worker_cb);
419 /* Create the connexions */
420 mysock = gras_socket_server_range(3000, 9999, 0, 0);
421 XBT_INFO("Sensor %d starting", rank);
422 while (!(master = try_gras_socket_client_from_string(argv[1])))
425 /* Join and run the group */
426 rank = amok_pm_group_join(master, "pmm");
427 amok_pm_mainloop(600);
430 gras_socket_close(mysock);
431 // gras_socket_close(master); Unknown