5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
10 // FIXME: move globals into structure...
12 xbt_mallocator_t smpi_request_mallocator = NULL;
13 xbt_mallocator_t smpi_message_mallocator = NULL;
14 xbt_fifo_t *smpi_pending_send_requests = NULL;
15 xbt_fifo_t *smpi_pending_recv_requests = NULL;
16 xbt_fifo_t *smpi_received_messages = NULL;
18 smx_process_t *smpi_sender_processes = NULL;
19 smx_process_t *smpi_receiver_processes = NULL;
21 int smpi_running_hosts = 0;
23 smpi_mpi_communicator_t smpi_mpi_comm_world;
25 smpi_mpi_status_t smpi_mpi_status_ignore;
27 smpi_mpi_datatype_t smpi_mpi_byte;
28 smpi_mpi_datatype_t smpi_mpi_int;
29 smpi_mpi_datatype_t smpi_mpi_double;
31 smpi_mpi_op_t smpi_mpi_land;
32 smpi_mpi_op_t smpi_mpi_sum;
34 static xbt_os_timer_t smpi_timer;
35 static int smpi_benchmarking;
36 static double smpi_reference_speed;
39 smx_mutex_t smpi_running_hosts_mutex = NULL;
40 smx_mutex_t smpi_benchmarking_mutex = NULL;
41 smx_mutex_t init_mutex = NULL;
42 smx_cond_t init_cond = NULL;
44 int smpi_root_ready = 0;
45 int smpi_ready_count = 0;
47 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
49 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
54 // FIXME: smarter algorithm?
55 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
59 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
64 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
66 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
69 int inline smpi_mpi_comm_world_rank_self()
71 return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self())
74 // FIXME: messages are actually smaller than requests, use them instead?
75 int smpi_sender(int argc, char **argv)
80 xbt_fifo_t request_queue;
82 int running_hosts = 0;
83 smpi_mpi_request_t *request;
85 smx_action_t communicate_action;
86 smpi_mpi_request_t *scratch;
88 smx_process_t waitproc;
90 self = SIMIX_process_self();
91 shost = SIMIX_host_self();
92 rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
94 // make sure root is done before own initialization
95 SIMIX_mutex_lock(init_mutex);
96 if (!smpi_root_ready) {
97 SIMIX_cond_wait(init_cond, init_mutex);
99 SIMIX_mutex_unlock(init_mutex);
101 request_queue = smpi_pending_send_requests[rank];
102 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
103 smpi_sender_processes[rank] = self;
105 // wait for all nodes to signal initializatin complete
106 SIMIX_mutex_lock(init_mutex);
108 if (smpi_ready_count < 3 * size) {
109 SIMIX_cond_wait(init_cond, init_mutex);
111 SIMIX_cond_broadcast(init_cond);
113 SIMIX_mutex_unlock(init_mutex);
115 SIMIX_mutex_lock(smpi_running_hosts_mutex);
116 running_hosts = smpi_running_hosts;
117 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
119 while (0 < running_hosts) {
122 request = xbt_fifo_shift(request_queue);
124 if (NULL == request) {
125 SIMIX_process_suspend(self);
127 SIMIX_mutex_lock(request->mutex);
129 dhost = request->comm->hosts[request->dst];
131 // FIXME: not at all sure I can assume magic just happens here....
132 communicate_action = SIMIX_action_communicate(shost, dhost,
133 "communication", request->datatype->size * request->count * 1.0, -1.0);
135 SIMIX_register_condition_to_action(communicate_action, request->cond);
136 SIMIX_register_action_to_condition(communicate_action, request->cond);
138 SIMIX_cond_wait(request->cond, request->mutex);
140 // copy request to appropriate received queue
141 scratch = xbt_mallocator_get(smpi_message_mallocator);
142 scratch->comm = request->comm;
143 scratch->src = request->src;
144 scratch->dst = request->dst;
145 scratch->tag = request->tag;
146 scratch->buf = request->buf;
147 drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
148 xbt_fifo_push(smpi_received_messages[drank], scratch);
150 request->completed = 1;
152 while(waitproc = xbt_fifo_shift(request->waitlist)) {
153 if (SIMIX_process_is_suspended(waitproc)) {
154 SIMIX_process_resume(waitproc);
158 SIMIX_mutex_unlock(request->mutex);
161 SIMIX_mutex_lock(smpi_running_hosts_mutex);
162 running_hosts = smpi_running_hosts;
163 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
169 int smpi_receiver(int argc, char **argv)
173 xbt_fifo_t request_queue;
174 xbt_fifo_t message_queue;
177 smpi_mpi_request_t *message;
178 smpi_mpi_request_t *request;
179 smx_process_t waitproc;
181 self = SIMIX_process_self();
182 rank = smpi_mpi_comm_world_rank_self();
184 // make sure root is done before own initialization
185 SIMIX_mutex_lock(init_mutex);
186 if (!smpi_root_ready) {
187 SIMIX_cond_wait(init_cond, init_mutex);
189 SIMIX_mutex_unlock(init_mutex);
191 request_queue = smpi_pending_receive_requests[rank];
192 message_queue = smpi_received_messages[rank];
193 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
194 smpi_receiver_processes[rank] = self;
196 // wait for all nodes to signal initializatin complete
197 SIMIX_mutex_lock(init_mutex);
199 if (smpi_ready_count < 3 * size) {
200 SIMIX_cond_wait(init_cond, init_mutex);
202 SIMIX_cond_broadcast(init_cond);
204 SIMIX_mutex_unlock(init_mutex);
206 SIMIX_mutex_lock(smpi_running_hosts_mutex);
207 running_hosts = smpi_running_hosts;
208 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
210 while (0 < running_hosts) {
212 // FIXME: search for received messages and requests
213 // use stupid algorithm for now
215 if (NULL == request) {
216 SIMIX_process_suspend(self);
218 SIMIX_mutex_lock(request->mutex);
219 memcpy(request->buf, message->buf, request->count * request->type->size);
220 request->src = message->src;
221 reqeust->completed = 1;
223 while (waitproc = xbt_fifo_shift(request->waitlist)) {
224 if (SIMIX_process_is_suspended(waitproc)) {
225 SIMIX_process_resume(waitproc);
229 SIMIX_mutex_unlock(request->mutex);
230 xbt_mallocator_release(smpi_message_mallocator, message);
233 SIMIX_mutex_lock(smpi_running_hosts_mutex);
234 running_hosts = smpi_running_hosts;
235 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
241 int smpi_run_simulation(int argc, char **argv)
243 smx_cond_t cond = NULL;
244 smx_action_t action = NULL;
246 xbt_fifo_t actions_failed = xbt_fifo_new();
247 xbt_fifo_t actions_done = xbt_fifo_new();
249 srand(SMPI_RAND_SEED);
251 SIMIX_global_init(&argc, argv);
253 init_mutex = SIMIX_mutex_init();
254 init_cond = SIMIX_cond_init();
256 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
257 SIMIX_function_register("smpi_sender", smpi_sender);
258 SIMIX_function_register("smpi_receiver", smpi_receiver);
259 SIMIX_create_environment(argv[1]);
260 SIMIX_launch_application(argv[2]);
262 /* Prepare to display some more info when dying on Ctrl-C pressing */
263 //signal(SIGINT, inthandler);
265 /* Clean IO before the run */
269 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
270 while (action = xbt_fifo_pop(actions_failed)) {
271 DEBUG1("** %s failed **", action->name);
272 while (cond = xbt_fifo_pop(action->cond_list)) {
273 SIMIX_cond_broadcast(cond);
275 SIMIX_action_destroy(action);
277 while (action = xbt_fifo_pop(actions_done)) {
278 DEBUG1("** %s done **",action->name);
279 while (cond = xbt_fifo_pop(action->cond_list)) {
280 SIMIX_cond_broadcast(cond);
282 SIMIX_action_destroy(action);
285 xbt_fifo_free(actions_failed);
286 xbt_fifo_free(actions_done);
287 INFO1("simulation time %g", SIMIX_get_clock());
292 void smpi_mpi_land_func(void *x, void *y, void *z)
294 *(int *)z = *(int *)x && *(int *)y;
297 void smpi_mpi_sum_func(void *x, void *y, void *z)
299 *(int *)z = *(int *)x + *(int *)y;
302 smpi_mpi_request_t *smpi_new_request()
304 return xbt_new(smpi_mpi_request_t, 1);
311 smx_process_t process;
316 // initialize some local variables
317 host = SIMIX_host_self();
318 hosts = SIMIX_host_get_table();
319 size = SIMIX_host_get_number();
321 // node 0 sets the globals
322 if (host == hosts[0]) {
325 smpi_sender_processes = xbt_new(smx_process_t, size);
326 smpi_receiver_processes = xbt_new(smx_process_t, size);
329 smpi_running_hosts_mutex = SIMIX_mutex_init();
330 smpi_running_hosts = size;
332 // global communicator
333 smpi_mpi_comm_world.size = size;
334 smpi_mpi_comm_world.barrier = 0;
335 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
336 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
337 smpi_mpi_comm_world.hosts = hosts;
338 smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
339 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
342 smpi_mpi_byte.size = (size_t)1;
343 smpi_mpi_int.size = sizeof(int);
344 smpi_mpi_double.size = sizeof(double);
347 smpi_mpi_land.func = &smpi_mpi_land_func;
348 smpi_mpi_sum.func = &smpi_mpi_sum_func;
351 smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
352 smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, NULL);
353 smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
354 smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
355 smpi_received_messages = xbt_new(xbt_fifo_t, size);
357 for(i = 0; i < size; i++) {
358 smpi_pending_send_requests[i] = xbt_fifo_new();
359 smpi_pending_recv_requests[i] = xbt_fifo_new();
360 smpi_received_messages[i] = xbt_fifo_new();
363 smpi_timer = xbt_os_timer_new();
364 smpi_reference_speed = SMPI_DEFAULT_SPEED;
365 smpi_benchmarking = 0;
366 smpi_benchmarking_mutex = SIMIX_mutex_init();
368 // signal all nodes to perform initialization
369 SIMIX_mutex_lock(init_mutex);
371 SIMIX_cond_broadcast(init_cond);
372 SIMIX_mutex_unlock(init_mutex);
376 // make sure root is done before own initialization
377 SIMIX_mutex_lock(init_mutex);
378 if (!smpi_root_ready) {
379 SIMIX_cond_wait(init_cond, init_mutex);
381 SIMIX_mutex_unlock(init_mutex);
383 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
387 // wait for all nodes to signal initializatin complete
388 SIMIX_mutex_lock(init_mutex);
390 if (smpi_ready_count < 3 * size) {
391 SIMIX_cond_wait(init_cond, init_mutex);
393 SIMIX_cond_broadcast(init_cond);
395 SIMIX_mutex_unlock(init_mutex);
399 void smpi_mpi_finalize()
403 SIMIX_mutex_lock(smpi_running_hosts_mutex);
404 i = --smpi_running_hosts;
405 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
409 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
411 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
412 xbt_fifo_free(smpi_pending_send_requests[i]);
413 xbt_fifo_free(smpi_pending_recv_requests[i]);
414 xbt_fifo_free(smpi_received_messages[i]);
417 xbt_mallocator_free(smpi_request_mallocator);
418 xbt_mallocator_free(smpi_message_mallocator);
419 xbt_free(smpi_pending_send_requests);
420 xbt_free(smpi_pending_recv_requests);
421 xbt_free(smpi_received_messages);
423 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
424 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
425 xbt_free(smpi_mpi_comm_world.processes);
427 xbt_os_timer_free(smpi_timer);
432 void smpi_bench_begin()
434 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
435 smpi_benchmarking = 1;
436 xbt_os_timer_start(smpi_timer);
440 void smpi_bench_end()
444 smx_action_t compute_action;
448 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
449 smpi_benchmarking = 0;
450 xbt_os_timer_stop(smpi_timer);
451 duration = xbt_os_timer_elapsed(smpi_timer);
452 host = SIMIX_host_self();
453 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
454 mutex = SIMIX_mutex_init();
455 cond = SIMIX_cond_init();
456 SIMIX_mutex_lock(mutex);
457 SIMIX_register_condition_to_action(compute_action, cond);
458 SIMIX_register_action_to_condition(compute_action, cond);
459 SIMIX_cond_wait(cond, mutex);
460 SIMIX_mutex_unlock(mutex);
461 SIMIX_mutex_destroy(mutex);
462 SIMIX_cond_destroy(cond);
463 // FIXME: check for success/failure?
467 void smpi_barrier(smpi_mpi_communicator_t *comm) {
469 SIMIX_mutex_lock(comm->barrier_mutex);
472 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
475 SIMIX_cond_broadcast(comm->barrier_cond);
477 SIMIX_mutex_unlock(comm->barrier_mutex);
480 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
483 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
484 if (i >= comm->size) i = -1;
488 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
489 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
491 int retval = MPI_SUCCESS;
496 retval = MPI_ERR_COUNT;
497 } else if (NULL == buf) {
498 retval = MPI_ERR_INTERN;
499 } else if (NULL == datatype) {
500 retval = MPI_ERR_TYPE;
501 } else if (NULL == comm) {
502 retval = MPI_ERR_COMM;
503 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
504 retval = MPI_ERR_RANK;
505 } else if (0 > dst || comm->size <= dst) {
506 retval = MPI_ERR_RANK;
507 } else if (0 > tag) {
508 retval = MPI_ERR_TAG;
510 *request = xbt_mallocator_get(smpi_request_mallocator);
511 (*request)->buf = buf;
512 (*request)->count = count;
513 (*request)->datatype = datatype;
514 (*request)->src = src;
515 (*request)->dst = dst;
516 (*request)->tag = tag;
517 (*request)->comm = comm;
518 (*request)->completed = 0;
519 (*request)->waitlist = NULL;
524 int smpi_isend(smpi_mpi_request_t *request)
526 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
528 xbt_fifo_push(smpi_pending_send_requests[rank], request);
530 if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
531 MSG_process_resume(smpi_sender_processes[rank]);
535 int smpi_irecv(smpi_mpi_request_t *request)
537 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
539 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
541 if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
542 MSG_process_resume(smpi_receiver_processes[rank]);
546 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
550 self = SIMIX_process_self();
552 if (NULL != request) {
553 SIMIX_mutex_lock(request->mutex);
554 if (!request->completed) {
555 xbt_fifo_push(request->waitlist, self);
558 SIMIX_mutex_unlock(request->mutex);
562 if (NULL != status && MPI_STATUS_IGNORE != status) {
563 SIMIX_mutex_lock(request->mutex);
564 status->MPI_SOURCE = request->src;
565 SIMIX_mutex_unlock(request->mutex);
570 // FIXME: move into own file
571 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
579 now = SIMIX_get_clock();
581 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
587 unsigned int smpi_sleep(unsigned int seconds)
592 smx_action_t sleep_action;
595 host = SIMIX_host_self();
596 sleep_action = SIMIX_action_sleep(host, seconds);
597 mutex = SIMIX_mutex_init();
598 cond = SIMIX_cond_init();
599 SIMIX_mutex_lock(mutex);
600 SIMIX_register_condition_to_action(sleep_action, cond);
601 SIMIX_register_action_to_condition(sleep_action, cond);
602 SIMIX_cond_wait(cond, mutex);
603 SIMIX_mutex_unlock(mutex);
604 SIMIX_mutex_destroy(mutex);
605 SIMIX_cond_destroy(cond);
606 // FIXME: check for success/failure?
611 void smpi_exit(int status)
614 SIMIX_mutex_lock(smpi_running_hosts_mutex);
615 smpi_running_hosts--;
616 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
617 SIMIX_process_kill(SIMIX_process_self());