5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
10 xbt_mallocator_t smpi_request_mallocator = NULL;
11 xbt_fifo_t *smpi_pending_send_requests = NULL;
12 xbt_fifo_t *smpi_pending_recv_requests = NULL;
13 xbt_fifo_t *smpi_received_messages = NULL;
15 smx_process_t *smpi_sender_processes = NULL;
16 smx_process_t *smpi_receiver_processes = NULL;
18 int smpi_running_hosts = 0;
20 smpi_mpi_communicator_t smpi_mpi_comm_world;
22 smpi_mpi_status_t smpi_mpi_status_ignore;
24 smpi_mpi_datatype_t smpi_mpi_byte;
25 smpi_mpi_datatype_t smpi_mpi_int;
26 smpi_mpi_datatype_t smpi_mpi_double;
28 smpi_mpi_op_t smpi_mpi_land;
29 smpi_mpi_op_t smpi_mpi_sum;
31 static xbt_os_timer_t smpi_timer;
32 static int smpi_benchmarking;
33 static double smpi_reference_speed;
36 smx_mutex_t smpi_running_hosts_mutex = NULL;
37 smx_mutex_t smpi_benchmarking_mutex = NULL;
38 smx_mutex_t init_mutex = NULL;
39 smx_cond_t init_cond = NULL;
41 int smpi_root_ready = 0;
42 int smpi_ready_count = 0;
44 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
46 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
51 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
55 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
60 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
62 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
65 int smpi_sender(int argc, char **argv)
69 int running_hosts = 0;
70 smpi_mpi_request_t *request;
72 smx_host_t shost, dhost;
73 smx_action_t communicate_action;
74 smpi_mpi_request_t *scratch;
77 self = SIMIX_process_self();
78 shost = SIMIX_host_self();
80 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
82 // make sure root is done before own initialization
83 SIMIX_mutex_lock(init_mutex);
84 if (!smpi_root_ready) {
85 SIMIX_cond_wait(init_cond, init_mutex);
87 SIMIX_mutex_unlock(init_mutex);
89 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
90 smpi_sender_processes[rank] = self;
92 // wait for all nodes to signal initializatin complete
93 SIMIX_mutex_lock(init_mutex);
95 if (smpi_ready_count < 3 * size) {
96 SIMIX_cond_wait(init_cond, init_mutex);
98 SIMIX_cond_broadcast(init_cond);
100 SIMIX_mutex_unlock(init_mutex);
102 SIMIX_mutex_lock(smpi_running_hosts_mutex);
103 running_hosts = smpi_running_hosts;
104 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
106 while (0 < running_hosts) {
108 request = xbt_fifo_shift(smpi_pending_send_requests[rank]);
110 if (NULL == request) {
111 SIMIX_process_suspend(self);
113 SIMIX_mutex_lock(request->mutex);
115 dhost = request->comm->hosts[request->dst];
117 // FIXME: not at all sure I can assume magic just happens here....
118 communicate_action = SIMIX_action_communicate(shost, dhost,
119 "communication", request->datatype->size * request->count * 1.0, -1.0);
121 SIMIX_register_condition_to_action(communicate_action, request->cond);
122 SIMIX_register_action_to_condition(communicate_action, request->cond);
124 SIMIX_cond_wait(request->cond, request->mutex);
126 // copy request to appropriate received queue
127 scratch = xbt_mallocator_get(smpi_request_mallocator);
128 memcpy(scratch, request, sizeof smpi_mpi_request_t);
129 drank = smpi_mpi_comm_rank(MPI_COMM_WORLD, dhost);
130 xbt_fifo_push(smpi_received_messages[drank], scratch);
132 request->completed = 1;
134 SIMIX_mutex_unlock(request->mutex);
137 SIMIX_mutex_lock(smpi_running_hosts_mutex);
138 running_hosts = smpi_running_hosts;
139 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
145 int smpi_receiver(int argc, char **argv)
153 dhost = SIMIX_host_self();
154 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
155 self = SIMIX_process_self();
157 // make sure root is done before own initialization
158 SIMIX_mutex_lock(init_mutex);
159 if (!smpi_root_ready) {
160 SIMIX_cond_wait(init_cond, init_mutex);
162 SIMIX_mutex_unlock(init_mutex);
164 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
165 smpi_receiver_processes[rank] = SIMIX_process_self();
167 // wait for all nodes to signal initializatin complete
168 SIMIX_mutex_lock(init_mutex);
170 if (smpi_ready_count < 3 * size) {
171 SIMIX_cond_wait(init_cond, init_mutex);
173 SIMIX_cond_broadcast(init_cond);
175 SIMIX_mutex_unlock(init_mutex);
177 SIMIX_mutex_lock(smpi_running_hosts_mutex);
178 running_hosts = smpi_running_hosts;
179 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
181 while (0 < running_hosts) {
183 request = xbt_fifo_shift(smpi_pending_send_requests[rank]);
185 if (NULL == request) {
186 SIMIX_process_suspend(self);
188 SIMIX_mutex_lock(request->mutex);
191 SIMIX_mutex_lock(smpi_running_hosts_mutex);
192 running_hosts = smpi_running_hosts;
193 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
199 int smpi_run_simulation(int argc, char **argv)
201 smx_cond_t cond = NULL;
202 smx_action_t action = NULL;
204 xbt_fifo_t actions_failed = xbt_fifo_new();
205 xbt_fifo_t actions_done = xbt_fifo_new();
207 srand(SMPI_RAND_SEED);
209 SIMIX_global_init(&argc, argv);
211 init_mutex = SIMIX_mutex_init();
212 init_cond = SIMIX_cond_init();
214 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
215 SIMIX_function_register("smpi_sender", smpi_sender);
216 SIMIX_function_register("smpi_receiver", smpi_receiver);
217 SIMIX_create_environment(argv[1]);
218 SIMIX_launch_application(argv[2]);
220 /* Prepare to display some more info when dying on Ctrl-C pressing */
221 //signal(SIGINT, inthandler);
223 /* Clean IO before the run */
227 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
228 while (action = xbt_fifo_pop(actions_failed)) {
229 DEBUG1("** %s failed **", action->name);
230 while (cond = xbt_fifo_pop(action->cond_list)) {
231 SIMIX_cond_broadcast(cond);
233 SIMIX_action_destroy(action);
235 while (action = xbt_fifo_pop(actions_done)) {
236 DEBUG1("** %s done **",action->name);
237 while (cond = xbt_fifo_pop(action->cond_list)) {
238 SIMIX_cond_broadcast(cond);
240 SIMIX_action_destroy(action);
243 xbt_fifo_free(actions_failed);
244 xbt_fifo_free(actions_done);
245 INFO1("simulation time %g", SIMIX_get_clock());
250 void smpi_mpi_land_func(void *x, void *y, void *z)
252 *(int *)z = *(int *)x && *(int *)y;
255 void smpi_mpi_sum_func(void *x, void *y, void *z)
257 *(int *)z = *(int *)x + *(int *)y;
260 smpi_mpi_request_t *smpi_new_request()
262 return xbt_new(smpi_mpi_request_t, 1);
269 smx_process_t process;
274 // initialize some local variables
275 host = SIMIX_host_self();
276 hosts = SIMIX_host_get_table();
277 size = SIMIX_host_get_number();
279 // node 0 sets the globals
280 if (host == hosts[0]) {
283 smpi_sender_processes = xbt_new(smx_process_t, size);
284 smpi_receiver_processes = xbt_new(smx_process_t, size);
287 smpi_running_hosts_mutex = SIMIX_mutex_init();
288 smpi_running_hosts = size;
290 // global communicator
291 smpi_mpi_comm_world.size = size;
292 smpi_mpi_comm_world.barrier = 0;
293 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
294 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
295 smpi_mpi_comm_world.hosts = hosts;
296 smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
297 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
300 smpi_mpi_byte.size = (size_t)1;
301 smpi_mpi_int.size = sizeof(int);
302 smpi_mpi_double.size = sizeof(double);
305 smpi_mpi_land.func = &smpi_mpi_land_func;
306 smpi_mpi_sum.func = &smpi_mpi_sum_func;
309 smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
310 smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
311 smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
312 smpi_received_messages = xbt_new(xbt_fifo_t, size);
314 for(i = 0; i < size; i++) {
315 smpi_pending_send_requests[i] = xbt_fifo_new();
316 smpi_pending_recv_requests[i] = xbt_fifo_new();
317 smpi_received_messages[i] = xbt_fifo_new();
320 smpi_timer = xbt_os_timer_new();
321 smpi_reference_speed = SMPI_DEFAULT_SPEED;
322 smpi_benchmarking = 0;
323 smpi_benchmarking_mutex = SIMIX_mutex_init();
325 // signal all nodes to perform initialization
326 SIMIX_mutex_lock(init_mutex);
328 SIMIX_cond_broadcast(init_cond);
329 SIMIX_mutex_unlock(init_mutex);
333 // make sure root is done before own initialization
334 SIMIX_mutex_lock(init_mutex);
335 if (!smpi_root_ready) {
336 SIMIX_cond_wait(init_cond, init_mutex);
338 SIMIX_mutex_unlock(init_mutex);
340 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
344 // wait for all nodes to signal initializatin complete
345 SIMIX_mutex_lock(init_mutex);
347 if (smpi_ready_count < 3 * size) {
348 SIMIX_cond_wait(init_cond, init_mutex);
350 SIMIX_cond_broadcast(init_cond);
352 SIMIX_mutex_unlock(init_mutex);
356 void smpi_mpi_finalize()
360 SIMIX_mutex_lock(smpi_running_hosts_mutex);
361 i = --smpi_running_hosts;
362 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
366 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
368 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
369 xbt_fifo_free(smpi_pending_send_requests[i]);
370 xbt_fifo_free(smpi_pending_recv_requests[i]);
371 xbt_fifo_free(smpi_received_messages[i]);
374 xbt_mallocator_free(smpi_request_mallocator);
375 xbt_free(smpi_pending_send_requests);
376 xbt_free(smpi_pending_recv_requests);
377 xbt_free(smpi_received_messages);
379 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
380 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
381 xbt_free(smpi_mpi_comm_world.processes);
383 xbt_os_timer_free(smpi_timer);
388 void smpi_bench_begin()
390 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
391 smpi_benchmarking = 1;
392 xbt_os_timer_start(smpi_timer);
396 void smpi_bench_end()
400 smx_action_t compute_action;
404 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
405 smpi_benchmarking = 0;
406 xbt_os_timer_stop(smpi_timer);
407 duration = xbt_os_timer_elapsed(smpi_timer);
408 host = SIMIX_host_self();
409 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
410 mutex = SIMIX_mutex_init();
411 cond = SIMIX_cond_init();
412 SIMIX_mutex_lock(mutex);
413 SIMIX_register_condition_to_action(compute_action, cond);
414 SIMIX_register_action_to_condition(compute_action, cond);
415 SIMIX_cond_wait(cond, mutex);
416 SIMIX_mutex_unlock(mutex);
417 SIMIX_mutex_destroy(mutex);
418 SIMIX_cond_destroy(cond);
419 // FIXME: check for success/failure?
423 void smpi_barrier(smpi_mpi_communicator_t *comm) {
425 SIMIX_mutex_lock(comm->barrier_mutex);
428 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
431 SIMIX_cond_broadcast(comm->barrier_cond);
433 SIMIX_mutex_unlock(comm->barrier_mutex);
436 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
439 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
440 if (i >= comm->size) i = -1;
444 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
445 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
447 int retval = MPI_SUCCESS;
452 retval = MPI_ERR_COUNT;
453 } else if (NULL == buf) {
454 retval = MPI_ERR_INTERN;
455 } else if (NULL == datatype) {
456 retval = MPI_ERR_TYPE;
457 } else if (NULL == comm) {
458 retval = MPI_ERR_COMM;
459 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
460 retval = MPI_ERR_RANK;
461 } else if (0 > dst || comm->size <= dst) {
462 retval = MPI_ERR_RANK;
463 } else if (0 > tag) {
464 retval = MPI_ERR_TAG;
466 *request = xbt_mallocator_get(smpi_request_mallocator);
467 (*request)->buf = buf;
468 (*request)->count = count;
469 (*request)->datatype = datatype;
470 (*request)->src = src;
471 (*request)->dst = dst;
472 (*request)->tag = tag;
473 (*request)->comm = comm;
474 (*request)->completed = 0;
475 (*request)->waitlist = NULL;
480 int smpi_isend(smpi_mpi_request_t *request)
482 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
484 xbt_fifo_push(smpi_pending_send_requests[rank], request);
486 if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
487 MSG_process_resume(smpi_sender_processes[rank]);
491 int smpi_irecv(smpi_mpi_request_t *request)
493 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
495 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
497 if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
498 MSG_process_resume(smpi_receiver_processes[rank]);
502 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
506 if (NULL != request) {
507 if (!request->completed) {
508 self = SIMIX_process_self();
509 xbt_fifo_push(request->waitlist, self);
510 } SIMIX_suspend(self);
511 if (NULL != status && MPI_STATUS_IGNORE != status) {
512 status->MPI_SOURCE = request->src;
517 // FIXME: move into own file
518 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
526 now = SIMIX_get_clock();
528 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
534 unsigned int smpi_sleep(unsigned int seconds)
539 smx_action_t sleep_action;
542 host = SIMIX_host_self();
543 sleep_action = SIMIX_action_sleep(host, seconds);
544 mutex = SIMIX_mutex_init();
545 cond = SIMIX_cond_init();
546 SIMIX_mutex_lock(mutex);
547 SIMIX_register_condition_to_action(sleep_action, cond);
548 SIMIX_register_action_to_condition(sleep_action, cond);
549 SIMIX_cond_wait(cond, mutex);
550 SIMIX_mutex_unlock(mutex);
551 SIMIX_mutex_destroy(mutex);
552 SIMIX_cond_destroy(cond);
553 // FIXME: check for success/failure?
558 void smpi_exit(int status)
561 SIMIX_mutex_lock(smpi_running_hosts_mutex);
562 smpi_running_hosts--;
563 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
564 SIMIX_process_kill(SIMIX_process_self());