Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
everything compiles and basic program runs again, but still getting a pointer
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_os_time.h"
6 #include "xbt/mallocator.h"
7 #include "smpi.h"
8
9 // FIXME: move globals into structure...
10
11 xbt_mallocator_t smpi_request_mallocator      = NULL;
12 xbt_mallocator_t smpi_message_mallocator      = NULL;
13
14 xbt_fifo_t *smpi_pending_send_requests        = NULL;
15 smx_mutex_t *smpi_pending_send_requests_mutex = NULL;
16
17 xbt_fifo_t *smpi_pending_recv_requests        = NULL;
18 smx_mutex_t *smpi_pending_recv_requests_mutex = NULL;
19
20 xbt_fifo_t *smpi_received_messages            = NULL;
21 smx_mutex_t *smpi_received_messages_mutex     = NULL;
22
23 smx_process_t *smpi_sender_processes        = NULL;
24 smx_process_t *smpi_receiver_processes      = NULL;
25
26 int smpi_running_hosts = 0;
27
28 smpi_mpi_communicator_t smpi_mpi_comm_world;
29
30 smpi_mpi_status_t smpi_mpi_status_ignore;
31
32 smpi_mpi_datatype_t smpi_mpi_byte;
33 smpi_mpi_datatype_t smpi_mpi_int;
34 smpi_mpi_datatype_t smpi_mpi_double;
35
36 smpi_mpi_op_t smpi_mpi_land;
37 smpi_mpi_op_t smpi_mpi_sum;
38
39 static xbt_os_timer_t smpi_timer;
40 static int smpi_benchmarking;
41 static double smpi_reference_speed;
42
43 // mutexes
44 smx_mutex_t smpi_running_hosts_mutex = NULL;
45 smx_mutex_t smpi_benchmarking_mutex  = NULL;
46 smx_mutex_t init_mutex = NULL;
47 smx_cond_t init_cond  = NULL;
48
49 int smpi_root_ready = 0;
50 int smpi_ready_count = 0;
51
52 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
53
54 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) 
55 {
56         return comm->size;
57 }
58
59 // FIXME: smarter algorithm?
60 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
61 {
62         int i;
63
64         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
65
66         return i;
67 }
68
69 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
70 {
71         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
72 }
73
74 int inline smpi_mpi_comm_world_rank_self()
75 {
76         return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
77 }
78
79 int smpi_sender(int argc, char **argv)
80 {
81         smx_process_t self;
82         smx_host_t shost;
83         int rank;
84         xbt_fifo_t request_queue;
85         smx_mutex_t request_queue_mutex;
86         int size;
87         int running_hosts = 0;
88         smpi_mpi_request_t *request;
89         smx_host_t dhost;
90         smx_action_t communicate_action;
91         smpi_received_message_t *scratch;
92         int drank;
93         smx_process_t waitproc;
94
95         self  = SIMIX_process_self();
96         shost = SIMIX_host_self();
97         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
98
99         // make sure root is done before own initialization
100         SIMIX_mutex_lock(init_mutex);
101         if (!smpi_root_ready) {
102                 SIMIX_cond_wait(init_cond, init_mutex);
103         }
104         SIMIX_mutex_unlock(init_mutex);
105
106         request_queue       = smpi_pending_send_requests[rank];
107         request_queue_mutex = smpi_pending_send_requests_mutex[rank];
108
109         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
110
111         smpi_sender_processes[rank] = self;
112
113         // wait for all nodes to signal initializatin complete
114         SIMIX_mutex_lock(init_mutex);
115         smpi_ready_count++;
116         if (smpi_ready_count < 3 * size) {
117                 SIMIX_cond_wait(init_cond, init_mutex);
118         } else {
119                 SIMIX_cond_broadcast(init_cond);
120         }
121         SIMIX_mutex_unlock(init_mutex);
122
123         SIMIX_mutex_lock(smpi_running_hosts_mutex);
124         running_hosts = smpi_running_hosts;
125         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
126
127         while (0 < running_hosts) {
128
129                 SIMIX_mutex_lock(request_queue_mutex);
130                 request = xbt_fifo_shift(request_queue);
131                 SIMIX_mutex_unlock(request_queue_mutex);
132
133                 if (NULL == request) {
134                         SIMIX_process_suspend(self);
135                 } else {
136                         SIMIX_mutex_lock(request->mutex);
137
138                         dhost = request->comm->hosts[request->dst];
139
140                         // FIXME: not at all sure I can assume magic just happens here....
141                         communicate_action = SIMIX_action_communicate(shost, dhost,
142                                 "communication", request->datatype->size * request->count * 1.0, -1.0);
143
144                         SIMIX_register_condition_to_action(communicate_action, request->cond);
145                         SIMIX_register_action_to_condition(communicate_action, request->cond);
146
147                         SIMIX_cond_wait(request->cond, request->mutex);
148
149                         // copy request to appropriate received queue
150                         scratch = xbt_mallocator_get(smpi_message_mallocator);
151                         scratch->comm = request->comm;
152                         scratch->src  = request->src;
153                         scratch->dst  = request->dst;
154                         scratch->tag  = request->tag;
155                         scratch->buf  = request->buf;
156                         drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
157                         SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
158                         xbt_fifo_push(smpi_received_messages[drank], scratch);
159                         SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
160
161                         request->completed = 1;
162
163                         // wake up receiver, then any waiting sender
164                         waitproc = smpi_receiver_processes[drank];
165
166                         do {
167                                 if (SIMIX_process_is_suspended(waitproc)) {
168                                         SIMIX_process_resume(waitproc);
169                                 }
170                         } while(waitproc = xbt_fifo_shift(request->waitlist));
171
172                         SIMIX_mutex_unlock(request->mutex);
173                 }
174
175                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
176                 running_hosts = smpi_running_hosts;
177                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
178         }
179
180         SIMIX_mutex_lock(init_mutex);
181         smpi_ready_count--;
182         if (smpi_ready_count <= 0) {
183                 SIMIX_cond_broadcast(init_cond);
184         }
185         SIMIX_mutex_unlock(init_mutex);
186
187         return 0;
188 }
189
190 int smpi_receiver(int argc, char **argv)
191 {
192         smx_process_t self;
193         int rank;
194         xbt_fifo_t request_queue;
195         smx_mutex_t request_queue_mutex;
196         xbt_fifo_t message_queue;
197         smx_mutex_t message_queue_mutex;
198         int size;
199         int running_hosts;
200         xbt_fifo_item_t request_item, message_item;
201         smpi_mpi_request_t *request;
202         smpi_received_message_t *message;
203         smx_process_t waitproc;
204
205         self  = SIMIX_process_self();
206         rank  = smpi_mpi_comm_world_rank_self();
207
208         // make sure root is done before own initialization
209         SIMIX_mutex_lock(init_mutex);
210         if (!smpi_root_ready) {
211                 SIMIX_cond_wait(init_cond, init_mutex);
212         }
213         SIMIX_mutex_unlock(init_mutex);
214
215         request_queue       = smpi_pending_recv_requests[rank];
216         request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
217
218         message_queue       = smpi_received_messages[rank];
219         message_queue_mutex = smpi_received_messages_mutex[rank];
220
221         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
222         smpi_receiver_processes[rank] = self;
223
224         // wait for all nodes to signal initializatin complete
225         SIMIX_mutex_lock(init_mutex);
226         smpi_ready_count++;
227         if (smpi_ready_count < 3 * size) {
228                 SIMIX_cond_wait(init_cond, init_mutex);
229         } else {
230                 SIMIX_cond_broadcast(init_cond);
231         }
232         SIMIX_mutex_unlock(init_mutex);
233
234         SIMIX_mutex_lock(smpi_running_hosts_mutex);
235         running_hosts = smpi_running_hosts;
236         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
237
238         while (0 < running_hosts) {
239
240                 request = NULL;
241                 message = NULL;
242
243                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
244
245                 // FIXME: not the best way to request multiple locks...
246                 SIMIX_mutex_lock(request_queue_mutex);
247                 SIMIX_mutex_lock(message_queue_mutex);
248                 for (request_item = xbt_fifo_get_first_item(request_queue);
249                         NULL != request_item;
250                         request_item = xbt_fifo_get_next_item(request_item)) {
251                         request = xbt_fifo_get_item_content(request_item);
252                         for (message_item = xbt_fifo_get_first_item(message_queue);
253                                 NULL != message_item;
254                                 message_item = xbt_fifo_get_next_item(message_item)) {
255                                 message = xbt_fifo_get_item_content(message_item);
256                                 if (request->comm == message->comm &&
257                                                 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
258                                                 request->tag == message->tag) {
259                                         xbt_fifo_remove_item(request_queue, request_item);
260                                         xbt_fifo_remove_item(message_queue, message_item);
261                                         goto stopsearch;
262                                 }
263                         }
264                 }
265 stopsearch:
266                 SIMIX_mutex_unlock(message_queue_mutex);
267                 SIMIX_mutex_unlock(request_queue_mutex);
268
269                 if (NULL == request || NULL == message) {
270                         SIMIX_process_suspend(self);
271                 } else {
272                         SIMIX_mutex_lock(request->mutex);
273                         memcpy(request->buf, message->buf, request->count * request->datatype->size);
274                         request->src = message->src;
275                         request->completed = 1;
276
277                         while (waitproc = xbt_fifo_shift(request->waitlist)) {
278                                 if (SIMIX_process_is_suspended(waitproc)) {
279                                         SIMIX_process_resume(waitproc);
280                                 }
281                         }
282                         SIMIX_mutex_unlock(request->mutex);
283
284                         xbt_mallocator_release(smpi_message_mallocator, message);
285                 }
286
287                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
288                 running_hosts = smpi_running_hosts;
289                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
290         }
291
292         SIMIX_mutex_lock(init_mutex);
293         smpi_ready_count--;
294         if (smpi_ready_count <= 0) {
295                 SIMIX_cond_broadcast(init_cond);
296         }
297         SIMIX_mutex_unlock(init_mutex);
298
299         return 0;
300 }
301
302 int smpi_run_simulation(int argc, char **argv)
303 {
304         smx_cond_t   cond           = NULL;
305         smx_action_t action         = NULL;
306
307         xbt_fifo_t   actions_failed = xbt_fifo_new();
308         xbt_fifo_t   actions_done   = xbt_fifo_new();
309
310         srand(SMPI_RAND_SEED);
311
312         SIMIX_global_init(&argc, argv);
313
314         init_mutex = SIMIX_mutex_init();
315         init_cond  = SIMIX_cond_init();
316
317         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
318         SIMIX_function_register("smpi_sender",         smpi_sender);
319         SIMIX_function_register("smpi_receiver",       smpi_receiver);
320         SIMIX_create_environment(argv[1]);
321         SIMIX_launch_application(argv[2]);
322
323         /* Prepare to display some more info when dying on Ctrl-C pressing */
324         //signal(SIGINT, inthandler);
325
326         /* Clean IO before the run */
327         fflush(stdout);
328         fflush(stderr);
329
330         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
331                 while (action = xbt_fifo_pop(actions_failed)) {
332                         DEBUG1("** %s failed **", action->name);
333                         while (cond = xbt_fifo_pop(action->cond_list)) {
334                                 SIMIX_cond_broadcast(cond);
335                         }
336                         SIMIX_action_destroy(action);
337                 }
338                 while (action = xbt_fifo_pop(actions_done)) {
339                         DEBUG1("** %s done **",action->name);
340                         while (cond = xbt_fifo_pop(action->cond_list)) {
341                                 SIMIX_cond_broadcast(cond);
342                         }
343                         SIMIX_action_destroy(action);
344                 }
345         }
346         xbt_fifo_free(actions_failed);
347         xbt_fifo_free(actions_done);
348         INFO1("simulation time %g", SIMIX_get_clock());
349         SIMIX_clean();
350         return 0;
351 }
352
353 void smpi_mpi_land_func(void *x, void *y, void *z)
354 {
355         *(int *)z = *(int *)x && *(int *)y;
356 }
357
358 void smpi_mpi_sum_func(void *x, void *y, void *z)
359 {
360         *(int *)z = *(int *)x + *(int *)y;
361 }
362
363 void *smpi_new_request()
364 {
365         return xbt_new(smpi_mpi_request_t, 1);
366 }
367
368 void *smpi_new_message()
369 {
370         return xbt_new(smpi_received_message_t, 1);
371 }
372
373 void smpi_do_nothing(void *pointer)
374 {
375         return;
376 }
377
378 void smpi_mpi_init()
379 {
380         int i;
381         int size;
382         smx_process_t process;
383         smx_host_t *hosts;
384         smx_host_t host;
385         double duration;
386
387         // initialize some local variables
388         host  = SIMIX_host_self();
389         hosts = SIMIX_host_get_table();
390         size  = SIMIX_host_get_number();
391
392         // node 0 sets the globals
393         if (host == hosts[0]) {
394
395                 // processes
396                 smpi_sender_processes             = xbt_new(smx_process_t, size);
397                 smpi_receiver_processes           = xbt_new(smx_process_t, size);
398
399                 // running hosts
400                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
401                 smpi_running_hosts                = size;
402
403                 // global communicator
404                 smpi_mpi_comm_world.size          = size;
405                 smpi_mpi_comm_world.barrier       = 0;
406                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
407                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
408                 smpi_mpi_comm_world.hosts         = hosts;
409                 smpi_mpi_comm_world.processes     = xbt_new(smx_process_t, size);
410                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
411
412                 // mpi datatypes
413                 smpi_mpi_byte.size                = (size_t)1;
414                 smpi_mpi_int.size                 = sizeof(int);
415                 smpi_mpi_double.size              = sizeof(double);
416
417                 // mpi operations
418                 smpi_mpi_land.func                = &smpi_mpi_land_func;
419                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
420
421                 // smpi globals
422                 smpi_request_mallocator           = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, smpi_do_nothing);
423                 smpi_message_mallocator           = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, smpi_do_nothing);
424                 smpi_pending_send_requests        = xbt_new(xbt_fifo_t,  size);
425                 smpi_pending_send_requests_mutex  = xbt_new(smx_mutex_t, size);
426                 smpi_pending_recv_requests        = xbt_new(xbt_fifo_t,  size);
427                 smpi_pending_recv_requests_mutex  = xbt_new(smx_mutex_t, size);
428                 smpi_received_messages            = xbt_new(xbt_fifo_t,  size);
429                 smpi_received_messages_mutex      = xbt_new(smx_mutex_t, size);
430
431                 for(i = 0; i < size; i++) {
432                         smpi_pending_send_requests[i]       = xbt_fifo_new();
433                         smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init();
434                         smpi_pending_recv_requests[i]       = xbt_fifo_new();
435                         smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init();
436                         smpi_received_messages[i]           = xbt_fifo_new();
437                         smpi_received_messages_mutex[i]     = SIMIX_mutex_init();
438                 }
439
440                 smpi_timer                      = xbt_os_timer_new();
441                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
442                 smpi_benchmarking               = 0;
443                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
444
445                 // signal all nodes to perform initialization
446                 SIMIX_mutex_lock(init_mutex);
447                 smpi_root_ready = 1;
448                 SIMIX_cond_broadcast(init_cond);
449                 SIMIX_mutex_unlock(init_mutex);
450
451         } else {
452
453                 // make sure root is done before own initialization
454                 SIMIX_mutex_lock(init_mutex);
455                 if (!smpi_root_ready) {
456                         SIMIX_cond_wait(init_cond, init_mutex);
457                 }
458                 SIMIX_mutex_unlock(init_mutex);
459
460                 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
461
462         }
463
464         // wait for all nodes to signal initializatin complete
465         SIMIX_mutex_lock(init_mutex);
466         smpi_ready_count++;
467         if (smpi_ready_count < 3 * size) {
468                 SIMIX_cond_wait(init_cond, init_mutex);
469         } else {
470                 SIMIX_cond_broadcast(init_cond);
471         }
472         SIMIX_mutex_unlock(init_mutex);
473
474 }
475
476 void smpi_mpi_finalize()
477 {
478         int i;
479
480         SIMIX_mutex_lock(smpi_running_hosts_mutex);
481         i = --smpi_running_hosts;
482         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
483
484         SIMIX_mutex_lock(init_mutex);
485         smpi_ready_count--;
486         SIMIX_mutex_unlock(init_mutex);
487
488         if (0 >= i) {
489
490                 // wake up senders/receivers
491                 for (i = 0; i < smpi_mpi_comm_world.size; i++) {
492                         if (SIMIX_process_is_suspended(smpi_sender_processes[i])) {
493                                 SIMIX_process_resume(smpi_sender_processes[i]);
494                         }
495                         if (SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
496                                 SIMIX_process_resume(smpi_receiver_processes[i]);
497                         }
498                 }
499
500                 // wait for senders/receivers to exit...
501                 SIMIX_mutex_lock(init_mutex);
502                 if (smpi_ready_count > 0) {
503                         SIMIX_cond_wait(init_cond, init_mutex);
504                 }
505                 SIMIX_mutex_unlock(init_mutex);
506
507                 SIMIX_mutex_destroy(init_mutex);
508                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
509
510                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
511                         xbt_fifo_free(smpi_pending_send_requests[i]);
512                         SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]);
513                         xbt_fifo_free(smpi_pending_recv_requests[i]);
514                         SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]);
515                         xbt_fifo_free(smpi_received_messages[i]);
516                         SIMIX_mutex_destroy(smpi_received_messages_mutex[i]);
517                 }
518
519                 xbt_mallocator_free(smpi_request_mallocator);
520                 xbt_mallocator_free(smpi_message_mallocator);
521                 xbt_free(smpi_pending_send_requests);
522                 xbt_free(smpi_pending_send_requests_mutex);
523                 xbt_free(smpi_pending_recv_requests);
524                 xbt_free(smpi_pending_recv_requests_mutex);
525                 xbt_free(smpi_received_messages);
526                 xbt_free(smpi_received_messages_mutex);
527
528                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
529                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
530                 xbt_free(smpi_mpi_comm_world.processes);
531
532                 xbt_os_timer_free(smpi_timer);
533         }
534
535 }
536
537 void smpi_bench_begin()
538 {
539         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
540         smpi_benchmarking = 1;
541         xbt_os_timer_start(smpi_timer);
542         return;
543 }
544
545 void smpi_bench_end()
546 {
547         double duration;
548         smx_host_t host;
549         smx_action_t compute_action;
550         smx_mutex_t mutex;
551         smx_cond_t cond;
552
553         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
554         smpi_benchmarking = 0;
555         xbt_os_timer_stop(smpi_timer);
556         duration = xbt_os_timer_elapsed(smpi_timer);
557         host           = SIMIX_host_self();
558         compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
559         mutex          = SIMIX_mutex_init();
560         cond           = SIMIX_cond_init();
561         SIMIX_mutex_lock(mutex);
562         SIMIX_register_condition_to_action(compute_action, cond);
563         SIMIX_register_action_to_condition(compute_action, cond);
564         SIMIX_cond_wait(cond, mutex);
565         SIMIX_mutex_unlock(mutex);
566         SIMIX_mutex_destroy(mutex);
567         SIMIX_cond_destroy(cond);
568         // FIXME: check for success/failure?
569         return;
570 }
571
572 void smpi_barrier(smpi_mpi_communicator_t *comm) {
573         int i;
574         SIMIX_mutex_lock(comm->barrier_mutex);
575         comm->barrier++;
576         if(i < comm->size) {
577                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
578         } else {
579                 comm->barrier = 0;
580                 SIMIX_cond_broadcast(comm->barrier_cond);
581         }
582         SIMIX_mutex_unlock(comm->barrier_mutex);
583 }
584
585 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
586 {
587         int i;
588         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
589         if (i >= comm->size) i = -1;
590         return i;
591 }
592
593 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
594         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
595 {
596         int retval = MPI_SUCCESS;
597
598         *request = NULL;
599
600         if (0 > count) {
601                 retval = MPI_ERR_COUNT;
602         } else if (NULL == buf) {
603                 retval = MPI_ERR_INTERN;
604         } else if (NULL == datatype) {
605                 retval = MPI_ERR_TYPE;
606         } else if (NULL == comm) {
607                 retval = MPI_ERR_COMM;
608         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
609                 retval = MPI_ERR_RANK;
610         } else if (0 > dst || comm->size <= dst) {
611                 retval = MPI_ERR_RANK;
612         } else if (0 > tag) {
613                 retval = MPI_ERR_TAG;
614         } else {
615                 *request = xbt_mallocator_get(smpi_request_mallocator);
616                 (*request)->comm       = comm;
617                 (*request)->src        = src;
618                 (*request)->dst        = dst;
619                 (*request)->tag        = tag;
620                 (*request)->buf        = buf;
621                 (*request)->count      = count;
622                 (*request)->datatype   = datatype;
623                 (*request)->completed  = 0;
624                 (*request)->mutex      = SIMIX_mutex_init();
625                 (*request)->cond       = SIMIX_cond_init();
626                 (*request)->waitlist   = NULL;
627         }
628         return retval;
629 }
630
631 int smpi_isend(smpi_mpi_request_t *request)
632 {
633         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
634
635         SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
636         xbt_fifo_push(smpi_pending_send_requests[rank], request);
637         SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
638
639         if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
640                 SIMIX_process_resume(smpi_sender_processes[rank]);
641         }
642 }
643
644 int smpi_irecv(smpi_mpi_request_t *request)
645 {
646         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
647
648         SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
649         xbt_fifo_push(smpi_pending_recv_requests[rank], request);
650         SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
651
652         if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
653                 SIMIX_process_resume(smpi_receiver_processes[rank]);
654         }
655 }
656
657 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
658 {
659         smx_process_t self;
660         int suspend = 0;
661         self = SIMIX_process_self();
662
663         if (NULL != request) {
664                 SIMIX_mutex_lock(request->mutex);
665                 if (!request->completed) {
666                         xbt_fifo_push(request->waitlist, self);
667                         suspend = 1;
668                 }
669                 SIMIX_mutex_unlock(request->mutex);
670                 if (suspend) {
671                         SIMIX_process_suspend(self);
672                 }
673                 if (NULL != status && MPI_STATUS_IGNORE != status) {
674                         SIMIX_mutex_lock(request->mutex);
675                         status->MPI_SOURCE = request->src;
676                         SIMIX_mutex_unlock(request->mutex);
677                 }
678         }
679 }
680
681 // FIXME: move into own file
682 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
683 {
684         double now;
685         int retval = 0;
686         smpi_bench_end();
687         if (NULL == tv) {
688                 retval = -1;
689         } else {
690                 now = SIMIX_get_clock();
691                 tv->tv_sec  = now;
692                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
693         }
694         smpi_bench_begin();
695         return retval;
696 }
697
698 unsigned int smpi_sleep(unsigned int seconds)
699 {
700         smx_mutex_t mutex;
701         smx_cond_t cond;
702         smx_host_t host;
703         smx_action_t sleep_action;
704
705         smpi_bench_end();
706         host         = SIMIX_host_self();
707         sleep_action = SIMIX_action_sleep(host, seconds);
708         mutex        = SIMIX_mutex_init();
709         cond         = SIMIX_cond_init();
710         SIMIX_mutex_lock(mutex);
711         SIMIX_register_condition_to_action(sleep_action, cond);
712         SIMIX_register_action_to_condition(sleep_action, cond);
713         SIMIX_cond_wait(cond, mutex);
714         SIMIX_mutex_unlock(mutex);
715         SIMIX_mutex_destroy(mutex);
716         SIMIX_cond_destroy(cond);
717         // FIXME: check for success/failure?
718         smpi_bench_begin();
719         return 0;
720 }
721
722 void smpi_exit(int status)
723 {
724         smpi_bench_end();
725         SIMIX_mutex_lock(smpi_running_hosts_mutex);
726         smpi_running_hosts--;
727         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
728         SIMIX_process_kill(SIMIX_process_self());
729         return;
730 }