2 /* pmm - parallel matrix multiplication "double diffusion" */
4 /* Copyright (c) 2006-2008 The SimGrid team. All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/matrix.h"
11 #include "amok/peermanagement.h"
13 #define PROC_MATRIX_SIZE 3
14 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
15 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
17 #define DATA_MATRIX_SIZE 18
18 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
20 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
22 /* struct for recovering results */
23 GRAS_DEFINE_TYPE(s_result, struct s_result {
25 int rowpos; xbt_matrix_t C GRAS_ANNOTE(subtype, double);});
27 typedef struct s_result result_t;
29 /* struct to send initial data to slave */
30 GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
33 xbt_peer_t line[NEIGHBOR_COUNT];
34 xbt_peer_t row[NEIGHBOR_COUNT];
35 xbt_matrix_t A GRAS_ANNOTE(subtype, double);
36 xbt_matrix_t B GRAS_ANNOTE(subtype, double);});
38 typedef struct s_pmm_assignment s_pmm_assignment_t;
40 /* register messages which may be sent (common to client and server) */
41 static void register_messages(void)
43 gras_datadesc_type_t result_type;
44 gras_datadesc_type_t pmm_assignment_type;
46 gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
47 result_type = gras_datadesc_by_symbol(s_result);
48 pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
50 /* receive a final result from slave */
51 gras_msgtype_declare("result", result_type);
53 /* send from master to slave to assign a position and some data */
54 gras_msgtype_declare("pmm_slave", pmm_assignment_type);
56 /* send data between slaves */
57 gras_msgtype_declare("dataA",
58 gras_datadesc_matrix(gras_datadesc_by_name("double"),
60 gras_msgtype_declare("dataB",
61 gras_datadesc_matrix(gras_datadesc_by_name("double"),
65 /* Function prototypes */
66 int slave(int argc, char *argv[]);
67 int master(int argc, char *argv[]);
70 /* **********************************************************************
72 * **********************************************************************/
74 /* Global private data */
76 int nbr_row, nbr_line;
81 int master(int argc, char *argv[])
91 xbt_dynar_t peers; /* group of slaves */
92 xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
93 gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */
95 /* Init the GRAS's infrastructure */
96 gras_init(&argc, argv);
100 /* Initialize data matrices */
101 A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
102 B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
103 C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
105 /* Create the connexions */
106 xbt_assert0(argc > 1, "Usage: master <port>");
107 gras_socket_server(atoi(argv[1]));
108 peers = amok_pm_group_new("pmm");
110 /* friends, we're ready. Come and play */
111 INFO0("Wait for peers for 2 sec");
112 gras_msg_handleall(2);
113 while (xbt_dynar_length(peers)<9) {
114 INFO1("Got only %ld pals. Wait 2 more seconds", xbt_dynar_length(peers));
115 gras_msg_handleall(2);
117 INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
119 for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
120 xbt_dynar_get_cpy(peers, i, &grid[i]);
121 socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
123 xbt_assert2(i == SLAVE_COUNT,
124 "Not enough slaves for this setting (got %d of %d). Change the deployment file",
127 /* Kill surnumerous slaves */
128 for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
131 xbt_dynar_remove_at(peers, i, &h);
132 INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
133 amok_pm_kill_hp(h->name, h->port);
138 /* Assign job to slaves */
139 int row = 0, line = 0;
140 INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
141 for (i = 0; i < SLAVE_COUNT; i++) {
142 s_pmm_assignment_t assignment;
145 assignment.linepos = line; // assigned line
146 assignment.rowpos = row; // assigned row
149 for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
150 if (i != j * PROC_MATRIX_SIZE + (row)) {
151 assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
155 for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
156 if (i != (line) * PROC_MATRIX_SIZE + j) {
157 assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
162 assignment.A = xbt_matrix_new_sub(A,
163 submatrix_size, submatrix_size,
164 submatrix_size * line,
165 submatrix_size * row, NULL);
167 xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
168 submatrix_size * line, submatrix_size * row, NULL);
170 if (row >= PROC_MATRIX_SIZE) {
175 gras_msg_send(socket[i], "pmm_slave", &assignment);
176 xbt_matrix_free(assignment.A);
177 xbt_matrix_free(assignment.B);
180 /* (have a rest while the slave perform the multiplication) */
182 /* Retrieve the results */
183 for (i = 0; i < SLAVE_COUNT; i++) {
184 gras_msg_wait(6000, "result", &from, &result);
185 VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT);
186 xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
187 submatrix_size * result.linepos,
188 submatrix_size * result.rowpos, 0, 0, NULL);
189 xbt_matrix_free(result.C);
193 if (xbt_matrix_double_is_seq(C))
194 INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
196 WARN0("the result seems wrong");
197 if (DATA_MATRIX_SIZE < 30) {
198 INFO0("The Result of Multiplication is :");
199 xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
201 INFO1("Matrix size too big (%d>30) to be displayed here",
206 amok_pm_group_shutdown("pmm"); /* Ok, we're out of here */
208 for (i = 0; i < SLAVE_COUNT; i++)
209 gras_socket_close(socket[i]);
216 } /* end_of_master */
218 /* **********************************************************************
220 * **********************************************************************/
222 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
224 /* Recover my initialized Data and My Position */
225 s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
226 gras_socket_t master = gras_msg_cb_ctx_from(ctx);
231 xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
232 sizeof(double), NULL);
233 xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
234 sizeof(double), NULL);
237 xbt_matrix_t mydataA, mydataB;
239 xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
243 gras_socket_t from; /* to exchange data with my neighbor */
245 /* sockets for brodcast to other slave */
246 gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
247 gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
248 memset(socket_line, 0, sizeof(socket_line));
249 memset(socket_row, 0, sizeof(socket_row));
253 gras_os_sleep(1); /* wait for my pals */
255 myline = assignment.linepos;
256 myrow = assignment.rowpos;
257 mydataA = assignment.A;
258 mydataB = assignment.B;
261 INFO0("Receive my pos and assignment");
263 INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
265 /* Get my neighborhood from the assignment message (skipping myself) */
266 for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
267 socket_line[i] = gras_socket_client(assignment.line[i]->name,
268 assignment.line[i]->port);
269 xbt_peer_free(assignment.line[i]);
271 for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
272 socket_row[i] = gras_socket_client(assignment.row[i]->name,
273 assignment.row[i]->port);
274 xbt_peer_free(assignment.row[i]);
277 for (step = 0; step < PROC_MATRIX_SIZE; step++) {
279 /* a line brodcast */
280 if (myline == step) {
281 VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline);
282 for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
283 VERB1("LINE: Send to %s", gras_socket_peer_name(socket_row[l]));
284 gras_msg_send(socket_row[l], "dataB", &mydataB);
289 bB = xbt_matrix_new_sub(mydataB,
290 submatrix_size, submatrix_size, 0, 0, NULL);
294 gras_msg_wait(600, "dataB", &from, &bB);
297 RETHROW0("Can't get a data message from line : %s");
299 VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
300 myline, gras_socket_peer_name(from));
305 VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
306 for (l = 1; l < PROC_MATRIX_SIZE; l++) {
307 VERB1("ROW: Send to %s", gras_socket_peer_name(socket_line[l - 1]));
308 gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
311 bA = xbt_matrix_new_sub(mydataA,
312 submatrix_size, submatrix_size, 0, 0, NULL);
316 gras_msg_wait(1200, "dataA", &from, &bA);
319 RETHROW0("Can't get a data message from row : %s");
321 VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
322 gras_socket_peer_name(from));
324 xbt_matrix_double_addmult(bA, bB, bC);
328 /* send Result to master */
330 result.linepos = myline;
331 result.rowpos = myrow;
334 gras_msg_send(master, "result", &result);
337 RETHROW0("Failed to send answer to server: %s");
339 VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
340 gras_socket_peer_name(master), gras_socket_peer_port(master));
341 /* Free the allocated resources, and shut GRAS down */
347 xbt_matrix_free(mydataA);
348 xbt_matrix_free(mydataB);
349 /* FIXME: some are said to be unknown
350 gras_socket_close(master);
351 gras_socket_close(from);
352 for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
354 gras_socket_close(socket_line[l]);
356 gras_socket_close(socket_row[l]);
362 int slave(int argc, char *argv[])
364 gras_socket_t mysock;
365 gras_socket_t master = NULL;
369 /* Init the GRAS's infrastructure */
370 gras_init(&argc, argv);
372 if (argc != 3 && argc != 2)
373 xbt_die("Usage: slave masterhost:masterport [rank]");
377 rank = atoi(argv[2]);
379 /* Register the known messages and my callback */
381 gras_cb_register("pmm_slave", pmm_worker_cb);
383 /* Create the connexions */
384 mysock = gras_socket_server_range(3000, 9999, 0, 0);
385 INFO1("Sensor %d starting", rank);
389 master = gras_socket_client_from_string(argv[1]);
393 if (e.category != system_error)
400 /* Join and run the group */
401 rank = amok_pm_group_join(master, "pmm");
402 amok_pm_mainloop(600);
405 gras_socket_close(mysock);
406 // gras_socket_close(master); Unknown