5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
10 // FIXME: move globals into structure...
12 xbt_mallocator_t smpi_request_mallocator = NULL;
13 xbt_fifo_t *smpi_pending_send_requests = NULL;
14 xbt_fifo_t *smpi_pending_recv_requests = NULL;
15 xbt_fifo_t *smpi_received_messages = NULL;
17 smx_process_t *smpi_sender_processes = NULL;
18 smx_process_t *smpi_receiver_processes = NULL;
20 int smpi_running_hosts = 0;
22 smpi_mpi_communicator_t smpi_mpi_comm_world;
24 smpi_mpi_status_t smpi_mpi_status_ignore;
26 smpi_mpi_datatype_t smpi_mpi_byte;
27 smpi_mpi_datatype_t smpi_mpi_int;
28 smpi_mpi_datatype_t smpi_mpi_double;
30 smpi_mpi_op_t smpi_mpi_land;
31 smpi_mpi_op_t smpi_mpi_sum;
33 static xbt_os_timer_t smpi_timer;
34 static int smpi_benchmarking;
35 static double smpi_reference_speed;
38 smx_mutex_t smpi_running_hosts_mutex = NULL;
39 smx_mutex_t smpi_benchmarking_mutex = NULL;
40 smx_mutex_t init_mutex = NULL;
41 smx_cond_t init_cond = NULL;
43 int smpi_root_ready = 0;
44 int smpi_ready_count = 0;
46 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
48 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
53 // FIXME: smarter algorithm?
54 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
58 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
63 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
65 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
68 int inline smpi_mpi_comm_world_rank_self()
70 return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self())
73 // FIXME: messages are actually smaller than requests, use them instead?
74 int smpi_sender(int argc, char **argv)
79 xbt_fifo_t request_queue;
81 int running_hosts = 0;
82 smpi_mpi_request_t *request;
84 smx_action_t communicate_action;
85 smpi_mpi_request_t *scratch;
87 smx_process_t waitproc;
89 self = SIMIX_process_self();
90 shost = SIMIX_host_self();
91 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
93 // make sure root is done before own initialization
94 SIMIX_mutex_lock(init_mutex);
95 if (!smpi_root_ready) {
96 SIMIX_cond_wait(init_cond, init_mutex);
98 SIMIX_mutex_unlock(init_mutex);
100 request_queue = smpi_pending_send_requests[rank];
101 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
102 smpi_sender_processes[rank] = self;
104 // wait for all nodes to signal initializatin complete
105 SIMIX_mutex_lock(init_mutex);
107 if (smpi_ready_count < 3 * size) {
108 SIMIX_cond_wait(init_cond, init_mutex);
110 SIMIX_cond_broadcast(init_cond);
112 SIMIX_mutex_unlock(init_mutex);
114 SIMIX_mutex_lock(smpi_running_hosts_mutex);
115 running_hosts = smpi_running_hosts;
116 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
118 while (0 < running_hosts) {
121 request = xbt_fifo_shift(request_queue);
123 if (NULL == request) {
124 SIMIX_process_suspend(self);
126 SIMIX_mutex_lock(request->mutex);
128 dhost = request->comm->hosts[request->dst];
130 // FIXME: not at all sure I can assume magic just happens here....
131 communicate_action = SIMIX_action_communicate(shost, dhost,
132 "communication", request->datatype->size * request->count * 1.0, -1.0);
134 SIMIX_register_condition_to_action(communicate_action, request->cond);
135 SIMIX_register_action_to_condition(communicate_action, request->cond);
137 SIMIX_cond_wait(request->cond, request->mutex);
139 // copy request to appropriate received queue
140 scratch = xbt_mallocator_get(smpi_request_mallocator);
141 memcpy(scratch, request, sizeof smpi_mpi_request_t);
142 drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
143 xbt_fifo_push(smpi_received_messages[drank], scratch);
145 request->completed = 1;
147 while(waitproc = xbt_fifo_shift(request->waitlist)) {
148 if (SIMIX_process_is_suspended(waitproc)) {
149 SIMIX_process_resume(waitproc);
153 SIMIX_mutex_unlock(request->mutex);
156 SIMIX_mutex_lock(smpi_running_hosts_mutex);
157 running_hosts = smpi_running_hosts;
158 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
164 int smpi_receiver(int argc, char **argv)
168 xbt_fifo_t request_queue;
169 xbt_fifo_t message_queue;
172 smpi_mpi_request_t *message;
173 smpi_mpi_request_t *request;
174 smx_process_t waitproc;
176 self = SIMIX_process_self();
177 rank = smpi_mpi_comm_world_rank_self();
179 // make sure root is done before own initialization
180 SIMIX_mutex_lock(init_mutex);
181 if (!smpi_root_ready) {
182 SIMIX_cond_wait(init_cond, init_mutex);
184 SIMIX_mutex_unlock(init_mutex);
186 request_queue = smpi_pending_receive_requests[rank];
187 message_queue = smpi_received_messages[rank];
188 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
189 smpi_receiver_processes[rank] = self;
191 // wait for all nodes to signal initializatin complete
192 SIMIX_mutex_lock(init_mutex);
194 if (smpi_ready_count < 3 * size) {
195 SIMIX_cond_wait(init_cond, init_mutex);
197 SIMIX_cond_broadcast(init_cond);
199 SIMIX_mutex_unlock(init_mutex);
201 SIMIX_mutex_lock(smpi_running_hosts_mutex);
202 running_hosts = smpi_running_hosts;
203 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
205 while (0 < running_hosts) {
207 // FIXME: search for received messages and requests
209 if (NULL == request) {
210 SIMIX_process_suspend(self);
212 SIMIX_mutex_lock(request->mutex);
213 memcpy(request->buf, message->buf, request->count * request->type->size);
214 request->src = message->src;
215 reqeust->completed = 1;
217 while (waitproc = xbt_fifo_shift(request->waitlist)) {
218 if (SIMIX_process_is_suspended(waitproc)) {
219 SIMIX_process_resume(waitproc);
223 SIMIX_mutex_unlock(request->mutex);
224 xbt_mallocator_release(smpi_request_mallocator, message);
227 SIMIX_mutex_lock(smpi_running_hosts_mutex);
228 running_hosts = smpi_running_hosts;
229 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
235 int smpi_run_simulation(int argc, char **argv)
237 smx_cond_t cond = NULL;
238 smx_action_t action = NULL;
240 xbt_fifo_t actions_failed = xbt_fifo_new();
241 xbt_fifo_t actions_done = xbt_fifo_new();
243 srand(SMPI_RAND_SEED);
245 SIMIX_global_init(&argc, argv);
247 init_mutex = SIMIX_mutex_init();
248 init_cond = SIMIX_cond_init();
250 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
251 SIMIX_function_register("smpi_sender", smpi_sender);
252 SIMIX_function_register("smpi_receiver", smpi_receiver);
253 SIMIX_create_environment(argv[1]);
254 SIMIX_launch_application(argv[2]);
256 /* Prepare to display some more info when dying on Ctrl-C pressing */
257 //signal(SIGINT, inthandler);
259 /* Clean IO before the run */
263 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
264 while (action = xbt_fifo_pop(actions_failed)) {
265 DEBUG1("** %s failed **", action->name);
266 while (cond = xbt_fifo_pop(action->cond_list)) {
267 SIMIX_cond_broadcast(cond);
269 SIMIX_action_destroy(action);
271 while (action = xbt_fifo_pop(actions_done)) {
272 DEBUG1("** %s done **",action->name);
273 while (cond = xbt_fifo_pop(action->cond_list)) {
274 SIMIX_cond_broadcast(cond);
276 SIMIX_action_destroy(action);
279 xbt_fifo_free(actions_failed);
280 xbt_fifo_free(actions_done);
281 INFO1("simulation time %g", SIMIX_get_clock());
286 void smpi_mpi_land_func(void *x, void *y, void *z)
288 *(int *)z = *(int *)x && *(int *)y;
291 void smpi_mpi_sum_func(void *x, void *y, void *z)
293 *(int *)z = *(int *)x + *(int *)y;
296 smpi_mpi_request_t *smpi_new_request()
298 return xbt_new(smpi_mpi_request_t, 1);
305 smx_process_t process;
310 // initialize some local variables
311 host = SIMIX_host_self();
312 hosts = SIMIX_host_get_table();
313 size = SIMIX_host_get_number();
315 // node 0 sets the globals
316 if (host == hosts[0]) {
319 smpi_sender_processes = xbt_new(smx_process_t, size);
320 smpi_receiver_processes = xbt_new(smx_process_t, size);
323 smpi_running_hosts_mutex = SIMIX_mutex_init();
324 smpi_running_hosts = size;
326 // global communicator
327 smpi_mpi_comm_world.size = size;
328 smpi_mpi_comm_world.barrier = 0;
329 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
330 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
331 smpi_mpi_comm_world.hosts = hosts;
332 smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
333 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
336 smpi_mpi_byte.size = (size_t)1;
337 smpi_mpi_int.size = sizeof(int);
338 smpi_mpi_double.size = sizeof(double);
341 smpi_mpi_land.func = &smpi_mpi_land_func;
342 smpi_mpi_sum.func = &smpi_mpi_sum_func;
345 smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
346 smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
347 smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
348 smpi_received_messages = xbt_new(xbt_fifo_t, size);
350 for(i = 0; i < size; i++) {
351 smpi_pending_send_requests[i] = xbt_fifo_new();
352 smpi_pending_recv_requests[i] = xbt_fifo_new();
353 smpi_received_messages[i] = xbt_fifo_new();
356 smpi_timer = xbt_os_timer_new();
357 smpi_reference_speed = SMPI_DEFAULT_SPEED;
358 smpi_benchmarking = 0;
359 smpi_benchmarking_mutex = SIMIX_mutex_init();
361 // signal all nodes to perform initialization
362 SIMIX_mutex_lock(init_mutex);
364 SIMIX_cond_broadcast(init_cond);
365 SIMIX_mutex_unlock(init_mutex);
369 // make sure root is done before own initialization
370 SIMIX_mutex_lock(init_mutex);
371 if (!smpi_root_ready) {
372 SIMIX_cond_wait(init_cond, init_mutex);
374 SIMIX_mutex_unlock(init_mutex);
376 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
380 // wait for all nodes to signal initializatin complete
381 SIMIX_mutex_lock(init_mutex);
383 if (smpi_ready_count < 3 * size) {
384 SIMIX_cond_wait(init_cond, init_mutex);
386 SIMIX_cond_broadcast(init_cond);
388 SIMIX_mutex_unlock(init_mutex);
392 void smpi_mpi_finalize()
396 SIMIX_mutex_lock(smpi_running_hosts_mutex);
397 i = --smpi_running_hosts;
398 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
402 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
404 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
405 xbt_fifo_free(smpi_pending_send_requests[i]);
406 xbt_fifo_free(smpi_pending_recv_requests[i]);
407 xbt_fifo_free(smpi_received_messages[i]);
410 xbt_mallocator_free(smpi_request_mallocator);
411 xbt_free(smpi_pending_send_requests);
412 xbt_free(smpi_pending_recv_requests);
413 xbt_free(smpi_received_messages);
415 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
416 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
417 xbt_free(smpi_mpi_comm_world.processes);
419 xbt_os_timer_free(smpi_timer);
424 void smpi_bench_begin()
426 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
427 smpi_benchmarking = 1;
428 xbt_os_timer_start(smpi_timer);
432 void smpi_bench_end()
436 smx_action_t compute_action;
440 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
441 smpi_benchmarking = 0;
442 xbt_os_timer_stop(smpi_timer);
443 duration = xbt_os_timer_elapsed(smpi_timer);
444 host = SIMIX_host_self();
445 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
446 mutex = SIMIX_mutex_init();
447 cond = SIMIX_cond_init();
448 SIMIX_mutex_lock(mutex);
449 SIMIX_register_condition_to_action(compute_action, cond);
450 SIMIX_register_action_to_condition(compute_action, cond);
451 SIMIX_cond_wait(cond, mutex);
452 SIMIX_mutex_unlock(mutex);
453 SIMIX_mutex_destroy(mutex);
454 SIMIX_cond_destroy(cond);
455 // FIXME: check for success/failure?
459 void smpi_barrier(smpi_mpi_communicator_t *comm) {
461 SIMIX_mutex_lock(comm->barrier_mutex);
464 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
467 SIMIX_cond_broadcast(comm->barrier_cond);
469 SIMIX_mutex_unlock(comm->barrier_mutex);
472 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
475 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
476 if (i >= comm->size) i = -1;
480 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
481 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
483 int retval = MPI_SUCCESS;
488 retval = MPI_ERR_COUNT;
489 } else if (NULL == buf) {
490 retval = MPI_ERR_INTERN;
491 } else if (NULL == datatype) {
492 retval = MPI_ERR_TYPE;
493 } else if (NULL == comm) {
494 retval = MPI_ERR_COMM;
495 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
496 retval = MPI_ERR_RANK;
497 } else if (0 > dst || comm->size <= dst) {
498 retval = MPI_ERR_RANK;
499 } else if (0 > tag) {
500 retval = MPI_ERR_TAG;
502 *request = xbt_mallocator_get(smpi_request_mallocator);
503 (*request)->buf = buf;
504 (*request)->count = count;
505 (*request)->datatype = datatype;
506 (*request)->src = src;
507 (*request)->dst = dst;
508 (*request)->tag = tag;
509 (*request)->comm = comm;
510 (*request)->completed = 0;
511 (*request)->waitlist = NULL;
516 int smpi_isend(smpi_mpi_request_t *request)
518 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
520 xbt_fifo_push(smpi_pending_send_requests[rank], request);
522 if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
523 MSG_process_resume(smpi_sender_processes[rank]);
527 int smpi_irecv(smpi_mpi_request_t *request)
529 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
531 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
533 if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
534 MSG_process_resume(smpi_receiver_processes[rank]);
538 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
542 self = SIMIX_process_self();
544 if (NULL != request) {
545 SIMIX_mutex_lock(request->mutex);
546 if (!request->completed) {
547 xbt_fifo_push(request->waitlist, self);
550 SIMIX_mutex_unlock(request->mutex);
554 if (NULL != status && MPI_STATUS_IGNORE != status) {
555 SIMIX_mutex_lock(request->mutex);
556 status->MPI_SOURCE = request->src;
557 SIMIX_mutex_unlock(request->mutex);
562 // FIXME: move into own file
563 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
571 now = SIMIX_get_clock();
573 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
579 unsigned int smpi_sleep(unsigned int seconds)
584 smx_action_t sleep_action;
587 host = SIMIX_host_self();
588 sleep_action = SIMIX_action_sleep(host, seconds);
589 mutex = SIMIX_mutex_init();
590 cond = SIMIX_cond_init();
591 SIMIX_mutex_lock(mutex);
592 SIMIX_register_condition_to_action(sleep_action, cond);
593 SIMIX_register_action_to_condition(sleep_action, cond);
594 SIMIX_cond_wait(cond, mutex);
595 SIMIX_mutex_unlock(mutex);
596 SIMIX_mutex_destroy(mutex);
597 SIMIX_cond_destroy(cond);
598 // FIXME: check for success/failure?
603 void smpi_exit(int status)
606 SIMIX_mutex_lock(smpi_running_hosts_mutex);
607 smpi_running_hosts--;
608 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
609 SIMIX_process_kill(SIMIX_process_self());