5 #include "xbt/xbt_os_time.h"
6 #include "xbt/mallocator.h"
9 // FIXME: move globals into structure...
11 xbt_mallocator_t smpi_request_mallocator = NULL;
12 xbt_mallocator_t smpi_message_mallocator = NULL;
14 xbt_fifo_t *smpi_pending_send_requests = NULL;
15 smx_mutex_t *smpi_pending_send_requests_mutex = NULL;
17 xbt_fifo_t *smpi_pending_recv_requests = NULL;
18 smx_mutex_t *smpi_pending_recv_requests_mutex = NULL;
20 xbt_fifo_t *smpi_received_messages = NULL;
21 smx_mutex_t *smpi_received_messages_mutex = NULL;
23 smx_process_t *smpi_sender_processes = NULL;
24 smx_process_t *smpi_receiver_processes = NULL;
26 int smpi_running_hosts = 0;
28 smpi_mpi_communicator_t smpi_mpi_comm_world;
30 smpi_mpi_status_t smpi_mpi_status_ignore;
32 smpi_mpi_datatype_t smpi_mpi_byte;
33 smpi_mpi_datatype_t smpi_mpi_int;
34 smpi_mpi_datatype_t smpi_mpi_double;
36 smpi_mpi_op_t smpi_mpi_land;
37 smpi_mpi_op_t smpi_mpi_sum;
39 static xbt_os_timer_t smpi_timer;
40 static int smpi_benchmarking;
41 static double smpi_reference_speed;
44 smx_mutex_t smpi_running_hosts_mutex = NULL;
45 smx_mutex_t smpi_benchmarking_mutex = NULL;
46 smx_mutex_t init_mutex = NULL;
47 smx_cond_t init_cond = NULL;
49 int smpi_root_ready = 0;
50 int smpi_ready_count = 0;
52 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
54 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
59 // FIXME: smarter algorithm?
60 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
64 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
69 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
71 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
74 int inline smpi_mpi_comm_world_rank_self()
76 return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
79 int smpi_sender(int argc, char **argv)
84 xbt_fifo_t request_queue;
85 smx_mutex_t request_queue_mutex;
87 int running_hosts = 0;
88 smpi_mpi_request_t *request;
90 smx_action_t communicate_action;
91 smpi_received_message_t *scratch;
93 smx_process_t waitproc;
95 self = SIMIX_process_self();
96 shost = SIMIX_host_self();
97 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
99 // make sure root is done before own initialization
100 SIMIX_mutex_lock(init_mutex);
101 if (!smpi_root_ready) {
102 SIMIX_cond_wait(init_cond, init_mutex);
104 SIMIX_mutex_unlock(init_mutex);
106 request_queue = smpi_pending_send_requests[rank];
107 request_queue_mutex = smpi_pending_send_requests_mutex[rank];
109 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
111 smpi_sender_processes[rank] = self;
113 // wait for all nodes to signal initializatin complete
114 SIMIX_mutex_lock(init_mutex);
116 if (smpi_ready_count < 3 * size) {
117 SIMIX_cond_wait(init_cond, init_mutex);
119 SIMIX_cond_broadcast(init_cond);
121 SIMIX_mutex_unlock(init_mutex);
123 SIMIX_mutex_lock(smpi_running_hosts_mutex);
124 running_hosts = smpi_running_hosts;
125 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
127 while (0 < running_hosts) {
129 SIMIX_mutex_lock(request_queue_mutex);
130 request = xbt_fifo_shift(request_queue);
131 SIMIX_mutex_unlock(request_queue_mutex);
133 if (NULL == request) {
134 SIMIX_process_suspend(self);
136 SIMIX_mutex_lock(request->mutex);
138 dhost = request->comm->hosts[request->dst];
140 // FIXME: not at all sure I can assume magic just happens here....
141 communicate_action = SIMIX_action_communicate(shost, dhost,
142 "communication", request->datatype->size * request->count * 1.0, -1.0);
144 SIMIX_register_condition_to_action(communicate_action, request->cond);
145 SIMIX_register_action_to_condition(communicate_action, request->cond);
147 SIMIX_cond_wait(request->cond, request->mutex);
149 // copy request to appropriate received queue
150 scratch = xbt_mallocator_get(smpi_message_mallocator);
151 scratch->comm = request->comm;
152 scratch->src = request->src;
153 scratch->dst = request->dst;
154 scratch->tag = request->tag;
155 scratch->buf = request->buf;
156 drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
157 SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
158 xbt_fifo_push(smpi_received_messages[drank], scratch);
159 SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
161 request->completed = 1;
163 // wake up receiver, then any waiting sender
164 waitproc = smpi_receiver_processes[drank];
167 if (SIMIX_process_is_suspended(waitproc)) {
168 SIMIX_process_resume(waitproc);
170 } while(waitproc = xbt_fifo_shift(request->waitlist));
172 SIMIX_mutex_unlock(request->mutex);
175 SIMIX_mutex_lock(smpi_running_hosts_mutex);
176 running_hosts = smpi_running_hosts;
177 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
180 SIMIX_mutex_lock(init_mutex);
182 if (smpi_ready_count <= 0) {
183 SIMIX_cond_broadcast(init_cond);
185 SIMIX_mutex_unlock(init_mutex);
190 int smpi_receiver(int argc, char **argv)
194 xbt_fifo_t request_queue;
195 smx_mutex_t request_queue_mutex;
196 xbt_fifo_t message_queue;
197 smx_mutex_t message_queue_mutex;
200 xbt_fifo_item_t request_item, message_item;
201 smpi_mpi_request_t *request;
202 smpi_received_message_t *message;
203 smx_process_t waitproc;
205 self = SIMIX_process_self();
206 rank = smpi_mpi_comm_world_rank_self();
208 // make sure root is done before own initialization
209 SIMIX_mutex_lock(init_mutex);
210 if (!smpi_root_ready) {
211 SIMIX_cond_wait(init_cond, init_mutex);
213 SIMIX_mutex_unlock(init_mutex);
215 request_queue = smpi_pending_recv_requests[rank];
216 request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
218 message_queue = smpi_received_messages[rank];
219 message_queue_mutex = smpi_received_messages_mutex[rank];
221 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
222 smpi_receiver_processes[rank] = self;
224 // wait for all nodes to signal initializatin complete
225 SIMIX_mutex_lock(init_mutex);
227 if (smpi_ready_count < 3 * size) {
228 SIMIX_cond_wait(init_cond, init_mutex);
230 SIMIX_cond_broadcast(init_cond);
232 SIMIX_mutex_unlock(init_mutex);
234 SIMIX_mutex_lock(smpi_running_hosts_mutex);
235 running_hosts = smpi_running_hosts;
236 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
238 while (0 < running_hosts) {
243 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
245 // FIXME: not the best way to request multiple locks...
246 SIMIX_mutex_lock(request_queue_mutex);
247 SIMIX_mutex_lock(message_queue_mutex);
248 for (request_item = xbt_fifo_get_first_item(request_queue);
249 NULL != request_item;
250 request_item = xbt_fifo_get_next_item(request_item)) {
251 request = xbt_fifo_get_item_content(request_item);
252 for (message_item = xbt_fifo_get_first_item(message_queue);
253 NULL != message_item;
254 message_item = xbt_fifo_get_next_item(message_item)) {
255 message = xbt_fifo_get_item_content(message_item);
256 if (request->comm == message->comm &&
257 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
258 request->tag == message->tag) {
259 xbt_fifo_remove_item(request_queue, request_item);
260 xbt_fifo_remove_item(message_queue, message_item);
266 SIMIX_mutex_unlock(message_queue_mutex);
267 SIMIX_mutex_unlock(request_queue_mutex);
269 if (NULL == request || NULL == message) {
270 SIMIX_process_suspend(self);
272 SIMIX_mutex_lock(request->mutex);
273 memcpy(request->buf, message->buf, request->count * request->datatype->size);
274 request->src = message->src;
275 request->completed = 1;
277 while (waitproc = xbt_fifo_shift(request->waitlist)) {
278 if (SIMIX_process_is_suspended(waitproc)) {
279 SIMIX_process_resume(waitproc);
282 SIMIX_mutex_unlock(request->mutex);
284 xbt_mallocator_release(smpi_message_mallocator, message);
287 SIMIX_mutex_lock(smpi_running_hosts_mutex);
288 running_hosts = smpi_running_hosts;
289 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
292 SIMIX_mutex_lock(init_mutex);
294 if (smpi_ready_count <= 0) {
295 SIMIX_cond_broadcast(init_cond);
297 SIMIX_mutex_unlock(init_mutex);
302 int smpi_run_simulation(int argc, char **argv)
304 smx_cond_t cond = NULL;
305 smx_action_t action = NULL;
307 xbt_fifo_t actions_failed = xbt_fifo_new();
308 xbt_fifo_t actions_done = xbt_fifo_new();
310 srand(SMPI_RAND_SEED);
312 SIMIX_global_init(&argc, argv);
314 init_mutex = SIMIX_mutex_init();
315 init_cond = SIMIX_cond_init();
317 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
318 SIMIX_function_register("smpi_sender", smpi_sender);
319 SIMIX_function_register("smpi_receiver", smpi_receiver);
320 SIMIX_create_environment(argv[1]);
321 SIMIX_launch_application(argv[2]);
323 /* Prepare to display some more info when dying on Ctrl-C pressing */
324 //signal(SIGINT, inthandler);
326 /* Clean IO before the run */
330 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
331 while (action = xbt_fifo_pop(actions_failed)) {
332 DEBUG1("** %s failed **", action->name);
333 while (cond = xbt_fifo_pop(action->cond_list)) {
334 SIMIX_cond_broadcast(cond);
336 SIMIX_action_destroy(action);
338 while (action = xbt_fifo_pop(actions_done)) {
339 DEBUG1("** %s done **",action->name);
340 while (cond = xbt_fifo_pop(action->cond_list)) {
341 SIMIX_cond_broadcast(cond);
343 SIMIX_action_destroy(action);
346 xbt_fifo_free(actions_failed);
347 xbt_fifo_free(actions_done);
348 INFO1("simulation time %g", SIMIX_get_clock());
353 void smpi_mpi_land_func(void *x, void *y, void *z)
355 *(int *)z = *(int *)x && *(int *)y;
358 void smpi_mpi_sum_func(void *x, void *y, void *z)
360 *(int *)z = *(int *)x + *(int *)y;
363 void *smpi_new_request()
365 return xbt_new(smpi_mpi_request_t, 1);
368 void *smpi_new_message()
370 return xbt_new(smpi_received_message_t, 1);
373 void smpi_do_nothing(void *pointer)
382 smx_process_t process;
387 // initialize some local variables
388 host = SIMIX_host_self();
389 hosts = SIMIX_host_get_table();
390 size = SIMIX_host_get_number();
392 // node 0 sets the globals
393 if (host == hosts[0]) {
396 smpi_sender_processes = xbt_new(smx_process_t, size);
397 smpi_receiver_processes = xbt_new(smx_process_t, size);
400 smpi_running_hosts_mutex = SIMIX_mutex_init();
401 smpi_running_hosts = size;
403 // global communicator
404 smpi_mpi_comm_world.size = size;
405 smpi_mpi_comm_world.barrier = 0;
406 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
407 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
408 smpi_mpi_comm_world.hosts = hosts;
409 smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
410 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
413 smpi_mpi_byte.size = (size_t)1;
414 smpi_mpi_int.size = sizeof(int);
415 smpi_mpi_double.size = sizeof(double);
418 smpi_mpi_land.func = &smpi_mpi_land_func;
419 smpi_mpi_sum.func = &smpi_mpi_sum_func;
422 smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, smpi_do_nothing);
423 smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, smpi_do_nothing);
424 smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
425 smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size);
426 smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
427 smpi_pending_recv_requests_mutex = xbt_new(smx_mutex_t, size);
428 smpi_received_messages = xbt_new(xbt_fifo_t, size);
429 smpi_received_messages_mutex = xbt_new(smx_mutex_t, size);
431 for(i = 0; i < size; i++) {
432 smpi_pending_send_requests[i] = xbt_fifo_new();
433 smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init();
434 smpi_pending_recv_requests[i] = xbt_fifo_new();
435 smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init();
436 smpi_received_messages[i] = xbt_fifo_new();
437 smpi_received_messages_mutex[i] = SIMIX_mutex_init();
440 smpi_timer = xbt_os_timer_new();
441 smpi_reference_speed = SMPI_DEFAULT_SPEED;
442 smpi_benchmarking = 0;
443 smpi_benchmarking_mutex = SIMIX_mutex_init();
445 // signal all nodes to perform initialization
446 SIMIX_mutex_lock(init_mutex);
448 SIMIX_cond_broadcast(init_cond);
449 SIMIX_mutex_unlock(init_mutex);
453 // make sure root is done before own initialization
454 SIMIX_mutex_lock(init_mutex);
455 if (!smpi_root_ready) {
456 SIMIX_cond_wait(init_cond, init_mutex);
458 SIMIX_mutex_unlock(init_mutex);
460 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
464 // wait for all nodes to signal initializatin complete
465 SIMIX_mutex_lock(init_mutex);
467 if (smpi_ready_count < 3 * size) {
468 SIMIX_cond_wait(init_cond, init_mutex);
470 SIMIX_cond_broadcast(init_cond);
472 SIMIX_mutex_unlock(init_mutex);
476 void smpi_mpi_finalize()
480 SIMIX_mutex_lock(smpi_running_hosts_mutex);
481 i = --smpi_running_hosts;
482 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
484 SIMIX_mutex_lock(init_mutex);
486 SIMIX_mutex_unlock(init_mutex);
490 // wake up senders/receivers
491 for (i = 0; i < smpi_mpi_comm_world.size; i++) {
492 if (SIMIX_process_is_suspended(smpi_sender_processes[i])) {
493 SIMIX_process_resume(smpi_sender_processes[i]);
495 if (SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
496 SIMIX_process_resume(smpi_receiver_processes[i]);
500 // wait for senders/receivers to exit...
501 SIMIX_mutex_lock(init_mutex);
502 if (smpi_ready_count > 0) {
503 SIMIX_cond_wait(init_cond, init_mutex);
505 SIMIX_mutex_unlock(init_mutex);
507 SIMIX_mutex_destroy(init_mutex);
508 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
510 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
511 xbt_fifo_free(smpi_pending_send_requests[i]);
512 SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]);
513 xbt_fifo_free(smpi_pending_recv_requests[i]);
514 SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]);
515 xbt_fifo_free(smpi_received_messages[i]);
516 SIMIX_mutex_destroy(smpi_received_messages_mutex[i]);
519 xbt_mallocator_free(smpi_request_mallocator);
520 xbt_mallocator_free(smpi_message_mallocator);
521 xbt_free(smpi_pending_send_requests);
522 xbt_free(smpi_pending_send_requests_mutex);
523 xbt_free(smpi_pending_recv_requests);
524 xbt_free(smpi_pending_recv_requests_mutex);
525 xbt_free(smpi_received_messages);
526 xbt_free(smpi_received_messages_mutex);
528 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
529 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
530 xbt_free(smpi_mpi_comm_world.processes);
532 xbt_os_timer_free(smpi_timer);
537 void smpi_bench_begin()
539 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
540 smpi_benchmarking = 1;
541 xbt_os_timer_start(smpi_timer);
545 void smpi_bench_end()
549 smx_action_t compute_action;
553 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
554 smpi_benchmarking = 0;
555 xbt_os_timer_stop(smpi_timer);
556 duration = xbt_os_timer_elapsed(smpi_timer);
557 host = SIMIX_host_self();
558 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
559 mutex = SIMIX_mutex_init();
560 cond = SIMIX_cond_init();
561 SIMIX_mutex_lock(mutex);
562 SIMIX_register_condition_to_action(compute_action, cond);
563 SIMIX_register_action_to_condition(compute_action, cond);
564 SIMIX_cond_wait(cond, mutex);
565 SIMIX_mutex_unlock(mutex);
566 SIMIX_mutex_destroy(mutex);
567 SIMIX_cond_destroy(cond);
568 // FIXME: check for success/failure?
572 void smpi_barrier(smpi_mpi_communicator_t *comm) {
574 SIMIX_mutex_lock(comm->barrier_mutex);
577 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
580 SIMIX_cond_broadcast(comm->barrier_cond);
582 SIMIX_mutex_unlock(comm->barrier_mutex);
585 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
588 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
589 if (i >= comm->size) i = -1;
593 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
594 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
596 int retval = MPI_SUCCESS;
601 retval = MPI_ERR_COUNT;
602 } else if (NULL == buf) {
603 retval = MPI_ERR_INTERN;
604 } else if (NULL == datatype) {
605 retval = MPI_ERR_TYPE;
606 } else if (NULL == comm) {
607 retval = MPI_ERR_COMM;
608 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
609 retval = MPI_ERR_RANK;
610 } else if (0 > dst || comm->size <= dst) {
611 retval = MPI_ERR_RANK;
612 } else if (0 > tag) {
613 retval = MPI_ERR_TAG;
615 *request = xbt_mallocator_get(smpi_request_mallocator);
616 (*request)->comm = comm;
617 (*request)->src = src;
618 (*request)->dst = dst;
619 (*request)->tag = tag;
620 (*request)->buf = buf;
621 (*request)->count = count;
622 (*request)->datatype = datatype;
623 (*request)->completed = 0;
624 (*request)->mutex = SIMIX_mutex_init();
625 (*request)->cond = SIMIX_cond_init();
626 (*request)->waitlist = NULL;
631 int smpi_isend(smpi_mpi_request_t *request)
633 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
635 SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
636 xbt_fifo_push(smpi_pending_send_requests[rank], request);
637 SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
639 if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
640 SIMIX_process_resume(smpi_sender_processes[rank]);
644 int smpi_irecv(smpi_mpi_request_t *request)
646 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
648 SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
649 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
650 SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
652 if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
653 SIMIX_process_resume(smpi_receiver_processes[rank]);
657 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
661 self = SIMIX_process_self();
663 if (NULL != request) {
664 SIMIX_mutex_lock(request->mutex);
665 if (!request->completed) {
666 xbt_fifo_push(request->waitlist, self);
669 SIMIX_mutex_unlock(request->mutex);
671 SIMIX_process_suspend(self);
673 if (NULL != status && MPI_STATUS_IGNORE != status) {
674 SIMIX_mutex_lock(request->mutex);
675 status->MPI_SOURCE = request->src;
676 SIMIX_mutex_unlock(request->mutex);
681 // FIXME: move into own file
682 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
690 now = SIMIX_get_clock();
692 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
698 unsigned int smpi_sleep(unsigned int seconds)
703 smx_action_t sleep_action;
706 host = SIMIX_host_self();
707 sleep_action = SIMIX_action_sleep(host, seconds);
708 mutex = SIMIX_mutex_init();
709 cond = SIMIX_cond_init();
710 SIMIX_mutex_lock(mutex);
711 SIMIX_register_condition_to_action(sleep_action, cond);
712 SIMIX_register_action_to_condition(sleep_action, cond);
713 SIMIX_cond_wait(cond, mutex);
714 SIMIX_mutex_unlock(mutex);
715 SIMIX_mutex_destroy(mutex);
716 SIMIX_cond_destroy(cond);
717 // FIXME: check for success/failure?
722 void smpi_exit(int status)
725 SIMIX_mutex_lock(smpi_running_hosts_mutex);
726 smpi_running_hosts--;
727 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
728 SIMIX_process_kill(SIMIX_process_self());