1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
13 "Logging specific to SMPI (base)");
15 static int match_recv(void* a, void* b, smx_action_t ignored) {
16 MPI_Request ref = (MPI_Request)a;
17 MPI_Request req = (MPI_Request)b;
19 xbt_assert(ref, "Cannot match recv against null reference");
20 xbt_assert(req, "Cannot match recv against null request");
21 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
22 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
25 static int match_send(void* a, void* b,smx_action_t ignored) {
26 MPI_Request ref = (MPI_Request)a;
27 MPI_Request req = (MPI_Request)b;
29 xbt_assert(ref, "Cannot match send against null reference");
30 xbt_assert(req, "Cannot match send against null request");
31 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
32 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
35 static MPI_Request build_request(void *buf, int count,
36 MPI_Datatype datatype, int src, int dst,
37 int tag, MPI_Comm comm, unsigned flags)
41 request = xbt_new(s_smpi_mpi_request_t, 1);
43 // FIXME: this will have to be changed to support non-contiguous datatypes
44 request->size = smpi_datatype_size(datatype) * count;
49 request->action = NULL;
50 request->flags = flags;
58 void smpi_action_trace_run(char *path)
62 xbt_dict_cursor_t cursor;
66 action_fp = fopen(path, "r");
67 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
71 if (!xbt_dict_is_empty(action_queues)) {
73 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
76 xbt_dict_foreach(action_queues, cursor, name, todo) {
77 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
83 xbt_dict_free(&action_queues);
84 action_queues = xbt_dict_new_homogeneous(NULL);
87 static void smpi_mpi_request_free_voidp(void* request)
89 MPI_Request req = request;
90 smpi_mpi_request_free(&req);
93 /* MPI Low level calls */
94 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
95 int dst, int tag, MPI_Comm comm)
98 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
99 comm, PERSISTENT | SEND);
104 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
105 int src, int tag, MPI_Comm comm)
107 MPI_Request request =
108 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
109 comm, PERSISTENT | RECV);
114 void smpi_mpi_start(MPI_Request request)
119 xbt_assert(!request->action,
120 "Cannot (re)start a non-finished communication");
121 if(request->flags & RECV) {
122 print_request("New recv", request);
123 mailbox = smpi_process_mailbox();
124 // FIXME: SIMIX does not yet support non-contiguous datatypes
125 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
127 print_request("New send", request);
128 mailbox = smpi_process_remote_mailbox(
129 smpi_group_index(smpi_comm_group(request->comm), request->dst));
130 // FIXME: SIMIX does not yet support non-contiguous datatypes
132 if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
133 void *oldbuf = request->buf;
135 request->buf = malloc(request->size);
136 memcpy(request->buf,oldbuf,request->size);
137 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
139 XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
142 simcall_comm_isend(mailbox, request->size, -1.0,
143 request->buf, request->size,
145 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
147 // detach if msg size < eager/rdv switch limit
151 /* FIXME: detached sends are not traceable (request->action == NULL) */
153 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
158 void smpi_mpi_startall(int count, MPI_Request * requests)
162 for(i = 0; i < count; i++) {
163 smpi_mpi_start(requests[i]);
167 void smpi_mpi_request_free(MPI_Request * request)
170 *request = MPI_REQUEST_NULL;
173 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
174 int dst, int tag, MPI_Comm comm)
176 MPI_Request request =
177 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
178 comm, NON_PERSISTENT | SEND);
183 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
184 int dst, int tag, MPI_Comm comm)
186 MPI_Request request =
187 smpi_isend_init(buf, count, datatype, dst, tag, comm);
189 smpi_mpi_start(request);
193 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
194 int src, int tag, MPI_Comm comm)
196 MPI_Request request =
197 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
198 comm, NON_PERSISTENT | RECV);
203 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
204 int src, int tag, MPI_Comm comm)
206 MPI_Request request =
207 smpi_irecv_init(buf, count, datatype, src, tag, comm);
209 smpi_mpi_start(request);
213 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
214 int tag, MPI_Comm comm, MPI_Status * status)
218 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
219 smpi_mpi_wait(&request, status);
222 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
223 int tag, MPI_Comm comm)
227 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
228 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
231 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
232 int dst, int sendtag, void *recvbuf, int recvcount,
233 MPI_Datatype recvtype, int src, int recvtag,
234 MPI_Comm comm, MPI_Status * status)
236 MPI_Request requests[2];
240 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
242 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
243 smpi_mpi_startall(2, requests);
244 smpi_mpi_waitall(2, requests, stats);
245 if(status != MPI_STATUS_IGNORE) {
246 // Copy receive status
247 memcpy(status, &stats[1], sizeof(MPI_Status));
251 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
253 return status->count / smpi_datatype_size(datatype);
256 static void finish_wait(MPI_Request * request, MPI_Status * status)
258 MPI_Request req = *request;
260 if(status != MPI_STATUS_IGNORE) {
261 status->MPI_SOURCE = req->src;
262 status->MPI_TAG = req->tag;
263 status->MPI_ERROR = MPI_SUCCESS;
264 // FIXME: really this should just contain the count of receive-type blocks,
266 status->count = req->size;
268 print_request("Finishing", req);
269 if(req->flags & NON_PERSISTENT) {
270 smpi_mpi_request_free(request);
276 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
279 if ((*request)->action == NULL)
282 flag = simcall_comm_test((*request)->action);
284 smpi_mpi_wait(request, status);
289 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
296 *index = MPI_UNDEFINED;
299 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
300 map = xbt_new(int, count);
302 for(i = 0; i < count; i++) {
303 if(requests[i]->action) {
304 xbt_dynar_push(comms, &requests[i]->action);
310 i = simcall_comm_testany(comms);
311 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
312 if(i != MPI_UNDEFINED) {
314 smpi_mpi_wait(&requests[*index], status);
319 xbt_dynar_free(&comms);
324 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
326 print_request("Waiting", *request);
327 if ((*request)->action != NULL) { // this is not a detached send
328 simcall_comm_wait((*request)->action, -1.0);
329 finish_wait(request, status);
331 // FIXME for a detached send, finish_wait is not called:
334 int smpi_mpi_waitany(int count, MPI_Request requests[],
341 index = MPI_UNDEFINED;
343 // Wait for a request to complete
344 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
345 map = xbt_new(int, count);
347 XBT_DEBUG("Wait for one of");
348 for(i = 0; i < count; i++) {
349 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
350 print_request(" ", requests[i]);
351 xbt_dynar_push(comms, &requests[i]->action);
357 i = simcall_comm_waitany(comms);
358 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
359 if (i != MPI_UNDEFINED) {
361 finish_wait(&requests[index], status);
365 xbt_dynar_free(&comms);
370 void smpi_mpi_waitall(int count, MPI_Request requests[],
375 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
377 for(c = 0; c < count; c++) {
379 smpi_mpi_wait(&requests[c], pstat);
382 index = smpi_mpi_waitany(count, requests, pstat);
383 if(index == MPI_UNDEFINED) {
387 if(status != MPI_STATUS_IGNORE) {
388 memcpy(&status[index], pstat, sizeof(*pstat));
393 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
399 for(i = 0; i < incount; i++) {
400 if(smpi_mpi_testany(incount, requests, &index, status)) {
401 indices[count] = index;
408 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
411 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
412 nary_tree_bcast(buf, count, datatype, root, comm, 4);
415 void smpi_mpi_barrier(MPI_Comm comm)
417 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
418 nary_tree_barrier(comm, 4);
421 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
422 void *recvbuf, int recvcount, MPI_Datatype recvtype,
423 int root, MPI_Comm comm)
425 int system_tag = 666;
426 int rank, size, src, index;
427 MPI_Aint lb = 0, recvext = 0;
428 MPI_Request *requests;
430 rank = smpi_comm_rank(comm);
431 size = smpi_comm_size(comm);
433 // Send buffer to root
434 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
436 // FIXME: check for errors
437 smpi_datatype_extent(recvtype, &lb, &recvext);
438 // Local copy from root
439 smpi_datatype_copy(sendbuf, sendcount, sendtype,
440 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
441 // Receive buffers from senders
442 requests = xbt_new(MPI_Request, size - 1);
444 for(src = 0; src < size; src++) {
446 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
448 src, system_tag, comm);
452 // Wait for completion of irecv's.
453 smpi_mpi_startall(size - 1, requests);
454 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
459 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
460 void *recvbuf, int *recvcounts, int *displs,
461 MPI_Datatype recvtype, int root, MPI_Comm comm)
463 int system_tag = 666;
464 int rank, size, src, index;
465 MPI_Aint lb = 0, recvext = 0;
466 MPI_Request *requests;
468 rank = smpi_comm_rank(comm);
469 size = smpi_comm_size(comm);
471 // Send buffer to root
472 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
474 // FIXME: check for errors
475 smpi_datatype_extent(recvtype, &lb, &recvext);
476 // Local copy from root
477 smpi_datatype_copy(sendbuf, sendcount, sendtype,
478 (char *)recvbuf + displs[root] * recvext,
479 recvcounts[root], recvtype);
480 // Receive buffers from senders
481 requests = xbt_new(MPI_Request, size - 1);
483 for(src = 0; src < size; src++) {
486 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
487 recvcounts[src], recvtype, src, system_tag, comm);
491 // Wait for completion of irecv's.
492 smpi_mpi_startall(size - 1, requests);
493 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
498 void smpi_mpi_allgather(void *sendbuf, int sendcount,
499 MPI_Datatype sendtype, void *recvbuf,
500 int recvcount, MPI_Datatype recvtype,
503 int system_tag = 666;
504 int rank, size, other, index;
505 MPI_Aint lb = 0, recvext = 0;
506 MPI_Request *requests;
508 rank = smpi_comm_rank(comm);
509 size = smpi_comm_size(comm);
510 // FIXME: check for errors
511 smpi_datatype_extent(recvtype, &lb, &recvext);
512 // Local copy from self
513 smpi_datatype_copy(sendbuf, sendcount, sendtype,
514 (char *)recvbuf + rank * recvcount * recvext, recvcount,
516 // Send/Recv buffers to/from others;
517 requests = xbt_new(MPI_Request, 2 * (size - 1));
519 for(other = 0; other < size; other++) {
522 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
525 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
526 recvcount, recvtype, other,
531 // Wait for completion of all comms.
532 smpi_mpi_startall(2 * (size - 1), requests);
533 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
537 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
538 MPI_Datatype sendtype, void *recvbuf,
539 int *recvcounts, int *displs,
540 MPI_Datatype recvtype, MPI_Comm comm)
542 int system_tag = 666;
543 int rank, size, other, index;
544 MPI_Aint lb = 0, recvext = 0;
545 MPI_Request *requests;
547 rank = smpi_comm_rank(comm);
548 size = smpi_comm_size(comm);
549 // FIXME: check for errors
550 smpi_datatype_extent(recvtype, &lb, &recvext);
551 // Local copy from self
552 smpi_datatype_copy(sendbuf, sendcount, sendtype,
553 (char *)recvbuf + displs[rank] * recvext,
554 recvcounts[rank], recvtype);
555 // Send buffers to others;
556 requests = xbt_new(MPI_Request, 2 * (size - 1));
558 for(other = 0; other < size; other++) {
561 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
565 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
566 recvtype, other, system_tag, comm);
570 // Wait for completion of all comms.
571 smpi_mpi_startall(2 * (size - 1), requests);
572 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
576 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
577 void *recvbuf, int recvcount, MPI_Datatype recvtype,
578 int root, MPI_Comm comm)
580 int system_tag = 666;
581 int rank, size, dst, index;
582 MPI_Aint lb = 0, sendext = 0;
583 MPI_Request *requests;
585 rank = smpi_comm_rank(comm);
586 size = smpi_comm_size(comm);
588 // Recv buffer from root
589 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
592 // FIXME: check for errors
593 smpi_datatype_extent(sendtype, &lb, &sendext);
594 // Local copy from root
595 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
596 sendcount, sendtype, recvbuf, recvcount, recvtype);
597 // Send buffers to receivers
598 requests = xbt_new(MPI_Request, size - 1);
600 for(dst = 0; dst < size; dst++) {
602 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
603 sendcount, sendtype, dst,
608 // Wait for completion of isend's.
609 smpi_mpi_startall(size - 1, requests);
610 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
615 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
616 MPI_Datatype sendtype, void *recvbuf, int recvcount,
617 MPI_Datatype recvtype, int root, MPI_Comm comm)
619 int system_tag = 666;
620 int rank, size, dst, index;
621 MPI_Aint lb = 0, sendext = 0;
622 MPI_Request *requests;
624 rank = smpi_comm_rank(comm);
625 size = smpi_comm_size(comm);
627 // Recv buffer from root
628 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
631 // FIXME: check for errors
632 smpi_datatype_extent(sendtype, &lb, &sendext);
633 // Local copy from root
634 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
635 sendtype, recvbuf, recvcount, recvtype);
636 // Send buffers to receivers
637 requests = xbt_new(MPI_Request, size - 1);
639 for(dst = 0; dst < size; dst++) {
642 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
643 sendtype, dst, system_tag, comm);
647 // Wait for completion of isend's.
648 smpi_mpi_startall(size - 1, requests);
649 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
654 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
655 MPI_Datatype datatype, MPI_Op op, int root,
658 int system_tag = 666;
659 int rank, size, src, index;
660 MPI_Aint lb = 0, dataext = 0;
661 MPI_Request *requests;
664 rank = smpi_comm_rank(comm);
665 size = smpi_comm_size(comm);
667 // Send buffer to root
668 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
670 // FIXME: check for errors
671 smpi_datatype_extent(datatype, &lb, &dataext);
672 // Local copy from root
673 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
674 // Receive buffers from senders
675 //TODO: make a MPI_barrier here ?
676 requests = xbt_new(MPI_Request, size - 1);
677 tmpbufs = xbt_new(void *, size - 1);
679 for(src = 0; src < size; src++) {
681 // FIXME: possibly overkill we we have contiguous/noncontiguous data
683 tmpbufs[index] = xbt_malloc(count * dataext);
685 smpi_irecv_init(tmpbufs[index], count, datatype, src,
690 // Wait for completion of irecv's.
691 smpi_mpi_startall(size - 1, requests);
692 for(src = 0; src < size - 1; src++) {
693 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
694 if(index == MPI_UNDEFINED) {
697 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
699 for(index = 0; index < size - 1; index++) {
700 xbt_free(tmpbufs[index]);
707 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
708 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
710 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
711 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
714 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
715 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
717 int system_tag = 666;
718 int rank, size, other, index;
719 MPI_Aint lb = 0, dataext = 0;
720 MPI_Request *requests;
723 rank = smpi_comm_rank(comm);
724 size = smpi_comm_size(comm);
726 // FIXME: check for errors
727 smpi_datatype_extent(datatype, &lb, &dataext);
729 // Local copy from self
730 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
732 // Send/Recv buffers to/from others;
733 requests = xbt_new(MPI_Request, size - 1);
734 tmpbufs = xbt_new(void *, rank);
736 for(other = 0; other < rank; other++) {
737 // FIXME: possibly overkill we we have contiguous/noncontiguous data
739 tmpbufs[index] = xbt_malloc(count * dataext);
741 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
745 for(other = rank + 1; other < size; other++) {
747 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
750 // Wait for completion of all comms.
751 smpi_mpi_startall(size - 1, requests);
752 for(other = 0; other < size - 1; other++) {
753 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
754 if(index == MPI_UNDEFINED) {
758 // #Request is below rank: it's a irecv
759 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
762 for(index = 0; index < rank; index++) {
763 xbt_free(tmpbufs[index]);