4 #include "simix/simix.h"
5 #include "xbt/sysdep.h"
6 #include "xbt/xbt_portability.h"
9 smpi_mpi_request_t **smpi_pending_send_requests = NULL;
10 smpi_mpi_request_t **smpi_last_pending_send_requests = NULL;
12 smpi_mpi_request_t **smpi_pending_recv_requests = NULL;
13 smpi_mpi_request_t **smpi_last_pending_recv_requests = NULL;
15 smpi_received_t **smpi_received = NULL;
16 smpi_received_t **smpi_last_received = NULL;
18 smx_process_t *smpi_sender_processes = NULL;
19 smx_process_t *smpi_receiver_processes = NULL;
21 int smpi_running_hosts = 0;
23 smpi_mpi_communicator_t smpi_mpi_comm_world;
25 smpi_mpi_status_t smpi_mpi_status_ignore;
27 smpi_mpi_datatype_t smpi_mpi_byte;
28 smpi_mpi_datatype_t smpi_mpi_int;
29 smpi_mpi_datatype_t smpi_mpi_double;
31 smpi_mpi_op_t smpi_mpi_land;
32 smpi_mpi_op_t smpi_mpi_sum;
34 static xbt_os_timer_t smpi_timer;
35 static int smpi_benchmarking;
36 static double smpi_reference;
38 int smpi_run_simulation(int *argc, char **argv) {
39 smx_cond_t cond = NULL;
40 smx_action_t smx_action;
41 xbt_fifo_t actions_done = xbt_fifo_new();
42 xbt_fifo_t actions_failed = xbt_fifo_new();
45 SIMIX_global_init(&argc, argv);
46 SIMIX_function_register("smpi_main", smpi_main);
47 SIMIX_function_register("smpi_sender", smpi_sender);
48 SIMIX_function_register("smpi_receiver", smpi_receiver);
49 SIMIX_create_environment(argv[1]);
50 SIMIX_launch_application(argv[2]);
52 /* Prepare to display some more info when dying on Ctrl-C pressing */
53 signal(SIGINT,inthandler);
55 /* Clean IO before the run */
59 //surf_solve(); /* Takes traces into account. Returns 0.0 */
60 /* xbt_fifo_size(msg_global->process_to_run) */
62 while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
64 while ( (smx_action = xbt_fifo_pop(actions_failed)) ) {
67 DEBUG1("** %s failed **",smx_action->name);
68 while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) {
69 SIMIX_cond_broadcast(cond);
71 /* action finished, destroy it */
72 // SIMIX_action_destroy(smx_action);
75 while ( (smx_action = xbt_fifo_pop(actions_done)) ) {
77 DEBUG1("** %s done **",smx_action->name);
78 while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) {
79 SIMIX_cond_broadcast(cond);
81 /* action finished, destroy it */
82 //SIMIX_action_destroy(smx_action);
85 xbt_fifo_free(actions_failed);
86 xbt_fifo_free(actions_done);
87 INFO1("simulation time %g", SIMIX_get_clock());
92 void smpi_mpi_land_func(void *x, void *y, void *z) {
93 *(int *)z = *(int *)x && *(int *)y;
96 void smpi_mpi_sum_func(void *x, void *y, void *z) {
97 *(int *)z = *(int *)x + *(int *)y;
100 void smpi_mpi_init() {
108 // will eventually need mutex
109 smpi_running_hosts++;
111 // initialize some local variables
112 size = SIMIX_host_get_number();
113 host = SIMIX_host_self();
114 hosts = SIMIX_host_get_table();
115 for(i = 0; i < size && host != hosts[i]; i++);
118 // node 0 sets the globals
121 // global communicator
122 smpi_mpi_comm_world.id = 0;
123 smpi_mpi_comm_world.size = size;
124 smpi_mpi_comm_world.barrier = 0;
125 smpi_mpi_comm_world.hosts = hosts;
126 smpi_mpi_comm_world.processes = xbt_malloc(sizeof(m_process_t) * size);
127 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
130 smpi_mpi_byte.size = (size_t)1;
131 smpi_mpi_int.size = sizeof(int);
132 smpi_mpi_double.size = sizeof(double);
135 smpi_mpi_land.func = &smpi_mpi_land_func;
136 smpi_mpi_sum.func = &smpi_mpi_sum_func;
139 smpi_pending_send_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
140 smpi_last_pending_send_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
141 smpi_pending_recv_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
142 smpi_last_pending_recv_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
143 smpi_received = xbt_malloc(sizeof(smpi_received_t*) * size);
144 smpi_last_received = xbt_malloc(sizeof(smpi_received_t*) * size);
145 smpi_sender_processes = xbt_malloc(sizeof(m_process_t) * size);
146 smpi_receiver_processes = xbt_malloc(sizeof(m_process_t) * size);
147 for(i = 0; i < size; i++) {
148 smpi_pending_send_requests[i] = NULL;
149 smpi_last_pending_send_requests[i] = NULL;
150 smpi_pending_recv_requests[i] = NULL;
151 smpi_last_pending_recv_requests[i] = NULL;
152 smpi_received[i] = NULL;
153 smpi_last_received[i] = NULL;
155 smpi_timer = xbt_os_timer_new();
156 smpi_reference = DEFAULT_POWER;
157 smpi_benchmarking = 0;
159 // tell send/recv nodes to begin
160 for(i = 0; i < size; i++) {
161 mtask = MSG_task_create("READY", 0, 0, NULL);
162 MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
164 MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
165 MSG_task_destroy(mtask);
166 mtask = MSG_task_create("READY", 0, 0, NULL);
167 MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
169 MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
170 MSG_task_destroy(mtask);
174 for(i = 1; i < size; i++) {
175 mtask = MSG_task_create("READY", 0, 0, NULL);
176 MSG_task_put(mtask, hosts[i], MPI_PORT);
180 // everyone needs to wait for node 0 to finish
182 MSG_task_get(&mtask, MPI_PORT);
183 MSG_task_destroy(mtask);
184 smpi_mpi_comm_world.processes[rank] = SIMIX_process_self();
187 // now that mpi_comm_world_processes is set, it's safe to set a barrier
188 smpi_barrier(&smpi_mpi_comm_world);
191 void smpi_mpi_finalize() {
193 smpi_running_hosts--;
194 if (0 <= smpi_running_hosts) {
195 for(i = 0; i < smpi_mpi_comm_world.size; i++) {
196 if(SIMIX_process_is_suspended(smpi_sender_processes[i])) {
197 SIMIX_process_resume(smpi_sender_processes[i]);
199 if(SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
200 SIMIX_process_resume(smpi_receiver_processes[i]);
204 xbt_free(smpi_mpi_comm_world.processes);
205 xbt_free(smpi_pending_send_requests);
206 xbt_free(smpi_last_pending_send_requests);
207 xbt_free(smpi_pending_recv_requests);
208 xbt_free(smpi_last_pending_recv_requests);
209 xbt_free(smpi_received);
210 xbt_free(smpi_last_received);
211 xbt_free(smpi_sender_processes);
212 xbt_free(smpi_receiver_processes);
213 xbt_os_timer_free(smpi_timer);
217 void smpi_complete(smpi_mpi_request_t *request) {
218 smpi_waitlist_node_t *current, *next;
219 request->completed = 1;
220 request->next = NULL;
221 current = request->waitlist;
222 while(NULL != current) {
223 if(SIMIX_process_is_suspended(current->process)) {
224 SIMIX_process_resume(current->process);
226 next = current->next;
230 request->waitlist = NULL;
233 int smpi_host_rank_self() {
234 return smpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
237 void smpi_isend(smpi_mpi_request_t *sendreq) {
238 int rank = smpi_host_rank_self();
239 if (NULL == smpi_last_pending_send_requests[rank]) {
240 smpi_pending_send_requests[rank] = sendreq;
242 smpi_last_pending_send_requests[rank]->next = sendreq;
244 smpi_last_pending_send_requests[rank] = sendreq;
245 if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
246 SIMIX_process_resume(smpi_sender_processes[rank]);
250 void smpi_match_requests(int rank) {
251 smpi_mpi_request_t *frequest, *prequest, *crequest;
252 smpi_received_t *freceived, *preceived, *creceived;
255 frequest = smpi_pending_recv_requests[rank];
258 while(NULL != crequest) {
259 freceived = smpi_received[rank];
261 creceived = freceived;
263 while(NULL != creceived && !match) {
264 if(crequest->comm->id == creceived->commid &&
265 (MPI_ANY_SOURCE == crequest->src || crequest->src == creceived->src) &&
266 crequest->tag == creceived->tag) {
271 // pull the request from the queue
272 if(NULL == prequest) {
273 frequest = crequest->next;
274 smpi_pending_recv_requests[rank] = frequest;
276 prequest->next = crequest->next;
278 if(crequest == smpi_last_pending_recv_requests[rank]) {
279 smpi_last_pending_recv_requests[rank] = prequest;
282 // pull the received data from the queue
283 if(NULL == preceived) {
284 freceived = creceived->next;
285 smpi_received[rank] = freceived;
287 preceived->next = creceived->next;
289 if(creceived == smpi_last_received[rank]) {
290 smpi_last_received[rank] = preceived;
293 // for when request->src is any source
294 crequest->src = creceived->src;
296 // calculate data size
297 dsize = crequest->count * crequest->datatype->size;
299 // copy data to buffer
300 memcpy(crequest->buf, creceived->data, dsize);
303 crequest->fwdthrough = creceived->fwdthrough;
305 // get rid of received data node, no longer needed
306 xbt_free(creceived->data);
309 if (crequest->fwdthrough == rank) {
310 smpi_complete(crequest);
312 crequest->src = rank;
313 crequest->dst = (rank + 1) % crequest->comm->size;
314 smpi_isend(crequest);
318 preceived = creceived;
319 creceived = creceived->next;
323 crequest = crequest->next;
327 void smpi_bench_begin() {
328 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
329 smpi_benchmarking = 1;
330 xbt_os_timer_start(smpi_timer);
334 void smpi_bench_end() {
335 m_task_t ctask = NULL;
337 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
338 smpi_benchmarking = 0;
339 xbt_os_timer_stop(smpi_timer);
340 duration = xbt_os_timer_elapsed(smpi_timer);
341 ctask = MSG_task_create("computation", duration * smpi_reference, 0 , NULL);
342 MSG_task_execute(ctask);
343 MSG_task_destroy(ctask);
347 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
348 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request) {
349 int retval = MPI_SUCCESS;
351 if (NULL == buf && 0 < count) {
352 retval = MPI_ERR_INTERN;
353 } else if (0 > count) {
354 retval = MPI_ERR_COUNT;
355 } else if (NULL == datatype) {
356 retval = MPI_ERR_TYPE;
357 } else if (NULL == comm) {
358 retval = MPI_ERR_COMM;
359 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
360 retval = MPI_ERR_RANK;
361 } else if (0 > dst || comm->size <= dst) {
362 retval = MPI_ERR_RANK;
363 } else if (0 > tag) {
364 retval = MPI_ERR_TAG;
366 *request = xbt_malloc(sizeof(smpi_mpi_request_t));
367 (*request)->buf = buf;
368 (*request)->count = count;
369 (*request)->datatype = datatype;
370 (*request)->src = src;
371 (*request)->dst = dst;
372 (*request)->tag = tag;
373 (*request)->comm = comm;
374 (*request)->completed = 0;
375 (*request)->fwdthrough = dst;
376 (*request)->waitlist = NULL;
377 (*request)->next = NULL;
382 void smpi_barrier(smpi_mpi_communicator_t *comm) {
385 if(comm->barrier < comm->size) {
386 SIMIX_process_suspend(SIMIX_process_self());
389 for(i = 0; i < comm->size; i++) {
390 if (SIMIX_process_is_suspended(comm->processes[i])) {
391 SIMIX_process_resume(comm->processes[i]);
397 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host) {
399 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
400 if (i >= comm->size) i = -1;
404 void smpi_irecv(smpi_mpi_request_t *recvreq) {
405 int rank = smpi_host_rank_self();
406 if (NULL == smpi_pending_recv_requests[rank]) {
407 smpi_pending_recv_requests[rank] = recvreq;
408 } else if (NULL != smpi_last_pending_recv_requests[rank]) {
409 smpi_last_pending_recv_requests[rank]->next = recvreq;
410 } else { // can't happen!
411 fprintf(stderr, "smpi_pending_recv_requests not null while smpi_last_pending_recv_requests null!\n");
413 smpi_last_pending_recv_requests[rank] = recvreq;
414 smpi_match_requests(rank);
415 if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
416 SIMIX_process_resume(smpi_receiver_processes[rank]);
420 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) {
421 smpi_waitlist_node_t *waitnode, *current;
422 if (NULL != request) {
423 if (!request->completed) {
424 waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
425 waitnode->process = SIMIX_process_self();
426 waitnode->next = NULL;
427 if (NULL == request->waitlist) {
428 request->waitlist = waitnode;
430 for(current = request->waitlist; NULL != current->next; current = current->next);
431 current->next = waitnode;
433 SIMIX_process_suspend(waitnode->process);
435 if (NULL != status && MPI_STATUS_IGNORE != status) {
436 status->MPI_SOURCE = request->src;
441 void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses) {
443 for (i = 0; i < count; i++) {
444 smpi_wait(requests[i], &statuses[i]);
448 void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests) {
450 for (i = 0; i < count; i++) {
451 smpi_wait(requests[i], MPI_STATUS_IGNORE);
455 int smpi_sender(int argc, char *argv[]) {
456 smx_process_t process;
463 smpi_mpi_request_t *sendreq;
465 process = SIMIX_process_self();
469 MSG_task_get(&mtask, SEND_SYNC_PORT);
471 rank = smpi_host_rank_self();
473 smpi_sender_processes[rank] = process;
476 MSG_task_put(mtask, MSG_task_get_source(mtask), SEND_SYNC_PORT);
478 while (0 < smpi_running_hosts) {
479 sendreq = smpi_pending_send_requests[rank];
480 if (NULL != sendreq) {
482 // pull from queue if not a fwd or no more to fwd
483 if (sendreq->dst == sendreq->fwdthrough) {
484 smpi_pending_send_requests[rank] = sendreq->next;
485 if(sendreq == smpi_last_pending_send_requests[rank]) {
486 smpi_last_pending_send_requests[rank] = NULL;
490 fc = ((sendreq->fwdthrough - sendreq->dst + sendreq->comm->size) % sendreq->comm->size) / 2;
491 ft = (sendreq->dst + fc) % sendreq->comm->size;
492 //printf("node %d sending broadcast to node %d through node %d\n", rank, sendreq->dst, ft);
495 // create task to send
496 sprintf(taskname, "comm:%d,src:%d,dst:%d,tag:%d,ft:%d", sendreq->comm->id, sendreq->src, sendreq->dst, sendreq->tag, ft);
497 dsize = sendreq->count * sendreq->datatype->size;
498 data = xbt_malloc(dsize);
499 memcpy(data, sendreq->buf, dsize);
500 mtask = MSG_task_create(taskname, 0, dsize, data);
502 // figure out which host to send it to
503 dhost = sendreq->comm->hosts[sendreq->dst];
507 printf("host %s attempting to send to host %s\n", SIMIX_host_get_name(SIMIX_host_self()), SIMIX_host_get_name(dhost));
509 MSG_task_put(mtask, dhost, MPI_PORT);
511 if (sendreq->dst == sendreq->fwdthrough) {
512 smpi_complete(sendreq);
514 sendreq->dst = (sendreq->dst + fc + 1) % sendreq->comm->size;
518 SIMIX_process_suspend(process);
524 int smpi_receiver(int argc, char **argv) {
525 smx_process_t process;
527 smpi_received_t *received;
529 smpi_mpi_request_t *recvreq;
531 process = SIMIX_process_self();
535 MSG_task_get(&mtask, RECV_SYNC_PORT);
537 rank = smpi_host_rank_self();
539 // potential race condition...
540 smpi_receiver_processes[rank] = process;
543 MSG_task_put(mtask, MSG_task_get_source(mtask), RECV_SYNC_PORT);
545 while (0 < smpi_running_hosts) {
546 recvreq = smpi_pending_recv_requests[rank];
547 if (NULL != recvreq) {
551 printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
552 SIMIX_host_get_name(SIMIX_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
554 MSG_task_get(&mtask, MPI_PORT);
556 received = xbt_malloc(sizeof(smpi_received_t));
558 sscanf(MSG_task_get_name(mtask), "comm:%d,src:%d,dst:%d,tag:%d,ft:%d",
559 &received->commid, &received->src, &received->dst, &received->tag, &received->fwdthrough);
560 received->data = MSG_task_get_data(mtask);
561 received->next = NULL;
563 if (NULL == smpi_last_received[rank]) {
564 smpi_received[rank] = received;
566 smpi_last_received[rank]->next = received;
568 smpi_last_received[rank] = received;
570 MSG_task_destroy(mtask);
572 smpi_match_requests(rank);
575 SIMIX_process_suspend(process);
581 // FIXME: move into own file
582 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) {
589 now = SIMIX_get_clock();
591 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
597 unsigned int smpi_sleep(unsigned int seconds) {
598 m_task_t task = NULL;
600 task = MSG_task_create("sleep", seconds * DEFAULT_POWER, 0, NULL);
601 MSG_task_execute(task);
602 MSG_task_destroy(task);
607 void smpi_exit(int status) {
609 smpi_running_hosts--;
610 SIMIX_process_kill(SIMIX_process_self());