Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
added smpi to cvs repository. still need to do a lot of integration work.
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2 #include <sys/time.h>
3 #include "msg/msg.h"
4 #include "xbt/sysdep.h"
5 #include "xbt/xbt_portability.h"
6 #include "smpi.h"
7
8 smpi_mpi_request_t **smpi_pending_send_requests      = NULL;
9 smpi_mpi_request_t **smpi_last_pending_send_requests = NULL;
10
11 smpi_mpi_request_t **smpi_pending_recv_requests      = NULL;
12 smpi_mpi_request_t **smpi_last_pending_recv_requests = NULL;
13
14 smpi_received_t **smpi_received                      = NULL;
15 smpi_received_t **smpi_last_received                 = NULL;
16
17 m_process_t *smpi_sender_processes                   = NULL;
18 m_process_t *smpi_receiver_processes                 = NULL;
19
20 int smpi_running_hosts = 0;
21
22 smpi_mpi_communicator_t smpi_mpi_comm_world;
23
24 smpi_mpi_status_t smpi_mpi_status_ignore;
25
26 smpi_mpi_datatype_t smpi_mpi_byte;
27 smpi_mpi_datatype_t smpi_mpi_int;
28 smpi_mpi_datatype_t smpi_mpi_double;
29
30 smpi_mpi_op_t smpi_mpi_land;
31 smpi_mpi_op_t smpi_mpi_sum;
32
33 static xbt_os_timer_t smpi_timer;
34 static int smpi_benchmarking;
35 static double smpi_reference;
36
37 void smpi_mpi_land_func(void *x, void *y, void *z) {
38   *(int *)z = *(int *)x && *(int *)y;
39 }
40
41 void smpi_mpi_sum_func(void *x, void *y, void *z) {
42   *(int *)z = *(int *)x + *(int *)y;
43 }
44
45 void smpi_mpi_init() {
46   int i;
47   int size, rank;
48   m_host_t *hosts;
49   m_host_t host;
50   double duration;
51   m_task_t mtask;
52
53   // will eventually need mutex
54   smpi_running_hosts++;
55
56   // initialize some local variables
57   size  = MSG_get_host_number();
58   host  = MSG_host_self();
59   hosts = MSG_get_host_table();
60   for(i = 0; i < size && host != hosts[i]; i++);
61   rank  = i;
62
63   // node 0 sets the globals
64   if (0 == rank) {
65
66     // global communicator
67     smpi_mpi_comm_world.id           = 0;
68     smpi_mpi_comm_world.size         = size;
69     smpi_mpi_comm_world.barrier      = 0;
70     smpi_mpi_comm_world.hosts        = hosts;
71     smpi_mpi_comm_world.processes    = xbt_malloc(sizeof(m_process_t) * size);
72     smpi_mpi_comm_world.processes[0] = MSG_process_self();
73
74     // mpi datatypes
75     smpi_mpi_byte.size               = (size_t)1;
76     smpi_mpi_int.size                = sizeof(int);
77     smpi_mpi_double.size             = sizeof(double);
78
79     // mpi operations
80     smpi_mpi_land.func               = &smpi_mpi_land_func;
81     smpi_mpi_sum.func                = &smpi_mpi_sum_func;
82
83     // smpi globals
84     smpi_pending_send_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
85     smpi_last_pending_send_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
86     smpi_pending_recv_requests       = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
87     smpi_last_pending_recv_requests  = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
88     smpi_received                    = xbt_malloc(sizeof(smpi_received_t*) * size);
89     smpi_last_received               = xbt_malloc(sizeof(smpi_received_t*) * size);
90     smpi_sender_processes            = xbt_malloc(sizeof(m_process_t) * size);
91     smpi_receiver_processes          = xbt_malloc(sizeof(m_process_t) * size);
92     for(i = 0; i < size; i++) {
93       smpi_pending_send_requests[i]      = NULL;
94       smpi_last_pending_send_requests[i] = NULL;
95       smpi_pending_recv_requests[i]      = NULL;
96       smpi_last_pending_recv_requests[i] = NULL;
97       smpi_received[i]                   = NULL;
98       smpi_last_received[i]              = NULL;
99     }
100     smpi_timer                      = xbt_os_timer_new();
101     smpi_reference                  = DEFAULT_POWER;
102     smpi_benchmarking               = 0;
103
104     // tell send/recv nodes to begin
105     for(i = 0; i < size; i++) {
106       mtask = MSG_task_create("READY", 0, 0, NULL);
107       MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
108       mtask = (m_task_t)0;
109       MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
110       MSG_task_destroy(mtask);
111       mtask = MSG_task_create("READY", 0, 0, NULL);
112       MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
113       mtask = (m_task_t)0;
114       MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
115       MSG_task_destroy(mtask);
116     }
117
118     // now everyone else
119     for(i = 1; i < size; i++) {
120       mtask = MSG_task_create("READY", 0, 0, NULL);
121       MSG_task_put(mtask, hosts[i], MPI_PORT);
122     }
123
124   } else {
125     // everyone needs to wait for node 0 to finish
126     mtask = (m_task_t)0;
127     MSG_task_get(&mtask, MPI_PORT);
128     MSG_task_destroy(mtask);
129     smpi_mpi_comm_world.processes[rank] = MSG_process_self();
130   }
131
132   // now that mpi_comm_world_processes is set, it's safe to set a barrier
133   smpi_barrier(&smpi_mpi_comm_world);
134 }
135
136 void smpi_mpi_finalize() {
137   int i;
138   smpi_running_hosts--;
139   if (0 <= smpi_running_hosts) {
140     for(i = 0; i < smpi_mpi_comm_world.size; i++) {
141       if(MSG_process_is_suspended(smpi_sender_processes[i])) {
142         MSG_process_resume(smpi_sender_processes[i]);
143       }
144       if(MSG_process_is_suspended(smpi_receiver_processes[i])) {
145         MSG_process_resume(smpi_receiver_processes[i]);
146       }
147     }
148   } else {
149     xbt_free(smpi_mpi_comm_world.processes);
150     xbt_free(smpi_pending_send_requests);
151     xbt_free(smpi_last_pending_send_requests);
152     xbt_free(smpi_pending_recv_requests);
153     xbt_free(smpi_last_pending_recv_requests);
154     xbt_free(smpi_received);
155     xbt_free(smpi_last_received);
156     xbt_free(smpi_sender_processes);
157     xbt_free(smpi_receiver_processes);
158     xbt_os_timer_free(smpi_timer);
159   }
160 }
161
162 void smpi_complete(smpi_mpi_request_t *request) {
163   smpi_waitlist_node_t *current, *next;
164   request->completed = 1;
165   request->next      = NULL;
166   current = request->waitlist;
167   while(NULL != current) {
168     if(MSG_process_is_suspended(current->process)) {
169       MSG_process_resume(current->process);
170     }
171     next = current->next;
172     xbt_free(current);
173     current = next;
174   }
175   request->waitlist  = NULL;
176 }
177
178 int smpi_host_rank_self() {
179   return smpi_comm_rank(&smpi_mpi_comm_world, MSG_host_self());
180 }
181
182 void smpi_isend(smpi_mpi_request_t *sendreq) {
183   int rank = smpi_host_rank_self();
184   if (NULL == smpi_last_pending_send_requests[rank]) {
185     smpi_pending_send_requests[rank] = sendreq;
186   } else {
187     smpi_last_pending_send_requests[rank]->next = sendreq;
188   }
189   smpi_last_pending_send_requests[rank] = sendreq;
190   if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
191     MSG_process_resume(smpi_sender_processes[rank]);
192   }
193 }
194
195 void smpi_match_requests(int rank) {
196   smpi_mpi_request_t *frequest, *prequest, *crequest;
197   smpi_received_t *freceived, *preceived, *creceived;
198   size_t dsize;
199   short int match;
200   frequest  = smpi_pending_recv_requests[rank];
201   prequest  = NULL;
202   crequest  = frequest;
203   while(NULL != crequest) {
204     freceived = smpi_received[rank];
205     preceived = NULL;
206     creceived = freceived;
207     match     = 0;
208     while(NULL != creceived && !match) {
209       if(crequest->comm->id == creceived->commid && 
210         (MPI_ANY_SOURCE == crequest->src || crequest->src == creceived->src) && 
211         crequest->tag == creceived->tag) {
212
213         // we have a match!
214         match = 1;
215
216         // pull the request from the queue
217         if(NULL == prequest) {
218           frequest = crequest->next;
219           smpi_pending_recv_requests[rank] = frequest;
220         } else {
221           prequest->next = crequest->next;
222         }
223         if(crequest == smpi_last_pending_recv_requests[rank]) {
224           smpi_last_pending_recv_requests[rank] = prequest;
225         }
226
227         // pull the received data from the queue
228         if(NULL == preceived) {
229           freceived = creceived->next;
230           smpi_received[rank] = freceived;
231         } else {
232           preceived->next = creceived->next;
233         }
234         if(creceived == smpi_last_received[rank]) {
235           smpi_last_received[rank] = preceived;
236         }
237
238         // for when request->src is any source
239         crequest->src = creceived->src;
240
241         // calculate data size
242         dsize = crequest->count * crequest->datatype->size;
243
244         // copy data to buffer
245         memcpy(crequest->buf, creceived->data, dsize);
246
247         // fwd through
248         crequest->fwdthrough = creceived->fwdthrough;
249
250         // get rid of received data node, no longer needed
251         xbt_free(creceived->data);
252         xbt_free(creceived);
253
254         if (crequest->fwdthrough == rank) {
255           smpi_complete(crequest);
256         } else {
257           crequest->src = rank;
258           crequest->dst = (rank + 1) % crequest->comm->size;
259           smpi_isend(crequest);
260         }
261
262       } else {
263         preceived = creceived;
264         creceived = creceived->next;
265       }
266     }
267     prequest = crequest;
268     crequest = crequest->next;
269   }
270 }
271
272 void smpi_bench_begin() {
273   xbt_assert0(!smpi_benchmarking, "Already benchmarking");
274   smpi_benchmarking = 1;
275   xbt_os_timer_start(smpi_timer);
276   return;
277 }
278
279 void smpi_bench_end() {
280   m_task_t ctask = NULL;
281   double duration;
282   xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
283   smpi_benchmarking = 0;
284   xbt_os_timer_stop(smpi_timer);
285   duration = xbt_os_timer_elapsed(smpi_timer);
286   ctask = MSG_task_create("computation", duration * smpi_reference, 0 , NULL);
287   MSG_task_execute(ctask);
288   MSG_task_destroy(ctask);
289   return;
290 }
291
292 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
293   int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request) {
294   int retval = MPI_SUCCESS;
295   *request = NULL;
296   if (NULL == buf && 0 < count) {
297     retval = MPI_ERR_INTERN;
298   } else if (0 > count) {
299     retval = MPI_ERR_COUNT;
300   } else if (NULL == datatype) {
301     retval = MPI_ERR_TYPE;
302   } else if (NULL == comm) {
303     retval = MPI_ERR_COMM;
304   } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
305     retval = MPI_ERR_RANK;
306   } else if (0 > dst || comm->size <= dst) {
307     retval = MPI_ERR_RANK;
308   } else if (0 > tag) {
309     retval = MPI_ERR_TAG;
310   } else {
311     *request = xbt_malloc(sizeof(smpi_mpi_request_t));
312     (*request)->buf        = buf;
313     (*request)->count      = count;
314     (*request)->datatype   = datatype;
315     (*request)->src        = src;
316     (*request)->dst        = dst;
317     (*request)->tag        = tag;
318     (*request)->comm       = comm;
319     (*request)->completed  = 0;
320     (*request)->fwdthrough = dst;
321     (*request)->waitlist   = NULL;
322     (*request)->next       = NULL;
323   }
324   return retval;
325 }
326
327 void smpi_barrier(smpi_mpi_communicator_t *comm) {
328   int i;
329   comm->barrier++;
330   if(comm->barrier < comm->size) {
331     MSG_process_suspend(MSG_process_self());
332   } else {
333     comm->barrier = 0;
334     for(i = 0; i < comm->size; i++) {
335       if (MSG_process_is_suspended(comm->processes[i])) {
336         MSG_process_resume(comm->processes[i]);
337       }
338     }
339   }
340 }
341
342 int smpi_comm_rank(smpi_mpi_communicator_t *comm, m_host_t host) {
343   int i;
344   for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
345   if (i >= comm->size) i = -1;
346   return i;
347 }
348
349 void smpi_irecv(smpi_mpi_request_t *recvreq) {
350   int rank = smpi_host_rank_self();
351   if (NULL == smpi_pending_recv_requests[rank]) {
352     smpi_pending_recv_requests[rank] = recvreq;
353   } else if (NULL != smpi_last_pending_recv_requests[rank]) {
354     smpi_last_pending_recv_requests[rank]->next = recvreq;
355   } else { // can't happen!
356     fprintf(stderr, "smpi_pending_recv_requests not null while smpi_last_pending_recv_requests null!\n");
357   }
358   smpi_last_pending_recv_requests[rank] = recvreq;
359   smpi_match_requests(rank);
360   if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
361     MSG_process_resume(smpi_receiver_processes[rank]);
362   }
363 }
364
365 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) {
366   smpi_waitlist_node_t *waitnode, *current;
367   if (NULL != request) {
368     if (!request->completed) {
369       waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
370       waitnode->process = MSG_process_self();
371       waitnode->next    = NULL;
372       if (NULL == request->waitlist) {
373         request->waitlist = waitnode;
374       } else {
375         for(current = request->waitlist; NULL != current->next; current = current->next);
376         current->next = waitnode;
377       }
378       MSG_process_suspend(waitnode->process);
379     }
380     if (NULL != status && MPI_STATUS_IGNORE != status) {
381       status->MPI_SOURCE = request->src;
382     }
383   }
384 }
385
386 void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses) {
387   int i;
388   for (i = 0; i < count; i++) {
389     smpi_wait(requests[i], &statuses[i]);
390   }
391 }
392
393 void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests) {
394   int i;
395   for (i = 0; i < count; i++) {
396     smpi_wait(requests[i], MPI_STATUS_IGNORE);
397   }
398 }
399
400 int smpi_sender(int argc, char *argv[]) {
401   m_process_t process;
402   char taskname[50];
403   size_t dsize;
404   void *data;
405   m_host_t dhost;
406   m_task_t mtask;
407   int rank, fc, ft;
408   smpi_mpi_request_t *sendreq;
409
410   process = MSG_process_self();
411
412   // wait for init
413   mtask = (m_task_t)0;
414   MSG_task_get(&mtask, SEND_SYNC_PORT);
415
416   rank = smpi_host_rank_self();
417
418   smpi_sender_processes[rank] = process;
419
420   // ready!
421   MSG_task_put(mtask, MSG_task_get_source(mtask), SEND_SYNC_PORT);
422
423   while (0 < smpi_running_hosts) {
424     sendreq = smpi_pending_send_requests[rank];
425     if (NULL != sendreq) {
426
427       // pull from queue if not a fwd or no more to fwd
428       if (sendreq->dst == sendreq->fwdthrough) {
429         smpi_pending_send_requests[rank] = sendreq->next;
430         if(sendreq == smpi_last_pending_send_requests[rank]) {
431           smpi_last_pending_send_requests[rank] = NULL;
432         }
433         ft = sendreq->dst;
434       } else {
435         fc = ((sendreq->fwdthrough - sendreq->dst + sendreq->comm->size) % sendreq->comm->size) / 2;
436         ft = (sendreq->dst + fc) % sendreq->comm->size;
437         //printf("node %d sending broadcast to node %d through node %d\n", rank, sendreq->dst, ft);
438       }
439
440       // create task to send
441       sprintf(taskname, "comm:%d,src:%d,dst:%d,tag:%d,ft:%d", sendreq->comm->id, sendreq->src, sendreq->dst, sendreq->tag, ft);
442       dsize = sendreq->count * sendreq->datatype->size;
443       data  = xbt_malloc(dsize);
444       memcpy(data, sendreq->buf, dsize);
445       mtask = MSG_task_create(taskname, 0, dsize, data);
446
447       // figure out which host to send it to
448       dhost = sendreq->comm->hosts[sendreq->dst];
449
450       // send task
451       #ifdef DEBUG
452         printf("host %s attempting to send to host %s\n", MSG_host_get_name(MSG_host_self()), MSG_host_get_name(dhost));
453       #endif
454       MSG_task_put(mtask, dhost, MPI_PORT);
455
456       if (sendreq->dst == sendreq->fwdthrough) {
457         smpi_complete(sendreq);
458       } else {
459         sendreq->dst = (sendreq->dst + fc + 1) % sendreq->comm->size;
460       }
461
462     } else {
463       MSG_process_suspend(process);
464     }
465   }
466   return 0;
467 }
468
469 int smpi_receiver(int argc, char **argv) {
470   m_process_t process;
471   m_task_t mtask;
472   smpi_received_t *received;
473   int rank;
474   smpi_mpi_request_t *recvreq;
475
476   process = MSG_process_self();
477
478   // wait for init
479   mtask = (m_task_t)0;
480   MSG_task_get(&mtask, RECV_SYNC_PORT);
481
482   rank = smpi_host_rank_self();
483
484   // potential race condition...
485   smpi_receiver_processes[rank] = process;
486
487   // ready!
488   MSG_task_put(mtask, MSG_task_get_source(mtask), RECV_SYNC_PORT);
489
490   while (0 < smpi_running_hosts) {
491     recvreq = smpi_pending_recv_requests[rank];
492     if (NULL != recvreq) {
493       mtask = (m_task_t)0;
494
495       #ifdef DEBUG
496         printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
497           MSG_host_get_name(MSG_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
498       #endif
499       MSG_task_get(&mtask, MPI_PORT);
500
501       received = xbt_malloc(sizeof(smpi_received_t));
502
503       sscanf(MSG_task_get_name(mtask), "comm:%d,src:%d,dst:%d,tag:%d,ft:%d",
504         &received->commid, &received->src, &received->dst, &received->tag, &received->fwdthrough);
505       received->data = MSG_task_get_data(mtask);
506       received->next = NULL;
507
508       if (NULL == smpi_last_received[rank]) {
509         smpi_received[rank] = received;
510       } else {
511         smpi_last_received[rank]->next = received;
512       }
513       smpi_last_received[rank] = received;
514
515       MSG_task_destroy(mtask);
516
517       smpi_match_requests(rank);
518
519     } else {
520       MSG_process_suspend(process);
521     }
522   }
523   return 0;
524 }
525
526 // FIXME: move into own file
527 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) {
528   double now;
529   int retval = 0;
530   smpi_bench_end();
531   if (NULL == tv) {
532     retval = -1;
533   } else {
534     now = MSG_get_clock();
535     tv->tv_sec  = now;
536     tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
537   }
538   smpi_bench_begin();
539   return retval;
540 }
541
542 unsigned int smpi_sleep(unsigned int seconds) {
543   m_task_t task = NULL;
544   smpi_bench_end();
545   task = MSG_task_create("sleep", seconds * DEFAULT_POWER, 0, NULL);
546   MSG_task_execute(task);
547   MSG_task_destroy(task);
548   smpi_bench_begin();
549   return 0;
550 }
551
552 void smpi_exit(int status) {
553   smpi_bench_end();
554   smpi_running_hosts--;
555   MSG_process_kill(MSG_process_self());
556   return;
557 }