Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
543a029441cdf1d487b2e666d5f94d04829b35a8
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2 #include <sys/time.h>
3 #include "msg/msg.h"
4 #include "simix/simix.h"
5 #include "xbt/sysdep.h"
6 #include "xbt/xbt_portability.h"
7 #include "smpi.h"
8
9 smpi_mpi_request_t **smpi_pending_send_requests      = NULL;
10 smpi_mpi_request_t **smpi_last_pending_send_requests = NULL;
11
12 smpi_mpi_request_t **smpi_pending_recv_requests      = NULL;
13 smpi_mpi_request_t **smpi_last_pending_recv_requests = NULL;
14
15 smpi_received_t **smpi_received                      = NULL;
16 smpi_received_t **smpi_last_received                 = NULL;
17
18 smx_process_t *smpi_sender_processes                 = NULL;
19 smx_process_t *smpi_receiver_processes               = NULL;
20
21 int smpi_running_hosts = 0;
22
23 smpi_mpi_communicator_t smpi_mpi_comm_world;
24
25 smpi_mpi_status_t smpi_mpi_status_ignore;
26
27 smpi_mpi_datatype_t smpi_mpi_byte;
28 smpi_mpi_datatype_t smpi_mpi_int;
29 smpi_mpi_datatype_t smpi_mpi_double;
30
31 smpi_mpi_op_t smpi_mpi_land;
32 smpi_mpi_op_t smpi_mpi_sum;
33
34 static xbt_os_timer_t smpi_timer;
35 static int smpi_benchmarking;
36 static double smpi_reference;
37
38 int smpi_run_simulation(int *argc, char **argv) {
39          smx_cond_t cond = NULL;
40          smx_action_t smx_action;
41          xbt_fifo_t actions_done = xbt_fifo_new();
42          xbt_fifo_t actions_failed = xbt_fifo_new();
43
44          srand(SEED);
45          SIMIX_global_init(&argc, argv);
46          SIMIX_function_register("smpi_main",     smpi_main);
47          SIMIX_function_register("smpi_sender",   smpi_sender);
48          SIMIX_function_register("smpi_receiver", smpi_receiver);
49   SIMIX_create_environment(argv[1]);
50   SIMIX_launch_application(argv[2]);
51
52          /* Prepare to display some more info when dying on Ctrl-C pressing */
53          signal(SIGINT,inthandler);
54
55          /* Clean IO before the run */
56          fflush(stdout);
57          fflush(stderr);
58
59          //surf_solve(); /* Takes traces into account. Returns 0.0 */
60          /* xbt_fifo_size(msg_global->process_to_run) */
61
62          while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
63
64                  while ( (smx_action = xbt_fifo_pop(actions_failed)) ) {
65
66
67                          DEBUG1("** %s failed **",smx_action->name);
68                          while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) {
69                                  SIMIX_cond_broadcast(cond);
70                          }
71                          /* action finished, destroy it */
72                  //      SIMIX_action_destroy(smx_action);
73                  }
74
75                  while ( (smx_action = xbt_fifo_pop(actions_done)) ) {
76
77                          DEBUG1("** %s done **",smx_action->name);
78                          while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) {
79                                  SIMIX_cond_broadcast(cond);
80                          }
81                          /* action finished, destroy it */
82                          //SIMIX_action_destroy(smx_action);
83                  }
84          }
85          xbt_fifo_free(actions_failed);
86          xbt_fifo_free(actions_done);
87  INFO1("simulation time %g", SIMIX_get_clock());
88   SIMIX_clean();
89   return 0;
90 }
91
92 void smpi_mpi_land_func(void *x, void *y, void *z) {
93   *(int *)z = *(int *)x && *(int *)y;
94 }
95
96 void smpi_mpi_sum_func(void *x, void *y, void *z) {
97   *(int *)z = *(int *)x + *(int *)y;
98 }
99
100 void smpi_mpi_init() {
101   int i;
102   int size, rank;
103   smx_host_t *hosts;
104   smx_host_t host;
105   double duration;
106   m_task_t mtask;
107
108   // will eventually need mutex
109   smpi_running_hosts++;
110
111   // initialize some local variables
112   size  = SIMIX_host_get_number();
113   host  = SIMIX_host_self();
114   hosts = SIMIX_host_get_table();
115   for(i = 0; i < size && host != hosts[i]; i++);
116   rank  = i;
117
118   // node 0 sets the globals
119   if (0 == rank) {
120
121     // global communicator
122     smpi_mpi_comm_world.id           = 0;
123     smpi_mpi_comm_world.size         = size;
124     smpi_mpi_comm_world.barrier      = 0;
125     smpi_mpi_comm_world.hosts        = hosts;
126     smpi_mpi_comm_world.processes    = xbt_malloc(sizeof(m_process_t) * size);
127     smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
128
129     // mpi datatypes
130     smpi_mpi_byte.size               = (size_t)1;
131     smpi_mpi_int.size                = sizeof(int);
132     smpi_mpi_double.size             = sizeof(double);
133
134     // mpi operations
135     smpi_mpi_land.func               = &smpi_mpi_land_func;
136     smpi_mpi_sum.func                = &smpi_mpi_sum_func;
137
138     // smpi globals
139     smpi_pending_send_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
140     smpi_last_pending_send_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
141     smpi_pending_recv_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
142     smpi_last_pending_recv_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
143     smpi_received                    = xbt_malloc(sizeof(smpi_received_t*) * size);
144     smpi_last_received               = xbt_malloc(sizeof(smpi_received_t*) * size);
145     smpi_sender_processes            = xbt_malloc(sizeof(m_process_t) * size);
146     smpi_receiver_processes          = xbt_malloc(sizeof(m_process_t) * size);
147     for(i = 0; i < size; i++) {
148       smpi_pending_send_requests[i]      = NULL;
149       smpi_last_pending_send_requests[i] = NULL;
150       smpi_pending_recv_requests[i]      = NULL;
151       smpi_last_pending_recv_requests[i] = NULL;
152       smpi_received[i]                   = NULL;
153       smpi_last_received[i]              = NULL;
154     }
155     smpi_timer                      = xbt_os_timer_new();
156     smpi_reference                  = DEFAULT_POWER;
157     smpi_benchmarking               = 0;
158
159     // tell send/recv nodes to begin
160     for(i = 0; i < size; i++) {
161       mtask = MSG_task_create("READY", 0, 0, NULL);
162       MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
163       mtask = (m_task_t)0;
164       MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
165       MSG_task_destroy(mtask);
166       mtask = MSG_task_create("READY", 0, 0, NULL);
167       MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
168       mtask = (m_task_t)0;
169       MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
170       MSG_task_destroy(mtask);
171     }
172
173     // now everyone else
174     for(i = 1; i < size; i++) {
175       mtask = MSG_task_create("READY", 0, 0, NULL);
176       MSG_task_put(mtask, hosts[i], MPI_PORT);
177     }
178
179   } else {
180     // everyone needs to wait for node 0 to finish
181     mtask = (m_task_t)0;
182     MSG_task_get(&mtask, MPI_PORT);
183     MSG_task_destroy(mtask);
184     smpi_mpi_comm_world.processes[rank] = SIMIX_process_self();
185   }
186
187   // now that mpi_comm_world_processes is set, it's safe to set a barrier
188   smpi_barrier(&smpi_mpi_comm_world);
189 }
190
191 void smpi_mpi_finalize() {
192   int i;
193   smpi_running_hosts--;
194   if (0 <= smpi_running_hosts) {
195     for(i = 0; i < smpi_mpi_comm_world.size; i++) {
196       if(SIMIX_process_is_suspended(smpi_sender_processes[i])) {
197         SIMIX_process_resume(smpi_sender_processes[i]);
198       }
199       if(SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
200         SIMIX_process_resume(smpi_receiver_processes[i]);
201       }
202     }
203   } else {
204     xbt_free(smpi_mpi_comm_world.processes);
205     xbt_free(smpi_pending_send_requests);
206     xbt_free(smpi_last_pending_send_requests);
207     xbt_free(smpi_pending_recv_requests);
208     xbt_free(smpi_last_pending_recv_requests);
209     xbt_free(smpi_received);
210     xbt_free(smpi_last_received);
211     xbt_free(smpi_sender_processes);
212     xbt_free(smpi_receiver_processes);
213     xbt_os_timer_free(smpi_timer);
214   }
215 }
216
217 void smpi_complete(smpi_mpi_request_t *request) {
218   smpi_waitlist_node_t *current, *next;
219   request->completed = 1;
220   request->next      = NULL;
221   current = request->waitlist;
222   while(NULL != current) {
223     if(SIMIX_process_is_suspended(current->process)) {
224       SIMIX_process_resume(current->process);
225     }
226     next = current->next;
227     xbt_free(current);
228     current = next;
229   }
230   request->waitlist  = NULL;
231 }
232
233 int smpi_host_rank_self() {
234   return smpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
235 }
236
237 void smpi_isend(smpi_mpi_request_t *sendreq) {
238   int rank = smpi_host_rank_self();
239   if (NULL == smpi_last_pending_send_requests[rank]) {
240     smpi_pending_send_requests[rank] = sendreq;
241   } else {
242     smpi_last_pending_send_requests[rank]->next = sendreq;
243   }
244   smpi_last_pending_send_requests[rank] = sendreq;
245   if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
246     SIMIX_process_resume(smpi_sender_processes[rank]);
247   }
248 }
249
250 void smpi_match_requests(int rank) {
251   smpi_mpi_request_t *frequest, *prequest, *crequest;
252   smpi_received_t *freceived, *preceived, *creceived;
253   size_t dsize;
254   short int match;
255   frequest  = smpi_pending_recv_requests[rank];
256   prequest  = NULL;
257   crequest  = frequest;
258   while(NULL != crequest) {
259     freceived = smpi_received[rank];
260     preceived = NULL;
261     creceived = freceived;
262     match     = 0;
263     while(NULL != creceived && !match) {
264       if(crequest->comm->id == creceived->commid && 
265         (MPI_ANY_SOURCE == crequest->src || crequest->src == creceived->src) && 
266         crequest->tag == creceived->tag) {
267
268         // we have a match!
269         match = 1;
270
271         // pull the request from the queue
272         if(NULL == prequest) {
273           frequest = crequest->next;
274           smpi_pending_recv_requests[rank] = frequest;
275         } else {
276           prequest->next = crequest->next;
277         }
278         if(crequest == smpi_last_pending_recv_requests[rank]) {
279           smpi_last_pending_recv_requests[rank] = prequest;
280         }
281
282         // pull the received data from the queue
283         if(NULL == preceived) {
284           freceived = creceived->next;
285           smpi_received[rank] = freceived;
286         } else {
287           preceived->next = creceived->next;
288         }
289         if(creceived == smpi_last_received[rank]) {
290           smpi_last_received[rank] = preceived;
291         }
292
293         // for when request->src is any source
294         crequest->src = creceived->src;
295
296         // calculate data size
297         dsize = crequest->count * crequest->datatype->size;
298
299         // copy data to buffer
300         memcpy(crequest->buf, creceived->data, dsize);
301
302         // fwd through
303         crequest->fwdthrough = creceived->fwdthrough;
304
305         // get rid of received data node, no longer needed
306         xbt_free(creceived->data);
307         xbt_free(creceived);
308
309         if (crequest->fwdthrough == rank) {
310           smpi_complete(crequest);
311         } else {
312           crequest->src = rank;
313           crequest->dst = (rank + 1) % crequest->comm->size;
314           smpi_isend(crequest);
315         }
316
317       } else {
318         preceived = creceived;
319         creceived = creceived->next;
320       }
321     }
322     prequest = crequest;
323     crequest = crequest->next;
324   }
325 }
326
327 void smpi_bench_begin() {
328   xbt_assert0(!smpi_benchmarking, "Already benchmarking");
329   smpi_benchmarking = 1;
330   xbt_os_timer_start(smpi_timer);
331   return;
332 }
333
334 void smpi_bench_end() {
335   m_task_t ctask = NULL;
336   double duration;
337   xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
338   smpi_benchmarking = 0;
339   xbt_os_timer_stop(smpi_timer);
340   duration = xbt_os_timer_elapsed(smpi_timer);
341   ctask = MSG_task_create("computation", duration * smpi_reference, 0 , NULL);
342   MSG_task_execute(ctask);
343   MSG_task_destroy(ctask);
344   return;
345 }
346
347 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
348   int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request) {
349   int retval = MPI_SUCCESS;
350   *request = NULL;
351   if (NULL == buf && 0 < count) {
352     retval = MPI_ERR_INTERN;
353   } else if (0 > count) {
354     retval = MPI_ERR_COUNT;
355   } else if (NULL == datatype) {
356     retval = MPI_ERR_TYPE;
357   } else if (NULL == comm) {
358     retval = MPI_ERR_COMM;
359   } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
360     retval = MPI_ERR_RANK;
361   } else if (0 > dst || comm->size <= dst) {
362     retval = MPI_ERR_RANK;
363   } else if (0 > tag) {
364     retval = MPI_ERR_TAG;
365   } else {
366     *request = xbt_malloc(sizeof(smpi_mpi_request_t));
367     (*request)->buf        = buf;
368     (*request)->count      = count;
369     (*request)->datatype   = datatype;
370     (*request)->src        = src;
371     (*request)->dst        = dst;
372     (*request)->tag        = tag;
373     (*request)->comm       = comm;
374     (*request)->completed  = 0;
375     (*request)->fwdthrough = dst;
376     (*request)->waitlist   = NULL;
377     (*request)->next       = NULL;
378   }
379   return retval;
380 }
381
382 void smpi_barrier(smpi_mpi_communicator_t *comm) {
383   int i;
384   comm->barrier++;
385   if(comm->barrier < comm->size) {
386     SIMIX_process_suspend(SIMIX_process_self());
387   } else {
388     comm->barrier = 0;
389     for(i = 0; i < comm->size; i++) {
390       if (SIMIX_process_is_suspended(comm->processes[i])) {
391         SIMIX_process_resume(comm->processes[i]);
392       }
393     }
394   }
395 }
396
397 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host) {
398   int i;
399   for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
400   if (i >= comm->size) i = -1;
401   return i;
402 }
403
404 void smpi_irecv(smpi_mpi_request_t *recvreq) {
405   int rank = smpi_host_rank_self();
406   if (NULL == smpi_pending_recv_requests[rank]) {
407     smpi_pending_recv_requests[rank] = recvreq;
408   } else if (NULL != smpi_last_pending_recv_requests[rank]) {
409     smpi_last_pending_recv_requests[rank]->next = recvreq;
410   } else { // can't happen!
411     fprintf(stderr, "smpi_pending_recv_requests not null while smpi_last_pending_recv_requests null!\n");
412   }
413   smpi_last_pending_recv_requests[rank] = recvreq;
414   smpi_match_requests(rank);
415   if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
416     SIMIX_process_resume(smpi_receiver_processes[rank]);
417   }
418 }
419
420 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) {
421   smpi_waitlist_node_t *waitnode, *current;
422   if (NULL != request) {
423     if (!request->completed) {
424       waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
425       waitnode->process = SIMIX_process_self();
426       waitnode->next    = NULL;
427       if (NULL == request->waitlist) {
428         request->waitlist = waitnode;
429       } else {
430         for(current = request->waitlist; NULL != current->next; current = current->next);
431         current->next = waitnode;
432       }
433       SIMIX_process_suspend(waitnode->process);
434     }
435     if (NULL != status && MPI_STATUS_IGNORE != status) {
436       status->MPI_SOURCE = request->src;
437     }
438   }
439 }
440
441 void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses) {
442   int i;
443   for (i = 0; i < count; i++) {
444     smpi_wait(requests[i], &statuses[i]);
445   }
446 }
447
448 void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests) {
449   int i;
450   for (i = 0; i < count; i++) {
451     smpi_wait(requests[i], MPI_STATUS_IGNORE);
452   }
453 }
454
455 int smpi_sender(int argc, char *argv[]) {
456   smx_process_t process;
457   char taskname[50];
458   size_t dsize;
459   void *data;
460   smx_host_t dhost;
461   m_task_t mtask;
462   int rank, fc, ft;
463   smpi_mpi_request_t *sendreq;
464
465   process = SIMIX_process_self();
466
467   // wait for init
468   mtask = (m_task_t)0;
469   MSG_task_get(&mtask, SEND_SYNC_PORT);
470
471   rank = smpi_host_rank_self();
472
473   smpi_sender_processes[rank] = process;
474
475   // ready!
476   MSG_task_put(mtask, MSG_task_get_source(mtask), SEND_SYNC_PORT);
477
478   while (0 < smpi_running_hosts) {
479     sendreq = smpi_pending_send_requests[rank];
480     if (NULL != sendreq) {
481
482       // pull from queue if not a fwd or no more to fwd
483       if (sendreq->dst == sendreq->fwdthrough) {
484         smpi_pending_send_requests[rank] = sendreq->next;
485         if(sendreq == smpi_last_pending_send_requests[rank]) {
486           smpi_last_pending_send_requests[rank] = NULL;
487         }
488         ft = sendreq->dst;
489       } else {
490         fc = ((sendreq->fwdthrough - sendreq->dst + sendreq->comm->size) % sendreq->comm->size) / 2;
491         ft = (sendreq->dst + fc) % sendreq->comm->size;
492         //printf("node %d sending broadcast to node %d through node %d\n", rank, sendreq->dst, ft);
493       }
494
495       // create task to send
496       sprintf(taskname, "comm:%d,src:%d,dst:%d,tag:%d,ft:%d", sendreq->comm->id, sendreq->src, sendreq->dst, sendreq->tag, ft);
497       dsize = sendreq->count * sendreq->datatype->size;
498       data  = xbt_malloc(dsize);
499       memcpy(data, sendreq->buf, dsize);
500       mtask = MSG_task_create(taskname, 0, dsize, data);
501
502       // figure out which host to send it to
503       dhost = sendreq->comm->hosts[sendreq->dst];
504
505       // send task
506       #ifdef DEBUG
507         printf("host %s attempting to send to host %s\n", SIMIX_host_get_name(SIMIX_host_self()), SIMIX_host_get_name(dhost));
508       #endif
509       MSG_task_put(mtask, dhost, MPI_PORT);
510
511       if (sendreq->dst == sendreq->fwdthrough) {
512         smpi_complete(sendreq);
513       } else {
514         sendreq->dst = (sendreq->dst + fc + 1) % sendreq->comm->size;
515       }
516
517     } else {
518       SIMIX_process_suspend(process);
519     }
520   }
521   return 0;
522 }
523
524 int smpi_receiver(int argc, char **argv) {
525   smx_process_t process;
526   m_task_t mtask;
527   smpi_received_t *received;
528   int rank;
529   smpi_mpi_request_t *recvreq;
530
531   process = SIMIX_process_self();
532
533   // wait for init
534   mtask = (m_task_t)0;
535   MSG_task_get(&mtask, RECV_SYNC_PORT);
536
537   rank = smpi_host_rank_self();
538
539   // potential race condition...
540   smpi_receiver_processes[rank] = process;
541
542   // ready!
543   MSG_task_put(mtask, MSG_task_get_source(mtask), RECV_SYNC_PORT);
544
545   while (0 < smpi_running_hosts) {
546     recvreq = smpi_pending_recv_requests[rank];
547     if (NULL != recvreq) {
548       mtask = (m_task_t)0;
549
550       #ifdef DEBUG
551         printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
552           SIMIX_host_get_name(SIMIX_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
553       #endif
554       MSG_task_get(&mtask, MPI_PORT);
555
556       received = xbt_malloc(sizeof(smpi_received_t));
557
558       sscanf(MSG_task_get_name(mtask), "comm:%d,src:%d,dst:%d,tag:%d,ft:%d",
559         &received->commid, &received->src, &received->dst, &received->tag, &received->fwdthrough);
560       received->data = MSG_task_get_data(mtask);
561       received->next = NULL;
562
563       if (NULL == smpi_last_received[rank]) {
564         smpi_received[rank] = received;
565       } else {
566         smpi_last_received[rank]->next = received;
567       }
568       smpi_last_received[rank] = received;
569
570       MSG_task_destroy(mtask);
571
572       smpi_match_requests(rank);
573
574     } else {
575       SIMIX_process_suspend(process);
576     }
577   }
578   return 0;
579 }
580
581 // FIXME: move into own file
582 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) {
583   double now;
584   int retval = 0;
585   smpi_bench_end();
586   if (NULL == tv) {
587     retval = -1;
588   } else {
589     now = SIMIX_get_clock();
590     tv->tv_sec  = now;
591     tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
592   }
593   smpi_bench_begin();
594   return retval;
595 }
596
597 unsigned int smpi_sleep(unsigned int seconds) {
598   m_task_t task = NULL;
599   smpi_bench_end();
600   task = MSG_task_create("sleep", seconds * DEFAULT_POWER, 0, NULL);
601   MSG_task_execute(task);
602   MSG_task_destroy(task);
603   smpi_bench_begin();
604   return 0;
605 }
606
607 void smpi_exit(int status) {
608   smpi_bench_end();
609   smpi_running_hosts--;
610   SIMIX_process_kill(SIMIX_process_self());
611   return;
612 }