Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
updated internal data structures to allow smpi to run outside of source tree.
[simgrid.git] / src / smpi / smpi_base.c
1 #include <stdio.h>
2 #include <signal.h>
3 #include <sys/time.h>
4
5 #include "private.h"
6
7 SMPI_Global_t     smpi_global     = NULL;
8
9 SMPI_MPI_Global_t smpi_mpi_global = NULL;
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi,XBT_LOG_ROOT_CAT, "All SMPI categories (see \ref SMPI_API)");
12
13 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
14 {
15         return comm->size;
16 }
17
18 // FIXME: smarter algorithm?
19 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
20 {
21         int i;
22
23         for(i = comm->size - 1; i > 0 && host != comm->simdata->hosts[i]; i--);
24
25         return i;
26 }
27
28 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
29 {
30         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
31 }
32
33 int smpi_sender(int argc, char **argv)
34 {
35         smx_process_t self;
36         smx_host_t shost;
37         int rank;
38
39         xbt_fifo_t request_queue;
40         smx_mutex_t request_queue_mutex;
41         int size;
42
43         int running_hosts_count;
44
45         smpi_mpi_request_t *request;
46
47         smx_host_t dhost;
48
49         smx_action_t action;
50
51         smpi_received_message_t *message;
52
53         int drank;
54
55         smx_process_t receiver_process;
56
57         self  = SIMIX_process_self();
58         shost = SIMIX_host_self();
59         rank  = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost);
60
61         // make sure root is done before own initialization
62         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
63         if (!smpi_global->root_ready) {
64                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
65         }
66         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
67
68         request_queue       = smpi_global->pending_send_request_queues[rank];
69         request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank];
70         size                = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
71
72         smpi_global->sender_processes[rank] = self;
73
74         // wait for all nodes to signal initializatin complete
75         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
76         smpi_global->ready_process_count++;
77         if (smpi_global->ready_process_count < 3 * size) {
78                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
79         } else {
80                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
81         }
82         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
83
84         do {
85
86                 SIMIX_mutex_lock(request_queue_mutex);
87                 request = xbt_fifo_shift(request_queue);
88                 SIMIX_mutex_unlock(request_queue_mutex);
89
90                 if (NULL == request) {
91                         SIMIX_process_suspend(self);
92                 } else {
93
94                         message       = xbt_mallocator_get(smpi_global->message_mallocator);
95
96                         SIMIX_mutex_lock(request->simdata->mutex);
97
98                         message->comm = request->comm;
99                         message->src  = request->src;
100                         message->dst  = request->dst;
101                         message->tag  = request->tag;
102                         message->buf  = xbt_malloc(request->datatype->size * request->count);
103                         memcpy(message->buf, request->buf, request->datatype->size * request->count);
104
105                         dhost = request->comm->simdata->hosts[request->dst];
106                         drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost);
107
108                         SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]);
109                         xbt_fifo_push(smpi_global->received_message_queues[drank], message);
110                         SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]);
111
112                         request->completed = 1;
113
114                         action = SIMIX_action_communicate(shost, dhost, NULL, request->datatype->size * request->count * 1.0, -1.0);
115
116                         SIMIX_register_action_to_condition(action, request->simdata->cond);
117
118                         SIMIX_cond_wait(request->simdata->cond, request->simdata->mutex);
119
120                         SIMIX_mutex_unlock(request->simdata->mutex);
121
122                         // wake up receiver if necessary
123                         receiver_process = smpi_global->receiver_processes[drank];
124                         if (SIMIX_process_is_suspended(receiver_process)) {
125                                 SIMIX_process_resume(receiver_process);
126                         }
127
128                 }
129
130                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
131                 running_hosts_count = smpi_global->running_hosts_count;
132                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
133
134         } while (0 < running_hosts_count);
135
136         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
137         smpi_global->ready_process_count--;
138         if (smpi_global->ready_process_count == 0) {
139                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
140         } else if (smpi_global->ready_process_count < 0) {
141                 // FIXME: can't happen! abort!
142         }
143         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
144
145         return 0;
146 }
147
148 int smpi_receiver(int argc, char **argv)
149 {
150         smx_process_t self;
151         int rank;
152
153         xbt_fifo_t request_queue;
154         smx_mutex_t request_queue_mutex;
155         xbt_fifo_t message_queue;
156         smx_mutex_t message_queue_mutex;
157         int size;
158
159         int running_hosts_count;
160
161         smpi_mpi_request_t *request;
162         smpi_received_message_t *message;
163
164         xbt_fifo_item_t request_item;
165         xbt_fifo_item_t message_item;
166
167         self  = SIMIX_process_self();
168         rank  = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
169
170         // make sure root is done before own initialization
171         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
172         if (!smpi_global->root_ready) {
173                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
174         }
175         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
176
177         request_queue       = smpi_global->pending_recv_request_queues[rank];
178         request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
179         message_queue       = smpi_global->received_message_queues[rank];
180         message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
181         size                = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
182
183         smpi_global->receiver_processes[rank] = self;
184
185         // wait for all nodes to signal initializatin complete
186         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
187         smpi_global->ready_process_count++;
188         if (smpi_global->ready_process_count < 3 * size) {
189                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
190         } else {
191                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
192         }
193         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
194
195         do {
196                 request = NULL;
197                 message = NULL;
198
199                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
200
201                 // FIXME: not the best way to request multiple locks...
202                 SIMIX_mutex_lock(request_queue_mutex);
203                 SIMIX_mutex_lock(message_queue_mutex);
204                 for (request_item = xbt_fifo_get_first_item(request_queue);
205                         NULL != request_item;
206                         request_item = xbt_fifo_get_next_item(request_item)) {
207                         request = xbt_fifo_get_item_content(request_item);
208                         for (message_item = xbt_fifo_get_first_item(message_queue);
209                                 NULL != message_item;
210                                 message_item = xbt_fifo_get_next_item(message_item)) {
211                                 message = xbt_fifo_get_item_content(message_item);
212                                 if (request->comm == message->comm &&
213                                                 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
214                                                 request->tag == message->tag) {
215                                         xbt_fifo_remove_item(request_queue, request_item);
216                                         xbt_fifo_remove_item(message_queue, message_item);
217                                         goto stopsearch;
218                                 }
219                         }
220                 }
221 stopsearch:
222                 SIMIX_mutex_unlock(message_queue_mutex);
223                 SIMIX_mutex_unlock(request_queue_mutex);
224
225                 if (NULL == request || NULL == message) {
226                         SIMIX_process_suspend(self);
227                 } else {
228                         SIMIX_mutex_lock(request->simdata->mutex);
229
230                         memcpy(request->buf, message->buf, request->datatype->size * request->count);
231                         request->src = message->src;
232                         request->completed = 1;
233                         SIMIX_cond_broadcast(request->simdata->cond);
234
235                         SIMIX_mutex_unlock(request->simdata->mutex);
236
237                         xbt_free(message->buf);
238                         xbt_mallocator_release(smpi_global->message_mallocator, message);
239                 }
240
241                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
242                 running_hosts_count = smpi_global->running_hosts_count;
243                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
244
245         } while (0 < running_hosts_count);
246
247         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
248         smpi_global->ready_process_count--;
249         if (smpi_global->ready_process_count == 0) {
250                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
251         } else if (smpi_global->ready_process_count < 0) {
252                 // FIXME: can't happen, abort!
253         }
254         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
255
256         return 0;
257 }
258
259 void *smpi_request_new()
260 {
261         smpi_mpi_request_t *request = xbt_new(smpi_mpi_request_t, 1);
262
263         request->completed = 0;
264         request->simdata            = xbt_new(s_smpi_mpi_request_simdata_t, 1);
265         request->simdata->mutex     = SIMIX_mutex_init();
266         request->simdata->cond      = SIMIX_cond_init();
267
268         return request;
269 }
270
271 void smpi_request_free(void *pointer)
272 {
273
274         smpi_mpi_request_t *request = pointer;
275
276         if (NULL != request) {
277                 SIMIX_cond_destroy(request->simdata->cond);
278                 SIMIX_mutex_destroy(request->simdata->mutex);
279                 xbt_free(request->simdata);
280                 xbt_free(request);
281         }
282
283         return;
284 }
285
286 void smpi_request_reset(void *pointer)
287 {
288         return;
289 }
290
291
292 void *smpi_message_new()
293 {
294         return xbt_new(smpi_received_message_t, 1);
295 }
296
297 void smpi_message_free(void *pointer)
298 {
299         if (NULL != pointer) {
300                 xbt_free(pointer);
301         }
302
303         return;
304 }
305
306 void smpi_message_reset(void *pointer)
307 {
308         return;
309 }
310
311 void smpi_global_init()
312 {
313         int i;
314
315         int size = SIMIX_host_get_number();
316
317         smpi_global = xbt_new(s_SMPI_Global_t, 1);
318
319         // config variable
320         smpi_global->reference_speed                     = SMPI_DEFAULT_SPEED;
321
322         smpi_global->root_ready                          = 0;
323         smpi_global->ready_process_count                 = 0;
324
325         // start/stop
326         smpi_global->start_stop_mutex                    = SIMIX_mutex_init();
327         smpi_global->start_stop_cond                     = SIMIX_cond_init();
328
329         // processes
330         smpi_global->sender_processes                    = xbt_new(smx_process_t, size);
331         smpi_global->receiver_processes                  = xbt_new(smx_process_t, size);
332
333         // running hosts
334         smpi_global->running_hosts_count_mutex           = SIMIX_mutex_init();
335         smpi_global->running_hosts_count                 = 0;
336
337         // mallocators
338         smpi_global->request_mallocator                  = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE,
339                                                              smpi_request_new, smpi_request_free, smpi_request_reset);
340         smpi_global->message_mallocator                  = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE,
341                                                              smpi_message_new, smpi_message_free, smpi_message_reset);
342
343         //
344         smpi_global->pending_send_request_queues         = xbt_new(xbt_fifo_t,  size);
345         smpi_global->pending_send_request_queues_mutexes = xbt_new(smx_mutex_t, size);
346         smpi_global->pending_recv_request_queues         = xbt_new(xbt_fifo_t,  size);
347         smpi_global->pending_recv_request_queues_mutexes = xbt_new(smx_mutex_t, size);
348         smpi_global->received_message_queues             = xbt_new(xbt_fifo_t,  size);
349         smpi_global->received_message_queues_mutexes     = xbt_new(smx_mutex_t, size);
350         smpi_global->timers                              = xbt_new(xbt_os_timer_t, size);
351         smpi_global->timers_mutexes                      = xbt_new(smx_mutex_t, size);
352
353         for(i = 0; i < size; i++) {
354                 smpi_global->pending_send_request_queues[i]         = xbt_fifo_new();
355                 smpi_global->pending_send_request_queues_mutexes[i] = SIMIX_mutex_init();
356                 smpi_global->pending_recv_request_queues[i]         = xbt_fifo_new();
357                 smpi_global->pending_recv_request_queues_mutexes[i] = SIMIX_mutex_init();
358                 smpi_global->received_message_queues[i]             = xbt_fifo_new();
359                 smpi_global->received_message_queues_mutexes[i]     = SIMIX_mutex_init();
360                 smpi_global->timers[i]                              = xbt_os_timer_new();
361                 smpi_global->timers_mutexes[i]                      = SIMIX_mutex_init();
362         }
363
364 }
365
366 void smpi_global_destroy()
367 {
368         int i;
369
370         int size = SIMIX_host_get_number();
371
372         // start/stop
373         SIMIX_mutex_destroy(smpi_global->start_stop_mutex);
374         SIMIX_cond_destroy(smpi_global->start_stop_cond);
375
376         // processes
377         xbt_free(smpi_global->sender_processes);
378         xbt_free(smpi_global->receiver_processes);
379
380         // running hosts
381         SIMIX_mutex_destroy(smpi_global->running_hosts_count_mutex);
382
383         // mallocators
384         xbt_mallocator_free(smpi_global->request_mallocator);
385         xbt_mallocator_free(smpi_global->message_mallocator);
386
387         for(i = 0; i < size; i++) {
388                 xbt_fifo_free(smpi_global->pending_send_request_queues[i]);
389                 SIMIX_mutex_destroy(smpi_global->pending_send_request_queues_mutexes[i]);
390                 xbt_fifo_free(smpi_global->pending_recv_request_queues[i]);
391                 SIMIX_mutex_destroy(smpi_global->pending_recv_request_queues_mutexes[i]);
392                 xbt_fifo_free(smpi_global->received_message_queues[i]);
393                 SIMIX_mutex_destroy(smpi_global->received_message_queues_mutexes[i]);
394                 xbt_os_timer_free(smpi_global->timers[i]);
395                 SIMIX_mutex_destroy(smpi_global->timers_mutexes[i]);
396         }
397
398         xbt_free(smpi_global->pending_send_request_queues);
399         xbt_free(smpi_global->pending_send_request_queues_mutexes);
400         xbt_free(smpi_global->pending_recv_request_queues);
401         xbt_free(smpi_global->pending_recv_request_queues_mutexes);
402         xbt_free(smpi_global->received_message_queues);
403         xbt_free(smpi_global->received_message_queues_mutexes);
404         xbt_free(smpi_global->timers);
405         xbt_free(smpi_global->timers_mutexes);
406
407         xbt_free(smpi_global);
408 }
409
410 int smpi_run_simulation(int argc, char **argv)
411 {
412         xbt_fifo_item_t cond_item   = NULL;
413         smx_cond_t   cond           = NULL;
414         smx_action_t action         = NULL;
415
416         xbt_fifo_t   actions_failed = xbt_fifo_new();
417         xbt_fifo_t   actions_done   = xbt_fifo_new();
418
419         srand(SMPI_RAND_SEED);
420
421         SIMIX_global_init(&argc, argv);
422
423         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
424         SIMIX_function_register("smpi_sender",         smpi_sender);
425         SIMIX_function_register("smpi_receiver",       smpi_receiver);
426
427         // FIXME: ought to verify these files...
428         SIMIX_create_environment(argv[1]);
429
430         // must initialize globals between creating environment and launching app....
431         smpi_global_init();
432
433         SIMIX_launch_application(argv[2]);
434
435         /* Prepare to display some more info when dying on Ctrl-C pressing */
436         // FIXME: doesn't work
437         //signal(SIGINT, inthandler);
438
439         /* Clean IO before the run */
440         fflush(stdout);
441         fflush(stderr);
442
443         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
444                 while ((action = xbt_fifo_pop(actions_failed))) {
445                         DEBUG1("** %s failed **", action->name);
446                         xbt_fifo_foreach(action->cond_list, cond_item, cond, smx_cond_t) {
447                                 SIMIX_cond_broadcast(cond);
448                                 SIMIX_unregister_action_to_condition(action, cond);
449                         }
450                         SIMIX_action_destroy(action);
451                 }
452                 while ((action = xbt_fifo_pop(actions_done))) {
453                         DEBUG1("** %s done **",action->name);
454                         xbt_fifo_foreach(action->cond_list, cond_item, cond, smx_cond_t) {
455                                 SIMIX_cond_broadcast(cond);
456                                 SIMIX_unregister_action_to_condition(action, cond);
457                         }
458                         SIMIX_action_destroy(action);
459                 }
460         }
461
462         xbt_fifo_free(actions_failed);
463         xbt_fifo_free(actions_done);
464
465         INFO1("simulation time %g", SIMIX_get_clock());
466
467         smpi_global_destroy();
468
469         SIMIX_clean();
470
471         return 0;
472 }
473
474 void smpi_mpi_land_func(void *x, void *y, void *z)
475 {
476         *(int *)z = *(int *)x && *(int *)y;
477 }
478
479 void smpi_mpi_sum_func(void *x, void *y, void *z)
480 {
481         *(int *)z = *(int *)x + *(int *)y;
482 }
483
484
485 void smpi_mpi_init()
486 {
487         smx_process_t process;
488         smx_host_t host;
489         smx_host_t *hosts;
490         int size;
491
492         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
493         smpi_global->running_hosts_count++;
494         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
495
496         // initialize some local variables
497         process = SIMIX_process_self();
498         host    = SIMIX_host_self();
499         hosts   = SIMIX_host_get_table();
500         size    = SIMIX_host_get_number();
501
502         // node 0 sets the globals
503         if (host == hosts[0]) {
504
505                 smpi_mpi_global                                = xbt_new(s_SMPI_MPI_Global_t, 1);
506
507                 // global communicator
508                 smpi_mpi_global->mpi_comm_world                         = xbt_new(smpi_mpi_communicator_t, 1);
509                 smpi_mpi_global->mpi_comm_world->size                   = size;
510                 smpi_mpi_global->mpi_comm_world->simdata                = xbt_new(s_smpi_mpi_communicator_simdata_t, 1);
511                 smpi_mpi_global->mpi_comm_world->simdata->barrier_count = 0;
512                 smpi_mpi_global->mpi_comm_world->simdata->barrier_mutex = SIMIX_mutex_init();
513                 smpi_mpi_global->mpi_comm_world->simdata->barrier_cond  = SIMIX_cond_init();
514                 smpi_mpi_global->mpi_comm_world->simdata->hosts         = hosts;
515                 smpi_mpi_global->mpi_comm_world->simdata->processes     = xbt_new(smx_process_t, size);
516                 smpi_mpi_global->mpi_comm_world->simdata->processes[0]  = process;
517
518                 // mpi datatypes
519                 smpi_mpi_global->mpi_byte                      = xbt_new(smpi_mpi_datatype_t, 1);
520                 smpi_mpi_global->mpi_byte->size                = (size_t)1;
521                 smpi_mpi_global->mpi_int                       = xbt_new(smpi_mpi_datatype_t, 1);
522                 smpi_mpi_global->mpi_int->size                 = sizeof(int);
523                 smpi_mpi_global->mpi_double                    = xbt_new(smpi_mpi_datatype_t, 1);
524                 smpi_mpi_global->mpi_double->size              = sizeof(double);
525
526                 // mpi operations
527                 smpi_mpi_global->mpi_land                      = xbt_new(smpi_mpi_op_t, 1);
528                 smpi_mpi_global->mpi_land->func                = smpi_mpi_land_func;
529                 smpi_mpi_global->mpi_sum                       = xbt_new(smpi_mpi_op_t, 1);
530                 smpi_mpi_global->mpi_sum->func                 = smpi_mpi_sum_func;
531
532                 // signal all nodes to perform initialization
533                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
534                 smpi_global->root_ready = 1;
535                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
536                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
537
538         } else {
539
540                 // make sure root is done before own initialization
541                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
542                 if (!smpi_global->root_ready) {
543                         SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
544                 }
545                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
546
547                 smpi_mpi_global->mpi_comm_world->simdata->processes[smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world)] = process;
548         }
549
550         // wait for all nodes to signal initializatin complete
551         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
552         smpi_global->ready_process_count++;
553         if (smpi_global->ready_process_count < 3 * size) {
554                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
555         } else {
556                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
557         }
558         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
559
560         return;
561 }
562
563 void smpi_mpi_finalize()
564 {
565         int i;
566
567         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
568         i = --smpi_global->running_hosts_count;
569         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
570
571         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
572         smpi_global->ready_process_count--;
573         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
574
575         if (0 >= i) {
576
577                 // wake up senders/receivers
578                 for (i = 0; i < smpi_mpi_global->mpi_comm_world->size; i++) {
579                         if (SIMIX_process_is_suspended(smpi_global->sender_processes[i])) {
580                                 SIMIX_process_resume(smpi_global->sender_processes[i]);
581                         }
582                         if (SIMIX_process_is_suspended(smpi_global->receiver_processes[i])) {
583                                 SIMIX_process_resume(smpi_global->receiver_processes[i]);
584                         }
585                 }
586
587                 // wait for senders/receivers to exit...
588                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
589                 if (smpi_global->ready_process_count > 0) {
590                         SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
591                 }
592                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
593
594                 SIMIX_mutex_destroy(smpi_mpi_global->mpi_comm_world->simdata->barrier_mutex);
595                 SIMIX_cond_destroy(smpi_mpi_global->mpi_comm_world->simdata->barrier_cond);
596                 xbt_free(smpi_mpi_global->mpi_comm_world->simdata->processes);
597                 xbt_free(smpi_mpi_global->mpi_comm_world->simdata);
598                 xbt_free(smpi_mpi_global->mpi_comm_world);
599
600                 xbt_free(smpi_mpi_global->mpi_byte);
601                 xbt_free(smpi_mpi_global->mpi_int);
602                 xbt_free(smpi_mpi_global->mpi_double);
603
604                 xbt_free(smpi_mpi_global->mpi_land);
605                 xbt_free(smpi_mpi_global->mpi_sum);
606
607                 xbt_free(smpi_mpi_global);
608         }
609
610 }
611
612 // FIXME: could cause trouble with multithreaded procs on same host...
613 void smpi_bench_begin()
614 {
615         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
616         SIMIX_mutex_lock(smpi_global->timers_mutexes[rank]);
617         xbt_os_timer_start(smpi_global->timers[rank]);
618         return;
619 }
620
621 void smpi_bench_end()
622 {
623         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
624         double duration;
625         smx_host_t host;
626         smx_action_t compute_action;
627         smx_mutex_t mutex;
628         smx_cond_t cond;
629
630         xbt_os_timer_stop(smpi_global->timers[rank]);
631
632         duration       = xbt_os_timer_elapsed(smpi_global->timers[rank]);
633         SIMIX_mutex_unlock(smpi_global->timers_mutexes[rank]);
634
635         host           = smpi_mpi_global->mpi_comm_world->simdata->hosts[rank];
636         compute_action = SIMIX_action_execute(host, NULL, duration * SMPI_DEFAULT_SPEED);
637         mutex          = SIMIX_mutex_init();
638         cond           = SIMIX_cond_init();
639
640         SIMIX_mutex_lock(mutex);
641         SIMIX_register_action_to_condition(compute_action, cond);
642         SIMIX_cond_wait(cond, mutex);
643         //SIMIX_unregister_action_to_condition(compute_action, cond);
644         SIMIX_mutex_unlock(mutex);
645
646         SIMIX_mutex_destroy(mutex);
647         SIMIX_cond_destroy(cond);
648
649         // FIXME: check for success/failure?
650
651         return;
652 }
653
654 void smpi_barrier(smpi_mpi_communicator_t *comm)
655 {
656
657         SIMIX_mutex_lock(comm->simdata->barrier_mutex);
658         if(++comm->simdata->barrier_count < comm->size) {
659                 SIMIX_cond_wait(comm->simdata->barrier_cond, comm->simdata->barrier_mutex);
660         } else {
661                 comm->simdata->barrier_count = 0;
662                 SIMIX_cond_broadcast(comm->simdata->barrier_cond);
663         }
664         SIMIX_mutex_unlock(comm->simdata->barrier_mutex);
665
666         return;
667 }
668
669 // FIXME: smarter algorithm...
670 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
671 {
672         int i;
673         for(i = 0; i < comm->size && host != comm->simdata->hosts[i]; i++);
674         if (i >= comm->size) i = -1;
675         return i;
676 }
677
678 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
679         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
680 {
681         int retval = MPI_SUCCESS;
682
683         *request = NULL;
684
685         if (0 > count) {
686                 retval = MPI_ERR_COUNT;
687         } else if (NULL == buf) {
688                 retval = MPI_ERR_INTERN;
689         } else if (NULL == datatype) {
690                 retval = MPI_ERR_TYPE;
691         } else if (NULL == comm) {
692                 retval = MPI_ERR_COMM;
693         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
694                 retval = MPI_ERR_RANK;
695         } else if (0 > dst || comm->size <= dst) {
696                 retval = MPI_ERR_RANK;
697         } else if (0 > tag) {
698                 retval = MPI_ERR_TAG;
699         } else {
700                 *request = xbt_mallocator_get(smpi_global->request_mallocator);
701                 (*request)->comm       = comm;
702                 (*request)->src        = src;
703                 (*request)->dst        = dst;
704                 (*request)->tag        = tag;
705                 (*request)->buf        = buf;
706                 (*request)->count      = count;
707                 (*request)->datatype   = datatype;
708         }
709         return retval;
710 }
711
712 int smpi_isend(smpi_mpi_request_t *request)
713 {
714         int retval = MPI_SUCCESS;
715         int rank   = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
716
717         if (NULL != request) {
718                 SIMIX_mutex_lock(smpi_global->pending_send_request_queues_mutexes[rank]);
719                 xbt_fifo_push(smpi_global->pending_send_request_queues[rank], request);
720                 SIMIX_mutex_unlock(smpi_global->pending_send_request_queues_mutexes[rank]);
721         }
722
723         if (SIMIX_process_is_suspended(smpi_global->sender_processes[rank])) {
724                 SIMIX_process_resume(smpi_global->sender_processes[rank]);
725         }
726
727         return retval;
728 }
729
730 int smpi_irecv(smpi_mpi_request_t *request)
731 {
732         int retval = MPI_SUCCESS;
733         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
734
735         if (NULL != request) {
736                 SIMIX_mutex_lock(smpi_global->pending_recv_request_queues_mutexes[rank]);
737                 xbt_fifo_push(smpi_global->pending_recv_request_queues[rank], request);
738                 SIMIX_mutex_unlock(smpi_global->pending_recv_request_queues_mutexes[rank]);
739         }
740
741         if (SIMIX_process_is_suspended(smpi_global->receiver_processes[rank])) {
742                 SIMIX_process_resume(smpi_global->receiver_processes[rank]);
743         }
744
745         return retval;
746 }
747
748 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
749 {
750         if (NULL != request) {
751                 SIMIX_mutex_lock(request->simdata->mutex);
752                 if (!request->completed) {
753                         SIMIX_cond_wait(request->simdata->cond, request->simdata->mutex);
754                 }
755                 if (NULL != status) {
756                         status->MPI_SOURCE = request->src;
757                 }
758                 SIMIX_mutex_unlock(request->simdata->mutex);
759         }
760 }
761
762 // FIXME: move into own file
763 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
764 {
765         double now;
766         int retval = 0;
767         smpi_bench_end();
768         if (NULL == tv) {
769                 retval = -1;
770         } else {
771                 now = SIMIX_get_clock();
772                 tv->tv_sec  = now;
773                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
774         }
775         smpi_bench_begin();
776         return retval;
777 }
778
779 unsigned int smpi_sleep(unsigned int seconds)
780 {
781         smx_mutex_t mutex;
782         smx_cond_t cond;
783         smx_host_t host;
784         smx_action_t sleep_action;
785
786         smpi_bench_end();
787         host         = SIMIX_host_self();
788         sleep_action = SIMIX_action_sleep(host, seconds);
789         mutex        = SIMIX_mutex_init();
790         cond         = SIMIX_cond_init();
791
792         SIMIX_mutex_lock(mutex);
793         SIMIX_register_action_to_condition(sleep_action, cond);
794         SIMIX_cond_wait(cond, mutex);
795         //SIMIX_unregister_action_to_condition(sleep_action, cond);
796         SIMIX_mutex_unlock(mutex);
797
798         SIMIX_mutex_destroy(mutex);
799         SIMIX_cond_destroy(cond);
800
801         // FIXME: check for success/failure?
802
803         smpi_bench_begin();
804         return 0;
805 }
806
807 void smpi_exit(int status)
808 {
809         smpi_bench_end();
810         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
811         smpi_global->running_hosts_count--;
812         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
813         SIMIX_process_kill(SIMIX_process_self());
814         return;
815 }