5 #include "xbt/xbt_os_time.h"
6 #include "xbt/mallocator.h"
9 // FIXME: move globals into structure...
11 xbt_mallocator_t smpi_request_mallocator = NULL;
12 xbt_mallocator_t smpi_message_mallocator = NULL;
14 xbt_fifo_t *smpi_pending_send_requests = NULL;
15 smx_mutex_t *smpi_pending_send_requests_mutex = NULL;
17 xbt_fifo_t *smpi_pending_recv_requests = NULL;
18 smx_mutex_t *smpi_pending_recv_requests_mutex = NULL;
20 xbt_fifo_t *smpi_received_messages = NULL;
21 smx_mutex_t *smpi_received_messages_mutex = NULL;
23 smx_process_t *smpi_sender_processes = NULL;
24 smx_process_t *smpi_receiver_processes = NULL;
26 int smpi_running_hosts = 0;
28 smpi_mpi_communicator_t smpi_mpi_comm_world;
30 smpi_mpi_status_t smpi_mpi_status_ignore;
32 smpi_mpi_datatype_t smpi_mpi_byte;
33 smpi_mpi_datatype_t smpi_mpi_int;
34 smpi_mpi_datatype_t smpi_mpi_double;
36 smpi_mpi_op_t smpi_mpi_land;
37 smpi_mpi_op_t smpi_mpi_sum;
39 static xbt_os_timer_t smpi_timer;
40 static int smpi_benchmarking;
41 static double smpi_reference_speed;
44 smx_mutex_t smpi_running_hosts_mutex = NULL;
45 smx_mutex_t smpi_benchmarking_mutex = NULL;
46 smx_mutex_t init_mutex = NULL;
47 smx_cond_t init_cond = NULL;
49 int smpi_root_ready = 0;
50 int smpi_ready_count = 0;
52 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
54 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
59 // FIXME: smarter algorithm?
60 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
64 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
69 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
71 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
74 int inline smpi_mpi_comm_world_rank_self()
76 return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
79 int smpi_sender(int argc, char **argv)
84 xbt_fifo_t request_queue;
85 smx_mutex_t request_queue_mutex;
87 int running_hosts = 0;
88 smpi_mpi_request_t *request;
90 smx_action_t communicate_action;
91 smpi_received_message_t *scratch;
93 smx_process_t waitproc;
95 self = SIMIX_process_self();
96 shost = SIMIX_host_self();
97 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
99 // make sure root is done before own initialization
100 SIMIX_mutex_lock(init_mutex);
101 if (!smpi_root_ready) {
102 SIMIX_cond_wait(init_cond, init_mutex);
104 SIMIX_mutex_unlock(init_mutex);
106 request_queue = smpi_pending_send_requests[rank];
107 request_queue_mutex = smpi_pending_send_requests_mutex[rank];
109 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
111 smpi_sender_processes[rank] = self;
113 // wait for all nodes to signal initializatin complete
114 SIMIX_mutex_lock(init_mutex);
116 if (smpi_ready_count < 3 * size) {
117 SIMIX_cond_wait(init_cond, init_mutex);
119 SIMIX_cond_broadcast(init_cond);
121 SIMIX_mutex_unlock(init_mutex);
123 SIMIX_mutex_lock(smpi_running_hosts_mutex);
124 running_hosts = smpi_running_hosts;
125 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
127 while (0 < running_hosts) {
129 SIMIX_mutex_lock(request_queue_mutex);
130 request = xbt_fifo_shift(request_queue);
131 SIMIX_mutex_unlock(request_queue_mutex);
133 if (NULL == request) {
134 SIMIX_process_suspend(self);
136 SIMIX_mutex_lock(request->mutex);
138 dhost = request->comm->hosts[request->dst];
140 // FIXME: not at all sure I can assume magic just happens here....
141 communicate_action = SIMIX_action_communicate(shost, dhost,
142 "communication", request->datatype->size * request->count * 1.0, -1.0);
144 SIMIX_register_condition_to_action(communicate_action, request->cond);
145 SIMIX_register_action_to_condition(communicate_action, request->cond);
147 SIMIX_cond_wait(request->cond, request->mutex);
149 // copy request to appropriate received queue
150 scratch = xbt_mallocator_get(smpi_message_mallocator);
151 scratch->comm = request->comm;
152 scratch->src = request->src;
153 scratch->dst = request->dst;
154 scratch->tag = request->tag;
155 scratch->buf = request->buf;
156 drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
157 SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
158 xbt_fifo_push(smpi_received_messages[drank], scratch);
159 SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
161 request->completed = 1;
163 // wake up receiver, then any waiting sender
164 waitproc = smpi_receiver_processes[drank];
167 if (SIMIX_process_is_suspended(waitproc)) {
168 SIMIX_process_resume(waitproc);
170 } while(waitproc = xbt_fifo_shift(request->waitlist));
172 SIMIX_mutex_unlock(request->mutex);
175 SIMIX_mutex_lock(smpi_running_hosts_mutex);
176 running_hosts = smpi_running_hosts;
177 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
183 int smpi_receiver(int argc, char **argv)
187 xbt_fifo_t request_queue;
188 smx_mutex_t request_queue_mutex;
189 xbt_fifo_t message_queue;
190 smx_mutex_t message_queue_mutex;
193 xbt_fifo_item_t request_item, message_item;
194 smpi_mpi_request_t *request;
195 smpi_received_message_t *message;
196 smx_process_t waitproc;
198 self = SIMIX_process_self();
199 rank = smpi_mpi_comm_world_rank_self();
201 // make sure root is done before own initialization
202 SIMIX_mutex_lock(init_mutex);
203 if (!smpi_root_ready) {
204 SIMIX_cond_wait(init_cond, init_mutex);
206 SIMIX_mutex_unlock(init_mutex);
208 request_queue = smpi_pending_recv_requests[rank];
209 request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
211 message_queue = smpi_received_messages[rank];
212 message_queue_mutex = smpi_received_messages_mutex[rank];
214 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
215 smpi_receiver_processes[rank] = self;
217 // wait for all nodes to signal initializatin complete
218 SIMIX_mutex_lock(init_mutex);
220 if (smpi_ready_count < 3 * size) {
221 SIMIX_cond_wait(init_cond, init_mutex);
223 SIMIX_cond_broadcast(init_cond);
225 SIMIX_mutex_unlock(init_mutex);
227 SIMIX_mutex_lock(smpi_running_hosts_mutex);
228 running_hosts = smpi_running_hosts;
229 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
231 while (0 < running_hosts) {
233 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
235 // FIXME: not the best way to request multiple locks...
236 SIMIX_mutex_lock(request_queue_mutex);
237 SIMIX_mutex_lock(message_queue_mutex);
238 for (request_item = xbt_fifo_get_first_item(request_queue);
239 NULL != request_item;
240 request_item = xbt_fifo_get_next_item(request_item)) {
241 request = xbt_fifo_get_item_content(request_item);
242 for (message_item = xbt_fifo_get_first_item(message_queue);
243 NULL != message_item;
244 message_item = xbt_fifo_get_next_item(message_item)) {
245 message = xbt_fifo_get_item_content(message_item);
246 if (request->comm == message->comm &&
247 (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
248 request->tag == message->tag) {
249 xbt_fifo_remove_item(request_queue, request_item);
250 xbt_fifo_remove_item(message_queue, message_item);
256 SIMIX_mutex_unlock(message_queue_mutex);
257 SIMIX_mutex_unlock(request_queue_mutex);
259 if (NULL == request || NULL == message) {
260 SIMIX_process_suspend(self);
262 SIMIX_mutex_lock(request->mutex);
263 memcpy(request->buf, message->buf, request->count * request->datatype->size);
264 request->src = message->src;
265 request->completed = 1;
267 while (waitproc = xbt_fifo_shift(request->waitlist)) {
268 if (SIMIX_process_is_suspended(waitproc)) {
269 SIMIX_process_resume(waitproc);
272 SIMIX_mutex_unlock(request->mutex);
274 xbt_mallocator_release(smpi_message_mallocator, message);
277 SIMIX_mutex_lock(smpi_running_hosts_mutex);
278 running_hosts = smpi_running_hosts;
279 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
285 int smpi_run_simulation(int argc, char **argv)
287 smx_cond_t cond = NULL;
288 smx_action_t action = NULL;
290 xbt_fifo_t actions_failed = xbt_fifo_new();
291 xbt_fifo_t actions_done = xbt_fifo_new();
293 srand(SMPI_RAND_SEED);
295 SIMIX_global_init(&argc, argv);
297 init_mutex = SIMIX_mutex_init();
298 init_cond = SIMIX_cond_init();
300 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
301 SIMIX_function_register("smpi_sender", smpi_sender);
302 SIMIX_function_register("smpi_receiver", smpi_receiver);
303 SIMIX_create_environment(argv[1]);
304 SIMIX_launch_application(argv[2]);
306 /* Prepare to display some more info when dying on Ctrl-C pressing */
307 //signal(SIGINT, inthandler);
309 /* Clean IO before the run */
313 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
314 while (action = xbt_fifo_pop(actions_failed)) {
315 DEBUG1("** %s failed **", action->name);
316 while (cond = xbt_fifo_pop(action->cond_list)) {
317 SIMIX_cond_broadcast(cond);
319 SIMIX_action_destroy(action);
321 while (action = xbt_fifo_pop(actions_done)) {
322 DEBUG1("** %s done **",action->name);
323 while (cond = xbt_fifo_pop(action->cond_list)) {
324 SIMIX_cond_broadcast(cond);
326 SIMIX_action_destroy(action);
329 xbt_fifo_free(actions_failed);
330 xbt_fifo_free(actions_done);
331 INFO1("simulation time %g", SIMIX_get_clock());
336 void smpi_mpi_land_func(void *x, void *y, void *z)
338 *(int *)z = *(int *)x && *(int *)y;
341 void smpi_mpi_sum_func(void *x, void *y, void *z)
343 *(int *)z = *(int *)x + *(int *)y;
346 void *smpi_new_request()
348 return xbt_new(smpi_mpi_request_t, 1);
351 void *smpi_new_message()
353 return xbt_new(smpi_received_message_t, 1);
360 smx_process_t process;
365 // initialize some local variables
366 host = SIMIX_host_self();
367 hosts = SIMIX_host_get_table();
368 size = SIMIX_host_get_number();
370 // node 0 sets the globals
371 if (host == hosts[0]) {
374 smpi_sender_processes = xbt_new(smx_process_t, size);
375 smpi_receiver_processes = xbt_new(smx_process_t, size);
378 smpi_running_hosts_mutex = SIMIX_mutex_init();
379 smpi_running_hosts = size;
381 // global communicator
382 smpi_mpi_comm_world.size = size;
383 smpi_mpi_comm_world.barrier = 0;
384 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
385 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
386 smpi_mpi_comm_world.hosts = hosts;
387 smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
388 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
391 smpi_mpi_byte.size = (size_t)1;
392 smpi_mpi_int.size = sizeof(int);
393 smpi_mpi_double.size = sizeof(double);
396 smpi_mpi_land.func = &smpi_mpi_land_func;
397 smpi_mpi_sum.func = &smpi_mpi_sum_func;
400 smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, &smpi_new_request, &xbt_free, NULL);
401 smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, &smpi_new_message, &xbt_free, NULL);
402 smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
403 smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size);
404 smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
405 smpi_pending_recv_requests_mutex = xbt_new(smx_mutex_t, size);
406 smpi_received_messages = xbt_new(xbt_fifo_t, size);
407 smpi_received_messages_mutex = xbt_new(smx_mutex_t, size);
409 for(i = 0; i < size; i++) {
410 smpi_pending_send_requests[i] = xbt_fifo_new();
411 smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init();
412 smpi_pending_recv_requests[i] = xbt_fifo_new();
413 smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init();
414 smpi_received_messages[i] = xbt_fifo_new();
415 smpi_received_messages_mutex[i] = SIMIX_mutex_init();
418 smpi_timer = xbt_os_timer_new();
419 smpi_reference_speed = SMPI_DEFAULT_SPEED;
420 smpi_benchmarking = 0;
421 smpi_benchmarking_mutex = SIMIX_mutex_init();
423 // signal all nodes to perform initialization
424 SIMIX_mutex_lock(init_mutex);
426 SIMIX_cond_broadcast(init_cond);
427 SIMIX_mutex_unlock(init_mutex);
431 // make sure root is done before own initialization
432 SIMIX_mutex_lock(init_mutex);
433 if (!smpi_root_ready) {
434 SIMIX_cond_wait(init_cond, init_mutex);
436 SIMIX_mutex_unlock(init_mutex);
438 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
442 // wait for all nodes to signal initializatin complete
443 SIMIX_mutex_lock(init_mutex);
445 if (smpi_ready_count < 3 * size) {
446 SIMIX_cond_wait(init_cond, init_mutex);
448 SIMIX_cond_broadcast(init_cond);
450 SIMIX_mutex_unlock(init_mutex);
454 void smpi_mpi_finalize()
458 SIMIX_mutex_lock(smpi_running_hosts_mutex);
459 i = --smpi_running_hosts;
460 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
464 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
466 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
467 xbt_fifo_free(smpi_pending_send_requests[i]);
468 SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]);
469 xbt_fifo_free(smpi_pending_recv_requests[i]);
470 SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]);
471 xbt_fifo_free(smpi_received_messages[i]);
472 SIMIX_mutex_destroy(smpi_received_messages_mutex[i]);
475 xbt_mallocator_free(smpi_request_mallocator);
476 xbt_mallocator_free(smpi_message_mallocator);
477 xbt_free(smpi_pending_send_requests);
478 xbt_free(smpi_pending_send_requests_mutex);
479 xbt_free(smpi_pending_recv_requests);
480 xbt_free(smpi_pending_recv_requests_mutex);
481 xbt_free(smpi_received_messages);
482 xbt_free(smpi_received_messages_mutex);
484 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
485 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
486 xbt_free(smpi_mpi_comm_world.processes);
488 xbt_os_timer_free(smpi_timer);
493 void smpi_bench_begin()
495 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
496 smpi_benchmarking = 1;
497 xbt_os_timer_start(smpi_timer);
501 void smpi_bench_end()
505 smx_action_t compute_action;
509 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
510 smpi_benchmarking = 0;
511 xbt_os_timer_stop(smpi_timer);
512 duration = xbt_os_timer_elapsed(smpi_timer);
513 host = SIMIX_host_self();
514 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
515 mutex = SIMIX_mutex_init();
516 cond = SIMIX_cond_init();
517 SIMIX_mutex_lock(mutex);
518 SIMIX_register_condition_to_action(compute_action, cond);
519 SIMIX_register_action_to_condition(compute_action, cond);
520 SIMIX_cond_wait(cond, mutex);
521 SIMIX_mutex_unlock(mutex);
522 SIMIX_mutex_destroy(mutex);
523 SIMIX_cond_destroy(cond);
524 // FIXME: check for success/failure?
528 void smpi_barrier(smpi_mpi_communicator_t *comm) {
530 SIMIX_mutex_lock(comm->barrier_mutex);
533 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
536 SIMIX_cond_broadcast(comm->barrier_cond);
538 SIMIX_mutex_unlock(comm->barrier_mutex);
541 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
544 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
545 if (i >= comm->size) i = -1;
549 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
550 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
552 int retval = MPI_SUCCESS;
557 retval = MPI_ERR_COUNT;
558 } else if (NULL == buf) {
559 retval = MPI_ERR_INTERN;
560 } else if (NULL == datatype) {
561 retval = MPI_ERR_TYPE;
562 } else if (NULL == comm) {
563 retval = MPI_ERR_COMM;
564 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
565 retval = MPI_ERR_RANK;
566 } else if (0 > dst || comm->size <= dst) {
567 retval = MPI_ERR_RANK;
568 } else if (0 > tag) {
569 retval = MPI_ERR_TAG;
571 *request = xbt_mallocator_get(smpi_request_mallocator);
572 (*request)->comm = comm;
573 (*request)->src = src;
574 (*request)->dst = dst;
575 (*request)->tag = tag;
576 (*request)->buf = buf;
577 (*request)->count = count;
578 (*request)->datatype = datatype;
579 (*request)->completed = 0;
580 (*request)->mutex = SIMIX_mutex_init();
581 (*request)->cond = SIMIX_cond_init();
582 (*request)->waitlist = NULL;
587 int smpi_isend(smpi_mpi_request_t *request)
589 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
591 SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
592 xbt_fifo_push(smpi_pending_send_requests[rank], request);
593 SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
595 if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
596 MSG_process_resume(smpi_sender_processes[rank]);
600 int smpi_irecv(smpi_mpi_request_t *request)
602 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
604 SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
605 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
606 SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
608 if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
609 MSG_process_resume(smpi_receiver_processes[rank]);
613 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
617 self = SIMIX_process_self();
619 if (NULL != request) {
620 SIMIX_mutex_lock(request->mutex);
621 if (!request->completed) {
622 xbt_fifo_push(request->waitlist, self);
625 SIMIX_mutex_unlock(request->mutex);
629 if (NULL != status && MPI_STATUS_IGNORE != status) {
630 SIMIX_mutex_lock(request->mutex);
631 status->MPI_SOURCE = request->src;
632 SIMIX_mutex_unlock(request->mutex);
637 // FIXME: move into own file
638 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
646 now = SIMIX_get_clock();
648 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
654 unsigned int smpi_sleep(unsigned int seconds)
659 smx_action_t sleep_action;
662 host = SIMIX_host_self();
663 sleep_action = SIMIX_action_sleep(host, seconds);
664 mutex = SIMIX_mutex_init();
665 cond = SIMIX_cond_init();
666 SIMIX_mutex_lock(mutex);
667 SIMIX_register_condition_to_action(sleep_action, cond);
668 SIMIX_register_action_to_condition(sleep_action, cond);
669 SIMIX_cond_wait(cond, mutex);
670 SIMIX_mutex_unlock(mutex);
671 SIMIX_mutex_destroy(mutex);
672 SIMIX_cond_destroy(cond);
673 // FIXME: check for success/failure?
678 void smpi_exit(int status)
681 SIMIX_mutex_lock(smpi_running_hosts_mutex);
682 smpi_running_hosts--;
683 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
684 SIMIX_process_kill(SIMIX_process_self());