1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
12 #include "surf/surf.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
15 "Logging specific to SMPI (base)");
17 static int match_recv(void* a, void* b, smx_action_t ignored) {
18 MPI_Request ref = (MPI_Request)a;
19 MPI_Request req = (MPI_Request)b;
21 xbt_assert(ref, "Cannot match recv against null reference");
22 xbt_assert(req, "Cannot match recv against null request");
23 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
24 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
27 static int match_send(void* a, void* b,smx_action_t ignored) {
28 MPI_Request ref = (MPI_Request)a;
29 MPI_Request req = (MPI_Request)b;
31 xbt_assert(ref, "Cannot match send against null reference");
32 xbt_assert(req, "Cannot match send against null request");
33 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
34 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
37 static MPI_Request build_request(void *buf, int count,
38 MPI_Datatype datatype, int src, int dst,
39 int tag, MPI_Comm comm, unsigned flags)
43 request = xbt_new(s_smpi_mpi_request_t, 1);
45 // FIXME: this will have to be changed to support non-contiguous datatypes
46 request->size = smpi_datatype_size(datatype) * count;
51 request->action = NULL;
52 request->flags = flags;
60 void smpi_action_trace_run(char *path)
64 xbt_dict_cursor_t cursor;
68 action_fp = fopen(path, "r");
69 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
73 if (!xbt_dict_is_empty(action_queues)) {
75 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
78 xbt_dict_foreach(action_queues, cursor, name, todo) {
79 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
85 xbt_dict_free(&action_queues);
86 action_queues = xbt_dict_new_homogeneous(NULL);
89 static void smpi_mpi_request_free_voidp(void* request)
91 MPI_Request req = request;
92 smpi_mpi_request_free(&req);
95 /* MPI Low level calls */
96 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
97 int dst, int tag, MPI_Comm comm)
100 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
101 comm, PERSISTENT | SEND);
106 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
107 int src, int tag, MPI_Comm comm)
109 MPI_Request request =
110 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
111 comm, PERSISTENT | RECV);
116 void smpi_mpi_start(MPI_Request request)
121 xbt_assert(!request->action,
122 "Cannot (re)start a non-finished communication");
123 if(request->flags & RECV) {
124 print_request("New recv", request);
125 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
126 mailbox = smpi_process_mailbox_small();
128 mailbox = smpi_process_mailbox();
130 // FIXME: SIMIX does not yet support non-contiguous datatypes
131 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
133 print_request("New send", request);
135 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
136 mailbox = smpi_process_remote_mailbox_small(
137 smpi_group_index(smpi_comm_group(request->comm), request->dst));
139 XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
140 mailbox = smpi_process_remote_mailbox(
141 smpi_group_index(smpi_comm_group(request->comm), request->dst));
143 if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
144 void *oldbuf = request->buf;
146 request->buf = malloc(request->size);
148 memcpy(request->buf,oldbuf,request->size);
149 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
151 XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
152 mailbox = smpi_process_remote_mailbox(
153 smpi_group_index(smpi_comm_group(request->comm), request->dst));
157 simcall_comm_isend(mailbox, request->size, -1.0,
158 request->buf, request->size,
160 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
162 // detach if msg size < eager/rdv switch limit
166 /* FIXME: detached sends are not traceable (request->action == NULL) */
168 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
175 void smpi_mpi_startall(int count, MPI_Request * requests)
179 for(i = 0; i < count; i++) {
180 smpi_mpi_start(requests[i]);
184 void smpi_mpi_request_free(MPI_Request * request)
187 *request = MPI_REQUEST_NULL;
190 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
191 int dst, int tag, MPI_Comm comm)
193 MPI_Request request =
194 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
195 comm, NON_PERSISTENT | SEND);
200 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
201 int dst, int tag, MPI_Comm comm)
203 MPI_Request request =
204 smpi_isend_init(buf, count, datatype, dst, tag, comm);
206 smpi_mpi_start(request);
210 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
211 int src, int tag, MPI_Comm comm)
213 MPI_Request request =
214 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
215 comm, NON_PERSISTENT | RECV);
220 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
221 int src, int tag, MPI_Comm comm)
223 MPI_Request request =
224 smpi_irecv_init(buf, count, datatype, src, tag, comm);
226 smpi_mpi_start(request);
230 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
231 int tag, MPI_Comm comm, MPI_Status * status)
235 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
236 smpi_mpi_wait(&request, status);
241 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
242 int tag, MPI_Comm comm)
246 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
247 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
250 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
251 int dst, int sendtag, void *recvbuf, int recvcount,
252 MPI_Datatype recvtype, int src, int recvtag,
253 MPI_Comm comm, MPI_Status * status)
255 MPI_Request requests[2];
259 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
261 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
262 smpi_mpi_startall(2, requests);
263 smpi_mpi_waitall(2, requests, stats);
264 if(status != MPI_STATUS_IGNORE) {
265 // Copy receive status
266 memcpy(status, &stats[1], sizeof(MPI_Status));
270 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
272 return status->count / smpi_datatype_size(datatype);
275 static void finish_wait(MPI_Request * request, MPI_Status * status)
277 MPI_Request req = *request;
279 if(status != MPI_STATUS_IGNORE) {
280 status->MPI_SOURCE = req->src;
281 status->MPI_TAG = req->tag;
282 status->MPI_ERROR = MPI_SUCCESS;
283 // FIXME: really this should just contain the count of receive-type blocks,
285 status->count = req->size;
287 print_request("Finishing", req);
288 if(req->flags & NON_PERSISTENT) {
289 smpi_mpi_request_free(request);
295 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
298 if ((*request)->action == NULL)
301 flag = simcall_comm_test((*request)->action);
303 smpi_mpi_wait(request, status);
308 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
315 *index = MPI_UNDEFINED;
318 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
319 map = xbt_new(int, count);
321 for(i = 0; i < count; i++) {
322 if(requests[i]->action) {
323 xbt_dynar_push(comms, &requests[i]->action);
329 i = simcall_comm_testany(comms);
330 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
331 if(i != MPI_UNDEFINED) {
333 smpi_mpi_wait(&requests[*index], status);
338 xbt_dynar_free(&comms);
343 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
345 print_request("Waiting", *request);
346 if ((*request)->action != NULL) { // this is not a detached send
347 simcall_comm_wait((*request)->action, -1.0);
348 finish_wait(request, status);
350 // FIXME for a detached send, finish_wait is not called:
353 int smpi_mpi_waitany(int count, MPI_Request requests[],
360 index = MPI_UNDEFINED;
362 // Wait for a request to complete
363 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
364 map = xbt_new(int, count);
366 XBT_DEBUG("Wait for one of");
367 for(i = 0; i < count; i++) {
368 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
369 print_request("Waiting any ", requests[i]);
370 xbt_dynar_push(comms, &requests[i]->action);
376 i = simcall_comm_waitany(comms);
378 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
379 if (i != MPI_UNDEFINED) {
381 finish_wait(&requests[index], status);
386 xbt_dynar_free(&comms);
391 void smpi_mpi_waitall(int count, MPI_Request requests[],
396 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
398 for(c = 0; c < count; c++) {
400 smpi_mpi_wait(&requests[c], pstat);
403 index = smpi_mpi_waitany(count, requests, pstat);
404 if(index == MPI_UNDEFINED) {
408 if(status != MPI_STATUS_IGNORE) {
409 memcpy(&status[index], pstat, sizeof(*pstat));
414 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
420 for(i = 0; i < incount; i++) {
421 if(smpi_mpi_testany(incount, requests, &index, status)) {
422 indices[count] = index;
429 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
432 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
433 nary_tree_bcast(buf, count, datatype, root, comm, 4);
436 void smpi_mpi_barrier(MPI_Comm comm)
438 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
439 nary_tree_barrier(comm, 4);
442 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
443 void *recvbuf, int recvcount, MPI_Datatype recvtype,
444 int root, MPI_Comm comm)
446 int system_tag = 666;
447 int rank, size, src, index;
448 MPI_Aint lb = 0, recvext = 0;
449 MPI_Request *requests;
451 rank = smpi_comm_rank(comm);
452 size = smpi_comm_size(comm);
454 // Send buffer to root
455 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
457 // FIXME: check for errors
458 smpi_datatype_extent(recvtype, &lb, &recvext);
459 // Local copy from root
460 smpi_datatype_copy(sendbuf, sendcount, sendtype,
461 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
462 // Receive buffers from senders
463 requests = xbt_new(MPI_Request, size - 1);
465 for(src = 0; src < size; src++) {
467 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
469 src, system_tag, comm);
473 // Wait for completion of irecv's.
474 smpi_mpi_startall(size - 1, requests);
475 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
480 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
481 void *recvbuf, int *recvcounts, int *displs,
482 MPI_Datatype recvtype, int root, MPI_Comm comm)
484 int system_tag = 666;
485 int rank, size, src, index;
486 MPI_Aint lb = 0, recvext = 0;
487 MPI_Request *requests;
489 rank = smpi_comm_rank(comm);
490 size = smpi_comm_size(comm);
492 // Send buffer to root
493 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
495 // FIXME: check for errors
496 smpi_datatype_extent(recvtype, &lb, &recvext);
497 // Local copy from root
498 smpi_datatype_copy(sendbuf, sendcount, sendtype,
499 (char *)recvbuf + displs[root] * recvext,
500 recvcounts[root], recvtype);
501 // Receive buffers from senders
502 requests = xbt_new(MPI_Request, size - 1);
504 for(src = 0; src < size; src++) {
507 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
508 recvcounts[src], recvtype, src, system_tag, comm);
512 // Wait for completion of irecv's.
513 smpi_mpi_startall(size - 1, requests);
514 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
519 void smpi_mpi_allgather(void *sendbuf, int sendcount,
520 MPI_Datatype sendtype, void *recvbuf,
521 int recvcount, MPI_Datatype recvtype,
524 int system_tag = 666;
525 int rank, size, other, index;
526 MPI_Aint lb = 0, recvext = 0;
527 MPI_Request *requests;
529 rank = smpi_comm_rank(comm);
530 size = smpi_comm_size(comm);
531 // FIXME: check for errors
532 smpi_datatype_extent(recvtype, &lb, &recvext);
533 // Local copy from self
534 smpi_datatype_copy(sendbuf, sendcount, sendtype,
535 (char *)recvbuf + rank * recvcount * recvext, recvcount,
537 // Send/Recv buffers to/from others;
538 requests = xbt_new(MPI_Request, 2 * (size - 1));
540 for(other = 0; other < size; other++) {
543 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
546 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
547 recvcount, recvtype, other,
552 // Wait for completion of all comms.
553 smpi_mpi_startall(2 * (size - 1), requests);
554 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
558 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
559 MPI_Datatype sendtype, void *recvbuf,
560 int *recvcounts, int *displs,
561 MPI_Datatype recvtype, MPI_Comm comm)
563 int system_tag = 666;
564 int rank, size, other, index;
565 MPI_Aint lb = 0, recvext = 0;
566 MPI_Request *requests;
568 rank = smpi_comm_rank(comm);
569 size = smpi_comm_size(comm);
570 // FIXME: check for errors
571 smpi_datatype_extent(recvtype, &lb, &recvext);
572 // Local copy from self
573 smpi_datatype_copy(sendbuf, sendcount, sendtype,
574 (char *)recvbuf + displs[rank] * recvext,
575 recvcounts[rank], recvtype);
576 // Send buffers to others;
577 requests = xbt_new(MPI_Request, 2 * (size - 1));
579 for(other = 0; other < size; other++) {
582 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
586 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
587 recvtype, other, system_tag, comm);
591 // Wait for completion of all comms.
592 smpi_mpi_startall(2 * (size - 1), requests);
593 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
597 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
598 void *recvbuf, int recvcount, MPI_Datatype recvtype,
599 int root, MPI_Comm comm)
601 int system_tag = 666;
602 int rank, size, dst, index;
603 MPI_Aint lb = 0, sendext = 0;
604 MPI_Request *requests;
606 rank = smpi_comm_rank(comm);
607 size = smpi_comm_size(comm);
609 // Recv buffer from root
610 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
613 // FIXME: check for errors
614 smpi_datatype_extent(sendtype, &lb, &sendext);
615 // Local copy from root
616 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
617 sendcount, sendtype, recvbuf, recvcount, recvtype);
618 // Send buffers to receivers
619 requests = xbt_new(MPI_Request, size - 1);
621 for(dst = 0; dst < size; dst++) {
623 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
624 sendcount, sendtype, dst,
629 // Wait for completion of isend's.
630 smpi_mpi_startall(size - 1, requests);
631 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
636 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
637 MPI_Datatype sendtype, void *recvbuf, int recvcount,
638 MPI_Datatype recvtype, int root, MPI_Comm comm)
640 int system_tag = 666;
641 int rank, size, dst, index;
642 MPI_Aint lb = 0, sendext = 0;
643 MPI_Request *requests;
645 rank = smpi_comm_rank(comm);
646 size = smpi_comm_size(comm);
648 // Recv buffer from root
649 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
652 // FIXME: check for errors
653 smpi_datatype_extent(sendtype, &lb, &sendext);
654 // Local copy from root
655 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
656 sendtype, recvbuf, recvcount, recvtype);
657 // Send buffers to receivers
658 requests = xbt_new(MPI_Request, size - 1);
660 for(dst = 0; dst < size; dst++) {
663 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
664 sendtype, dst, system_tag, comm);
668 // Wait for completion of isend's.
669 smpi_mpi_startall(size - 1, requests);
670 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
675 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
676 MPI_Datatype datatype, MPI_Op op, int root,
679 int system_tag = 666;
680 int rank, size, src, index;
681 MPI_Aint lb = 0, dataext = 0;
682 MPI_Request *requests;
685 rank = smpi_comm_rank(comm);
686 size = smpi_comm_size(comm);
688 // Send buffer to root
689 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
691 // FIXME: check for errors
692 smpi_datatype_extent(datatype, &lb, &dataext);
693 // Local copy from root
694 if (sendbuf && recvbuf)
695 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
696 // Receive buffers from senders
697 //TODO: make a MPI_barrier here ?
698 requests = xbt_new(MPI_Request, size - 1);
699 tmpbufs = xbt_new(void *, size - 1);
701 for(src = 0; src < size; src++) {
703 // FIXME: possibly overkill we we have contiguous/noncontiguous data
705 tmpbufs[index] = xbt_malloc(count * dataext);
707 smpi_irecv_init(tmpbufs[index], count, datatype, src,
712 // Wait for completion of irecv's.
713 smpi_mpi_startall(size - 1, requests);
714 for(src = 0; src < size - 1; src++) {
715 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
716 XBT_VERB("finished waiting any request with index %d", index);
717 if(index == MPI_UNDEFINED) {
720 if(op) /* op can be MPI_OP_NULL that does nothing */
721 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
723 for(index = 0; index < size - 1; index++) {
724 xbt_free(tmpbufs[index]);
731 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
732 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
734 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
735 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
738 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
739 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
741 int system_tag = 666;
742 int rank, size, other, index;
743 MPI_Aint lb = 0, dataext = 0;
744 MPI_Request *requests;
747 rank = smpi_comm_rank(comm);
748 size = smpi_comm_size(comm);
750 // FIXME: check for errors
751 smpi_datatype_extent(datatype, &lb, &dataext);
753 // Local copy from self
754 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
756 // Send/Recv buffers to/from others;
757 requests = xbt_new(MPI_Request, size - 1);
758 tmpbufs = xbt_new(void *, rank);
760 for(other = 0; other < rank; other++) {
761 // FIXME: possibly overkill we we have contiguous/noncontiguous data
763 tmpbufs[index] = xbt_malloc(count * dataext);
765 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
769 for(other = rank + 1; other < size; other++) {
771 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
774 // Wait for completion of all comms.
775 smpi_mpi_startall(size - 1, requests);
776 for(other = 0; other < size - 1; other++) {
777 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
778 if(index == MPI_UNDEFINED) {
782 // #Request is below rank: it's a irecv
783 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
786 for(index = 0; index < rank; index++) {
787 xbt_free(tmpbufs[index]);