Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a84f8a561092f50e547d8258dfea68f0a9b14a13
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_os_time.h"
6 #include "xbt/mallocator.h"
7 #include "smpi.h"
8
9 // FIXME: move globals into structure...
10
11 xbt_mallocator_t smpi_request_mallocator      = NULL;
12 xbt_mallocator_t smpi_message_mallocator      = NULL;
13
14 xbt_fifo_t *smpi_pending_send_requests        = NULL;
15 smx_mutex_t *smpi_pending_send_requests_mutex = NULL;
16
17 xbt_fifo_t *smpi_pending_recv_requests        = NULL;
18 smx_mutex_t *smpi_pending_recv_requests_mutex = NULL;
19
20 xbt_fifo_t *smpi_received_messages            = NULL;
21 smx_mutex_t *smpi_received_messages_mutex     = NULL;
22
23 smx_process_t *smpi_sender_processes        = NULL;
24 smx_process_t *smpi_receiver_processes      = NULL;
25
26 int smpi_running_hosts = 0;
27
28 smpi_mpi_communicator_t smpi_mpi_comm_world;
29
30 smpi_mpi_status_t smpi_mpi_status_ignore;
31
32 smpi_mpi_datatype_t smpi_mpi_byte;
33 smpi_mpi_datatype_t smpi_mpi_int;
34 smpi_mpi_datatype_t smpi_mpi_double;
35
36 smpi_mpi_op_t smpi_mpi_land;
37 smpi_mpi_op_t smpi_mpi_sum;
38
39 static xbt_os_timer_t smpi_timer;
40 static int smpi_benchmarking;
41 static double smpi_reference_speed;
42
43 // mutexes
44 smx_mutex_t smpi_running_hosts_mutex = NULL;
45 smx_mutex_t smpi_benchmarking_mutex  = NULL;
46 smx_mutex_t init_mutex = NULL;
47 smx_cond_t init_cond  = NULL;
48
49 int smpi_root_ready = 0;
50 int smpi_ready_count = 0;
51
52 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
53
54 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) 
55 {
56         return comm->size;
57 }
58
59 // FIXME: smarter algorithm?
60 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
61 {
62         int i;
63
64         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
65
66         return i;
67 }
68
69 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
70 {
71         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
72 }
73
74 int inline smpi_mpi_comm_world_rank_self()
75 {
76         return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
77 }
78
79 int smpi_sender(int argc, char **argv)
80 {
81         smx_process_t self;
82         smx_host_t shost;
83         int rank;
84         xbt_fifo_t request_queue;
85         smx_mutex_t request_queue_mutex;
86         int size;
87         int running_hosts = 0;
88         smpi_mpi_request_t *request;
89         smx_host_t dhost;
90         smx_action_t communicate_action;
91         smpi_received_message_t *scratch;
92         int drank;
93         smx_process_t waitproc;
94
95         self  = SIMIX_process_self();
96         shost = SIMIX_host_self();
97         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
98
99         // make sure root is done before own initialization
100         SIMIX_mutex_lock(init_mutex);
101         if (!smpi_root_ready) {
102                 SIMIX_cond_wait(init_cond, init_mutex);
103         }
104         SIMIX_mutex_unlock(init_mutex);
105
106         request_queue       = smpi_pending_send_requests[rank];
107         request_queue_mutex = smpi_pending_send_requests_mutex[rank];
108
109         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
110
111         smpi_sender_processes[rank] = self;
112
113         // wait for all nodes to signal initializatin complete
114         SIMIX_mutex_lock(init_mutex);
115         smpi_ready_count++;
116         if (smpi_ready_count < 3 * size) {
117                 SIMIX_cond_wait(init_cond, init_mutex);
118         } else {
119                 SIMIX_cond_broadcast(init_cond);
120         }
121         SIMIX_mutex_unlock(init_mutex);
122
123         SIMIX_mutex_lock(smpi_running_hosts_mutex);
124         running_hosts = smpi_running_hosts;
125         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
126
127         while (0 < running_hosts) {
128
129                 SIMIX_mutex_lock(request_queue_mutex);
130                 request = xbt_fifo_shift(request_queue);
131                 SIMIX_mutex_unlock(request_queue_mutex);
132
133                 if (NULL == request) {
134                         SIMIX_process_suspend(self);
135                 } else {
136                         SIMIX_mutex_lock(request->mutex);
137
138                         dhost = request->comm->hosts[request->dst];
139
140                         // FIXME: not at all sure I can assume magic just happens here....
141                         communicate_action = SIMIX_action_communicate(shost, dhost,
142                                 "communication", request->datatype->size * request->count * 1.0, -1.0);
143
144                         SIMIX_register_condition_to_action(communicate_action, request->cond);
145                         SIMIX_register_action_to_condition(communicate_action, request->cond);
146
147                         SIMIX_cond_wait(request->cond, request->mutex);
148
149                         // copy request to appropriate received queue
150                         scratch = xbt_mallocator_get(smpi_message_mallocator);
151                         scratch->comm = request->comm;
152                         scratch->src  = request->src;
153                         scratch->dst  = request->dst;
154                         scratch->tag  = request->tag;
155                         scratch->buf  = xbt_malloc(request->datatype->size * request->count);
156                         memcpy(scratch->buf, request->buf, request->datatype->size * request->count);
157                         drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
158                         SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
159                         xbt_fifo_push(smpi_received_messages[drank], scratch);
160                         SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
161
162                         request->completed = 1;
163
164                         // wake up receiver, then any waiting sender
165                         waitproc = smpi_receiver_processes[drank];
166
167                         do {
168                                 if (SIMIX_process_is_suspended(waitproc)) {
169                                         SIMIX_process_resume(waitproc);
170                                 }
171                         } while(waitproc = xbt_fifo_shift(request->waitlist));
172
173                         SIMIX_mutex_unlock(request->mutex);
174                 }
175
176                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
177                 running_hosts = smpi_running_hosts;
178                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
179         }
180
181         SIMIX_mutex_lock(init_mutex);
182         smpi_ready_count--;
183         if (smpi_ready_count <= 0) {
184                 SIMIX_cond_broadcast(init_cond);
185         }
186         SIMIX_mutex_unlock(init_mutex);
187
188         return 0;
189 }
190
191 int smpi_receiver(int argc, char **argv)
192 {
193         smx_process_t self;
194         int rank;
195         xbt_fifo_t request_queue;
196         smx_mutex_t request_queue_mutex;
197         xbt_fifo_t message_queue;
198         smx_mutex_t message_queue_mutex;
199         int size;
200         int running_hosts;
201         xbt_fifo_item_t request_item, message_item;
202         smpi_mpi_request_t *request;
203         smpi_received_message_t *message;
204         smx_process_t waitproc;
205
206         self  = SIMIX_process_self();
207         rank  = smpi_mpi_comm_world_rank_self();
208
209         // make sure root is done before own initialization
210         SIMIX_mutex_lock(init_mutex);
211         if (!smpi_root_ready) {
212                 SIMIX_cond_wait(init_cond, init_mutex);
213         }
214         SIMIX_mutex_unlock(init_mutex);
215
216         request_queue       = smpi_pending_recv_requests[rank];
217         request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
218
219         message_queue       = smpi_received_messages[rank];
220         message_queue_mutex = smpi_received_messages_mutex[rank];
221
222         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
223         smpi_receiver_processes[rank] = self;
224
225         // wait for all nodes to signal initializatin complete
226         SIMIX_mutex_lock(init_mutex);
227         smpi_ready_count++;
228         if (smpi_ready_count < 3 * size) {
229                 SIMIX_cond_wait(init_cond, init_mutex);
230         } else {
231                 SIMIX_cond_broadcast(init_cond);
232         }
233         SIMIX_mutex_unlock(init_mutex);
234
235         SIMIX_mutex_lock(smpi_running_hosts_mutex);
236         running_hosts = smpi_running_hosts;
237         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
238
239         while (0 < running_hosts) {
240
241                 request = NULL;
242                 message = NULL;
243
244                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
245
246                 // FIXME: not the best way to request multiple locks...
247                 SIMIX_mutex_lock(request_queue_mutex);
248                 SIMIX_mutex_lock(message_queue_mutex);
249                 for (request_item = xbt_fifo_get_first_item(request_queue);
250                         NULL != request_item;
251                         request_item = xbt_fifo_get_next_item(request_item)) {
252                         request = xbt_fifo_get_item_content(request_item);
253                         for (message_item = xbt_fifo_get_first_item(message_queue);
254                                 NULL != message_item;
255                                 message_item = xbt_fifo_get_next_item(message_item)) {
256                                 message = xbt_fifo_get_item_content(message_item);
257                                 if (request->comm == message->comm &&
258                                                 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
259                                                 request->tag == message->tag) {
260                                         xbt_fifo_remove_item(request_queue, request_item);
261                                         xbt_fifo_remove_item(message_queue, message_item);
262                                         goto stopsearch;
263                                 }
264                         }
265                 }
266 stopsearch:
267                 SIMIX_mutex_unlock(message_queue_mutex);
268                 SIMIX_mutex_unlock(request_queue_mutex);
269
270                 if (NULL == request || NULL == message) {
271                         SIMIX_process_suspend(self);
272                 } else {
273                         SIMIX_mutex_lock(request->mutex);
274                         memcpy(request->buf, message->buf, request->count * request->datatype->size);
275                         request->src = message->src;
276                         request->completed = 1;
277
278                         while (waitproc = xbt_fifo_shift(request->waitlist)) {
279                                 if (SIMIX_process_is_suspended(waitproc)) {
280                                         SIMIX_process_resume(waitproc);
281                                 }
282                         }
283                         SIMIX_mutex_unlock(request->mutex);
284
285                         xbt_mallocator_release(smpi_message_mallocator, message);
286                 }
287
288                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
289                 running_hosts = smpi_running_hosts;
290                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
291         }
292
293         SIMIX_mutex_lock(init_mutex);
294         smpi_ready_count--;
295         if (smpi_ready_count <= 0) {
296                 SIMIX_cond_broadcast(init_cond);
297         }
298         SIMIX_mutex_unlock(init_mutex);
299
300         return 0;
301 }
302
303 int smpi_run_simulation(int argc, char **argv)
304 {
305         smx_cond_t   cond           = NULL;
306         smx_action_t action         = NULL;
307
308         xbt_fifo_t   actions_failed = xbt_fifo_new();
309         xbt_fifo_t   actions_done   = xbt_fifo_new();
310
311         srand(SMPI_RAND_SEED);
312
313         SIMIX_global_init(&argc, argv);
314
315         init_mutex = SIMIX_mutex_init();
316         init_cond  = SIMIX_cond_init();
317
318         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
319         SIMIX_function_register("smpi_sender",         smpi_sender);
320         SIMIX_function_register("smpi_receiver",       smpi_receiver);
321         SIMIX_create_environment(argv[1]);
322         SIMIX_launch_application(argv[2]);
323
324         /* Prepare to display some more info when dying on Ctrl-C pressing */
325         //signal(SIGINT, inthandler);
326
327         /* Clean IO before the run */
328         fflush(stdout);
329         fflush(stderr);
330
331         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
332                 while (action = xbt_fifo_pop(actions_failed)) {
333                         DEBUG1("** %s failed **", action->name);
334                         while (cond = xbt_fifo_pop(action->cond_list)) {
335                                 SIMIX_cond_broadcast(cond);
336                         }
337                         SIMIX_action_destroy(action);
338                 }
339                 while (action = xbt_fifo_pop(actions_done)) {
340                         DEBUG1("** %s done **",action->name);
341                         while (cond = xbt_fifo_pop(action->cond_list)) {
342                                 SIMIX_cond_broadcast(cond);
343                         }
344                         SIMIX_action_destroy(action);
345                 }
346         }
347         xbt_fifo_free(actions_failed);
348         xbt_fifo_free(actions_done);
349         INFO1("simulation time %g", SIMIX_get_clock());
350         SIMIX_clean();
351         return 0;
352 }
353
354 void smpi_mpi_land_func(void *x, void *y, void *z)
355 {
356         *(int *)z = *(int *)x && *(int *)y;
357 }
358
359 void smpi_mpi_sum_func(void *x, void *y, void *z)
360 {
361         *(int *)z = *(int *)x + *(int *)y;
362 }
363
364 void *smpi_new_request()
365 {
366         return xbt_new(smpi_mpi_request_t, 1);
367 }
368
369 void smpi_free_request(void *pointer) {
370         smpi_mpi_request_t *request = pointer;
371         if (NULL != request) {
372                 xbt_fifo_free(request->waitlist);
373                 xbt_free(request);
374         }
375 }
376
377 void *smpi_new_message()
378 {
379         return xbt_new(smpi_received_message_t, 1);
380 }
381
382 void smpi_do_nothing(void *pointer)
383 {
384         return;
385 }
386
387 void smpi_mpi_init()
388 {
389         int i;
390         int size;
391         smx_process_t process;
392         smx_host_t *hosts;
393         smx_host_t host;
394         double duration;
395
396         // initialize some local variables
397         host  = SIMIX_host_self();
398         hosts = SIMIX_host_get_table();
399         size  = SIMIX_host_get_number();
400
401         // node 0 sets the globals
402         if (host == hosts[0]) {
403
404                 // processes
405                 smpi_sender_processes             = xbt_new(smx_process_t, size);
406                 smpi_receiver_processes           = xbt_new(smx_process_t, size);
407
408                 // running hosts
409                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
410                 smpi_running_hosts                = size;
411
412                 // global communicator
413                 smpi_mpi_comm_world.size          = size;
414                 smpi_mpi_comm_world.barrier       = 0;
415                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
416                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
417                 smpi_mpi_comm_world.hosts         = hosts;
418                 smpi_mpi_comm_world.processes     = xbt_new(smx_process_t, size);
419                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
420
421                 // mpi datatypes
422                 smpi_mpi_byte.size                = (size_t)1;
423                 smpi_mpi_int.size                 = sizeof(int);
424                 smpi_mpi_double.size              = sizeof(double);
425
426                 // mpi operations
427                 smpi_mpi_land.func                = &smpi_mpi_land_func;
428                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
429
430                 // smpi globals
431                 smpi_request_mallocator           = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, smpi_free_request, smpi_do_nothing);
432                 smpi_message_mallocator           = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, smpi_do_nothing);
433                 smpi_pending_send_requests        = xbt_new(xbt_fifo_t,  size);
434                 smpi_pending_send_requests_mutex  = xbt_new(smx_mutex_t, size);
435                 smpi_pending_recv_requests        = xbt_new(xbt_fifo_t,  size);
436                 smpi_pending_recv_requests_mutex  = xbt_new(smx_mutex_t, size);
437                 smpi_received_messages            = xbt_new(xbt_fifo_t,  size);
438                 smpi_received_messages_mutex      = xbt_new(smx_mutex_t, size);
439
440                 for(i = 0; i < size; i++) {
441                         smpi_pending_send_requests[i]       = xbt_fifo_new();
442                         smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init();
443                         smpi_pending_recv_requests[i]       = xbt_fifo_new();
444                         smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init();
445                         smpi_received_messages[i]           = xbt_fifo_new();
446                         smpi_received_messages_mutex[i]     = SIMIX_mutex_init();
447                 }
448
449                 smpi_timer                      = xbt_os_timer_new();
450                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
451                 smpi_benchmarking               = 0;
452                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
453
454                 // signal all nodes to perform initialization
455                 SIMIX_mutex_lock(init_mutex);
456                 smpi_root_ready = 1;
457                 SIMIX_cond_broadcast(init_cond);
458                 SIMIX_mutex_unlock(init_mutex);
459
460         } else {
461
462                 // make sure root is done before own initialization
463                 SIMIX_mutex_lock(init_mutex);
464                 if (!smpi_root_ready) {
465                         SIMIX_cond_wait(init_cond, init_mutex);
466                 }
467                 SIMIX_mutex_unlock(init_mutex);
468
469                 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
470
471         }
472
473         // wait for all nodes to signal initializatin complete
474         SIMIX_mutex_lock(init_mutex);
475         smpi_ready_count++;
476         if (smpi_ready_count < 3 * size) {
477                 SIMIX_cond_wait(init_cond, init_mutex);
478         } else {
479                 SIMIX_cond_broadcast(init_cond);
480         }
481         SIMIX_mutex_unlock(init_mutex);
482
483 }
484
485 void smpi_mpi_finalize()
486 {
487         int i;
488
489         SIMIX_mutex_lock(smpi_running_hosts_mutex);
490         i = --smpi_running_hosts;
491         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
492
493         SIMIX_mutex_lock(init_mutex);
494         smpi_ready_count--;
495         SIMIX_mutex_unlock(init_mutex);
496
497         if (0 >= i) {
498
499                 // wake up senders/receivers
500                 for (i = 0; i < smpi_mpi_comm_world.size; i++) {
501                         if (SIMIX_process_is_suspended(smpi_sender_processes[i])) {
502                                 SIMIX_process_resume(smpi_sender_processes[i]);
503                         }
504                         if (SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
505                                 SIMIX_process_resume(smpi_receiver_processes[i]);
506                         }
507                 }
508
509                 // wait for senders/receivers to exit...
510                 SIMIX_mutex_lock(init_mutex);
511                 if (smpi_ready_count > 0) {
512                         SIMIX_cond_wait(init_cond, init_mutex);
513                 }
514                 SIMIX_mutex_unlock(init_mutex);
515
516                 SIMIX_mutex_destroy(init_mutex);
517                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
518
519                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
520                         xbt_fifo_free(smpi_pending_send_requests[i]);
521                         SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]);
522                         xbt_fifo_free(smpi_pending_recv_requests[i]);
523                         SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]);
524                         xbt_fifo_free(smpi_received_messages[i]);
525                         SIMIX_mutex_destroy(smpi_received_messages_mutex[i]);
526                 }
527
528                 xbt_mallocator_free(smpi_request_mallocator);
529                 xbt_mallocator_free(smpi_message_mallocator);
530                 xbt_free(smpi_pending_send_requests);
531                 xbt_free(smpi_pending_send_requests_mutex);
532                 xbt_free(smpi_pending_recv_requests);
533                 xbt_free(smpi_pending_recv_requests_mutex);
534                 xbt_free(smpi_received_messages);
535                 xbt_free(smpi_received_messages_mutex);
536
537                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
538                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
539                 xbt_free(smpi_mpi_comm_world.processes);
540
541                 xbt_os_timer_free(smpi_timer);
542         }
543
544 }
545
546 void smpi_bench_begin()
547 {
548         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
549         smpi_benchmarking = 1;
550         xbt_os_timer_start(smpi_timer);
551         return;
552 }
553
554 void smpi_bench_end()
555 {
556         double duration;
557         smx_host_t host;
558         smx_action_t compute_action;
559         smx_mutex_t mutex;
560         smx_cond_t cond;
561
562         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
563         smpi_benchmarking = 0;
564         xbt_os_timer_stop(smpi_timer);
565         duration = xbt_os_timer_elapsed(smpi_timer);
566         host           = SIMIX_host_self();
567         compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
568         mutex          = SIMIX_mutex_init();
569         cond           = SIMIX_cond_init();
570         SIMIX_mutex_lock(mutex);
571         SIMIX_register_condition_to_action(compute_action, cond);
572         SIMIX_register_action_to_condition(compute_action, cond);
573         SIMIX_cond_wait(cond, mutex);
574         SIMIX_mutex_unlock(mutex);
575         SIMIX_mutex_destroy(mutex);
576         SIMIX_cond_destroy(cond);
577         // FIXME: check for success/failure?
578         return;
579 }
580
581 void smpi_barrier(smpi_mpi_communicator_t *comm) {
582         int i;
583         SIMIX_mutex_lock(comm->barrier_mutex);
584         comm->barrier++;
585         if(i < comm->size) {
586                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
587         } else {
588                 comm->barrier = 0;
589                 SIMIX_cond_broadcast(comm->barrier_cond);
590         }
591         SIMIX_mutex_unlock(comm->barrier_mutex);
592 }
593
594 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
595 {
596         int i;
597         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
598         if (i >= comm->size) i = -1;
599         return i;
600 }
601
602 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
603         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
604 {
605         int retval = MPI_SUCCESS;
606
607         *request = NULL;
608
609         if (0 > count) {
610                 retval = MPI_ERR_COUNT;
611         } else if (NULL == buf) {
612                 retval = MPI_ERR_INTERN;
613         } else if (NULL == datatype) {
614                 retval = MPI_ERR_TYPE;
615         } else if (NULL == comm) {
616                 retval = MPI_ERR_COMM;
617         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
618                 retval = MPI_ERR_RANK;
619         } else if (0 > dst || comm->size <= dst) {
620                 retval = MPI_ERR_RANK;
621         } else if (0 > tag) {
622                 retval = MPI_ERR_TAG;
623         } else {
624                 *request = xbt_mallocator_get(smpi_request_mallocator);
625                 (*request)->comm       = comm;
626                 (*request)->src        = src;
627                 (*request)->dst        = dst;
628                 (*request)->tag        = tag;
629                 (*request)->buf        = buf;
630                 (*request)->count      = count;
631                 (*request)->datatype   = datatype;
632                 (*request)->completed  = 0;
633                 (*request)->mutex      = SIMIX_mutex_init();
634                 (*request)->cond       = SIMIX_cond_init();
635                 (*request)->waitlist   = xbt_fifo_new();
636         }
637         return retval;
638 }
639
640 int smpi_isend(smpi_mpi_request_t *request)
641 {
642         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
643
644         SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
645         xbt_fifo_push(smpi_pending_send_requests[rank], request);
646         SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
647
648         if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
649                 SIMIX_process_resume(smpi_sender_processes[rank]);
650         }
651 }
652
653 int smpi_irecv(smpi_mpi_request_t *request)
654 {
655         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
656
657         SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
658         xbt_fifo_push(smpi_pending_recv_requests[rank], request);
659         SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
660
661         if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
662                 SIMIX_process_resume(smpi_receiver_processes[rank]);
663         }
664 }
665
666 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
667 {
668         smx_process_t self;
669         int suspend = 0;
670         self = SIMIX_process_self();
671
672         if (NULL != request) {
673                 SIMIX_mutex_lock(request->mutex);
674                 if (!request->completed) {
675                         xbt_fifo_push(request->waitlist, self);
676                         suspend = 1;
677                 }
678                 SIMIX_mutex_unlock(request->mutex);
679                 if (suspend) {
680                         SIMIX_process_suspend(self);
681                 }
682                 if (NULL != status && MPI_STATUS_IGNORE != status) {
683                         SIMIX_mutex_lock(request->mutex);
684                         status->MPI_SOURCE = request->src;
685                         SIMIX_mutex_unlock(request->mutex);
686                 }
687         }
688 }
689
690 // FIXME: move into own file
691 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
692 {
693         double now;
694         int retval = 0;
695         smpi_bench_end();
696         if (NULL == tv) {
697                 retval = -1;
698         } else {
699                 now = SIMIX_get_clock();
700                 tv->tv_sec  = now;
701                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
702         }
703         smpi_bench_begin();
704         return retval;
705 }
706
707 unsigned int smpi_sleep(unsigned int seconds)
708 {
709         smx_mutex_t mutex;
710         smx_cond_t cond;
711         smx_host_t host;
712         smx_action_t sleep_action;
713
714         smpi_bench_end();
715         host         = SIMIX_host_self();
716         sleep_action = SIMIX_action_sleep(host, seconds);
717         mutex        = SIMIX_mutex_init();
718         cond         = SIMIX_cond_init();
719         SIMIX_mutex_lock(mutex);
720         SIMIX_register_condition_to_action(sleep_action, cond);
721         SIMIX_register_action_to_condition(sleep_action, cond);
722         SIMIX_cond_wait(cond, mutex);
723         SIMIX_mutex_unlock(mutex);
724         SIMIX_mutex_destroy(mutex);
725         SIMIX_cond_destroy(cond);
726         // FIXME: check for success/failure?
727         smpi_bench_begin();
728         return 0;
729 }
730
731 void smpi_exit(int status)
732 {
733         smpi_bench_end();
734         SIMIX_mutex_lock(smpi_running_hosts_mutex);
735         smpi_running_hosts--;
736         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
737         SIMIX_process_kill(SIMIX_process_self());
738         return;
739 }