Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
66c5a7dab5011b8f20589c39b45bd17a2d97d446
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2 #include <sys/time.h>
3 #include "msg/msg.h"
4 #include "simix/simix.h"
5 #include "xbt/sysdep.h"
6 #include "xbt/xbt_portability.h"
7 #include "smpi.h"
8
9 smpi_mpi_request_t **smpi_pending_send_requests      = NULL;
10 smpi_mpi_request_t **smpi_last_pending_send_requests = NULL;
11
12 smpi_mpi_request_t **smpi_pending_recv_requests      = NULL;
13 smpi_mpi_request_t **smpi_last_pending_recv_requests = NULL;
14
15 smpi_received_t **smpi_received                      = NULL;
16 smpi_received_t **smpi_last_received                 = NULL;
17
18 smx_process_t *smpi_sender_processes                 = NULL;
19 smx_process_t *smpi_receiver_processes               = NULL;
20
21 int smpi_running_hosts = 0;
22
23 smpi_mpi_communicator_t smpi_mpi_comm_world;
24
25 smpi_mpi_status_t smpi_mpi_status_ignore;
26
27 smpi_mpi_datatype_t smpi_mpi_byte;
28 smpi_mpi_datatype_t smpi_mpi_int;
29 smpi_mpi_datatype_t smpi_mpi_double;
30
31 smpi_mpi_op_t smpi_mpi_land;
32 smpi_mpi_op_t smpi_mpi_sum;
33
34 static xbt_os_timer_t smpi_timer;
35 static int smpi_benchmarking;
36 static double smpi_reference;
37
38 void smpi_mpi_land_func(void *x, void *y, void *z) {
39   *(int *)z = *(int *)x && *(int *)y;
40 }
41
42 void smpi_mpi_sum_func(void *x, void *y, void *z) {
43   *(int *)z = *(int *)x + *(int *)y;
44 }
45
46 void smpi_mpi_init() {
47   int i;
48   int size, rank;
49   smx_host_t *hosts;
50   smx_host_t host;
51   double duration;
52   m_task_t mtask;
53
54   // will eventually need mutex
55   smpi_running_hosts++;
56
57   // initialize some local variables
58   size  = SIMIX_host_get_number();
59   host  = SIMIX_host_self();
60   hosts = SIMIX_host_get_table();
61   for(i = 0; i < size && host != hosts[i]; i++);
62   rank  = i;
63
64   // node 0 sets the globals
65   if (0 == rank) {
66
67     // global communicator
68     smpi_mpi_comm_world.id           = 0;
69     smpi_mpi_comm_world.size         = size;
70     smpi_mpi_comm_world.barrier      = 0;
71     smpi_mpi_comm_world.hosts        = hosts;
72     smpi_mpi_comm_world.processes    = xbt_malloc(sizeof(m_process_t) * size);
73     smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
74
75     // mpi datatypes
76     smpi_mpi_byte.size               = (size_t)1;
77     smpi_mpi_int.size                = sizeof(int);
78     smpi_mpi_double.size             = sizeof(double);
79
80     // mpi operations
81     smpi_mpi_land.func               = &smpi_mpi_land_func;
82     smpi_mpi_sum.func                = &smpi_mpi_sum_func;
83
84     // smpi globals
85     smpi_pending_send_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
86     smpi_last_pending_send_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
87     smpi_pending_recv_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
88     smpi_last_pending_recv_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
89     smpi_received                    = xbt_malloc(sizeof(smpi_received_t*) * size);
90     smpi_last_received               = xbt_malloc(sizeof(smpi_received_t*) * size);
91     smpi_sender_processes            = xbt_malloc(sizeof(m_process_t) * size);
92     smpi_receiver_processes          = xbt_malloc(sizeof(m_process_t) * size);
93     for(i = 0; i < size; i++) {
94       smpi_pending_send_requests[i]      = NULL;
95       smpi_last_pending_send_requests[i] = NULL;
96       smpi_pending_recv_requests[i]      = NULL;
97       smpi_last_pending_recv_requests[i] = NULL;
98       smpi_received[i]                   = NULL;
99       smpi_last_received[i]              = NULL;
100     }
101     smpi_timer                      = xbt_os_timer_new();
102     smpi_reference                  = DEFAULT_POWER;
103     smpi_benchmarking               = 0;
104
105     // tell send/recv nodes to begin
106     for(i = 0; i < size; i++) {
107       mtask = MSG_task_create("READY", 0, 0, NULL);
108       MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
109       mtask = (m_task_t)0;
110       MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
111       MSG_task_destroy(mtask);
112       mtask = MSG_task_create("READY", 0, 0, NULL);
113       MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
114       mtask = (m_task_t)0;
115       MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
116       MSG_task_destroy(mtask);
117     }
118
119     // now everyone else
120     for(i = 1; i < size; i++) {
121       mtask = MSG_task_create("READY", 0, 0, NULL);
122       MSG_task_put(mtask, hosts[i], MPI_PORT);
123     }
124
125   } else {
126     // everyone needs to wait for node 0 to finish
127     mtask = (m_task_t)0;
128     MSG_task_get(&mtask, MPI_PORT);
129     MSG_task_destroy(mtask);
130     smpi_mpi_comm_world.processes[rank] = SIMIX_process_self();
131   }
132
133   // now that mpi_comm_world_processes is set, it's safe to set a barrier
134   smpi_barrier(&smpi_mpi_comm_world);
135 }
136
137 void smpi_mpi_finalize() {
138   int i;
139   smpi_running_hosts--;
140   if (0 <= smpi_running_hosts) {
141     for(i = 0; i < smpi_mpi_comm_world.size; i++) {
142       if(SIMIX_process_is_suspended(smpi_sender_processes[i])) {
143         SIMIX_process_resume(smpi_sender_processes[i]);
144       }
145       if(SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
146         SIMIX_process_resume(smpi_receiver_processes[i]);
147       }
148     }
149   } else {
150     xbt_free(smpi_mpi_comm_world.processes);
151     xbt_free(smpi_pending_send_requests);
152     xbt_free(smpi_last_pending_send_requests);
153     xbt_free(smpi_pending_recv_requests);
154     xbt_free(smpi_last_pending_recv_requests);
155     xbt_free(smpi_received);
156     xbt_free(smpi_last_received);
157     xbt_free(smpi_sender_processes);
158     xbt_free(smpi_receiver_processes);
159     xbt_os_timer_free(smpi_timer);
160   }
161 }
162
163 void smpi_complete(smpi_mpi_request_t *request) {
164   smpi_waitlist_node_t *current, *next;
165   request->completed = 1;
166   request->next      = NULL;
167   current = request->waitlist;
168   while(NULL != current) {
169     if(SIMIX_process_is_suspended(current->process)) {
170       SIMIX_process_resume(current->process);
171     }
172     next = current->next;
173     xbt_free(current);
174     current = next;
175   }
176   request->waitlist  = NULL;
177 }
178
179 int smpi_host_rank_self() {
180   return smpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
181 }
182
183 void smpi_isend(smpi_mpi_request_t *sendreq) {
184   int rank = smpi_host_rank_self();
185   if (NULL == smpi_last_pending_send_requests[rank]) {
186     smpi_pending_send_requests[rank] = sendreq;
187   } else {
188     smpi_last_pending_send_requests[rank]->next = sendreq;
189   }
190   smpi_last_pending_send_requests[rank] = sendreq;
191   if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
192     SIMIX_process_resume(smpi_sender_processes[rank]);
193   }
194 }
195
196 void smpi_match_requests(int rank) {
197   smpi_mpi_request_t *frequest, *prequest, *crequest;
198   smpi_received_t *freceived, *preceived, *creceived;
199   size_t dsize;
200   short int match;
201   frequest  = smpi_pending_recv_requests[rank];
202   prequest  = NULL;
203   crequest  = frequest;
204   while(NULL != crequest) {
205     freceived = smpi_received[rank];
206     preceived = NULL;
207     creceived = freceived;
208     match     = 0;
209     while(NULL != creceived && !match) {
210       if(crequest->comm->id == creceived->commid && 
211         (MPI_ANY_SOURCE == crequest->src || crequest->src == creceived->src) && 
212         crequest->tag == creceived->tag) {
213
214         // we have a match!
215         match = 1;
216
217         // pull the request from the queue
218         if(NULL == prequest) {
219           frequest = crequest->next;
220           smpi_pending_recv_requests[rank] = frequest;
221         } else {
222           prequest->next = crequest->next;
223         }
224         if(crequest == smpi_last_pending_recv_requests[rank]) {
225           smpi_last_pending_recv_requests[rank] = prequest;
226         }
227
228         // pull the received data from the queue
229         if(NULL == preceived) {
230           freceived = creceived->next;
231           smpi_received[rank] = freceived;
232         } else {
233           preceived->next = creceived->next;
234         }
235         if(creceived == smpi_last_received[rank]) {
236           smpi_last_received[rank] = preceived;
237         }
238
239         // for when request->src is any source
240         crequest->src = creceived->src;
241
242         // calculate data size
243         dsize = crequest->count * crequest->datatype->size;
244
245         // copy data to buffer
246         memcpy(crequest->buf, creceived->data, dsize);
247
248         // fwd through
249         crequest->fwdthrough = creceived->fwdthrough;
250
251         // get rid of received data node, no longer needed
252         xbt_free(creceived->data);
253         xbt_free(creceived);
254
255         if (crequest->fwdthrough == rank) {
256           smpi_complete(crequest);
257         } else {
258           crequest->src = rank;
259           crequest->dst = (rank + 1) % crequest->comm->size;
260           smpi_isend(crequest);
261         }
262
263       } else {
264         preceived = creceived;
265         creceived = creceived->next;
266       }
267     }
268     prequest = crequest;
269     crequest = crequest->next;
270   }
271 }
272
273 void smpi_bench_begin() {
274   xbt_assert0(!smpi_benchmarking, "Already benchmarking");
275   smpi_benchmarking = 1;
276   xbt_os_timer_start(smpi_timer);
277   return;
278 }
279
280 void smpi_bench_end() {
281   m_task_t ctask = NULL;
282   double duration;
283   xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
284   smpi_benchmarking = 0;
285   xbt_os_timer_stop(smpi_timer);
286   duration = xbt_os_timer_elapsed(smpi_timer);
287   ctask = MSG_task_create("computation", duration * smpi_reference, 0 , NULL);
288   MSG_task_execute(ctask);
289   MSG_task_destroy(ctask);
290   return;
291 }
292
293 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
294   int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request) {
295   int retval = MPI_SUCCESS;
296   *request = NULL;
297   if (NULL == buf && 0 < count) {
298     retval = MPI_ERR_INTERN;
299   } else if (0 > count) {
300     retval = MPI_ERR_COUNT;
301   } else if (NULL == datatype) {
302     retval = MPI_ERR_TYPE;
303   } else if (NULL == comm) {
304     retval = MPI_ERR_COMM;
305   } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
306     retval = MPI_ERR_RANK;
307   } else if (0 > dst || comm->size <= dst) {
308     retval = MPI_ERR_RANK;
309   } else if (0 > tag) {
310     retval = MPI_ERR_TAG;
311   } else {
312     *request = xbt_malloc(sizeof(smpi_mpi_request_t));
313     (*request)->buf        = buf;
314     (*request)->count      = count;
315     (*request)->datatype   = datatype;
316     (*request)->src        = src;
317     (*request)->dst        = dst;
318     (*request)->tag        = tag;
319     (*request)->comm       = comm;
320     (*request)->completed  = 0;
321     (*request)->fwdthrough = dst;
322     (*request)->waitlist   = NULL;
323     (*request)->next       = NULL;
324   }
325   return retval;
326 }
327
328 void smpi_barrier(smpi_mpi_communicator_t *comm) {
329   int i;
330   comm->barrier++;
331   if(comm->barrier < comm->size) {
332     SIMIX_process_suspend(SIMIX_process_self());
333   } else {
334     comm->barrier = 0;
335     for(i = 0; i < comm->size; i++) {
336       if (SIMIX_process_is_suspended(comm->processes[i])) {
337         SIMIX_process_resume(comm->processes[i]);
338       }
339     }
340   }
341 }
342
343 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host) {
344   int i;
345   for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
346   if (i >= comm->size) i = -1;
347   return i;
348 }
349
350 void smpi_irecv(smpi_mpi_request_t *recvreq) {
351   int rank = smpi_host_rank_self();
352   if (NULL == smpi_pending_recv_requests[rank]) {
353     smpi_pending_recv_requests[rank] = recvreq;
354   } else if (NULL != smpi_last_pending_recv_requests[rank]) {
355     smpi_last_pending_recv_requests[rank]->next = recvreq;
356   } else { // can't happen!
357     fprintf(stderr, "smpi_pending_recv_requests not null while smpi_last_pending_recv_requests null!\n");
358   }
359   smpi_last_pending_recv_requests[rank] = recvreq;
360   smpi_match_requests(rank);
361   if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
362     SIMIX_process_resume(smpi_receiver_processes[rank]);
363   }
364 }
365
366 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) {
367   smpi_waitlist_node_t *waitnode, *current;
368   if (NULL != request) {
369     if (!request->completed) {
370       waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
371       waitnode->process = SIMIX_process_self();
372       waitnode->next    = NULL;
373       if (NULL == request->waitlist) {
374         request->waitlist = waitnode;
375       } else {
376         for(current = request->waitlist; NULL != current->next; current = current->next);
377         current->next = waitnode;
378       }
379       SIMIX_process_suspend(waitnode->process);
380     }
381     if (NULL != status && MPI_STATUS_IGNORE != status) {
382       status->MPI_SOURCE = request->src;
383     }
384   }
385 }
386
387 void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses) {
388   int i;
389   for (i = 0; i < count; i++) {
390     smpi_wait(requests[i], &statuses[i]);
391   }
392 }
393
394 void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests) {
395   int i;
396   for (i = 0; i < count; i++) {
397     smpi_wait(requests[i], MPI_STATUS_IGNORE);
398   }
399 }
400
401 int smpi_sender(int argc, char *argv[]) {
402   smx_process_t process;
403   char taskname[50];
404   size_t dsize;
405   void *data;
406   smx_host_t dhost;
407   m_task_t mtask;
408   int rank, fc, ft;
409   smpi_mpi_request_t *sendreq;
410
411   process = SIMIX_process_self();
412
413   // wait for init
414   mtask = (m_task_t)0;
415   MSG_task_get(&mtask, SEND_SYNC_PORT);
416
417   rank = smpi_host_rank_self();
418
419   smpi_sender_processes[rank] = process;
420
421   // ready!
422   MSG_task_put(mtask, MSG_task_get_source(mtask), SEND_SYNC_PORT);
423
424   while (0 < smpi_running_hosts) {
425     sendreq = smpi_pending_send_requests[rank];
426     if (NULL != sendreq) {
427
428       // pull from queue if not a fwd or no more to fwd
429       if (sendreq->dst == sendreq->fwdthrough) {
430         smpi_pending_send_requests[rank] = sendreq->next;
431         if(sendreq == smpi_last_pending_send_requests[rank]) {
432           smpi_last_pending_send_requests[rank] = NULL;
433         }
434         ft = sendreq->dst;
435       } else {
436         fc = ((sendreq->fwdthrough - sendreq->dst + sendreq->comm->size) % sendreq->comm->size) / 2;
437         ft = (sendreq->dst + fc) % sendreq->comm->size;
438         //printf("node %d sending broadcast to node %d through node %d\n", rank, sendreq->dst, ft);
439       }
440
441       // create task to send
442       sprintf(taskname, "comm:%d,src:%d,dst:%d,tag:%d,ft:%d", sendreq->comm->id, sendreq->src, sendreq->dst, sendreq->tag, ft);
443       dsize = sendreq->count * sendreq->datatype->size;
444       data  = xbt_malloc(dsize);
445       memcpy(data, sendreq->buf, dsize);
446       mtask = MSG_task_create(taskname, 0, dsize, data);
447
448       // figure out which host to send it to
449       dhost = sendreq->comm->hosts[sendreq->dst];
450
451       // send task
452       #ifdef DEBUG
453         printf("host %s attempting to send to host %s\n", SIMIX_host_get_name(SIMIX_host_self()), SIMIX_host_get_name(dhost));
454       #endif
455       MSG_task_put(mtask, dhost, MPI_PORT);
456
457       if (sendreq->dst == sendreq->fwdthrough) {
458         smpi_complete(sendreq);
459       } else {
460         sendreq->dst = (sendreq->dst + fc + 1) % sendreq->comm->size;
461       }
462
463     } else {
464       SIMIX_process_suspend(process);
465     }
466   }
467   return 0;
468 }
469
470 int smpi_receiver(int argc, char **argv) {
471   smx_process_t process;
472   m_task_t mtask;
473   smpi_received_t *received;
474   int rank;
475   smpi_mpi_request_t *recvreq;
476
477   process = SIMIX_process_self();
478
479   // wait for init
480   mtask = (m_task_t)0;
481   MSG_task_get(&mtask, RECV_SYNC_PORT);
482
483   rank = smpi_host_rank_self();
484
485   // potential race condition...
486   smpi_receiver_processes[rank] = process;
487
488   // ready!
489   MSG_task_put(mtask, MSG_task_get_source(mtask), RECV_SYNC_PORT);
490
491   while (0 < smpi_running_hosts) {
492     recvreq = smpi_pending_recv_requests[rank];
493     if (NULL != recvreq) {
494       mtask = (m_task_t)0;
495
496       #ifdef DEBUG
497         printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
498           SIMIX_host_get_name(SIMIX_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
499       #endif
500       MSG_task_get(&mtask, MPI_PORT);
501
502       received = xbt_malloc(sizeof(smpi_received_t));
503
504       sscanf(MSG_task_get_name(mtask), "comm:%d,src:%d,dst:%d,tag:%d,ft:%d",
505         &received->commid, &received->src, &received->dst, &received->tag, &received->fwdthrough);
506       received->data = MSG_task_get_data(mtask);
507       received->next = NULL;
508
509       if (NULL == smpi_last_received[rank]) {
510         smpi_received[rank] = received;
511       } else {
512         smpi_last_received[rank]->next = received;
513       }
514       smpi_last_received[rank] = received;
515
516       MSG_task_destroy(mtask);
517
518       smpi_match_requests(rank);
519
520     } else {
521       SIMIX_process_suspend(process);
522     }
523   }
524   return 0;
525 }
526
527 // FIXME: move into own file
528 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) {
529   double now;
530   int retval = 0;
531   smpi_bench_end();
532   if (NULL == tv) {
533     retval = -1;
534   } else {
535     now = SIMIX_get_clock();
536     tv->tv_sec  = now;
537     tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
538   }
539   smpi_bench_begin();
540   return retval;
541 }
542
543 unsigned int smpi_sleep(unsigned int seconds) {
544   m_task_t task = NULL;
545   smpi_bench_end();
546   task = MSG_task_create("sleep", seconds * DEFAULT_POWER, 0, NULL);
547   MSG_task_execute(task);
548   MSG_task_destroy(task);
549   smpi_bench_begin();
550   return 0;
551 }
552
553 void smpi_exit(int status) {
554   smpi_bench_end();
555   smpi_running_hosts--;
556   SIMIX_process_kill(SIMIX_process_self());
557   return;
558 }