5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
10 xbt_fifo_t *smpi_pending_send_requests = NULL;
11 xbt_fifo_t *smpi_pending_recv_requests = NULL;
12 xbt_fifo_t *smpi_received_messages = NULL;
14 smx_process_t *smpi_sender_processes = NULL;
15 smx_process_t *smpi_receiver_processes = NULL;
17 int smpi_running_hosts = 0;
19 smpi_mpi_communicator_t smpi_mpi_comm_world;
21 smpi_mpi_status_t smpi_mpi_status_ignore;
23 smpi_mpi_datatype_t smpi_mpi_byte;
24 smpi_mpi_datatype_t smpi_mpi_int;
25 smpi_mpi_datatype_t smpi_mpi_double;
27 smpi_mpi_op_t smpi_mpi_land;
28 smpi_mpi_op_t smpi_mpi_sum;
30 static xbt_os_timer_t smpi_timer;
31 static int smpi_benchmarking;
32 static double smpi_reference_speed;
35 smx_mutex_t smpi_running_hosts_mutex = NULL;
36 smx_mutex_t smpi_benchmarking_mutex = NULL;
37 smx_mutex_t init_mutex = NULL;
38 smx_cond_t init_cond = NULL;
40 int smpi_root_ready = 0;
41 int smpi_ready_count = 0;
43 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
45 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm)
50 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
54 for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
59 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
61 return smpi_mpi_comm_rank(comm, SIMIX_host_self());
64 int smpi_sender(int argc, char **argv)
66 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
68 int running_hosts = 0;
69 smpi_mpi_request_t *request;
71 smx_host_t shost, dhost;
72 smx_action_t communicate_action;
74 self = SIMIX_process_self();
75 shost = SIMIX_host_self();
77 // make sure root is done before own initialization
78 SIMIX_mutex_lock(init_mutex);
79 if (!smpi_root_ready) {
80 SIMIX_cond_wait(init_cond, init_mutex);
82 SIMIX_mutex_unlock(init_mutex);
84 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
85 smpi_sender_processes[rank] = self;
87 // wait for all nodes to signal initializatin complete
88 SIMIX_mutex_lock(init_mutex);
90 if (smpi_ready_count < 3 * size) {
91 SIMIX_cond_wait(init_cond, init_mutex);
93 SIMIX_cond_broadcast(init_cond);
95 SIMIX_mutex_unlock(init_mutex);
97 SIMIX_mutex_lock(smpi_running_hosts_mutex);
98 running_hosts = smpi_running_hosts;
99 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
101 while (0 < running_hosts) {
103 request = xbt_fifo_shift(smpi_pending_send_requests[rank]);
105 if (NULL == request) {
106 SIMIX_process_suspend(self);
108 SIMIX_mutex_lock(request->mutex);
110 dhost = request->comm->hosts[request->dst];
112 // FIXME: not at all sure I can assume magic just happens here....
113 communicate_action = SIMIX_action_communicate(shost, dhost,
114 "communication", request->datatype->size * request->count * 1.0, -1.0);
116 SIMIX_register_condition_to_action(communicate_action, request->cond);
117 SIMIX_register_action_to_condition(communicate_action, request->cond);
119 SIMIX_cond_wait(request->cond, request->mutex);
121 // fixme, create new request, copy over to
122 // should be malloc and memcpy
124 SIMIX_mutex_unlock(request->mutex);
127 SIMIX_mutex_lock(smpi_running_hosts_mutex);
128 running_hosts = smpi_running_hosts;
129 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
135 int smpi_receiver(int argc, char **argv)
137 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
140 // make sure root is done before own initialization
141 SIMIX_mutex_lock(init_mutex);
142 if (!smpi_root_ready) {
143 SIMIX_cond_wait(init_cond, init_mutex);
145 SIMIX_mutex_unlock(init_mutex);
147 size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
148 smpi_receiver_processes[rank] = SIMIX_process_self();
150 // wait for all nodes to signal initializatin complete
151 SIMIX_mutex_lock(init_mutex);
153 if (smpi_ready_count < 3 * size) {
154 SIMIX_cond_wait(init_cond, init_mutex);
156 SIMIX_cond_broadcast(init_cond);
158 SIMIX_mutex_unlock(init_mutex);
163 int smpi_run_simulation(int argc, char **argv)
165 smx_cond_t cond = NULL;
166 smx_action_t action = NULL;
168 xbt_fifo_t actions_failed = xbt_fifo_new();
169 xbt_fifo_t actions_done = xbt_fifo_new();
171 srand(SMPI_RAND_SEED);
173 SIMIX_global_init(&argc, argv);
175 init_mutex = SIMIX_mutex_init();
176 init_cond = SIMIX_cond_init();
178 SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
179 SIMIX_function_register("smpi_sender", smpi_sender);
180 SIMIX_function_register("smpi_receiver", smpi_receiver);
181 SIMIX_create_environment(argv[1]);
182 SIMIX_launch_application(argv[2]);
184 /* Prepare to display some more info when dying on Ctrl-C pressing */
185 //signal(SIGINT, inthandler);
187 /* Clean IO before the run */
191 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
192 while (action = xbt_fifo_pop(actions_failed)) {
193 DEBUG1("** %s failed **", action->name);
194 while (cond = xbt_fifo_pop(action->cond_list)) {
195 SIMIX_cond_broadcast(cond);
197 SIMIX_action_destroy(action);
199 while (action = xbt_fifo_pop(actions_done)) {
200 DEBUG1("** %s done **",action->name);
201 while (cond = xbt_fifo_pop(action->cond_list)) {
202 SIMIX_cond_broadcast(cond);
204 SIMIX_action_destroy(action);
207 xbt_fifo_free(actions_failed);
208 xbt_fifo_free(actions_done);
209 INFO1("simulation time %g", SIMIX_get_clock());
214 void smpi_mpi_land_func(void *x, void *y, void *z)
216 *(int *)z = *(int *)x && *(int *)y;
219 void smpi_mpi_sum_func(void *x, void *y, void *z)
221 *(int *)z = *(int *)x + *(int *)y;
228 smx_process_t process;
233 // initialize some local variables
234 host = SIMIX_host_self();
235 hosts = SIMIX_host_get_table();
236 size = SIMIX_host_get_number();
238 // node 0 sets the globals
239 if (host == hosts[0]) {
242 smpi_sender_processes = xbt_new0(smx_process_t, size);
243 smpi_receiver_processes = xbt_new0(smx_process_t, size);
246 smpi_running_hosts_mutex = SIMIX_mutex_init();
247 smpi_running_hosts = size;
249 // global communicator
250 smpi_mpi_comm_world.size = size;
251 smpi_mpi_comm_world.barrier = 0;
252 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
253 smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
254 smpi_mpi_comm_world.hosts = hosts;
255 smpi_mpi_comm_world.processes = xbt_new0(smx_process_t, size);
256 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
259 smpi_mpi_byte.size = (size_t)1;
260 smpi_mpi_int.size = sizeof(int);
261 smpi_mpi_double.size = sizeof(double);
264 smpi_mpi_land.func = &smpi_mpi_land_func;
265 smpi_mpi_sum.func = &smpi_mpi_sum_func;
268 smpi_pending_send_requests = xbt_new0(xbt_fifo_t, size);
269 smpi_pending_recv_requests = xbt_new0(xbt_fifo_t, size);
270 smpi_received_messages = xbt_new0(xbt_fifo_t, size);
272 for(i = 0; i < size; i++) {
273 smpi_pending_send_requests[i] = xbt_fifo_new();
274 smpi_pending_recv_requests[i] = xbt_fifo_new();
275 smpi_received_messages[i] = xbt_fifo_new();
278 smpi_timer = xbt_os_timer_new();
279 smpi_reference_speed = SMPI_DEFAULT_SPEED;
280 smpi_benchmarking = 0;
281 smpi_benchmarking_mutex = SIMIX_mutex_init();
283 // signal all nodes to perform initialization
284 SIMIX_mutex_lock(init_mutex);
286 SIMIX_cond_broadcast(init_cond);
287 SIMIX_mutex_unlock(init_mutex);
291 // make sure root is done before own initialization
292 SIMIX_mutex_lock(init_mutex);
293 if (!smpi_root_ready) {
294 SIMIX_cond_wait(init_cond, init_mutex);
296 SIMIX_mutex_unlock(init_mutex);
298 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
302 // wait for all nodes to signal initializatin complete
303 SIMIX_mutex_lock(init_mutex);
305 if (smpi_ready_count < 3 * size) {
306 SIMIX_cond_wait(init_cond, init_mutex);
308 SIMIX_cond_broadcast(init_cond);
310 SIMIX_mutex_unlock(init_mutex);
314 void smpi_mpi_finalize()
318 SIMIX_mutex_lock(smpi_running_hosts_mutex);
319 i = --smpi_running_hosts;
320 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
324 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
326 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
327 xbt_fifo_free(smpi_pending_send_requests[i]);
328 xbt_fifo_free(smpi_pending_recv_requests[i]);
329 xbt_fifo_free(smpi_received_messages[i]);
332 xbt_free(smpi_pending_send_requests);
333 xbt_free(smpi_pending_recv_requests);
334 xbt_free(smpi_received_messages);
336 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
337 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
338 xbt_free(smpi_mpi_comm_world.processes);
340 xbt_os_timer_free(smpi_timer);
345 void smpi_bench_begin()
347 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
348 smpi_benchmarking = 1;
349 xbt_os_timer_start(smpi_timer);
353 void smpi_bench_end()
357 smx_action_t compute_action;
361 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
362 smpi_benchmarking = 0;
363 xbt_os_timer_stop(smpi_timer);
364 duration = xbt_os_timer_elapsed(smpi_timer);
365 host = SIMIX_host_self();
366 compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
367 mutex = SIMIX_mutex_init();
368 cond = SIMIX_cond_init();
369 SIMIX_mutex_lock(mutex);
370 SIMIX_register_condition_to_action(compute_action, cond);
371 SIMIX_register_action_to_condition(compute_action, cond);
372 SIMIX_cond_wait(cond, mutex);
373 SIMIX_mutex_unlock(mutex);
374 SIMIX_mutex_destroy(mutex);
375 SIMIX_cond_destroy(cond);
376 // FIXME: check for success/failure?
380 void smpi_barrier(smpi_mpi_communicator_t *comm) {
382 SIMIX_mutex_lock(comm->barrier_mutex);
385 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
388 SIMIX_cond_broadcast(comm->barrier_cond);
390 SIMIX_mutex_unlock(comm->barrier_mutex);
393 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
396 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
397 if (i >= comm->size) i = -1;
401 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
402 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
404 int retval = MPI_SUCCESS;
409 retval = MPI_ERR_COUNT;
410 } else if (NULL == buf) {
411 retval = MPI_ERR_INTERN;
412 } else if (NULL == datatype) {
413 retval = MPI_ERR_TYPE;
414 } else if (NULL == comm) {
415 retval = MPI_ERR_COMM;
416 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
417 retval = MPI_ERR_RANK;
418 } else if (0 > dst || comm->size <= dst) {
419 retval = MPI_ERR_RANK;
420 } else if (0 > tag) {
421 retval = MPI_ERR_TAG;
423 *request = xbt_new0(smpi_mpi_request_t, 1);
424 (*request)->buf = buf;
425 (*request)->count = count;
426 (*request)->datatype = datatype;
427 (*request)->src = src;
428 (*request)->dst = dst;
429 (*request)->tag = tag;
430 (*request)->comm = comm;
431 (*request)->completed = 0;
432 (*request)->waitlist = NULL;
437 int smpi_isend(smpi_mpi_request_t *request)
439 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
441 xbt_fifo_push(smpi_pending_send_requests[rank], request);
443 if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
444 MSG_process_resume(smpi_sender_processes[rank]);
448 int smpi_irecv(smpi_mpi_request_t *request)
450 int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
452 xbt_fifo_push(smpi_pending_recv_requests[rank], request);
454 if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
455 MSG_process_resume(smpi_receiver_processes[rank]);
459 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
463 if (NULL != request) {
464 if (!request->completed) {
465 self = SIMIX_process_self();
466 xbt_fifo_push(request->waitlist, self);
467 } SIMIX_suspend(self);
468 if (NULL != status && MPI_STATUS_IGNORE != status) {
469 status->MPI_SOURCE = request->src;
474 // FIXME: move into own file
475 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
483 now = SIMIX_get_clock();
485 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
491 unsigned int smpi_sleep(unsigned int seconds)
496 smx_action_t sleep_action;
499 host = SIMIX_host_self();
500 sleep_action = SIMIX_action_sleep(host, seconds);
501 mutex = SIMIX_mutex_init();
502 cond = SIMIX_cond_init();
503 SIMIX_mutex_lock(mutex);
504 SIMIX_register_condition_to_action(sleep_action, cond);
505 SIMIX_register_action_to_condition(sleep_action, cond);
506 SIMIX_cond_wait(cond, mutex);
507 SIMIX_mutex_unlock(mutex);
508 SIMIX_mutex_destroy(mutex);
509 SIMIX_cond_destroy(cond);
510 // FIXME: check for success/failure?
515 void smpi_exit(int status)
518 SIMIX_mutex_lock(smpi_running_hosts_mutex);
519 smpi_running_hosts--;
520 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
521 SIMIX_process_kill(SIMIX_process_self());