Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
minor code cleanups. Realized I wasn't deallocating message buffers.
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_os_time.h"
6 #include "xbt/mallocator.h"
7 #include "smpi.h"
8
9 // FIXME: move globals into structure...
10
11 xbt_mallocator_t smpi_request_mallocator      = NULL;
12 xbt_mallocator_t smpi_message_mallocator      = NULL;
13
14 xbt_fifo_t *smpi_pending_send_requests        = NULL;
15 smx_mutex_t *smpi_pending_send_requests_mutex = NULL;
16
17 xbt_fifo_t *smpi_pending_recv_requests        = NULL;
18 smx_mutex_t *smpi_pending_recv_requests_mutex = NULL;
19
20 xbt_fifo_t *smpi_received_messages            = NULL;
21 smx_mutex_t *smpi_received_messages_mutex     = NULL;
22
23 smx_process_t *smpi_sender_processes        = NULL;
24 smx_process_t *smpi_receiver_processes      = NULL;
25
26 int smpi_running_hosts = 0;
27
28 smpi_mpi_communicator_t smpi_mpi_comm_world;
29
30 smpi_mpi_status_t smpi_mpi_status_ignore;
31
32 smpi_mpi_datatype_t smpi_mpi_byte;
33 smpi_mpi_datatype_t smpi_mpi_int;
34 smpi_mpi_datatype_t smpi_mpi_double;
35
36 smpi_mpi_op_t smpi_mpi_land;
37 smpi_mpi_op_t smpi_mpi_sum;
38
39 static xbt_os_timer_t smpi_timer;
40 static int smpi_benchmarking;
41 static double smpi_reference_speed;
42
43 // mutexes
44 smx_mutex_t smpi_running_hosts_mutex = NULL;
45 smx_mutex_t smpi_benchmarking_mutex  = NULL;
46 smx_mutex_t init_mutex = NULL;
47 smx_cond_t  init_cond  = NULL;
48
49 int smpi_root_ready  = 0;
50 int smpi_ready_count = 0;
51
52 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
53
54 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) 
55 {
56         return comm->size;
57 }
58
59 // FIXME: smarter algorithm?
60 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
61 {
62         int i;
63
64         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
65
66         return i;
67 }
68
69 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
70 {
71         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
72 }
73
74 int inline smpi_mpi_comm_world_rank_self()
75 {
76         return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
77 }
78
79 int smpi_sender(int argc, char **argv)
80 {
81         smx_process_t self;
82         smx_host_t shost;
83         int rank;
84
85         xbt_fifo_t request_queue;
86         smx_mutex_t request_queue_mutex;
87         int size;
88
89         int running_hosts = 0;
90
91         smpi_mpi_request_t *request;
92
93         smx_host_t dhost;
94
95         smx_action_t communicate_action;
96
97         smpi_received_message_t *message;
98
99         int drank;
100
101         smx_process_t waitproc;
102
103         self  = SIMIX_process_self();
104         shost = SIMIX_host_self();
105         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
106
107         // make sure root is done before own initialization
108         SIMIX_mutex_lock(init_mutex);
109         if (!smpi_root_ready) {
110                 SIMIX_cond_wait(init_cond, init_mutex);
111         }
112         SIMIX_mutex_unlock(init_mutex);
113
114         request_queue       = smpi_pending_send_requests[rank];
115         request_queue_mutex = smpi_pending_send_requests_mutex[rank];
116         size                = smpi_mpi_comm_size(&smpi_mpi_comm_world);
117
118         smpi_sender_processes[rank] = self;
119
120         // wait for all nodes to signal initializatin complete
121         SIMIX_mutex_lock(init_mutex);
122         smpi_ready_count++;
123         if (smpi_ready_count < 3 * size) {
124                 SIMIX_cond_wait(init_cond, init_mutex);
125         } else {
126                 SIMIX_cond_broadcast(init_cond);
127         }
128         SIMIX_mutex_unlock(init_mutex);
129
130         SIMIX_mutex_lock(smpi_running_hosts_mutex);
131         running_hosts = smpi_running_hosts;
132         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
133
134         while (0 < running_hosts) {
135
136                 SIMIX_mutex_lock(request_queue_mutex);
137                 request = xbt_fifo_shift(request_queue);
138                 SIMIX_mutex_unlock(request_queue_mutex);
139
140                 if (NULL == request) {
141                         SIMIX_process_suspend(self);
142                 } else {
143                         SIMIX_mutex_lock(request->mutex);
144
145                         dhost = request->comm->hosts[request->dst];
146
147                         // FIXME: not at all sure I can assume magic just happens here....
148                         communicate_action = SIMIX_action_communicate(shost, dhost,
149                                 "communication", request->datatype->size * request->count * 1.0, -1.0);
150
151                         SIMIX_register_condition_to_action(communicate_action, request->cond);
152                         SIMIX_register_action_to_condition(communicate_action, request->cond);
153
154                         SIMIX_cond_wait(request->cond, request->mutex);
155
156                         // copy request to appropriate received queue
157                         message = xbt_mallocator_get(smpi_message_mallocator);
158                         message->comm = request->comm;
159                         message->src  = request->src;
160                         message->dst  = request->dst;
161                         message->tag  = request->tag;
162                         message->buf  = xbt_malloc(request->datatype->size * request->count);
163                         memcpy(message->buf, request->buf, request->datatype->size * request->count);
164
165                         drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
166
167                         SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
168                         xbt_fifo_push(smpi_received_messages[drank], message);
169                         SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
170
171                         request->completed = 1;
172
173                         // wake up receiver, then any waiting sender
174                         waitproc = smpi_receiver_processes[drank];
175
176                         do {
177                                 if (SIMIX_process_is_suspended(waitproc)) {
178                                         SIMIX_process_resume(waitproc);
179                                 }
180                         } while(waitproc = xbt_fifo_shift(request->waitlist));
181
182                         SIMIX_mutex_unlock(request->mutex);
183                 }
184
185                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
186                 running_hosts = smpi_running_hosts;
187                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
188         }
189
190         SIMIX_mutex_lock(init_mutex);
191         smpi_ready_count--;
192         if (smpi_ready_count <= 0) {
193                 SIMIX_cond_broadcast(init_cond);
194         }
195         SIMIX_mutex_unlock(init_mutex);
196
197         return 0;
198 }
199
200 int smpi_receiver(int argc, char **argv)
201 {
202         smx_process_t self;
203         int rank;
204
205         xbt_fifo_t request_queue;
206         smx_mutex_t request_queue_mutex;
207         xbt_fifo_t message_queue;
208         smx_mutex_t message_queue_mutex;
209         int size;
210
211         int running_hosts;
212
213         smpi_mpi_request_t *request;
214         smpi_received_message_t *message;
215
216         xbt_fifo_item_t request_item;
217         xbt_fifo_item_t message_item;
218
219         smx_process_t waitproc;
220
221         self  = SIMIX_process_self();
222         rank  = smpi_mpi_comm_world_rank_self();
223
224         // make sure root is done before own initialization
225         SIMIX_mutex_lock(init_mutex);
226         if (!smpi_root_ready) {
227                 SIMIX_cond_wait(init_cond, init_mutex);
228         }
229         SIMIX_mutex_unlock(init_mutex);
230
231         request_queue       = smpi_pending_recv_requests[rank];
232         request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
233         message_queue       = smpi_received_messages[rank];
234         message_queue_mutex = smpi_received_messages_mutex[rank];
235         size                = smpi_mpi_comm_size(&smpi_mpi_comm_world);
236
237         smpi_receiver_processes[rank] = self;
238
239         // wait for all nodes to signal initializatin complete
240         SIMIX_mutex_lock(init_mutex);
241         smpi_ready_count++;
242         if (smpi_ready_count < 3 * size) {
243                 SIMIX_cond_wait(init_cond, init_mutex);
244         } else {
245                 SIMIX_cond_broadcast(init_cond);
246         }
247         SIMIX_mutex_unlock(init_mutex);
248
249         SIMIX_mutex_lock(smpi_running_hosts_mutex);
250         running_hosts = smpi_running_hosts;
251         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
252
253         while (0 < running_hosts) {
254
255                 request = NULL;
256                 message = NULL;
257
258                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
259
260                 // FIXME: not the best way to request multiple locks...
261                 SIMIX_mutex_lock(request_queue_mutex);
262                 SIMIX_mutex_lock(message_queue_mutex);
263                 for (request_item = xbt_fifo_get_first_item(request_queue);
264                         NULL != request_item;
265                         request_item = xbt_fifo_get_next_item(request_item)) {
266                         request = xbt_fifo_get_item_content(request_item);
267                         for (message_item = xbt_fifo_get_first_item(message_queue);
268                                 NULL != message_item;
269                                 message_item = xbt_fifo_get_next_item(message_item)) {
270                                 message = xbt_fifo_get_item_content(message_item);
271                                 if (request->comm == message->comm &&
272                                                 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
273                                                 request->tag == message->tag) {
274                                         xbt_fifo_remove_item(request_queue, request_item);
275                                         xbt_fifo_remove_item(message_queue, message_item);
276                                         goto stopsearch;
277                                 }
278                         }
279                 }
280 stopsearch:
281                 SIMIX_mutex_unlock(message_queue_mutex);
282                 SIMIX_mutex_unlock(request_queue_mutex);
283
284                 if (NULL == request || NULL == message) {
285                         SIMIX_process_suspend(self);
286                 } else {
287                         SIMIX_mutex_lock(request->mutex);
288
289                         memcpy(request->buf, message->buf, request->count * request->datatype->size);
290                         request->src = message->src;
291                         request->completed = 1;
292
293                         while (waitproc = xbt_fifo_shift(request->waitlist)) {
294                                 if (SIMIX_process_is_suspended(waitproc)) {
295                                         SIMIX_process_resume(waitproc);
296                                 }
297                         }
298
299                         SIMIX_mutex_unlock(request->mutex);
300
301                         xbt_free(message->buf);
302                         xbt_mallocator_release(smpi_message_mallocator, message);
303                 }
304
305                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
306                 running_hosts = smpi_running_hosts;
307                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
308         }
309
310         SIMIX_mutex_lock(init_mutex);
311         smpi_ready_count--;
312         if (smpi_ready_count <= 0) {
313                 SIMIX_cond_broadcast(init_cond);
314         }
315         SIMIX_mutex_unlock(init_mutex);
316
317         return 0;
318 }
319
320 int smpi_run_simulation(int argc, char **argv)
321 {
322         smx_cond_t   cond           = NULL;
323         smx_action_t action         = NULL;
324
325         xbt_fifo_t   actions_failed = xbt_fifo_new();
326         xbt_fifo_t   actions_done   = xbt_fifo_new();
327
328         srand(SMPI_RAND_SEED);
329
330         SIMIX_global_init(&argc, argv);
331
332         // important globals
333         init_mutex = SIMIX_mutex_init();
334         init_cond  = SIMIX_cond_init();
335
336         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
337         SIMIX_function_register("smpi_sender",         smpi_sender);
338         SIMIX_function_register("smpi_receiver",       smpi_receiver);
339
340         // FIXME: ought to verify these files...
341         SIMIX_create_environment(argv[1]);
342         SIMIX_launch_application(argv[2]);
343
344         /* Prepare to display some more info when dying on Ctrl-C pressing */
345         // FIXME: doesn't work
346         //signal(SIGINT, inthandler);
347
348         /* Clean IO before the run */
349         fflush(stdout);
350         fflush(stderr);
351
352         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
353                 while (action = xbt_fifo_pop(actions_failed)) {
354                         DEBUG1("** %s failed **", action->name);
355                         while (cond = xbt_fifo_pop(action->cond_list)) {
356                                 SIMIX_cond_broadcast(cond);
357                         }
358                         SIMIX_action_destroy(action);
359                 }
360                 while (action = xbt_fifo_pop(actions_done)) {
361                         DEBUG1("** %s done **",action->name);
362                         while (cond = xbt_fifo_pop(action->cond_list)) {
363                                 SIMIX_cond_broadcast(cond);
364                         }
365                         SIMIX_action_destroy(action);
366                 }
367         }
368
369         xbt_fifo_free(actions_failed);
370         xbt_fifo_free(actions_done);
371
372         INFO1("simulation time %g", SIMIX_get_clock());
373
374         SIMIX_clean();
375
376         return 0;
377 }
378
379 void smpi_mpi_land_func(void *x, void *y, void *z)
380 {
381         *(int *)z = *(int *)x && *(int *)y;
382 }
383
384 void smpi_mpi_sum_func(void *x, void *y, void *z)
385 {
386         *(int *)z = *(int *)x + *(int *)y;
387 }
388
389 void *smpi_new_request()
390 {
391         return xbt_new(smpi_mpi_request_t, 1);
392 }
393
394 void smpi_free_request(void *pointer) {
395         smpi_mpi_request_t *request = pointer;
396
397         if (NULL != request) {
398                 SIMIX_mutex_destroy(request->mutex);
399                 SIMIX_cond_destroy(request->cond);
400                 xbt_fifo_free(request->waitlist);
401                 xbt_free(request);
402         }
403
404         return;
405 }
406
407 void smpi_reset_request(void *pointer) {
408         smpi_mpi_request_t *request = pointer;
409
410         if (NULL != request) {
411                 request->mutex    = SIMIX_mutex_init();
412                 request->cond     = SIMIX_cond_init();
413                 request->waitlist = xbt_fifo_new();
414                 // FIXME: clear waitlist
415         }
416
417         return;
418 }
419
420
421 void *smpi_new_message()
422 {
423         return xbt_new(smpi_received_message_t, 1);
424 }
425
426 void smpi_do_nothing(void *pointer)
427 {
428         return;
429 }
430
431 void smpi_mpi_init()
432 {
433         int i;
434         int size;
435         smx_process_t process;
436         smx_host_t *hosts;
437         smx_host_t host;
438         double duration;
439
440         // initialize some local variables
441         host  = SIMIX_host_self();
442         hosts = SIMIX_host_get_table();
443         size  = SIMIX_host_get_number();
444
445         // node 0 sets the globals
446         if (host == hosts[0]) {
447
448                 // processes
449                 smpi_sender_processes             = xbt_new(smx_process_t, size);
450                 smpi_receiver_processes           = xbt_new(smx_process_t, size);
451
452                 // running hosts
453                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
454                 smpi_running_hosts                = size;
455
456                 // global communicator
457                 smpi_mpi_comm_world.size          = size;
458                 smpi_mpi_comm_world.barrier       = 0;
459                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
460                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
461                 smpi_mpi_comm_world.hosts         = hosts;
462                 smpi_mpi_comm_world.processes     = xbt_new(smx_process_t, size);
463                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
464
465                 // mpi datatypes
466                 smpi_mpi_byte.size                = (size_t)1;
467                 smpi_mpi_int.size                 = sizeof(int);
468                 smpi_mpi_double.size              = sizeof(double);
469
470                 // mpi operations
471                 smpi_mpi_land.func                = &smpi_mpi_land_func;
472                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
473
474                 // smpi globals
475                 smpi_request_mallocator           = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, smpi_free_request, smpi_reset_request);
476                 smpi_message_mallocator           = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, smpi_free_message, smpi_do_nothing);
477                 smpi_pending_send_requests        = xbt_new(xbt_fifo_t,  size);
478                 smpi_pending_send_requests_mutex  = xbt_new(smx_mutex_t, size);
479                 smpi_pending_recv_requests        = xbt_new(xbt_fifo_t,  size);
480                 smpi_pending_recv_requests_mutex  = xbt_new(smx_mutex_t, size);
481                 smpi_received_messages            = xbt_new(xbt_fifo_t,  size);
482                 smpi_received_messages_mutex      = xbt_new(smx_mutex_t, size);
483
484                 for(i = 0; i < size; i++) {
485                         smpi_pending_send_requests[i]       = xbt_fifo_new();
486                         smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init();
487                         smpi_pending_recv_requests[i]       = xbt_fifo_new();
488                         smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init();
489                         smpi_received_messages[i]           = xbt_fifo_new();
490                         smpi_received_messages_mutex[i]     = SIMIX_mutex_init();
491                 }
492
493                 smpi_timer                      = xbt_os_timer_new();
494                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
495                 smpi_benchmarking               = 0;
496                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
497
498                 // signal all nodes to perform initialization
499                 SIMIX_mutex_lock(init_mutex);
500                 smpi_root_ready = 1;
501                 SIMIX_cond_broadcast(init_cond);
502                 SIMIX_mutex_unlock(init_mutex);
503
504         } else {
505
506                 // make sure root is done before own initialization
507                 SIMIX_mutex_lock(init_mutex);
508                 if (!smpi_root_ready) {
509                         SIMIX_cond_wait(init_cond, init_mutex);
510                 }
511                 SIMIX_mutex_unlock(init_mutex);
512
513                 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
514
515         }
516
517         // wait for all nodes to signal initializatin complete
518         SIMIX_mutex_lock(init_mutex);
519         smpi_ready_count++;
520         if (smpi_ready_count < 3 * size) {
521                 SIMIX_cond_wait(init_cond, init_mutex);
522         } else {
523                 SIMIX_cond_broadcast(init_cond);
524         }
525         SIMIX_mutex_unlock(init_mutex);
526
527 }
528
529 void smpi_mpi_finalize()
530 {
531         int i;
532
533         SIMIX_mutex_lock(smpi_running_hosts_mutex);
534         i = --smpi_running_hosts;
535         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
536
537         SIMIX_mutex_lock(init_mutex);
538         smpi_ready_count--;
539         SIMIX_mutex_unlock(init_mutex);
540
541         if (0 >= i) {
542
543                 // wake up senders/receivers
544                 for (i = 0; i < smpi_mpi_comm_world.size; i++) {
545                         if (SIMIX_process_is_suspended(smpi_sender_processes[i])) {
546                                 SIMIX_process_resume(smpi_sender_processes[i]);
547                         }
548                         if (SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
549                                 SIMIX_process_resume(smpi_receiver_processes[i]);
550                         }
551                 }
552
553                 // wait for senders/receivers to exit...
554                 SIMIX_mutex_lock(init_mutex);
555                 if (smpi_ready_count > 0) {
556                         SIMIX_cond_wait(init_cond, init_mutex);
557                 }
558                 SIMIX_mutex_unlock(init_mutex);
559
560                 SIMIX_mutex_destroy(init_mutex);
561                 SIMIX_cond_destroy(init_cond);
562                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
563
564                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
565                         xbt_fifo_free(smpi_pending_send_requests[i]);
566                         SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]);
567                         xbt_fifo_free(smpi_pending_recv_requests[i]);
568                         SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]);
569                         xbt_fifo_free(smpi_received_messages[i]);
570                         SIMIX_mutex_destroy(smpi_received_messages_mutex[i]);
571                 }
572
573                 xbt_mallocator_free(smpi_request_mallocator);
574                 xbt_mallocator_free(smpi_message_mallocator);
575                 xbt_free(smpi_pending_send_requests);
576                 xbt_free(smpi_pending_send_requests_mutex);
577                 xbt_free(smpi_pending_recv_requests);
578                 xbt_free(smpi_pending_recv_requests_mutex);
579                 xbt_free(smpi_received_messages);
580                 xbt_free(smpi_received_messages_mutex);
581
582                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
583                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
584                 xbt_free(smpi_mpi_comm_world.processes);
585
586                 xbt_os_timer_free(smpi_timer);
587         }
588
589 }
590
591 void smpi_bench_begin()
592 {
593         SIMIX_mutex_lock(smpi_benchmarking_mutex);
594         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
595         smpi_benchmarking = 1;
596         SIMIX_mutex_unlock(smpi_benchmarking_mutex);
597
598         xbt_os_timer_start(smpi_timer);
599
600         return;
601 }
602
603 void smpi_bench_end()
604 {
605         double duration;
606         smx_host_t host;
607         smx_action_t compute_action;
608         smx_mutex_t mutex;
609         smx_cond_t cond;
610
611         SIMIX_mutex_lock(smpi_benchmarking_mutex);
612         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
613         smpi_benchmarking = 0;
614         SIMIX_mutex_lock(smpi_benchmarking_mutex);
615
616         xbt_os_timer_stop(smpi_timer);
617
618         duration       = xbt_os_timer_elapsed(smpi_timer);
619         host           = SIMIX_host_self();
620         compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
621         mutex          = SIMIX_mutex_init();
622         cond           = SIMIX_cond_init();
623
624         SIMIX_mutex_lock(mutex);
625         SIMIX_register_condition_to_action(compute_action, cond);
626         SIMIX_register_action_to_condition(compute_action, cond);
627         SIMIX_cond_wait(cond, mutex);
628         SIMIX_mutex_unlock(mutex);
629         SIMIX_mutex_destroy(mutex);
630         SIMIX_cond_destroy(cond);
631
632         // FIXME: check for success/failure?
633
634         return;
635 }
636
637 void smpi_barrier(smpi_mpi_communicator_t *comm) {
638         int i;
639
640         SIMIX_mutex_lock(comm->barrier_mutex);
641         comm->barrier++;
642         if(i < comm->size) {
643                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
644         } else {
645                 comm->barrier = 0;
646                 SIMIX_cond_broadcast(comm->barrier_cond);
647         }
648         SIMIX_mutex_unlock(comm->barrier_mutex);
649
650         return;
651 }
652
653 // FIXME: smarter algorithm...
654 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
655 {
656         int i;
657         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
658         if (i >= comm->size) i = -1;
659         return i;
660 }
661
662 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
663         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
664 {
665         int retval = MPI_SUCCESS;
666
667         *request = NULL;
668
669         if (0 > count) {
670                 retval = MPI_ERR_COUNT;
671         } else if (NULL == buf) {
672                 retval = MPI_ERR_INTERN;
673         } else if (NULL == datatype) {
674                 retval = MPI_ERR_TYPE;
675         } else if (NULL == comm) {
676                 retval = MPI_ERR_COMM;
677         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
678                 retval = MPI_ERR_RANK;
679         } else if (0 > dst || comm->size <= dst) {
680                 retval = MPI_ERR_RANK;
681         } else if (0 > tag) {
682                 retval = MPI_ERR_TAG;
683         } else {
684                 *request = xbt_mallocator_get(smpi_request_mallocator);
685                 (*request)->comm       = comm;
686                 (*request)->src        = src;
687                 (*request)->dst        = dst;
688                 (*request)->tag        = tag;
689                 (*request)->buf        = buf;
690                 (*request)->count      = count;
691                 (*request)->datatype   = datatype;
692                 (*request)->completed  = 0;
693                 (*request)->mutex      = SIMIX_mutex_init();
694                 (*request)->cond       = SIMIX_cond_init();
695                 (*request)->waitlist   = xbt_fifo_new();
696         }
697         return retval;
698 }
699
700 int smpi_isend(smpi_mpi_request_t *request)
701 {
702         int retval = MPI_SUCCESS;
703         int rank   = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
704
705         if (NULL != request) {
706                 SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
707                 xbt_fifo_push(smpi_pending_send_requests[rank], request);
708                 SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
709         }
710
711         if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
712                 SIMIX_process_resume(smpi_sender_processes[rank]);
713         }
714
715         return retval;
716 }
717
718 int smpi_irecv(smpi_mpi_request_t *request)
719 {
720         int retval = MPI_SUCCESS;
721         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
722
723         if (NULL != request) {
724                 SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
725                 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
726                 SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
727         }
728
729         if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
730                 SIMIX_process_resume(smpi_receiver_processes[rank]);
731         }
732
733         return retval;
734 }
735
736 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
737 {
738         smx_process_t self = SIMIX_process_self();
739         int suspend = 0;
740
741         if (NULL != request) {
742                 SIMIX_mutex_lock(request->mutex);
743                 if (!request->completed) {
744                         xbt_fifo_push(request->waitlist, self);
745                         suspend = 1;
746                 }
747                 SIMIX_mutex_unlock(request->mutex);
748                 if (suspend) {
749                         SIMIX_process_suspend(self);
750                 }
751                 if (NULL != status && MPI_STATUS_IGNORE != status) {
752                         SIMIX_mutex_lock(request->mutex);
753                         status->MPI_SOURCE = request->src;
754                         SIMIX_mutex_unlock(request->mutex);
755                 }
756         }
757 }
758
759 // FIXME: move into own file
760 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
761 {
762         double now;
763         int retval = 0;
764         smpi_bench_end();
765         if (NULL == tv) {
766                 retval = -1;
767         } else {
768                 now = SIMIX_get_clock();
769                 tv->tv_sec  = now;
770                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
771         }
772         smpi_bench_begin();
773         return retval;
774 }
775
776 unsigned int smpi_sleep(unsigned int seconds)
777 {
778         smx_mutex_t mutex;
779         smx_cond_t cond;
780         smx_host_t host;
781         smx_action_t sleep_action;
782
783         smpi_bench_end();
784         host         = SIMIX_host_self();
785         sleep_action = SIMIX_action_sleep(host, seconds);
786         mutex        = SIMIX_mutex_init();
787         cond         = SIMIX_cond_init();
788         SIMIX_mutex_lock(mutex);
789         SIMIX_register_condition_to_action(sleep_action, cond);
790         SIMIX_register_action_to_condition(sleep_action, cond);
791         SIMIX_cond_wait(cond, mutex);
792         SIMIX_mutex_unlock(mutex);
793         SIMIX_mutex_destroy(mutex);
794         SIMIX_cond_destroy(cond);
795         // FIXME: check for success/failure?
796         smpi_bench_begin();
797         return 0;
798 }
799
800 void smpi_exit(int status)
801 {
802         smpi_bench_end();
803         SIMIX_mutex_lock(smpi_running_hosts_mutex);
804         smpi_running_hosts--;
805         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
806         SIMIX_process_kill(SIMIX_process_self());
807         return;
808 }