5 #include "xbt/xbt_os_time.h"
6 #include "xbt/mallocator.h"
9 // FIXME: move globals into structure...
11 xbt_mallocator_t smpi_request_mallocator = NULL;
12 xbt_mallocator_t smpi_message_mallocator = NULL;
14 xbt_fifo_t *smpi_pending_send_requests = NULL;
15 smx_mutex_t *smpi_pending_send_requests_mutex = NULL;
17 xbt_fifo_t *smpi_pending_recv_requests = NULL;
18 smx_mutex_t *smpi_pending_recv_requests_mutex = NULL;
20 xbt_fifo_t *smpi_received_messages = NULL;
21 smx_mutex_t *smpi_received_messages_mutex = NULL;
23 smx_process_t *smpi_sender_processes = NULL;
24 smx_process_t *smpi_receiver_processes = NULL;
26 int smpi_running_hosts = 0;
28 smpi_mpi_communicator_t smpi_mpi_comm_world;
30 smpi_mpi_status_t smpi_mpi_status_ignore;
32 smpi_mpi_datatype_t smpi_mpi_byte;
33 smpi_mpi_datatype_t smpi_mpi_int;
34 smpi_mpi_datatype_t smpi_mpi_double;
36 smpi_mpi_op_t smpi_mpi_land;
37 smpi_mpi_op_t smpi_mpi_sum;
39 static xbt_os_timer_t smpi_timer;
40 static int smpi_benchmarking;
41 static double smpi_reference_speed;
44 smx_mutex_t smpi_running_hosts_mutex = NULL;
45 smx_mutex_t smpi_benchmarking_mutex = NULL;
46 smx_mutex_t init_mutex = NULL;
47 smx_cond_t init_cond = NULL;
49 int smpi_root_ready = 0;
50 int smpi_ready_count = 0;
52 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
54 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
59 // FIXME: smarter algorithm?
60 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
64 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
69 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
71 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
74 int inline smpi_mpi_comm_world_rank_self()
76 return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
79 int smpi_sender(int argc, char **argv)
84 xbt_fifo_t request_queue;
85 smx_mutex_t request_queue_mutex;
87 int running_hosts = 0;
88 smpi_mpi_request_t *request;
90 smx_action_t communicate_action;
91 smpi_received_message_t *scratch;
93 smx_process_t waitproc;
95 self = SIMIX_process_self();
96 shost = SIMIX_host_self();
97 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
99 // make sure root is done before own initialization
100 SIMIX_mutex_lock(init_mutex);
101 if (!smpi_root_ready) {
102 SIMIX_cond_wait(init_cond, init_mutex);
104 SIMIX_mutex_unlock(init_mutex);
106 request_queue = smpi_pending_send_requests[rank];
107 request_queue_mutex = smpi_pending_send_requests_mutex[rank];
109 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
111 smpi_sender_processes[rank] = self;
113 // wait for all nodes to signal initializatin complete
114 SIMIX_mutex_lock(init_mutex);
116 if (smpi_ready_count < 3 * size) {
117 SIMIX_cond_wait(init_cond, init_mutex);
119 SIMIX_cond_broadcast(init_cond);
121 SIMIX_mutex_unlock(init_mutex);
123 SIMIX_mutex_lock(smpi_running_hosts_mutex);
124 running_hosts = smpi_running_hosts;
125 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
127 while (0 < running_hosts) {
129 SIMIX_mutex_lock(request_queue_mutex);
130 request = xbt_fifo_shift(request_queue);
131 SIMIX_mutex_unlock(request_queue_mutex);
133 if (NULL == request) {
134 SIMIX_process_suspend(self);
136 SIMIX_mutex_lock(request->mutex);
138 dhost = request->comm->hosts[request->dst];
140 // FIXME: not at all sure I can assume magic just happens here....
141 communicate_action = SIMIX_action_communicate(shost, dhost,
142 "communication", request->datatype->size * request->count * 1.0, -1.0);
144 SIMIX_register_condition_to_action(communicate_action, request->cond);
145 SIMIX_register_action_to_condition(communicate_action, request->cond);
147 SIMIX_cond_wait(request->cond, request->mutex);
149 // copy request to appropriate received queue
150 scratch = xbt_mallocator_get(smpi_message_mallocator);
151 scratch->comm = request->comm;
152 scratch->src = request->src;
153 scratch->dst = request->dst;
154 scratch->tag = request->tag;
155 scratch->buf = xbt_malloc(request->datatype->size * request->count);
156 memcpy(scratch->buf, request->buf, request->datatype->size * request->count);
157 drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
158 SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
159 xbt_fifo_push(smpi_received_messages[drank], scratch);
160 SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
162 request->completed = 1;
164 // wake up receiver, then any waiting sender
165 waitproc = smpi_receiver_processes[drank];
168 if (SIMIX_process_is_suspended(waitproc)) {
169 SIMIX_process_resume(waitproc);
171 } while(waitproc = xbt_fifo_shift(request->waitlist));
173 SIMIX_mutex_unlock(request->mutex);
176 SIMIX_mutex_lock(smpi_running_hosts_mutex);
177 running_hosts = smpi_running_hosts;
178 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
181 SIMIX_mutex_lock(init_mutex);
183 if (smpi_ready_count <= 0) {
184 SIMIX_cond_broadcast(init_cond);
186 SIMIX_mutex_unlock(init_mutex);
191 int smpi_receiver(int argc, char **argv)
195 xbt_fifo_t request_queue;
196 smx_mutex_t request_queue_mutex;
197 xbt_fifo_t message_queue;
198 smx_mutex_t message_queue_mutex;
201 xbt_fifo_item_t request_item, message_item;
202 smpi_mpi_request_t *request;
203 smpi_received_message_t *message;
204 smx_process_t waitproc;
206 self = SIMIX_process_self();
207 rank = smpi_mpi_comm_world_rank_self();
209 // make sure root is done before own initialization
210 SIMIX_mutex_lock(init_mutex);
211 if (!smpi_root_ready) {
212 SIMIX_cond_wait(init_cond, init_mutex);
214 SIMIX_mutex_unlock(init_mutex);
216 request_queue = smpi_pending_recv_requests[rank];
217 request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
219 message_queue = smpi_received_messages[rank];
220 message_queue_mutex = smpi_received_messages_mutex[rank];
222 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
223 smpi_receiver_processes[rank] = self;
225 // wait for all nodes to signal initializatin complete
226 SIMIX_mutex_lock(init_mutex);
228 if (smpi_ready_count < 3 * size) {
229 SIMIX_cond_wait(init_cond, init_mutex);
231 SIMIX_cond_broadcast(init_cond);
233 SIMIX_mutex_unlock(init_mutex);
235 SIMIX_mutex_lock(smpi_running_hosts_mutex);
236 running_hosts = smpi_running_hosts;
237 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
239 while (0 < running_hosts) {
244 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
246 // FIXME: not the best way to request multiple locks...
247 SIMIX_mutex_lock(request_queue_mutex);
248 SIMIX_mutex_lock(message_queue_mutex);
249 for (request_item = xbt_fifo_get_first_item(request_queue);
250 NULL != request_item;
251 request_item = xbt_fifo_get_next_item(request_item)) {
252 request = xbt_fifo_get_item_content(request_item);
253 for (message_item = xbt_fifo_get_first_item(message_queue);
254 NULL != message_item;
255 message_item = xbt_fifo_get_next_item(message_item)) {
256 message = xbt_fifo_get_item_content(message_item);
257 if (request->comm == message->comm &&
258 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
259 request->tag == message->tag) {
260 xbt_fifo_remove_item(request_queue, request_item);
261 xbt_fifo_remove_item(message_queue, message_item);
267 SIMIX_mutex_unlock(message_queue_mutex);
268 SIMIX_mutex_unlock(request_queue_mutex);
270 if (NULL == request || NULL == message) {
271 SIMIX_process_suspend(self);
273 SIMIX_mutex_lock(request->mutex);
274 memcpy(request->buf, message->buf, request->count * request->datatype->size);
275 request->src = message->src;
276 request->completed = 1;
278 while (waitproc = xbt_fifo_shift(request->waitlist)) {
279 if (SIMIX_process_is_suspended(waitproc)) {
280 SIMIX_process_resume(waitproc);
283 SIMIX_mutex_unlock(request->mutex);
285 xbt_mallocator_release(smpi_message_mallocator, message);
288 SIMIX_mutex_lock(smpi_running_hosts_mutex);
289 running_hosts = smpi_running_hosts;
290 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
293 SIMIX_mutex_lock(init_mutex);
295 if (smpi_ready_count <= 0) {
296 SIMIX_cond_broadcast(init_cond);
298 SIMIX_mutex_unlock(init_mutex);
303 int smpi_run_simulation(int argc, char **argv)
305 smx_cond_t cond = NULL;
306 smx_action_t action = NULL;
308 xbt_fifo_t actions_failed = xbt_fifo_new();
309 xbt_fifo_t actions_done = xbt_fifo_new();
311 srand(SMPI_RAND_SEED);
313 SIMIX_global_init(&argc, argv);
315 init_mutex = SIMIX_mutex_init();
316 init_cond = SIMIX_cond_init();
318 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
319 SIMIX_function_register("smpi_sender", smpi_sender);
320 SIMIX_function_register("smpi_receiver", smpi_receiver);
321 SIMIX_create_environment(argv[1]);
322 SIMIX_launch_application(argv[2]);
324 /* Prepare to display some more info when dying on Ctrl-C pressing */
325 //signal(SIGINT, inthandler);
327 /* Clean IO before the run */
331 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
332 while (action = xbt_fifo_pop(actions_failed)) {
333 DEBUG1("** %s failed **", action->name);
334 while (cond = xbt_fifo_pop(action->cond_list)) {
335 SIMIX_cond_broadcast(cond);
337 SIMIX_action_destroy(action);
339 while (action = xbt_fifo_pop(actions_done)) {
340 DEBUG1("** %s done **",action->name);
341 while (cond = xbt_fifo_pop(action->cond_list)) {
342 SIMIX_cond_broadcast(cond);
344 SIMIX_action_destroy(action);
347 xbt_fifo_free(actions_failed);
348 xbt_fifo_free(actions_done);
349 INFO1("simulation time %g", SIMIX_get_clock());
354 void smpi_mpi_land_func(void *x, void *y, void *z)
356 *(int *)z = *(int *)x && *(int *)y;
359 void smpi_mpi_sum_func(void *x, void *y, void *z)
361 *(int *)z = *(int *)x + *(int *)y;
364 void *smpi_new_request()
366 return xbt_new(smpi_mpi_request_t, 1);
369 void smpi_free_request(void *pointer) {
370 smpi_mpi_request_t *request = pointer;
371 if (NULL != request) {
372 xbt_fifo_free(request->waitlist);
377 void *smpi_new_message()
379 return xbt_new(smpi_received_message_t, 1);
382 void smpi_do_nothing(void *pointer)
391 smx_process_t process;
396 // initialize some local variables
397 host = SIMIX_host_self();
398 hosts = SIMIX_host_get_table();
399 size = SIMIX_host_get_number();
401 // node 0 sets the globals
402 if (host == hosts[0]) {
405 smpi_sender_processes = xbt_new(smx_process_t, size);
406 smpi_receiver_processes = xbt_new(smx_process_t, size);
409 smpi_running_hosts_mutex = SIMIX_mutex_init();
410 smpi_running_hosts = size;
412 // global communicator
413 smpi_mpi_comm_world.size = size;
414 smpi_mpi_comm_world.barrier = 0;
415 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
416 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
417 smpi_mpi_comm_world.hosts = hosts;
418 smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
419 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
422 smpi_mpi_byte.size = (size_t)1;
423 smpi_mpi_int.size = sizeof(int);
424 smpi_mpi_double.size = sizeof(double);
427 smpi_mpi_land.func = &smpi_mpi_land_func;
428 smpi_mpi_sum.func = &smpi_mpi_sum_func;
431 smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, smpi_free_request, smpi_do_nothing);
432 smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, smpi_do_nothing);
433 smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
434 smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size);
435 smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
436 smpi_pending_recv_requests_mutex = xbt_new(smx_mutex_t, size);
437 smpi_received_messages = xbt_new(xbt_fifo_t, size);
438 smpi_received_messages_mutex = xbt_new(smx_mutex_t, size);
440 for(i = 0; i < size; i++) {
441 smpi_pending_send_requests[i] = xbt_fifo_new();
442 smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init();
443 smpi_pending_recv_requests[i] = xbt_fifo_new();
444 smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init();
445 smpi_received_messages[i] = xbt_fifo_new();
446 smpi_received_messages_mutex[i] = SIMIX_mutex_init();
449 smpi_timer = xbt_os_timer_new();
450 smpi_reference_speed = SMPI_DEFAULT_SPEED;
451 smpi_benchmarking = 0;
452 smpi_benchmarking_mutex = SIMIX_mutex_init();
454 // signal all nodes to perform initialization
455 SIMIX_mutex_lock(init_mutex);
457 SIMIX_cond_broadcast(init_cond);
458 SIMIX_mutex_unlock(init_mutex);
462 // make sure root is done before own initialization
463 SIMIX_mutex_lock(init_mutex);
464 if (!smpi_root_ready) {
465 SIMIX_cond_wait(init_cond, init_mutex);
467 SIMIX_mutex_unlock(init_mutex);
469 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
473 // wait for all nodes to signal initializatin complete
474 SIMIX_mutex_lock(init_mutex);
476 if (smpi_ready_count < 3 * size) {
477 SIMIX_cond_wait(init_cond, init_mutex);
479 SIMIX_cond_broadcast(init_cond);
481 SIMIX_mutex_unlock(init_mutex);
485 void smpi_mpi_finalize()
489 SIMIX_mutex_lock(smpi_running_hosts_mutex);
490 i = --smpi_running_hosts;
491 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
493 SIMIX_mutex_lock(init_mutex);
495 SIMIX_mutex_unlock(init_mutex);
499 // wake up senders/receivers
500 for (i = 0; i < smpi_mpi_comm_world.size; i++) {
501 if (SIMIX_process_is_suspended(smpi_sender_processes[i])) {
502 SIMIX_process_resume(smpi_sender_processes[i]);
504 if (SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
505 SIMIX_process_resume(smpi_receiver_processes[i]);
509 // wait for senders/receivers to exit...
510 SIMIX_mutex_lock(init_mutex);
511 if (smpi_ready_count > 0) {
512 SIMIX_cond_wait(init_cond, init_mutex);
514 SIMIX_mutex_unlock(init_mutex);
516 SIMIX_mutex_destroy(init_mutex);
517 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
519 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
520 xbt_fifo_free(smpi_pending_send_requests[i]);
521 SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]);
522 xbt_fifo_free(smpi_pending_recv_requests[i]);
523 SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]);
524 xbt_fifo_free(smpi_received_messages[i]);
525 SIMIX_mutex_destroy(smpi_received_messages_mutex[i]);
528 xbt_mallocator_free(smpi_request_mallocator);
529 xbt_mallocator_free(smpi_message_mallocator);
530 xbt_free(smpi_pending_send_requests);
531 xbt_free(smpi_pending_send_requests_mutex);
532 xbt_free(smpi_pending_recv_requests);
533 xbt_free(smpi_pending_recv_requests_mutex);
534 xbt_free(smpi_received_messages);
535 xbt_free(smpi_received_messages_mutex);
537 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
538 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
539 xbt_free(smpi_mpi_comm_world.processes);
541 xbt_os_timer_free(smpi_timer);
546 void smpi_bench_begin()
548 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
549 smpi_benchmarking = 1;
550 xbt_os_timer_start(smpi_timer);
554 void smpi_bench_end()
558 smx_action_t compute_action;
562 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
563 smpi_benchmarking = 0;
564 xbt_os_timer_stop(smpi_timer);
565 duration = xbt_os_timer_elapsed(smpi_timer);
566 host = SIMIX_host_self();
567 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
568 mutex = SIMIX_mutex_init();
569 cond = SIMIX_cond_init();
570 SIMIX_mutex_lock(mutex);
571 SIMIX_register_condition_to_action(compute_action, cond);
572 SIMIX_register_action_to_condition(compute_action, cond);
573 SIMIX_cond_wait(cond, mutex);
574 SIMIX_mutex_unlock(mutex);
575 SIMIX_mutex_destroy(mutex);
576 SIMIX_cond_destroy(cond);
577 // FIXME: check for success/failure?
581 void smpi_barrier(smpi_mpi_communicator_t *comm) {
583 SIMIX_mutex_lock(comm->barrier_mutex);
586 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
589 SIMIX_cond_broadcast(comm->barrier_cond);
591 SIMIX_mutex_unlock(comm->barrier_mutex);
594 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
597 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
598 if (i >= comm->size) i = -1;
602 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
603 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
605 int retval = MPI_SUCCESS;
610 retval = MPI_ERR_COUNT;
611 } else if (NULL == buf) {
612 retval = MPI_ERR_INTERN;
613 } else if (NULL == datatype) {
614 retval = MPI_ERR_TYPE;
615 } else if (NULL == comm) {
616 retval = MPI_ERR_COMM;
617 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
618 retval = MPI_ERR_RANK;
619 } else if (0 > dst || comm->size <= dst) {
620 retval = MPI_ERR_RANK;
621 } else if (0 > tag) {
622 retval = MPI_ERR_TAG;
624 *request = xbt_mallocator_get(smpi_request_mallocator);
625 (*request)->comm = comm;
626 (*request)->src = src;
627 (*request)->dst = dst;
628 (*request)->tag = tag;
629 (*request)->buf = buf;
630 (*request)->count = count;
631 (*request)->datatype = datatype;
632 (*request)->completed = 0;
633 (*request)->mutex = SIMIX_mutex_init();
634 (*request)->cond = SIMIX_cond_init();
635 (*request)->waitlist = xbt_fifo_new();
640 int smpi_isend(smpi_mpi_request_t *request)
642 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
644 SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
645 xbt_fifo_push(smpi_pending_send_requests[rank], request);
646 SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
648 if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
649 SIMIX_process_resume(smpi_sender_processes[rank]);
653 int smpi_irecv(smpi_mpi_request_t *request)
655 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
657 SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
658 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
659 SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
661 if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
662 SIMIX_process_resume(smpi_receiver_processes[rank]);
666 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
670 self = SIMIX_process_self();
672 if (NULL != request) {
673 SIMIX_mutex_lock(request->mutex);
674 if (!request->completed) {
675 xbt_fifo_push(request->waitlist, self);
678 SIMIX_mutex_unlock(request->mutex);
680 SIMIX_process_suspend(self);
682 if (NULL != status && MPI_STATUS_IGNORE != status) {
683 SIMIX_mutex_lock(request->mutex);
684 status->MPI_SOURCE = request->src;
685 SIMIX_mutex_unlock(request->mutex);
690 // FIXME: move into own file
691 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
699 now = SIMIX_get_clock();
701 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
707 unsigned int smpi_sleep(unsigned int seconds)
712 smx_action_t sleep_action;
715 host = SIMIX_host_self();
716 sleep_action = SIMIX_action_sleep(host, seconds);
717 mutex = SIMIX_mutex_init();
718 cond = SIMIX_cond_init();
719 SIMIX_mutex_lock(mutex);
720 SIMIX_register_condition_to_action(sleep_action, cond);
721 SIMIX_register_action_to_condition(sleep_action, cond);
722 SIMIX_cond_wait(cond, mutex);
723 SIMIX_mutex_unlock(mutex);
724 SIMIX_mutex_destroy(mutex);
725 SIMIX_cond_destroy(cond);
726 // FIXME: check for success/failure?
731 void smpi_exit(int status)
734 SIMIX_mutex_lock(smpi_running_hosts_mutex);
735 smpi_running_hosts--;
736 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
737 SIMIX_process_kill(SIMIX_process_self());