4 #include "simix/simix.h"
5 #include "xbt/sysdep.h"
6 #include "xbt/xbt_portability.h"
9 smpi_mpi_request_t **smpi_pending_send_requests = NULL;
10 smpi_mpi_request_t **smpi_last_pending_send_requests = NULL;
12 smpi_mpi_request_t **smpi_pending_recv_requests = NULL;
13 smpi_mpi_request_t **smpi_last_pending_recv_requests = NULL;
15 smpi_received_t **smpi_received = NULL;
16 smpi_received_t **smpi_last_received = NULL;
18 smx_process_t *smpi_sender_processes = NULL;
19 smx_process_t *smpi_receiver_processes = NULL;
21 int smpi_running_hosts = 0;
23 smpi_mpi_communicator_t smpi_mpi_comm_world;
25 smpi_mpi_status_t smpi_mpi_status_ignore;
27 smpi_mpi_datatype_t smpi_mpi_byte;
28 smpi_mpi_datatype_t smpi_mpi_int;
29 smpi_mpi_datatype_t smpi_mpi_double;
31 smpi_mpi_op_t smpi_mpi_land;
32 smpi_mpi_op_t smpi_mpi_sum;
34 static xbt_os_timer_t smpi_timer;
35 static int smpi_benchmarking;
36 static double smpi_reference;
38 void smpi_mpi_land_func(void *x, void *y, void *z) {
39 *(int *)z = *(int *)x && *(int *)y;
42 void smpi_mpi_sum_func(void *x, void *y, void *z) {
43 *(int *)z = *(int *)x + *(int *)y;
46 void smpi_mpi_init() {
54 // will eventually need mutex
57 // initialize some local variables
58 size = SIMIX_host_get_number();
59 host = SIMIX_host_self();
60 hosts = SIMIX_host_get_table();
61 for(i = 0; i < size && host != hosts[i]; i++);
64 // node 0 sets the globals
67 // global communicator
68 smpi_mpi_comm_world.id = 0;
69 smpi_mpi_comm_world.size = size;
70 smpi_mpi_comm_world.barrier = 0;
71 smpi_mpi_comm_world.hosts = hosts;
72 smpi_mpi_comm_world.processes = xbt_malloc(sizeof(m_process_t) * size);
73 smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
76 smpi_mpi_byte.size = (size_t)1;
77 smpi_mpi_int.size = sizeof(int);
78 smpi_mpi_double.size = sizeof(double);
81 smpi_mpi_land.func = &smpi_mpi_land_func;
82 smpi_mpi_sum.func = &smpi_mpi_sum_func;
85 smpi_pending_send_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
86 smpi_last_pending_send_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
87 smpi_pending_recv_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
88 smpi_last_pending_recv_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
89 smpi_received = xbt_malloc(sizeof(smpi_received_t*) * size);
90 smpi_last_received = xbt_malloc(sizeof(smpi_received_t*) * size);
91 smpi_sender_processes = xbt_malloc(sizeof(m_process_t) * size);
92 smpi_receiver_processes = xbt_malloc(sizeof(m_process_t) * size);
93 for(i = 0; i < size; i++) {
94 smpi_pending_send_requests[i] = NULL;
95 smpi_last_pending_send_requests[i] = NULL;
96 smpi_pending_recv_requests[i] = NULL;
97 smpi_last_pending_recv_requests[i] = NULL;
98 smpi_received[i] = NULL;
99 smpi_last_received[i] = NULL;
101 smpi_timer = xbt_os_timer_new();
102 smpi_reference = DEFAULT_POWER;
103 smpi_benchmarking = 0;
105 // tell send/recv nodes to begin
106 for(i = 0; i < size; i++) {
107 mtask = MSG_task_create("READY", 0, 0, NULL);
108 MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
110 MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
111 MSG_task_destroy(mtask);
112 mtask = MSG_task_create("READY", 0, 0, NULL);
113 MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
115 MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
116 MSG_task_destroy(mtask);
120 for(i = 1; i < size; i++) {
121 mtask = MSG_task_create("READY", 0, 0, NULL);
122 MSG_task_put(mtask, hosts[i], MPI_PORT);
126 // everyone needs to wait for node 0 to finish
128 MSG_task_get(&mtask, MPI_PORT);
129 MSG_task_destroy(mtask);
130 smpi_mpi_comm_world.processes[rank] = SIMIX_process_self();
133 // now that mpi_comm_world_processes is set, it's safe to set a barrier
134 smpi_barrier(&smpi_mpi_comm_world);
137 void smpi_mpi_finalize() {
139 smpi_running_hosts--;
140 if (0 <= smpi_running_hosts) {
141 for(i = 0; i < smpi_mpi_comm_world.size; i++) {
142 if(SIMIX_process_is_suspended(smpi_sender_processes[i])) {
143 SIMIX_process_resume(smpi_sender_processes[i]);
145 if(SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
146 SIMIX_process_resume(smpi_receiver_processes[i]);
150 xbt_free(smpi_mpi_comm_world.processes);
151 xbt_free(smpi_pending_send_requests);
152 xbt_free(smpi_last_pending_send_requests);
153 xbt_free(smpi_pending_recv_requests);
154 xbt_free(smpi_last_pending_recv_requests);
155 xbt_free(smpi_received);
156 xbt_free(smpi_last_received);
157 xbt_free(smpi_sender_processes);
158 xbt_free(smpi_receiver_processes);
159 xbt_os_timer_free(smpi_timer);
163 void smpi_complete(smpi_mpi_request_t *request) {
164 smpi_waitlist_node_t *current, *next;
165 request->completed = 1;
166 request->next = NULL;
167 current = request->waitlist;
168 while(NULL != current) {
169 if(SIMIX_process_is_suspended(current->process)) {
170 SIMIX_process_resume(current->process);
172 next = current->next;
176 request->waitlist = NULL;
179 int smpi_host_rank_self() {
180 return smpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
183 void smpi_isend(smpi_mpi_request_t *sendreq) {
184 int rank = smpi_host_rank_self();
185 if (NULL == smpi_last_pending_send_requests[rank]) {
186 smpi_pending_send_requests[rank] = sendreq;
188 smpi_last_pending_send_requests[rank]->next = sendreq;
190 smpi_last_pending_send_requests[rank] = sendreq;
191 if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
192 SIMIX_process_resume(smpi_sender_processes[rank]);
196 void smpi_match_requests(int rank) {
197 smpi_mpi_request_t *frequest, *prequest, *crequest;
198 smpi_received_t *freceived, *preceived, *creceived;
201 frequest = smpi_pending_recv_requests[rank];
204 while(NULL != crequest) {
205 freceived = smpi_received[rank];
207 creceived = freceived;
209 while(NULL != creceived && !match) {
210 if(crequest->comm->id == creceived->commid &&
211 (MPI_ANY_SOURCE == crequest->src || crequest->src == creceived->src) &&
212 crequest->tag == creceived->tag) {
217 // pull the request from the queue
218 if(NULL == prequest) {
219 frequest = crequest->next;
220 smpi_pending_recv_requests[rank] = frequest;
222 prequest->next = crequest->next;
224 if(crequest == smpi_last_pending_recv_requests[rank]) {
225 smpi_last_pending_recv_requests[rank] = prequest;
228 // pull the received data from the queue
229 if(NULL == preceived) {
230 freceived = creceived->next;
231 smpi_received[rank] = freceived;
233 preceived->next = creceived->next;
235 if(creceived == smpi_last_received[rank]) {
236 smpi_last_received[rank] = preceived;
239 // for when request->src is any source
240 crequest->src = creceived->src;
242 // calculate data size
243 dsize = crequest->count * crequest->datatype->size;
245 // copy data to buffer
246 memcpy(crequest->buf, creceived->data, dsize);
249 crequest->fwdthrough = creceived->fwdthrough;
251 // get rid of received data node, no longer needed
252 xbt_free(creceived->data);
255 if (crequest->fwdthrough == rank) {
256 smpi_complete(crequest);
258 crequest->src = rank;
259 crequest->dst = (rank + 1) % crequest->comm->size;
260 smpi_isend(crequest);
264 preceived = creceived;
265 creceived = creceived->next;
269 crequest = crequest->next;
273 void smpi_bench_begin() {
274 xbt_assert0(!smpi_benchmarking, "Already benchmarking");
275 smpi_benchmarking = 1;
276 xbt_os_timer_start(smpi_timer);
280 void smpi_bench_end() {
281 m_task_t ctask = NULL;
283 xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
284 smpi_benchmarking = 0;
285 xbt_os_timer_stop(smpi_timer);
286 duration = xbt_os_timer_elapsed(smpi_timer);
287 ctask = MSG_task_create("computation", duration * smpi_reference, 0 , NULL);
288 MSG_task_execute(ctask);
289 MSG_task_destroy(ctask);
293 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
294 int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request) {
295 int retval = MPI_SUCCESS;
297 if (NULL == buf && 0 < count) {
298 retval = MPI_ERR_INTERN;
299 } else if (0 > count) {
300 retval = MPI_ERR_COUNT;
301 } else if (NULL == datatype) {
302 retval = MPI_ERR_TYPE;
303 } else if (NULL == comm) {
304 retval = MPI_ERR_COMM;
305 } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
306 retval = MPI_ERR_RANK;
307 } else if (0 > dst || comm->size <= dst) {
308 retval = MPI_ERR_RANK;
309 } else if (0 > tag) {
310 retval = MPI_ERR_TAG;
312 *request = xbt_malloc(sizeof(smpi_mpi_request_t));
313 (*request)->buf = buf;
314 (*request)->count = count;
315 (*request)->datatype = datatype;
316 (*request)->src = src;
317 (*request)->dst = dst;
318 (*request)->tag = tag;
319 (*request)->comm = comm;
320 (*request)->completed = 0;
321 (*request)->fwdthrough = dst;
322 (*request)->waitlist = NULL;
323 (*request)->next = NULL;
328 void smpi_barrier(smpi_mpi_communicator_t *comm) {
331 if(comm->barrier < comm->size) {
332 SIMIX_process_suspend(SIMIX_process_self());
335 for(i = 0; i < comm->size; i++) {
336 if (SIMIX_process_is_suspended(comm->processes[i])) {
337 SIMIX_process_resume(comm->processes[i]);
343 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host) {
345 for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
346 if (i >= comm->size) i = -1;
350 void smpi_irecv(smpi_mpi_request_t *recvreq) {
351 int rank = smpi_host_rank_self();
352 if (NULL == smpi_pending_recv_requests[rank]) {
353 smpi_pending_recv_requests[rank] = recvreq;
354 } else if (NULL != smpi_last_pending_recv_requests[rank]) {
355 smpi_last_pending_recv_requests[rank]->next = recvreq;
356 } else { // can't happen!
357 fprintf(stderr, "smpi_pending_recv_requests not null while smpi_last_pending_recv_requests null!\n");
359 smpi_last_pending_recv_requests[rank] = recvreq;
360 smpi_match_requests(rank);
361 if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
362 SIMIX_process_resume(smpi_receiver_processes[rank]);
366 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) {
367 smpi_waitlist_node_t *waitnode, *current;
368 if (NULL != request) {
369 if (!request->completed) {
370 waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
371 waitnode->process = SIMIX_process_self();
372 waitnode->next = NULL;
373 if (NULL == request->waitlist) {
374 request->waitlist = waitnode;
376 for(current = request->waitlist; NULL != current->next; current = current->next);
377 current->next = waitnode;
379 SIMIX_process_suspend(waitnode->process);
381 if (NULL != status && MPI_STATUS_IGNORE != status) {
382 status->MPI_SOURCE = request->src;
387 void smpi_wait_all(int count, smpi_mpi_request_t **requests, smpi_mpi_status_t *statuses) {
389 for (i = 0; i < count; i++) {
390 smpi_wait(requests[i], &statuses[i]);
394 void smpi_wait_all_nostatus(int count, smpi_mpi_request_t **requests) {
396 for (i = 0; i < count; i++) {
397 smpi_wait(requests[i], MPI_STATUS_IGNORE);
401 int smpi_sender(int argc, char *argv[]) {
402 smx_process_t process;
409 smpi_mpi_request_t *sendreq;
411 process = SIMIX_process_self();
415 MSG_task_get(&mtask, SEND_SYNC_PORT);
417 rank = smpi_host_rank_self();
419 smpi_sender_processes[rank] = process;
422 MSG_task_put(mtask, MSG_task_get_source(mtask), SEND_SYNC_PORT);
424 while (0 < smpi_running_hosts) {
425 sendreq = smpi_pending_send_requests[rank];
426 if (NULL != sendreq) {
428 // pull from queue if not a fwd or no more to fwd
429 if (sendreq->dst == sendreq->fwdthrough) {
430 smpi_pending_send_requests[rank] = sendreq->next;
431 if(sendreq == smpi_last_pending_send_requests[rank]) {
432 smpi_last_pending_send_requests[rank] = NULL;
436 fc = ((sendreq->fwdthrough - sendreq->dst + sendreq->comm->size) % sendreq->comm->size) / 2;
437 ft = (sendreq->dst + fc) % sendreq->comm->size;
438 //printf("node %d sending broadcast to node %d through node %d\n", rank, sendreq->dst, ft);
441 // create task to send
442 sprintf(taskname, "comm:%d,src:%d,dst:%d,tag:%d,ft:%d", sendreq->comm->id, sendreq->src, sendreq->dst, sendreq->tag, ft);
443 dsize = sendreq->count * sendreq->datatype->size;
444 data = xbt_malloc(dsize);
445 memcpy(data, sendreq->buf, dsize);
446 mtask = MSG_task_create(taskname, 0, dsize, data);
448 // figure out which host to send it to
449 dhost = sendreq->comm->hosts[sendreq->dst];
453 printf("host %s attempting to send to host %s\n", SIMIX_host_get_name(SIMIX_host_self()), SIMIX_host_get_name(dhost));
455 MSG_task_put(mtask, dhost, MPI_PORT);
457 if (sendreq->dst == sendreq->fwdthrough) {
458 smpi_complete(sendreq);
460 sendreq->dst = (sendreq->dst + fc + 1) % sendreq->comm->size;
464 SIMIX_process_suspend(process);
470 int smpi_receiver(int argc, char **argv) {
471 smx_process_t process;
473 smpi_received_t *received;
475 smpi_mpi_request_t *recvreq;
477 process = SIMIX_process_self();
481 MSG_task_get(&mtask, RECV_SYNC_PORT);
483 rank = smpi_host_rank_self();
485 // potential race condition...
486 smpi_receiver_processes[rank] = process;
489 MSG_task_put(mtask, MSG_task_get_source(mtask), RECV_SYNC_PORT);
491 while (0 < smpi_running_hosts) {
492 recvreq = smpi_pending_recv_requests[rank];
493 if (NULL != recvreq) {
497 printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
498 SIMIX_host_get_name(SIMIX_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
500 MSG_task_get(&mtask, MPI_PORT);
502 received = xbt_malloc(sizeof(smpi_received_t));
504 sscanf(MSG_task_get_name(mtask), "comm:%d,src:%d,dst:%d,tag:%d,ft:%d",
505 &received->commid, &received->src, &received->dst, &received->tag, &received->fwdthrough);
506 received->data = MSG_task_get_data(mtask);
507 received->next = NULL;
509 if (NULL == smpi_last_received[rank]) {
510 smpi_received[rank] = received;
512 smpi_last_received[rank]->next = received;
514 smpi_last_received[rank] = received;
516 MSG_task_destroy(mtask);
518 smpi_match_requests(rank);
521 SIMIX_process_suspend(process);
527 // FIXME: move into own file
528 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz) {
535 now = SIMIX_get_clock();
537 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
543 unsigned int smpi_sleep(unsigned int seconds) {
544 m_task_t task = NULL;
546 task = MSG_task_create("sleep", seconds * DEFAULT_POWER, 0, NULL);
547 MSG_task_execute(task);
548 MSG_task_destroy(task);
553 void smpi_exit(int status) {
555 smpi_running_hosts--;
556 SIMIX_process_kill(SIMIX_process_self());