Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ad53360abb47bc9852c71dc6cfa99e108154c28e
[simgrid.git] / src / smpi / smpi_base.c
1 #include <stdio.h>
2 #include <signal.h>
3 #include <sys/time.h>
4
5 #include "private.h"
6
7 SMPI_Global_t     smpi_global     = NULL;
8
9 SMPI_MPI_Global_t smpi_mpi_global = NULL;
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi,XBT_LOG_ROOT_CAT, "All SMPI categories (see \ref SMPI_API)");
12
13 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
14 {
15         return comm->size;
16 }
17
18 // FIXME: smarter algorithm?
19 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
20 {
21         int i;
22
23         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
24
25         return i;
26 }
27
28 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
29 {
30         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
31 }
32
33 //int smpi_mpi_comm_world_rank_self()
34 //{
35 //      return smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, SIMIX_host_self());
36 //}
37
38 int smpi_sender(int argc, char **argv)
39 {
40         smx_process_t self;
41         smx_host_t shost;
42         int rank;
43
44         xbt_fifo_t request_queue;
45         smx_mutex_t request_queue_mutex;
46         int size;
47
48         int running_hosts_count;
49
50         smpi_mpi_request_t *request;
51
52         smx_host_t dhost;
53
54         smx_action_t communicate_action;
55
56         smpi_received_message_t *message;
57
58         int drank;
59
60         smx_process_t receiver_process;
61
62         self  = SIMIX_process_self();
63         shost = SIMIX_host_self();
64         rank  = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost);
65
66         // make sure root is done before own initialization
67         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
68         if (!smpi_global->root_ready) {
69                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
70         }
71         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
72
73         request_queue       = smpi_global->pending_send_request_queues[rank];
74         request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank];
75         size                = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
76
77         smpi_global->sender_processes[rank] = self;
78
79         // wait for all nodes to signal initializatin complete
80         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
81         smpi_global->ready_process_count++;
82         if (smpi_global->ready_process_count < 3 * size) {
83                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
84         } else {
85                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
86         }
87         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
88
89         do {
90
91                 SIMIX_mutex_lock(request_queue_mutex);
92                 request = xbt_fifo_shift(request_queue);
93                 SIMIX_mutex_unlock(request_queue_mutex);
94
95                 if (NULL == request) {
96                         SIMIX_process_suspend(self);
97                 } else {
98
99                         SIMIX_mutex_lock(request->mutex);
100
101                         // copy request to appropriate received queue
102                         message       = xbt_mallocator_get(smpi_global->message_mallocator);
103                         message->comm = request->comm;
104                         message->src  = request->src;
105                         message->dst  = request->dst;
106                         message->tag  = request->tag;
107                         message->buf  = xbt_malloc(request->datatype->size * request->count);
108                         memcpy(message->buf, request->buf, request->datatype->size * request->count);
109
110                         dhost = request->comm->hosts[request->dst];
111                         drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost);
112
113                         SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]);
114                         xbt_fifo_push(smpi_global->received_message_queues[drank], message);
115                         SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]);
116
117                         request->completed = 1;
118
119                         communicate_action = SIMIX_action_communicate(shost, dhost,
120                                 NULL, request->datatype->size * request->count * 1.0, -1.0);
121
122                         SIMIX_register_action_to_condition(communicate_action, request->cond);
123                         SIMIX_cond_wait(request->cond, request->mutex);
124                         //SIMIX_unregister_action_to_condition(communicate_action, request->cond);
125
126                         SIMIX_mutex_unlock(request->mutex);
127
128                         // wake up receiver if necessary
129                         receiver_process = smpi_global->receiver_processes[drank];
130
131                         if (SIMIX_process_is_suspended(receiver_process)) {
132                                 SIMIX_process_resume(receiver_process);
133                         }
134
135                 }
136
137                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
138                 running_hosts_count = smpi_global->running_hosts_count;
139                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
140
141         } while (0 < running_hosts_count);
142
143         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
144         smpi_global->ready_process_count--;
145         if (smpi_global->ready_process_count == 0) {
146                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
147         } else if (smpi_global->ready_process_count < 0) {
148                 // FIXME: can't happen! abort!
149         }
150         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
151
152         return 0;
153 }
154
155 int smpi_receiver(int argc, char **argv)
156 {
157         smx_process_t self;
158         int rank;
159
160         xbt_fifo_t request_queue;
161         smx_mutex_t request_queue_mutex;
162         xbt_fifo_t message_queue;
163         smx_mutex_t message_queue_mutex;
164         int size;
165
166         int running_hosts_count;
167
168         smpi_mpi_request_t *request;
169         smpi_received_message_t *message;
170
171         xbt_fifo_item_t request_item;
172         xbt_fifo_item_t message_item;
173
174         self  = SIMIX_process_self();
175         rank  = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
176
177         // make sure root is done before own initialization
178         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
179         if (!smpi_global->root_ready) {
180                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
181         }
182         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
183
184         request_queue       = smpi_global->pending_recv_request_queues[rank];
185         request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
186         message_queue       = smpi_global->received_message_queues[rank];
187         message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
188         size                = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
189
190         smpi_global->receiver_processes[rank] = self;
191
192         // wait for all nodes to signal initializatin complete
193         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
194         smpi_global->ready_process_count++;
195         if (smpi_global->ready_process_count < 3 * size) {
196                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
197         } else {
198                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
199         }
200         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
201
202         do {
203                 request = NULL;
204                 message = NULL;
205
206                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
207
208                 // FIXME: not the best way to request multiple locks...
209                 SIMIX_mutex_lock(request_queue_mutex);
210                 SIMIX_mutex_lock(message_queue_mutex);
211                 for (request_item = xbt_fifo_get_first_item(request_queue);
212                         NULL != request_item;
213                         request_item = xbt_fifo_get_next_item(request_item)) {
214                         request = xbt_fifo_get_item_content(request_item);
215                         for (message_item = xbt_fifo_get_first_item(message_queue);
216                                 NULL != message_item;
217                                 message_item = xbt_fifo_get_next_item(message_item)) {
218                                 message = xbt_fifo_get_item_content(message_item);
219                                 if (request->comm == message->comm &&
220                                                 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
221                                                 request->tag == message->tag) {
222                                         xbt_fifo_remove_item(request_queue, request_item);
223                                         xbt_fifo_remove_item(message_queue, message_item);
224                                         goto stopsearch;
225                                 }
226                         }
227                 }
228 stopsearch:
229                 SIMIX_mutex_unlock(message_queue_mutex);
230                 SIMIX_mutex_unlock(request_queue_mutex);
231
232                 if (NULL == request || NULL == message) {
233                         SIMIX_process_suspend(self);
234                 } else {
235                         SIMIX_mutex_lock(request->mutex);
236
237                         memcpy(request->buf, message->buf, request->datatype->size * request->count);
238                         request->src = message->src;
239                         request->completed = 1;
240                         SIMIX_cond_broadcast(request->cond);
241
242                         SIMIX_mutex_unlock(request->mutex);
243
244                         xbt_free(message->buf);
245                         xbt_mallocator_release(smpi_global->message_mallocator, message);
246                 }
247
248                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
249                 running_hosts_count = smpi_global->running_hosts_count;
250                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
251
252         } while (0 < running_hosts_count);
253
254         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
255         smpi_global->ready_process_count--;
256         if (smpi_global->ready_process_count == 0) {
257                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
258         } else if (smpi_global->ready_process_count < 0) {
259                 // FIXME: can't happen, abort!
260         }
261         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
262
263         return 0;
264 }
265
266 void *smpi_request_new()
267 {
268         smpi_mpi_request_t *request = xbt_new(smpi_mpi_request_t, 1);
269
270         request->completed = 0;
271         request->mutex     = SIMIX_mutex_init();
272         request->cond      = SIMIX_cond_init();
273
274         return request;
275 }
276
277 void smpi_request_free(void *pointer)
278 {
279
280         smpi_mpi_request_t *request = pointer;
281
282         if (NULL != request) {
283                 SIMIX_cond_destroy(request->cond);
284                 SIMIX_mutex_destroy(request->mutex);
285                 xbt_free(request);
286         }
287
288         return;
289 }
290
291 void smpi_request_reset(void *pointer)
292 {
293         return;
294 }
295
296
297 void *smpi_message_new()
298 {
299         return xbt_new(smpi_received_message_t, 1);
300 }
301
302 void smpi_message_free(void *pointer)
303 {
304         if (NULL != pointer) {
305                 xbt_free(pointer);
306         }
307
308         return;
309 }
310
311 void smpi_message_reset(void *pointer)
312 {
313         return;
314 }
315
316 void smpi_global_init()
317 {
318         int i;
319
320         int size = SIMIX_host_get_number();
321
322         smpi_global = xbt_new(s_SMPI_Global_t, 1);
323
324         // config variable
325         smpi_global->reference_speed                     = SMPI_DEFAULT_SPEED;
326
327         smpi_global->root_ready                          = 0;
328         smpi_global->ready_process_count                 = 0;
329
330         // start/stop
331         smpi_global->start_stop_mutex                    = SIMIX_mutex_init();
332         smpi_global->start_stop_cond                     = SIMIX_cond_init();
333
334         // processes
335         smpi_global->sender_processes                    = xbt_new(smx_process_t, size);
336         smpi_global->receiver_processes                  = xbt_new(smx_process_t, size);
337
338         // running hosts
339         smpi_global->running_hosts_count_mutex           = SIMIX_mutex_init();
340         smpi_global->running_hosts_count                 = 0;
341
342         // mallocators
343         smpi_global->request_mallocator                  = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE,
344                                                              smpi_request_new, smpi_request_free, smpi_request_reset);
345         smpi_global->message_mallocator                  = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE,
346                                                              smpi_message_new, smpi_message_free, smpi_message_reset);
347
348         //
349         smpi_global->pending_send_request_queues         = xbt_new(xbt_fifo_t,  size);
350         smpi_global->pending_send_request_queues_mutexes = xbt_new(smx_mutex_t, size);
351         smpi_global->pending_recv_request_queues         = xbt_new(xbt_fifo_t,  size);
352         smpi_global->pending_recv_request_queues_mutexes = xbt_new(smx_mutex_t, size);
353         smpi_global->received_message_queues             = xbt_new(xbt_fifo_t,  size);
354         smpi_global->received_message_queues_mutexes     = xbt_new(smx_mutex_t, size);
355         smpi_global->timers                              = xbt_new(xbt_os_timer_t, size);
356         smpi_global->timers_mutexes                      = xbt_new(smx_mutex_t, size);
357
358         for(i = 0; i < size; i++) {
359                 smpi_global->pending_send_request_queues[i]         = xbt_fifo_new();
360                 smpi_global->pending_send_request_queues_mutexes[i] = SIMIX_mutex_init();
361                 smpi_global->pending_recv_request_queues[i]         = xbt_fifo_new();
362                 smpi_global->pending_recv_request_queues_mutexes[i] = SIMIX_mutex_init();
363                 smpi_global->received_message_queues[i]             = xbt_fifo_new();
364                 smpi_global->received_message_queues_mutexes[i]     = SIMIX_mutex_init();
365                 smpi_global->timers[i]                              = xbt_os_timer_new();
366                 smpi_global->timers_mutexes[i]                      = SIMIX_mutex_init();
367         }
368
369 }
370
371 void smpi_global_destroy()
372 {
373         int i;
374
375         int size = SIMIX_host_get_number();
376
377         // start/stop
378         SIMIX_mutex_destroy(smpi_global->start_stop_mutex);
379         SIMIX_cond_destroy(smpi_global->start_stop_cond);
380
381         // processes
382         xbt_free(smpi_global->sender_processes);
383         xbt_free(smpi_global->receiver_processes);
384
385         // running hosts
386         SIMIX_mutex_destroy(smpi_global->running_hosts_count_mutex);
387
388         // mallocators
389         xbt_mallocator_free(smpi_global->request_mallocator);
390         xbt_mallocator_free(smpi_global->message_mallocator);
391
392         for(i = 0; i < size; i++) {
393                 xbt_fifo_free(smpi_global->pending_send_request_queues[i]);
394                 SIMIX_mutex_destroy(smpi_global->pending_send_request_queues_mutexes[i]);
395                 xbt_fifo_free(smpi_global->pending_recv_request_queues[i]);
396                 SIMIX_mutex_destroy(smpi_global->pending_recv_request_queues_mutexes[i]);
397                 xbt_fifo_free(smpi_global->received_message_queues[i]);
398                 SIMIX_mutex_destroy(smpi_global->received_message_queues_mutexes[i]);
399                 xbt_os_timer_free(smpi_global->timers[i]);
400                 SIMIX_mutex_destroy(smpi_global->timers_mutexes[i]);
401         }
402
403         xbt_free(smpi_global->pending_send_request_queues);
404         xbt_free(smpi_global->pending_send_request_queues_mutexes);
405         xbt_free(smpi_global->pending_recv_request_queues);
406         xbt_free(smpi_global->pending_recv_request_queues_mutexes);
407         xbt_free(smpi_global->received_message_queues);
408         xbt_free(smpi_global->received_message_queues_mutexes);
409         xbt_free(smpi_global->timers);
410         xbt_free(smpi_global->timers_mutexes);
411
412         xbt_free(smpi_global);
413 }
414
415 int smpi_run_simulation(int argc, char **argv)
416 {
417         xbt_fifo_item_t cond_item   = NULL;
418         smx_cond_t   cond           = NULL;
419         smx_action_t action         = NULL;
420
421         xbt_fifo_t   actions_failed = xbt_fifo_new();
422         xbt_fifo_t   actions_done   = xbt_fifo_new();
423
424         srand(SMPI_RAND_SEED);
425
426         SIMIX_global_init(&argc, argv);
427
428         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
429         SIMIX_function_register("smpi_sender",         smpi_sender);
430         SIMIX_function_register("smpi_receiver",       smpi_receiver);
431
432         // FIXME: ought to verify these files...
433         SIMIX_create_environment(argv[1]);
434
435         // must initialize globals between creating environment and launching app....
436         smpi_global_init();
437
438         SIMIX_launch_application(argv[2]);
439
440         /* Prepare to display some more info when dying on Ctrl-C pressing */
441         // FIXME: doesn't work
442         //signal(SIGINT, inthandler);
443
444         /* Clean IO before the run */
445         fflush(stdout);
446         fflush(stderr);
447
448         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
449                 while ((action = xbt_fifo_pop(actions_failed))) {
450                         DEBUG1("** %s failed **", action->name);
451                         xbt_fifo_foreach(action->cond_list, cond_item, cond, smx_cond_t) {
452                                 SIMIX_cond_broadcast(cond);
453                                 SIMIX_unregister_action_to_condition(action, cond);
454                         }
455                         SIMIX_action_destroy(action);
456                 }
457                 while ((action = xbt_fifo_pop(actions_done))) {
458                         DEBUG1("** %s done **",action->name);
459                         xbt_fifo_foreach(action->cond_list, cond_item, cond, smx_cond_t) {
460                                 SIMIX_cond_broadcast(cond);
461                                 SIMIX_unregister_action_to_condition(action, cond);
462                         }
463                         SIMIX_action_destroy(action);
464                 }
465         }
466
467         xbt_fifo_free(actions_failed);
468         xbt_fifo_free(actions_done);
469
470         INFO1("simulation time %g", SIMIX_get_clock());
471
472         smpi_global_destroy();
473
474         SIMIX_clean();
475
476         return 0;
477 }
478
479 void smpi_mpi_land_func(void *x, void *y, void *z)
480 {
481         *(int *)z = *(int *)x && *(int *)y;
482 }
483
484 void smpi_mpi_sum_func(void *x, void *y, void *z)
485 {
486         *(int *)z = *(int *)x + *(int *)y;
487 }
488
489
490 void smpi_mpi_init()
491 {
492         smx_process_t process;
493         smx_host_t host;
494         smx_host_t *hosts;
495         int size;
496
497         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
498         smpi_global->running_hosts_count++;
499         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
500
501         // initialize some local variables
502         process = SIMIX_process_self();
503         host    = SIMIX_host_self();
504         hosts   = SIMIX_host_get_table();
505         size    = SIMIX_host_get_number();
506
507         // node 0 sets the globals
508         if (host == hosts[0]) {
509
510                 smpi_mpi_global                                = xbt_new(s_SMPI_MPI_Global_t, 1);
511
512                 // global communicator
513                 smpi_mpi_global->mpi_comm_world                = xbt_new(smpi_mpi_communicator_t, 1);
514                 smpi_mpi_global->mpi_comm_world->size          = size;
515                 smpi_mpi_global->mpi_comm_world->barrier_count = 0;
516                 smpi_mpi_global->mpi_comm_world->barrier_mutex = SIMIX_mutex_init();
517                 smpi_mpi_global->mpi_comm_world->barrier_cond  = SIMIX_cond_init();
518                 smpi_mpi_global->mpi_comm_world->hosts         = hosts;
519                 smpi_mpi_global->mpi_comm_world->processes     = xbt_new(smx_process_t, size);
520                 smpi_mpi_global->mpi_comm_world->processes[0]  = process;
521
522                 // mpi datatypes
523                 smpi_mpi_global->mpi_byte                      = xbt_new(smpi_mpi_datatype_t, 1);
524                 smpi_mpi_global->mpi_byte->size                = (size_t)1;
525                 smpi_mpi_global->mpi_int                       = xbt_new(smpi_mpi_datatype_t, 1);
526                 smpi_mpi_global->mpi_int->size                 = sizeof(int);
527                 smpi_mpi_global->mpi_double                    = xbt_new(smpi_mpi_datatype_t, 1);
528                 smpi_mpi_global->mpi_double->size              = sizeof(double);
529
530                 // mpi operations
531                 smpi_mpi_global->mpi_land                      = xbt_new(smpi_mpi_op_t, 1);
532                 smpi_mpi_global->mpi_land->func                = smpi_mpi_land_func;
533                 smpi_mpi_global->mpi_sum                       = xbt_new(smpi_mpi_op_t, 1);
534                 smpi_mpi_global->mpi_sum->func                 = smpi_mpi_sum_func;
535
536                 // signal all nodes to perform initialization
537                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
538                 smpi_global->root_ready = 1;
539                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
540                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
541
542         } else {
543
544                 // make sure root is done before own initialization
545                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
546                 if (!smpi_global->root_ready) {
547                         SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
548                 }
549                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
550
551                 smpi_mpi_global->mpi_comm_world->processes[smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world)] = process;
552         }
553
554         // wait for all nodes to signal initializatin complete
555         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
556         smpi_global->ready_process_count++;
557         if (smpi_global->ready_process_count < 3 * size) {
558                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
559         } else {
560                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
561         }
562         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
563
564         return;
565 }
566
567 void smpi_mpi_finalize()
568 {
569         int i;
570
571         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
572         i = --smpi_global->running_hosts_count;
573         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
574
575         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
576         smpi_global->ready_process_count--;
577         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
578
579         if (0 >= i) {
580
581                 // wake up senders/receivers
582                 for (i = 0; i < smpi_mpi_global->mpi_comm_world->size; i++) {
583                         if (SIMIX_process_is_suspended(smpi_global->sender_processes[i])) {
584                                 SIMIX_process_resume(smpi_global->sender_processes[i]);
585                         }
586                         if (SIMIX_process_is_suspended(smpi_global->receiver_processes[i])) {
587                                 SIMIX_process_resume(smpi_global->receiver_processes[i]);
588                         }
589                 }
590
591                 // wait for senders/receivers to exit...
592                 SIMIX_mutex_lock(smpi_global->start_stop_mutex);
593                 if (smpi_global->ready_process_count > 0) {
594                         SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
595                 }
596                 SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
597
598                 SIMIX_mutex_destroy(smpi_mpi_global->mpi_comm_world->barrier_mutex);
599                 SIMIX_cond_destroy(smpi_mpi_global->mpi_comm_world->barrier_cond);
600                 xbt_free(smpi_mpi_global->mpi_comm_world->processes);
601                 xbt_free(smpi_mpi_global->mpi_comm_world);
602
603                 xbt_free(smpi_mpi_global->mpi_byte);
604                 xbt_free(smpi_mpi_global->mpi_int);
605                 xbt_free(smpi_mpi_global->mpi_double);
606
607                 xbt_free(smpi_mpi_global->mpi_land);
608                 xbt_free(smpi_mpi_global->mpi_sum);
609
610                 xbt_free(smpi_mpi_global);
611         }
612
613 }
614
615 // FIXME: could cause trouble with multithreaded procs on same host...
616 void smpi_bench_begin()
617 {
618         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
619         SIMIX_mutex_lock(smpi_global->timers_mutexes[rank]);
620         xbt_os_timer_start(smpi_global->timers[rank]);
621         return;
622 }
623
624 void smpi_bench_end()
625 {
626         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
627         double duration;
628         smx_host_t host;
629         smx_action_t compute_action;
630         smx_mutex_t mutex;
631         smx_cond_t cond;
632
633         xbt_os_timer_stop(smpi_global->timers[rank]);
634
635         duration       = xbt_os_timer_elapsed(smpi_global->timers[rank]);
636         SIMIX_mutex_unlock(smpi_global->timers_mutexes[rank]);
637
638         host           = smpi_mpi_global->mpi_comm_world->hosts[rank];
639         compute_action = SIMIX_action_execute(host, NULL, duration * SMPI_DEFAULT_SPEED);
640         mutex          = SIMIX_mutex_init();
641         cond           = SIMIX_cond_init();
642
643         SIMIX_mutex_lock(mutex);
644         SIMIX_register_action_to_condition(compute_action, cond);
645         SIMIX_cond_wait(cond, mutex);
646         //SIMIX_unregister_action_to_condition(compute_action, cond);
647         SIMIX_mutex_unlock(mutex);
648
649         SIMIX_mutex_destroy(mutex);
650         SIMIX_cond_destroy(cond);
651
652         // FIXME: check for success/failure?
653
654         return;
655 }
656
657 void smpi_barrier(smpi_mpi_communicator_t *comm)
658 {
659
660         SIMIX_mutex_lock(comm->barrier_mutex);
661         if(++comm->barrier_count < comm->size) {
662                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
663         } else {
664                 comm->barrier_count = 0;
665                 SIMIX_cond_broadcast(comm->barrier_cond);
666         }
667         SIMIX_mutex_unlock(comm->barrier_mutex);
668
669         return;
670 }
671
672 // FIXME: smarter algorithm...
673 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
674 {
675         int i;
676         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
677         if (i >= comm->size) i = -1;
678         return i;
679 }
680
681 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
682         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
683 {
684         int retval = MPI_SUCCESS;
685
686         *request = NULL;
687
688         if (0 > count) {
689                 retval = MPI_ERR_COUNT;
690         } else if (NULL == buf) {
691                 retval = MPI_ERR_INTERN;
692         } else if (NULL == datatype) {
693                 retval = MPI_ERR_TYPE;
694         } else if (NULL == comm) {
695                 retval = MPI_ERR_COMM;
696         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
697                 retval = MPI_ERR_RANK;
698         } else if (0 > dst || comm->size <= dst) {
699                 retval = MPI_ERR_RANK;
700         } else if (0 > tag) {
701                 retval = MPI_ERR_TAG;
702         } else {
703                 *request = xbt_mallocator_get(smpi_global->request_mallocator);
704                 (*request)->comm       = comm;
705                 (*request)->src        = src;
706                 (*request)->dst        = dst;
707                 (*request)->tag        = tag;
708                 (*request)->buf        = buf;
709                 (*request)->count      = count;
710                 (*request)->datatype   = datatype;
711         }
712         return retval;
713 }
714
715 int smpi_isend(smpi_mpi_request_t *request)
716 {
717         int retval = MPI_SUCCESS;
718         int rank   = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
719
720         if (NULL != request) {
721                 SIMIX_mutex_lock(smpi_global->pending_send_request_queues_mutexes[rank]);
722                 xbt_fifo_push(smpi_global->pending_send_request_queues[rank], request);
723                 SIMIX_mutex_unlock(smpi_global->pending_send_request_queues_mutexes[rank]);
724         }
725
726         if (SIMIX_process_is_suspended(smpi_global->sender_processes[rank])) {
727                 SIMIX_process_resume(smpi_global->sender_processes[rank]);
728         }
729
730         return retval;
731 }
732
733 int smpi_irecv(smpi_mpi_request_t *request)
734 {
735         int retval = MPI_SUCCESS;
736         int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
737
738         if (NULL != request) {
739                 SIMIX_mutex_lock(smpi_global->pending_recv_request_queues_mutexes[rank]);
740                 xbt_fifo_push(smpi_global->pending_recv_request_queues[rank], request);
741                 SIMIX_mutex_unlock(smpi_global->pending_recv_request_queues_mutexes[rank]);
742         }
743
744         if (SIMIX_process_is_suspended(smpi_global->receiver_processes[rank])) {
745                 SIMIX_process_resume(smpi_global->receiver_processes[rank]);
746         }
747
748         return retval;
749 }
750
751 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
752 {
753         if (NULL != request) {
754                 SIMIX_mutex_lock(request->mutex);
755                 if (!request->completed) {
756                         SIMIX_cond_wait(request->cond, request->mutex);
757                 }
758                 if (NULL != status) {
759                         status->MPI_SOURCE = request->src;
760                 }
761                 SIMIX_mutex_unlock(request->mutex);
762         }
763 }
764
765 // FIXME: move into own file
766 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
767 {
768         double now;
769         int retval = 0;
770         smpi_bench_end();
771         if (NULL == tv) {
772                 retval = -1;
773         } else {
774                 now = SIMIX_get_clock();
775                 tv->tv_sec  = now;
776                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
777         }
778         smpi_bench_begin();
779         return retval;
780 }
781
782 unsigned int smpi_sleep(unsigned int seconds)
783 {
784         smx_mutex_t mutex;
785         smx_cond_t cond;
786         smx_host_t host;
787         smx_action_t sleep_action;
788
789         smpi_bench_end();
790         host         = SIMIX_host_self();
791         sleep_action = SIMIX_action_sleep(host, seconds);
792         mutex        = SIMIX_mutex_init();
793         cond         = SIMIX_cond_init();
794
795         SIMIX_mutex_lock(mutex);
796         SIMIX_register_action_to_condition(sleep_action, cond);
797         SIMIX_cond_wait(cond, mutex);
798         //SIMIX_unregister_action_to_condition(sleep_action, cond);
799         SIMIX_mutex_unlock(mutex);
800
801         SIMIX_mutex_destroy(mutex);
802         SIMIX_cond_destroy(cond);
803
804         // FIXME: check for success/failure?
805
806         smpi_bench_begin();
807         return 0;
808 }
809
810 void smpi_exit(int status)
811 {
812         smpi_bench_end();
813         SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
814         smpi_global->running_hosts_count--;
815         SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
816         SIMIX_process_kill(SIMIX_process_self());
817         return;
818 }