1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
14 "Logging specific to SMPI (base)");
16 static int match_recv(void* a, void* b, smx_action_t ignored) {
17 MPI_Request ref = (MPI_Request)a;
18 MPI_Request req = (MPI_Request)b;
20 xbt_assert(ref, "Cannot match recv against null reference");
21 xbt_assert(req, "Cannot match recv against null request");
22 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
23 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
26 static int match_send(void* a, void* b,smx_action_t ignored) {
27 MPI_Request ref = (MPI_Request)a;
28 MPI_Request req = (MPI_Request)b;
30 xbt_assert(ref, "Cannot match send against null reference");
31 xbt_assert(req, "Cannot match send against null request");
32 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
33 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
36 static MPI_Request build_request(void *buf, int count,
37 MPI_Datatype datatype, int src, int dst,
38 int tag, MPI_Comm comm, unsigned flags)
42 request = xbt_new(s_smpi_mpi_request_t, 1);
44 // FIXME: this will have to be changed to support non-contiguous datatypes
45 request->size = smpi_datatype_size(datatype) * count;
50 request->action = NULL;
51 request->flags = flags;
59 void smpi_action_trace_run(char *path)
63 xbt_dict_cursor_t cursor;
67 action_fp = fopen(path, "r");
68 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
72 if (!xbt_dict_is_empty(action_queues)) {
74 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
77 xbt_dict_foreach(action_queues, cursor, name, todo) {
78 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
84 xbt_dict_free(&action_queues);
85 action_queues = xbt_dict_new_homogeneous(NULL);
88 static void smpi_mpi_request_free_voidp(void* request)
90 MPI_Request req = request;
91 smpi_mpi_request_free(&req);
94 /* MPI Low level calls */
95 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
96 int dst, int tag, MPI_Comm comm)
99 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
100 comm, PERSISTENT | SEND);
105 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
106 int src, int tag, MPI_Comm comm)
108 MPI_Request request =
109 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
110 comm, PERSISTENT | RECV);
115 void smpi_mpi_start(MPI_Request request)
120 xbt_assert(!request->action,
121 "Cannot (re)start a non-finished communication");
122 if(request->flags & RECV) {
123 print_request("New recv", request);
124 mailbox = smpi_process_mailbox();
125 // FIXME: SIMIX does not yet support non-contiguous datatypes
126 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
128 print_request("New send", request);
129 mailbox = smpi_process_remote_mailbox(
130 smpi_group_index(smpi_comm_group(request->comm), request->dst));
131 // FIXME: SIMIX does not yet support non-contiguous datatypes
133 if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
134 void *oldbuf = request->buf;
136 request->buf = malloc(request->size);
137 memcpy(request->buf,oldbuf,request->size);
138 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
140 XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
143 simcall_comm_isend(mailbox, request->size, -1.0,
144 request->buf, request->size,
146 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
148 // detach if msg size < eager/rdv switch limit
152 /* FIXME: detached sends are not traceable (request->action == NULL) */
154 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
159 void smpi_mpi_startall(int count, MPI_Request * requests)
163 for(i = 0; i < count; i++) {
164 smpi_mpi_start(requests[i]);
168 void smpi_mpi_request_free(MPI_Request * request)
171 *request = MPI_REQUEST_NULL;
174 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
175 int dst, int tag, MPI_Comm comm)
177 MPI_Request request =
178 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
179 comm, NON_PERSISTENT | SEND);
184 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
185 int dst, int tag, MPI_Comm comm)
187 MPI_Request request =
188 smpi_isend_init(buf, count, datatype, dst, tag, comm);
190 smpi_mpi_start(request);
194 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
195 int src, int tag, MPI_Comm comm)
197 MPI_Request request =
198 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
199 comm, NON_PERSISTENT | RECV);
204 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
205 int src, int tag, MPI_Comm comm)
207 MPI_Request request =
208 smpi_irecv_init(buf, count, datatype, src, tag, comm);
210 smpi_mpi_start(request);
214 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
215 int tag, MPI_Comm comm, MPI_Status * status)
219 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
220 smpi_mpi_wait(&request, status);
223 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
224 int tag, MPI_Comm comm)
228 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
229 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
232 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
233 int dst, int sendtag, void *recvbuf, int recvcount,
234 MPI_Datatype recvtype, int src, int recvtag,
235 MPI_Comm comm, MPI_Status * status)
237 MPI_Request requests[2];
241 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
243 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
244 smpi_mpi_startall(2, requests);
245 smpi_mpi_waitall(2, requests, stats);
246 if(status != MPI_STATUS_IGNORE) {
247 // Copy receive status
248 memcpy(status, &stats[1], sizeof(MPI_Status));
252 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
254 return status->count / smpi_datatype_size(datatype);
257 static void finish_wait(MPI_Request * request, MPI_Status * status)
259 MPI_Request req = *request;
261 if(status != MPI_STATUS_IGNORE) {
262 status->MPI_SOURCE = req->src;
263 status->MPI_TAG = req->tag;
264 status->MPI_ERROR = MPI_SUCCESS;
265 // FIXME: really this should just contain the count of receive-type blocks,
267 status->count = req->size;
269 print_request("Finishing", req);
270 if(req->flags & NON_PERSISTENT) {
271 smpi_mpi_request_free(request);
277 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
280 if ((*request)->action == NULL)
283 flag = simcall_comm_test((*request)->action);
285 smpi_mpi_wait(request, status);
290 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
297 *index = MPI_UNDEFINED;
300 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
301 map = xbt_new(int, count);
303 for(i = 0; i < count; i++) {
304 if(requests[i]->action) {
305 xbt_dynar_push(comms, &requests[i]->action);
311 i = simcall_comm_testany(comms);
312 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
313 if(i != MPI_UNDEFINED) {
315 smpi_mpi_wait(&requests[*index], status);
320 xbt_dynar_free(&comms);
325 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
327 print_request("Waiting", *request);
328 if ((*request)->action != NULL) { // this is not a detached send
329 simcall_comm_wait((*request)->action, -1.0);
330 finish_wait(request, status);
332 // FIXME for a detached send, finish_wait is not called:
335 int smpi_mpi_waitany(int count, MPI_Request requests[],
342 index = MPI_UNDEFINED;
344 // Wait for a request to complete
345 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
346 map = xbt_new(int, count);
348 XBT_DEBUG("Wait for one of");
349 for(i = 0; i < count; i++) {
350 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
351 print_request(" ", requests[i]);
352 xbt_dynar_push(comms, &requests[i]->action);
358 i = simcall_comm_waitany(comms);
359 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
360 if (i != MPI_UNDEFINED) {
362 finish_wait(&requests[index], status);
366 xbt_dynar_free(&comms);
371 void smpi_mpi_waitall(int count, MPI_Request requests[],
376 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
378 for(c = 0; c < count; c++) {
380 smpi_mpi_wait(&requests[c], pstat);
383 index = smpi_mpi_waitany(count, requests, pstat);
384 if(index == MPI_UNDEFINED) {
388 if(status != MPI_STATUS_IGNORE) {
389 memcpy(&status[index], pstat, sizeof(*pstat));
394 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
400 for(i = 0; i < incount; i++) {
401 if(smpi_mpi_testany(incount, requests, &index, status)) {
402 indices[count] = index;
409 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
412 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
413 nary_tree_bcast(buf, count, datatype, root, comm, 4);
416 void smpi_mpi_barrier(MPI_Comm comm)
418 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
419 nary_tree_barrier(comm, 4);
422 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
423 void *recvbuf, int recvcount, MPI_Datatype recvtype,
424 int root, MPI_Comm comm)
426 int system_tag = 666;
427 int rank, size, src, index;
428 MPI_Aint lb = 0, recvext = 0;
429 MPI_Request *requests;
431 rank = smpi_comm_rank(comm);
432 size = smpi_comm_size(comm);
434 // Send buffer to root
435 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
437 // FIXME: check for errors
438 smpi_datatype_extent(recvtype, &lb, &recvext);
439 // Local copy from root
440 smpi_datatype_copy(sendbuf, sendcount, sendtype,
441 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
442 // Receive buffers from senders
443 requests = xbt_new(MPI_Request, size - 1);
445 for(src = 0; src < size; src++) {
447 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
449 src, system_tag, comm);
453 // Wait for completion of irecv's.
454 smpi_mpi_startall(size - 1, requests);
455 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
460 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
461 void *recvbuf, int *recvcounts, int *displs,
462 MPI_Datatype recvtype, int root, MPI_Comm comm)
464 int system_tag = 666;
465 int rank, size, src, index;
466 MPI_Aint lb = 0, recvext = 0;
467 MPI_Request *requests;
469 rank = smpi_comm_rank(comm);
470 size = smpi_comm_size(comm);
472 // Send buffer to root
473 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
475 // FIXME: check for errors
476 smpi_datatype_extent(recvtype, &lb, &recvext);
477 // Local copy from root
478 smpi_datatype_copy(sendbuf, sendcount, sendtype,
479 (char *)recvbuf + displs[root] * recvext,
480 recvcounts[root], recvtype);
481 // Receive buffers from senders
482 requests = xbt_new(MPI_Request, size - 1);
484 for(src = 0; src < size; src++) {
487 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
488 recvcounts[src], recvtype, src, system_tag, comm);
492 // Wait for completion of irecv's.
493 smpi_mpi_startall(size - 1, requests);
494 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
499 void smpi_mpi_allgather(void *sendbuf, int sendcount,
500 MPI_Datatype sendtype, void *recvbuf,
501 int recvcount, MPI_Datatype recvtype,
504 int system_tag = 666;
505 int rank, size, other, index;
506 MPI_Aint lb = 0, recvext = 0;
507 MPI_Request *requests;
509 rank = smpi_comm_rank(comm);
510 size = smpi_comm_size(comm);
511 // FIXME: check for errors
512 smpi_datatype_extent(recvtype, &lb, &recvext);
513 // Local copy from self
514 smpi_datatype_copy(sendbuf, sendcount, sendtype,
515 (char *)recvbuf + rank * recvcount * recvext, recvcount,
517 // Send/Recv buffers to/from others;
518 requests = xbt_new(MPI_Request, 2 * (size - 1));
520 for(other = 0; other < size; other++) {
523 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
526 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
527 recvcount, recvtype, other,
532 // Wait for completion of all comms.
533 smpi_mpi_startall(2 * (size - 1), requests);
534 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
538 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
539 MPI_Datatype sendtype, void *recvbuf,
540 int *recvcounts, int *displs,
541 MPI_Datatype recvtype, MPI_Comm comm)
543 int system_tag = 666;
544 int rank, size, other, index;
545 MPI_Aint lb = 0, recvext = 0;
546 MPI_Request *requests;
548 rank = smpi_comm_rank(comm);
549 size = smpi_comm_size(comm);
550 // FIXME: check for errors
551 smpi_datatype_extent(recvtype, &lb, &recvext);
552 // Local copy from self
553 smpi_datatype_copy(sendbuf, sendcount, sendtype,
554 (char *)recvbuf + displs[rank] * recvext,
555 recvcounts[rank], recvtype);
556 // Send buffers to others;
557 requests = xbt_new(MPI_Request, 2 * (size - 1));
559 for(other = 0; other < size; other++) {
562 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
566 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
567 recvtype, other, system_tag, comm);
571 // Wait for completion of all comms.
572 smpi_mpi_startall(2 * (size - 1), requests);
573 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
577 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
578 void *recvbuf, int recvcount, MPI_Datatype recvtype,
579 int root, MPI_Comm comm)
581 int system_tag = 666;
582 int rank, size, dst, index;
583 MPI_Aint lb = 0, sendext = 0;
584 MPI_Request *requests;
586 rank = smpi_comm_rank(comm);
587 size = smpi_comm_size(comm);
589 // Recv buffer from root
590 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
593 // FIXME: check for errors
594 smpi_datatype_extent(sendtype, &lb, &sendext);
595 // Local copy from root
596 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
597 sendcount, sendtype, recvbuf, recvcount, recvtype);
598 // Send buffers to receivers
599 requests = xbt_new(MPI_Request, size - 1);
601 for(dst = 0; dst < size; dst++) {
603 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
604 sendcount, sendtype, dst,
609 // Wait for completion of isend's.
610 smpi_mpi_startall(size - 1, requests);
611 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
616 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
617 MPI_Datatype sendtype, void *recvbuf, int recvcount,
618 MPI_Datatype recvtype, int root, MPI_Comm comm)
620 int system_tag = 666;
621 int rank, size, dst, index;
622 MPI_Aint lb = 0, sendext = 0;
623 MPI_Request *requests;
625 rank = smpi_comm_rank(comm);
626 size = smpi_comm_size(comm);
628 // Recv buffer from root
629 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
632 // FIXME: check for errors
633 smpi_datatype_extent(sendtype, &lb, &sendext);
634 // Local copy from root
635 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
636 sendtype, recvbuf, recvcount, recvtype);
637 // Send buffers to receivers
638 requests = xbt_new(MPI_Request, size - 1);
640 for(dst = 0; dst < size; dst++) {
643 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
644 sendtype, dst, system_tag, comm);
648 // Wait for completion of isend's.
649 smpi_mpi_startall(size - 1, requests);
650 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
655 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
656 MPI_Datatype datatype, MPI_Op op, int root,
659 int system_tag = 666;
660 int rank, size, src, index;
661 MPI_Aint lb = 0, dataext = 0;
662 MPI_Request *requests;
665 rank = smpi_comm_rank(comm);
666 size = smpi_comm_size(comm);
668 // Send buffer to root
669 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
671 // FIXME: check for errors
672 smpi_datatype_extent(datatype, &lb, &dataext);
673 // Local copy from root
674 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
675 // Receive buffers from senders
676 //TODO: make a MPI_barrier here ?
677 requests = xbt_new(MPI_Request, size - 1);
678 tmpbufs = xbt_new(void *, size - 1);
680 for(src = 0; src < size; src++) {
682 // FIXME: possibly overkill we we have contiguous/noncontiguous data
684 tmpbufs[index] = xbt_malloc(count * dataext);
686 smpi_irecv_init(tmpbufs[index], count, datatype, src,
691 // Wait for completion of irecv's.
692 smpi_mpi_startall(size - 1, requests);
693 for(src = 0; src < size - 1; src++) {
694 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
695 if(index == MPI_UNDEFINED) {
698 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
700 for(index = 0; index < size - 1; index++) {
701 xbt_free(tmpbufs[index]);
708 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
709 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
711 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
712 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
715 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
716 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
718 int system_tag = 666;
719 int rank, size, other, index;
720 MPI_Aint lb = 0, dataext = 0;
721 MPI_Request *requests;
724 rank = smpi_comm_rank(comm);
725 size = smpi_comm_size(comm);
727 // FIXME: check for errors
728 smpi_datatype_extent(datatype, &lb, &dataext);
730 // Local copy from self
731 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
733 // Send/Recv buffers to/from others;
734 requests = xbt_new(MPI_Request, size - 1);
735 tmpbufs = xbt_new(void *, rank);
737 for(other = 0; other < rank; other++) {
738 // FIXME: possibly overkill we we have contiguous/noncontiguous data
740 tmpbufs[index] = xbt_malloc(count * dataext);
742 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
746 for(other = rank + 1; other < size; other++) {
748 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
751 // Wait for completion of all comms.
752 smpi_mpi_startall(size - 1, requests);
753 for(other = 0; other < size - 1; other++) {
754 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
755 if(index == MPI_UNDEFINED) {
759 // #Request is below rank: it's a irecv
760 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
763 for(index = 0; index < rank; index++) {
764 xbt_free(tmpbufs[index]);