Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
26a0b9c4cbd564cd5554587e3bb219104707aebf
[simgrid.git] / examples / msg / pmm / msg_pmm.c
1 /* pmm - parallel matrix multiplication "double diffusion"                  */
2
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010, 2011. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "msg/msg.h"
9 #include "xbt/matrix.h"
10 #include "xbt/log.h"
11 #include "xbt/xbt_os_time.h"
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pmm,
14                              "Messages specific for this msg example");
15
16 /* This example should always be executed using a MATRIX_SIZE multiple of
17  * GRID_SIZE and with GRID_SIZE^2 nodes. */
18 #define MATRIX_SIZE 900
19 #define GRID_SIZE 3
20
21 #define MAILBOX_NAME_SIZE 10
22 #define GRID_NUM_NODES GRID_SIZE * GRID_SIZE
23 #define NODE_MATRIX_SIZE MATRIX_SIZE / GRID_SIZE
24 #define NEIGHBOURS_COUNT GRID_SIZE - 1
25
26 /*
27  * The job sent to every node
28  */
29 typedef struct s_node_job{
30   int row;
31   int col;
32   int nodes_in_row[NEIGHBOURS_COUNT];
33   int nodes_in_col[NEIGHBOURS_COUNT];
34   xbt_matrix_t A;
35   xbt_matrix_t B;
36 } s_node_job_t, *node_job_t;
37
38 /**
39  * Structure for recovering results
40  */
41 typedef struct s_result {
42   int row;
43   int col;
44   xbt_matrix_t sC;
45 } s_result_t, *result_t;
46
47 int node(int argc, char **argv);
48 static void create_jobs(xbt_matrix_t A, xbt_matrix_t B, node_job_t *jobs);
49 static void broadcast_jobs(node_job_t *jobs);
50 static node_job_t wait_job(int selfid);
51 static void broadcast_matrix(xbt_matrix_t M, int num_nodes, int *nodes);
52 static void get_sub_matrix(xbt_matrix_t *sM, int selfid);
53 static void receive_results(result_t *results);
54 static void task_cleanup(void *arg);
55
56 int node(int argc, char **argv)
57 {
58   int k, myid;
59   char my_mbox[MAILBOX_NAME_SIZE];
60   node_job_t myjob, jobs[GRID_NUM_NODES];
61   xbt_matrix_t A, B, C, sA, sB, sC;
62   result_t result;
63
64   xbt_assert0(argc != 1, "Wrong number of arguments for this node");
65
66   /* Initialize the node's data-structures */
67   myid = atoi(argv[1]);
68   snprintf(my_mbox, MAILBOX_NAME_SIZE - 1, "%d", myid);
69   sC = xbt_matrix_double_new_zeros(NODE_MATRIX_SIZE, NODE_MATRIX_SIZE);
70
71   if(myid == 0){
72     /* Create the matrices to multiply and one to store the result */
73     A = xbt_matrix_double_new_id(MATRIX_SIZE, MATRIX_SIZE);
74     B = xbt_matrix_double_new_seq(MATRIX_SIZE, MATRIX_SIZE);
75     C = xbt_matrix_double_new_zeros(MATRIX_SIZE, MATRIX_SIZE);
76
77     /* Create the nodes' jobs */
78     create_jobs(A, B, jobs);
79
80     /* Get own job first */
81     myjob = jobs[0];
82
83     /* Broadcast the rest of the jobs to the other nodes */
84     broadcast_jobs(jobs + 1);
85
86   }else{
87     myjob = wait_job(myid);
88   }
89
90   /* Multiplication main-loop */
91   XBT_VERB("Start Multiplication's Main-loop");
92   for(k=0; k < GRID_SIZE; k++){
93     if(k == myjob->col){
94       XBT_VERB("Broadcast sA(%d,%d) to row %d", myjob->row, k, myjob->row);
95       broadcast_matrix(myjob->A, NEIGHBOURS_COUNT, myjob->nodes_in_row);
96     }
97
98     if(k == myjob->row){
99       XBT_VERB("Broadcast sB(%d,%d) to col %d", k, myjob->col, myjob->col);
100       broadcast_matrix(myjob->B, NEIGHBOURS_COUNT, myjob->nodes_in_col);
101     }
102
103     if(myjob->row == k && myjob->col == k){
104       xbt_matrix_double_addmult(myjob->A, myjob->B, sC);
105     }else if(myjob->row == k){
106       get_sub_matrix(&sA, myid);
107       xbt_matrix_double_addmult(sA, myjob->B, sC);
108       xbt_matrix_free(sA);
109     }else if(myjob->col == k){
110       get_sub_matrix(&sB, myid);
111       xbt_matrix_double_addmult(myjob->A, sB, sC);
112       xbt_matrix_free(sB);
113     }else{
114       get_sub_matrix(&sA, myid);
115       get_sub_matrix(&sB, myid);
116       xbt_matrix_double_addmult(sA, sB, sC);
117       xbt_matrix_free(sA);
118       xbt_matrix_free(sB);
119     }
120   }
121
122   /* Node 0: gather the results and reconstruct the final matrix */
123   if(myid == 0){
124     int node;
125     result_t results[GRID_NUM_NODES] = {0};
126
127     XBT_VERB("Multiplication done.");
128
129     /* Get the result from the nodes in the GRID */
130     receive_results(results);
131
132     /* First add our results */
133     xbt_matrix_copy_values(C, sC, NODE_MATRIX_SIZE, NODE_MATRIX_SIZE,
134                            0, 0, 0, 0, NULL);
135
136     /* Reconstruct the rest of the result matrix */
137     for (node = 1; node < GRID_NUM_NODES; node++){
138       xbt_matrix_copy_values(C, results[node]->sC,
139                              NODE_MATRIX_SIZE, NODE_MATRIX_SIZE,
140                              NODE_MATRIX_SIZE * results[node]->row,
141                              NODE_MATRIX_SIZE * results[node]->col,
142                              0, 0, NULL);
143       xbt_matrix_free(results[node]->sC);
144       xbt_free(results[node]);
145     }
146
147     //xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
148
149   /* The rest: return the result to node 0 */
150   }else{
151     m_task_t task;
152
153     XBT_VERB("Multiplication done. Send the sub-result.");
154
155     result = xbt_new0(s_result_t, 1);
156     result->row = myjob->row;
157     result->col = myjob->col;
158     result->sC =
159       xbt_matrix_new_sub(sC, NODE_MATRIX_SIZE, NODE_MATRIX_SIZE, 0, 0, NULL);
160     task = MSG_task_create("result",100,100,result);
161     MSG_task_dsend(task, "0", NULL);
162   }
163
164   /* Clean up and finish*/
165   xbt_matrix_free(myjob->A);
166   xbt_matrix_free(myjob->B);
167   xbt_free(myjob);
168   return 0;
169 }
170
171 /*
172  * Broadcast the jobs to the nodes of the grid (except to node 0)
173  */
174 static void broadcast_jobs(node_job_t *jobs)
175 {
176   int node;
177   char node_mbox[MAILBOX_NAME_SIZE];
178   m_task_t task;
179   msg_comm_t comms[GRID_NUM_NODES - 1] = {0};
180
181   XBT_VERB("Broadcast Jobs");
182   for (node = 1; node < GRID_NUM_NODES; node++){
183     task = MSG_task_create("Job", 100, 100, jobs[node-1]);
184     snprintf(node_mbox, MAILBOX_NAME_SIZE - 1, "%d", node);
185     comms[node-1] = MSG_task_isend(task, node_mbox);
186   }
187
188   MSG_comm_waitall(comms, GRID_NUM_NODES-1, -1);
189 }
190
191 static node_job_t wait_job(int selfid)
192 {
193   m_task_t task = NULL;
194   char self_mbox[MAILBOX_NAME_SIZE];
195   node_job_t job;
196   snprintf(self_mbox, MAILBOX_NAME_SIZE - 1, "%d", selfid);
197   MSG_task_receive(&task, self_mbox);
198   job = (node_job_t)MSG_task_get_data(task);
199   MSG_task_destroy(task);
200   XBT_VERB("Got Job (%d,%d)", job->row, job->col);
201
202   return job;
203 }
204
205 static void broadcast_matrix(xbt_matrix_t M, int num_nodes, int *nodes)
206 {
207   int node;
208   char node_mbox[MAILBOX_NAME_SIZE];
209   m_task_t task;
210   xbt_matrix_t sM;
211
212   for(node=0; node < num_nodes; node++){
213     snprintf(node_mbox, MAILBOX_NAME_SIZE - 1, "%d", nodes[node]);
214     sM = xbt_matrix_new_sub(M, NODE_MATRIX_SIZE, NODE_MATRIX_SIZE, 0, 0, NULL);
215     task = MSG_task_create("sub-matrix", 100, 100, sM);
216     MSG_task_dsend(task, node_mbox, task_cleanup);
217     XBT_DEBUG("sub-matrix sent to %s", node_mbox);
218   }
219
220 }
221
222 static void get_sub_matrix(xbt_matrix_t *sM, int selfid)
223 {
224   m_task_t task = NULL;
225   char node_mbox[MAILBOX_NAME_SIZE];
226
227   XBT_VERB("Get sub-matrix");
228
229   snprintf(node_mbox, MAILBOX_NAME_SIZE - 1, "%d", selfid);
230   MSG_task_receive(&task, node_mbox);
231   *sM = (xbt_matrix_t)MSG_task_get_data(task);
232   MSG_task_destroy(task);
233 }
234
235 static void task_cleanup(void *arg){
236   m_task_t task = (m_task_t)arg;
237   xbt_matrix_t m = (xbt_matrix_t)MSG_task_get_data(task);
238   xbt_matrix_free(m);
239   MSG_task_destroy(task);
240 }
241
242 /**
243  * \brief Main function.
244  */
245 int main(int argc, char *argv[])
246 {
247   xbt_os_timer_t timer = xbt_os_timer_new();
248
249   MSG_global_init(&argc, argv);
250
251   char **options = &argv[1];
252   const char* platform_file = options[0];
253   const char* application_file = options[1];
254
255   MSG_set_channel_number(0);
256   MSG_create_environment(platform_file);
257
258   MSG_function_register("node", node);
259   MSG_launch_application(application_file);
260
261   xbt_os_timer_start(timer);
262   MSG_error_t res = MSG_main();
263   xbt_os_timer_stop(timer);
264   XBT_CRITICAL("Simulated time: %g", MSG_get_clock());
265
266   MSG_clean();
267
268   if (res == MSG_OK)
269     return 0;
270   else
271     return 1;
272 }
273
274 static void create_jobs(xbt_matrix_t A, xbt_matrix_t B, node_job_t *jobs)
275 {
276   int node, j, k, row = 0, col = 0;
277
278   for (node = 0; node < GRID_NUM_NODES; node++){
279     XBT_VERB("Create job %d", node);
280     jobs[node] = xbt_new0(s_node_job_t, 1);
281     jobs[node]->row = row;
282     jobs[node]->col = col;
283
284     /* Compute who are the nodes in the same row and column */
285     /* than the node receiving this job */
286     for (j = 0, k = 0; j < GRID_SIZE; j++) {
287       if (node != (GRID_SIZE * row) + j) {
288         jobs[node]->nodes_in_row[k] = (GRID_SIZE * row) + j;
289         k++;
290       }
291     }
292
293     for (j = 0, k = 0; j < GRID_SIZE; j++) {
294       if (node != (GRID_SIZE * j) + col) {
295         jobs[node]->nodes_in_col[k] = (GRID_SIZE * j) + col;
296         k++;
297       }
298     }
299
300     /* Assign a sub matrix of A and B to the job */
301     jobs[node]->A =
302       xbt_matrix_new_sub(A, NODE_MATRIX_SIZE, NODE_MATRIX_SIZE,
303                          NODE_MATRIX_SIZE * row, NODE_MATRIX_SIZE * col,
304                          NULL);
305     jobs[node]->B =
306       xbt_matrix_new_sub(B, NODE_MATRIX_SIZE, NODE_MATRIX_SIZE,
307                          NODE_MATRIX_SIZE * row, NODE_MATRIX_SIZE * col,
308                          NULL);
309
310     if (++col >= GRID_SIZE){
311       col = 0;
312       row++;
313     }
314   }
315 }
316
317 static void receive_results(result_t *results){
318   int node;
319   msg_comm_t comms[GRID_NUM_NODES-1] = {0};
320   m_task_t tasks[GRID_NUM_NODES-1] = {0};
321
322   XBT_VERB("Receive Results.");
323
324   /* Get the result from the nodes in the GRID */
325   for (node = 1; node < GRID_NUM_NODES; node++){
326    comms[node-1] = MSG_task_irecv(&tasks[node-1], "0");
327   }
328
329   MSG_comm_waitall(comms, GRID_NUM_NODES - 1, -1);
330
331   /* Reconstruct the result matrix */
332   for (node = 1; node < GRID_NUM_NODES; node++){
333     results[node] = (result_t)MSG_task_get_data(tasks[node-1]);
334     MSG_task_destroy(tasks[node-1]);
335   }
336 }