Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bug fix.
[simgrid.git] / src / smpi / smpi_base.c
1 #include <stdio.h>
2 #include <signal.h>
3 #include <sys/time.h>
4
5 #include "private.h"
6
7 SMPI_Global_t     smpi_global     = NULL;
8
9 SMPI_MPI_Global_t smpi_mpi_global = NULL;
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi,XBT_LOG_ROOT_CAT, "All SMPI categories (see \ref SMPI_API)");
12
13 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
14 {
15         return comm->size;
16 }
17
18 // FIXME: smarter algorithm?
19 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
20 {
21         int i;
22
23         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
24
25         return i;
26 }
27
28 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
29 {
30         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
31 }
32
33 //int smpi_mpi_comm_world_rank_self()
34 //{
35 //      return smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, SIMIX_host_self());
36 //}
37
38 int smpi_sender(int argc, char **argv)
39 {
40         smx_process_t self;
41         smx_host_t shost;
42         int rank;
43
44         xbt_fifo_t request_queue;
45         smx_mutex_t request_queue_mutex;
46         int size;
47
48         int running_hosts_count;
49
50         smpi_mpi_request_t *request;
51
52         smx_host_t dhost;
53
54         smx_action_t communicate_action;
55
56         smpi_received_message_t *message;
57
58         int drank;
59
60         smx_process_t receiver_process;
61
62         self  = SIMIX_process_self();
63         shost = SIMIX_host_self();
64         rank  = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost);
65
66         // make sure root is done before own initialization
67         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
68         if (!smpi_global->root_ready) {
69                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
70         }
71         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
72
73         request_queue       = smpi_global->pending_send_request_queues[rank];
74         request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank];
75         size                = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
76
77         smpi_global->sender_processes[rank] = self;
78
79         // wait for all nodes to signal initializatin complete
80         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
81         smpi_global->ready_process_count++;
82         if (smpi_global->ready_process_count < 3 * size) {
83                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
84         } else {
85                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
86         }
87         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
88
89         do {
90
91                 SIMIX_mutex_lock(request_queue_mutex);
92                 request = xbt_fifo_shift(request_queue);
93                 SIMIX_mutex_unlock(request_queue_mutex);
94
95                 if (NULL == request) {
96                         SIMIX_process_suspend(self);
97                 } else {
98
99                         SIMIX_mutex_lock(request->mutex);
100
101                         // copy request to appropriate received queue
102                         message       = xbt_mallocator_get(smpi_global->message_mallocator);
103                         message->comm = request->comm;
104                         message->src  = request->src;
105                         message->dst  = request->dst;
106                         message->tag  = request->tag;
107                         message->buf  = xbt_malloc(request->datatype->size * request->count);
108                         memcpy(message->buf, request->buf, request->datatype->size * request->count);
109
110                         dhost = request->comm->hosts[request->dst];
111                         drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost);
112
113                         SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]);
114                         xbt_fifo_push(smpi_global->received_message_queues[drank], message);
115                         SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]);
116
117                         request->completed = 1;
118
119                         communicate_action = SIMIX_action_communicate(shost, dhost,
120                                 NULL, request->datatype->size * request->count * 1.0, -1.0);
121
122                         SIMIX_register_action_to_condition(communicate_action, request->cond);
123                         SIMIX_cond_wait(request->cond, request->mutex);
124                         SIMIX_unregister_action_to_condition(communicate_action, request->cond);
125
126                         SIMIX_mutex_unlock(request->mutex);
127
128                         // wake up receiver if necessary
129                         receiver_process = smpi_global->receiver_processes[drank];
130
131                         if (SIMIX_process_is_suspended(receiver_process)) {
132                                 SIMIX_process_resume(receiver_process);
133                         }
134
135                 }
136
137                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
138                 running_hosts_count = smpi_global->running_hosts_count;
139                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
140
141         } while (0 < running_hosts_count);
142
143         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
144         smpi_global->ready_process_count--;
145         if (smpi_global->ready_process_count == 0) {
146                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
147         } else if (smpi_global->ready_process_count < 0) {
148                 // FIXME: can't happen! abort!
149         }
150         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
151
152         return 0;
153 }
154
155 int smpi_receiver(int argc, char **argv)
156 {
157         smx_process_t self;
158         int rank;
159
160         xbt_fifo_t request_queue;
161         smx_mutex_t request_queue_mutex;
162         xbt_fifo_t message_queue;
163         smx_mutex_t message_queue_mutex;
164         int size;
165
166         int running_hosts_count;
167
168         smpi_mpi_request_t *request;
169         smpi_received_message_t *message;
170
171         xbt_fifo_item_t request_item;
172         xbt_fifo_item_t message_item;
173
174         self  = SIMIX_process_self();
175         rank  = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
176
177         // make sure root is done before own initialization
178         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
179         if (!smpi_global->root_ready) {
180                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
181         }
182         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
183
184         request_queue       = smpi_global->pending_recv_request_queues[rank];
185         request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
186         message_queue       = smpi_global->received_message_queues[rank];
187         message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
188         size                = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
189
190         smpi_global->receiver_processes[rank] = self;
191
192         // wait for all nodes to signal initializatin complete
193         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
194         smpi_global->ready_process_count++;
195         if (smpi_global->ready_process_count < 3 * size) {
196                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
197         } else {
198                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
199         }
200         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
201
202         do {
203                 request = NULL;
204                 message = NULL;
205
206                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
207
208                 // FIXME: not the best way to request multiple locks...
209                 SIMIX_mutex_lock(request_queue_mutex);
210                 SIMIX_mutex_lock(message_queue_mutex);
211                 for (request_item = xbt_fifo_get_first_item(request_queue);
212                         NULL != request_item;
213                         request_item = xbt_fifo_get_next_item(request_item)) {
214                         request = xbt_fifo_get_item_content(request_item);
215                         for (message_item = xbt_fifo_get_first_item(message_queue);
216                                 NULL != message_item;
217                                 message_item = xbt_fifo_get_next_item(message_item)) {
218                                 message = xbt_fifo_get_item_content(message_item);
219                                 if (request->comm == message->comm &&
220                                                 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
221                                                 request->tag == message->tag) {
222                                         xbt_fifo_remove_item(request_queue, request_item);
223                                         xbt_fifo_remove_item(message_queue, message_item);
224                                         goto stopsearch;
225                                 }
226                         }
227                 }
228 stopsearch:
229                 SIMIX_mutex_unlock(message_queue_mutex);
230                 SIMIX_mutex_unlock(request_queue_mutex);
231
232                 if (NULL == request || NULL == message) {
233                         SIMIX_process_suspend(self);
234                 } else {
235                         SIMIX_mutex_lock(request->mutex);
236
237                         memcpy(request->buf, message->buf, request->datatype->size * request->count);
238                         request->src = message->src;
239                         request->completed = 1;
240                         SIMIX_cond_broadcast(request->cond);
241
242                         SIMIX_mutex_unlock(request->mutex);
243
244                         xbt_free(message->buf);
245                         xbt_mallocator_release(smpi_global->message_mallocator, message);
246                 }
247
248                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
249                 running_hosts_count = smpi_global->running_hosts_count;
250                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
251
252         } while (0 < running_hosts_count);
253
254         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
255         smpi_global->ready_process_count--;
256         if (smpi_global->ready_process_count == 0) {
257                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
258         } else if (smpi_global->ready_process_count < 0) {
259                 // FIXME: can't happen, abort!
260         }
261         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
262
263         return 0;
264 }
265
266 void *smpi_request_new()
267 {
268         smpi_mpi_request_t *request = xbt_new(smpi_mpi_request_t, 1);
269
270         request->completed = 0;
271         request->mutex     = SIMIX_mutex_init();
272         request->cond      = SIMIX_cond_init();
273
274         return request;
275 }
276
277 void smpi_request_free(void *pointer)
278 {
279
280         smpi_mpi_request_t *request = pointer;
281
282         if (NULL != request) {
283                 SIMIX_cond_destroy(request->cond);
284                 SIMIX_mutex_destroy(request->mutex);
285                 xbt_free(request);
286         }
287
288         return;
289 }
290
291 void smpi_request_reset(void *pointer)
292 {
293         return;
294 }
295
296
297 void *smpi_message_new()
298 {
299         return xbt_new(smpi_received_message_t, 1);
300 }
301
302 void smpi_message_free(void *pointer)
303 {
304         if (NULL != pointer) {
305                 xbt_free(pointer);
306         }
307
308         return;
309 }
310
311 void smpi_message_reset(void *pointer)
312 {
313         return;
314 }
315
316 void smpi_global_init()
317 {
318         int i;
319
320         int size = SIMIX_host_get_number();
321
322         smpi_global = xbt_new(s_SMPI_Global_t, 1);
323
324         // config variable
325         smpi_global->reference_speed                     = SMPI_DEFAULT_SPEED;
326
327         smpi_global->root_ready                          = 0;
328         smpi_global->ready_process_count                 = 0;
329
330         // start/stop
331         smpi_global->start_stop_mutex                    = SIMIX_mutex_init();
332         smpi_global->start_stop_cond                     = SIMIX_cond_init();
333
334         // processes
335         smpi_global->sender_processes                    = xbt_new(smx_process_t, size);
336         smpi_global->receiver_processes                  = xbt_new(smx_process_t, size);
337
338         // running hosts
339         smpi_global->running_hosts_count_mutex           = SIMIX_mutex_init();
340         smpi_global->running_hosts_count                 = 0;
341
342         // mallocators
343         smpi_global->request_mallocator                  = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE,
344                                                              smpi_request_new, smpi_request_free, smpi_request_reset);
345         smpi_global->message_mallocator                  = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE,
346                                                              smpi_message_new, smpi_message_free, smpi_message_reset);
347
348         //
349         smpi_global->pending_send_request_queues         = xbt_new(xbt_fifo_t,  size);
350         smpi_global->pending_send_request_queues_mutexes = xbt_new(smx_mutex_t, size);
351         smpi_global->pending_recv_request_queues         = xbt_new(xbt_fifo_t,  size);
352         smpi_global->pending_recv_request_queues_mutexes = xbt_new(smx_mutex_t, size);
353         smpi_global->received_message_queues             = xbt_new(xbt_fifo_t,  size);
354         smpi_global->received_message_queues_mutexes     = xbt_new(smx_mutex_t, size);
355         smpi_global->timers                              = xbt_new(xbt_os_timer_t, size);
356         smpi_global->timers_mutexes                      = xbt_new(smx_mutex_t, size);
357
358         for(i = 0; i < size; i++) {
359                 smpi_global->pending_send_request_queues[i]         = xbt_fifo_new();
360                 smpi_global->pending_send_request_queues_mutexes[i] = SIMIX_mutex_init();
361                 smpi_global->pending_recv_request_queues[i]         = xbt_fifo_new();
362                 smpi_global->pending_recv_request_queues_mutexes[i] = SIMIX_mutex_init();
363                 smpi_global->received_message_queues[i]             = xbt_fifo_new();
364                 smpi_global->received_message_queues_mutexes[i]     = SIMIX_mutex_init();
365                 smpi_global->timers[i]                              = xbt_os_timer_new();
366                 smpi_global->timers_mutexes[i]                      = SIMIX_mutex_init();
367         }
368
369 }
370
371 void smpi_global_destroy()
372 {
373         int i;
374
375         int size = SIMIX_host_get_number();
376
377         // start/stop
378         SIMIX_mutex_destroy(smpi_global->start_stop_mutex);
379         SIMIX_cond_destroy(smpi_global->start_stop_cond);
380
381         // processes
382         xbt_free(smpi_global->sender_processes);
383         xbt_free(smpi_global->receiver_processes);
384
385         // running hosts
386         SIMIX_mutex_destroy(smpi_global->running_hosts_count_mutex);
387
388         // mallocators
389         xbt_mallocator_free(smpi_global->request_mallocator);
390         xbt_mallocator_free(smpi_global->message_mallocator);
391
392         for(i = 0; i < size; i++) {
393                 xbt_fifo_free(smpi_global->pending_send_request_queues[i]);
394                 SIMIX_mutex_destroy(smpi_global->pending_send_request_queues_mutexes[i]);
395                 xbt_fifo_free(smpi_global->pending_recv_request_queues[i]);
396                 SIMIX_mutex_destroy(smpi_global->pending_recv_request_queues_mutexes[i]);
397                 xbt_fifo_free(smpi_global->received_message_queues[i]);
398                 SIMIX_mutex_destroy(smpi_global->received_message_queues_mutexes[i]);
399                 xbt_os_timer_free(smpi_global->timers[i]);
400                 SIMIX_mutex_destroy(smpi_global->timers_mutexes[i]);
401         }
402
403         xbt_free(smpi_global->pending_send_request_queues);
404         xbt_free(smpi_global->pending_send_request_queues_mutexes);
405         xbt_free(smpi_global->pending_recv_request_queues);
406         xbt_free(smpi_global->pending_recv_request_queues_mutexes);
407         xbt_free(smpi_global->received_message_queues);
408         xbt_free(smpi_global->received_message_queues_mutexes);
409         xbt_free(smpi_global->timers);
410         xbt_free(smpi_global->timers_mutexes);
411
412         xbt_free(smpi_global);
413 }
414
415 int smpi_run_simulation(int argc, char **argv)
416 {
417         smx_cond_t   cond           = NULL;
418         smx_action_t action         = NULL;
419
420         xbt_fifo_t   actions_failed = xbt_fifo_new();
421         xbt_fifo_t   actions_done   = xbt_fifo_new();
422
423         srand(SMPI_RAND_SEED);
424
425         SIMIX_global_init(&argc, argv);
426
427         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
428         SIMIX_function_register("smpi_sender",         smpi_sender);
429         SIMIX_function_register("smpi_receiver",       smpi_receiver);
430
431         // FIXME: ought to verify these files...
432         SIMIX_create_environment(argv[1]);
433
434         // must initialize globals between creating environment and launching app....
435         smpi_global_init();
436
437         SIMIX_launch_application(argv[2]);
438
439         /* Prepare to display some more info when dying on Ctrl-C pressing */
440         // FIXME: doesn't work
441         //signal(SIGINT, inthandler);
442
443         /* Clean IO before the run */
444         fflush(stdout);
445         fflush(stderr);
446
447         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
448                 while ((action = xbt_fifo_pop(actions_failed))) {
449                         DEBUG1("** %s failed **", action->name);
450                         while ((cond = xbt_fifo_pop(action->cond_list))) {
451                                 SIMIX_cond_broadcast(cond);
452                         }
453                         SIMIX_action_destroy(action);
454                 }
455                 while ((action = xbt_fifo_pop(actions_done))) {
456                         DEBUG1("** %s done **",action->name);
457                         while ((cond = xbt_fifo_pop(action->cond_list))) {
458                                 SIMIX_cond_broadcast(cond);
459                         }
460                         SIMIX_action_destroy(action);
461                 }
462         }
463
464         xbt_fifo_free(actions_failed);
465         xbt_fifo_free(actions_done);
466
467         INFO1("simulation time %g", SIMIX_get_clock());
468
469         smpi_global_destroy();
470
471         SIMIX_clean();
472
473         return 0;
474 }
475
476 void smpi_mpi_land_func(void *x, void *y, void *z)
477 {
478         *(int *)z = *(int *)x && *(int *)y;
479 }
480
481 void smpi_mpi_sum_func(void *x, void *y, void *z)
482 {
483         *(int *)z = *(int *)x + *(int *)y;
484 }
485
486
487 void smpi_mpi_init()
488 {
489         smx_process_t process;
490         smx_host_t host;
491         smx_host_t *hosts;
492         int size;
493
494         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
495         smpi_global->running_hosts_count++;
496         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
497
498         // initialize some local variables
499         process = SIMIX_process_self();
500         host    = SIMIX_host_self();
501         hosts   = SIMIX_host_get_table();
502         size    = SIMIX_host_get_number();
503
504         // node 0 sets the globals
505         if (host == hosts[0]) {
506
507                 smpi_mpi_global                                = xbt_new(s_SMPI_MPI_Global_t, 1);
508
509                 // global communicator
510                 smpi_mpi_global->mpi_comm_world                = xbt_new(smpi_mpi_communicator_t, 1);
511                 smpi_mpi_global->mpi_comm_world->size          = size;
512                 smpi_mpi_global->mpi_comm_world->barrier_count = 0;
513                 smpi_mpi_global->mpi_comm_world->barrier_mutex = SIMIX_mutex_init();
514                 smpi_mpi_global->mpi_comm_world->barrier_cond  = SIMIX_cond_init();
515                 smpi_mpi_global->mpi_comm_world->hosts         = hosts;
516                 smpi_mpi_global->mpi_comm_world->processes     = xbt_new(smx_process_t, size);
517                 smpi_mpi_global->mpi_comm_world->processes[0]  = process;
518
519                 // mpi datatypes
520                 smpi_mpi_global->mpi_byte                      = xbt_new(smpi_mpi_datatype_t, 1);
521                 smpi_mpi_global->mpi_byte->size                = (size_t)1;
522                 smpi_mpi_global->mpi_int                       = xbt_new(smpi_mpi_datatype_t, 1);
523                 smpi_mpi_global->mpi_int->size                 = sizeof(int);
524                 smpi_mpi_global->mpi_double                    = xbt_new(smpi_mpi_datatype_t, 1);
525                 smpi_mpi_global->mpi_double->size              = sizeof(double);
526
527                 // mpi operations
528                 smpi_mpi_global->mpi_land                      = xbt_new(smpi_mpi_op_t, 1);
529                 smpi_mpi_global->mpi_land->func                = smpi_mpi_land_func;
530                 smpi_mpi_global->mpi_sum                       = xbt_new(smpi_mpi_op_t, 1);
531                 smpi_mpi_global->mpi_sum->func                 = smpi_mpi_sum_func;
532
533                 // signal all nodes to perform initialization
534                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
535                 smpi_global->root_ready = 1;
536                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
537                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
538
539         } else {
540
541                 // make sure root is done before own initialization
542                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
543                 if (!smpi_global->root_ready) {
544                         SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
545                 }
546                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
547
548                 smpi_mpi_global->mpi_comm_world->processes[smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world)] = process;
549         }
550
551         // wait for all nodes to signal initializatin complete
552         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
553         smpi_global->ready_process_count++;
554         if (smpi_global->ready_process_count < 3 * size) {
555                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
556         } else {
557                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
558         }
559         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
560
561         return;
562 }
563
564 void smpi_mpi_finalize()
565 {
566         int i;
567
568         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
569         i = --smpi_global->running_hosts_count;
570         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
571
572         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
573         smpi_global->ready_process_count--;
574         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
575
576         if (0 >= i) {
577
578                 // wake up senders/receivers
579                 for (i = 0; i < smpi_mpi_global->mpi_comm_world->size; i++) {
580                         if (SIMIX_process_is_suspended(smpi_global->sender_processes[i])) {
581                                 SIMIX_process_resume(smpi_global->sender_processes[i]);
582                         }
583                         if (SIMIX_process_is_suspended(smpi_global->receiver_processes[i])) {
584                                 SIMIX_process_resume(smpi_global->receiver_processes[i]);
585                         }
586                 }
587
588                 // wait for senders/receivers to exit...
589                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
590                 if (smpi_global->ready_process_count > 0) {
591                         SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
592                 }
593                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
594
595                 SIMIX_mutex_destroy(smpi_mpi_global->mpi_comm_world->barrier_mutex);
596                 SIMIX_cond_destroy(smpi_mpi_global->mpi_comm_world->barrier_cond);
597                 xbt_free(smpi_mpi_global->mpi_comm_world->processes);
598                 xbt_free(smpi_mpi_global->mpi_comm_world);
599
600                 xbt_free(smpi_mpi_global->mpi_byte);
601                 xbt_free(smpi_mpi_global->mpi_int);
602                 xbt_free(smpi_mpi_global->mpi_double);
603
604                 xbt_free(smpi_mpi_global->mpi_land);
605                 xbt_free(smpi_mpi_global->mpi_sum);
606
607                 xbt_free(smpi_mpi_global);
608         }
609
610 }
611
612 // FIXME: could cause trouble with multithreaded procs on same host...
613 void smpi_bench_begin()
614 {
615         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
616         SIMIX_mutex_lock(smpi_global->timers_mutexes[rank]);
617         xbt_os_timer_start(smpi_global->timers[rank]);
618         return;
619 }
620
621 void smpi_bench_end()
622 {
623         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
624         double duration;
625         smx_host_t host;
626         smx_action_t compute_action;
627         smx_mutex_t mutex;
628         smx_cond_t cond;
629
630         xbt_os_timer_stop(smpi_global->timers[rank]);
631
632         duration       = xbt_os_timer_elapsed(smpi_global->timers[rank]);
633         SIMIX_mutex_unlock(smpi_global->timers_mutexes[rank]);
634
635         host           = smpi_mpi_global->mpi_comm_world->hosts[rank];
636         compute_action = SIMIX_action_execute(host, NULL, duration * SMPI_DEFAULT_SPEED);
637         mutex          = SIMIX_mutex_init();
638         cond           = SIMIX_cond_init();
639
640         SIMIX_mutex_lock(mutex);
641         SIMIX_register_action_to_condition(compute_action, cond);
642         SIMIX_cond_wait(cond, mutex);
643         SIMIX_unregister_action_to_condition(compute_action, cond);
644         SIMIX_mutex_unlock(mutex);
645
646         SIMIX_mutex_destroy(mutex);
647         SIMIX_cond_destroy(cond);
648
649         // FIXME: check for success/failure?
650
651         return;
652 }
653
654 void smpi_barrier(smpi_mpi_communicator_t *comm)
655 {
656
657         SIMIX_mutex_lock(comm->barrier_mutex);
658         if(++comm->barrier_count < comm->size) {
659                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
660         } else {
661                 comm->barrier_count = 0;
662                 SIMIX_cond_broadcast(comm->barrier_cond);
663         }
664         SIMIX_mutex_unlock(comm->barrier_mutex);
665
666         return;
667 }
668
669 // FIXME: smarter algorithm...
670 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
671 {
672         int i;
673         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
674         if (i >= comm->size) i = -1;
675         return i;
676 }
677
678 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
679         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
680 {
681         int retval = MPI_SUCCESS;
682
683         *request = NULL;
684
685         if (0 > count) {
686                 retval = MPI_ERR_COUNT;
687         } else if (NULL == buf) {
688                 retval = MPI_ERR_INTERN;
689         } else if (NULL == datatype) {
690                 retval = MPI_ERR_TYPE;
691         } else if (NULL == comm) {
692                 retval = MPI_ERR_COMM;
693         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
694                 retval = MPI_ERR_RANK;
695         } else if (0 > dst || comm->size <= dst) {
696                 retval = MPI_ERR_RANK;
697         } else if (0 > tag) {
698                 retval = MPI_ERR_TAG;
699         } else {
700                 *request = xbt_mallocator_get(smpi_global->request_mallocator);
701                 (*request)->comm       = comm;
702                 (*request)->src        = src;
703                 (*request)->dst        = dst;
704                 (*request)->tag        = tag;
705                 (*request)->buf        = buf;
706                 (*request)->count      = count;
707                 (*request)->datatype   = datatype;
708         }
709         return retval;
710 }
711
712 int smpi_isend(smpi_mpi_request_t *request)
713 {
714         int retval = MPI_SUCCESS;
715         int rank   = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
716
717         if (NULL != request) {
718                 SIMIX_mutex_lock(smpi_global->pending_send_request_queues_mutexes[rank]);
719                 xbt_fifo_push(smpi_global->pending_send_request_queues[rank], request);
720                 SIMIX_mutex_unlock(smpi_global->pending_send_request_queues_mutexes[rank]);
721         }
722
723         if (SIMIX_process_is_suspended(smpi_global->sender_processes[rank])) {
724                 SIMIX_process_resume(smpi_global->sender_processes[rank]);
725         }
726
727         return retval;
728 }
729
730 int smpi_irecv(smpi_mpi_request_t *request)
731 {
732         int retval = MPI_SUCCESS;
733         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
734
735         if (NULL != request) {
736                 SIMIX_mutex_lock(smpi_global->pending_recv_request_queues_mutexes[rank]);
737                 xbt_fifo_push(smpi_global->pending_recv_request_queues[rank], request);
738                 SIMIX_mutex_unlock(smpi_global->pending_recv_request_queues_mutexes[rank]);
739         }
740
741         if (SIMIX_process_is_suspended(smpi_global->receiver_processes[rank])) {
742                 SIMIX_process_resume(smpi_global->receiver_processes[rank]);
743         }
744
745         return retval;
746 }
747
748 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
749 {
750         if (NULL != request) {
751                 SIMIX_mutex_lock(request->mutex);
752                 if (!request->completed) {
753                         SIMIX_cond_wait(request->cond, request->mutex);
754                 }
755                 if (NULL != status) {
756                         status->MPI_SOURCE = request->src;
757                 }
758                 SIMIX_mutex_unlock(request->mutex);
759         }
760 }
761
762 // FIXME: move into own file
763 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
764 {
765         double now;
766         int retval = 0;
767         smpi_bench_end();
768         if (NULL == tv) {
769                 retval = -1;
770         } else {
771                 now = SIMIX_get_clock();
772                 tv->tv_sec  = now;
773                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
774         }
775         smpi_bench_begin();
776         return retval;
777 }
778
779 unsigned int smpi_sleep(unsigned int seconds)
780 {
781         smx_mutex_t mutex;
782         smx_cond_t cond;
783         smx_host_t host;
784         smx_action_t sleep_action;
785
786         smpi_bench_end();
787         host         = SIMIX_host_self();
788         sleep_action = SIMIX_action_sleep(host, seconds);
789         mutex        = SIMIX_mutex_init();
790         cond         = SIMIX_cond_init();
791
792         SIMIX_mutex_lock(mutex);
793         SIMIX_register_action_to_condition(sleep_action, cond);
794         SIMIX_cond_wait(cond, mutex);
795         SIMIX_unregister_action_to_condition(sleep_action, cond);
796         SIMIX_mutex_unlock(mutex);
797
798         SIMIX_mutex_destroy(mutex);
799         SIMIX_cond_destroy(cond);
800
801         // FIXME: check for success/failure?
802
803         smpi_bench_begin();
804         return 0;
805 }
806
807 void smpi_exit(int status)
808 {
809         smpi_bench_end();
810         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
811         smpi_global->running_hosts_count--;
812         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
813         SIMIX_process_kill(SIMIX_process_self());
814         return;
815 }