1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
12 #include "surf/surf.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
15 "Logging specific to SMPI (base)");
17 static int match_recv(void* a, void* b, smx_action_t ignored) {
18 MPI_Request ref = (MPI_Request)a;
19 MPI_Request req = (MPI_Request)b;
21 xbt_assert(ref, "Cannot match recv against null reference");
22 xbt_assert(req, "Cannot match recv against null request");
23 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
24 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
27 static int match_send(void* a, void* b,smx_action_t ignored) {
28 MPI_Request ref = (MPI_Request)a;
29 MPI_Request req = (MPI_Request)b;
31 xbt_assert(ref, "Cannot match send against null reference");
32 xbt_assert(req, "Cannot match send against null request");
33 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
34 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
37 static MPI_Request build_request(void *buf, int count,
38 MPI_Datatype datatype, int src, int dst,
39 int tag, MPI_Comm comm, unsigned flags)
43 request = xbt_new(s_smpi_mpi_request_t, 1);
45 // FIXME: this will have to be changed to support non-contiguous datatypes
46 request->size = smpi_datatype_size(datatype) * count;
51 request->action = NULL;
52 request->flags = flags;
60 void smpi_action_trace_run(char *path)
64 xbt_dict_cursor_t cursor;
68 action_fp = fopen(path, "r");
69 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
73 if (!xbt_dict_is_empty(action_queues)) {
75 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
78 xbt_dict_foreach(action_queues, cursor, name, todo) {
79 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
85 xbt_dict_free(&action_queues);
86 action_queues = xbt_dict_new_homogeneous(NULL);
89 static void smpi_mpi_request_free_voidp(void* request)
91 MPI_Request req = request;
92 smpi_mpi_request_free(&req);
95 /* MPI Low level calls */
96 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
97 int dst, int tag, MPI_Comm comm)
100 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
101 comm, PERSISTENT | SEND);
106 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
107 int src, int tag, MPI_Comm comm)
109 MPI_Request request =
110 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
111 comm, PERSISTENT | RECV);
116 void smpi_mpi_start(MPI_Request request)
121 xbt_assert(!request->action,
122 "Cannot (re)start a non-finished communication");
123 if(request->flags & RECV) {
124 print_request("New recv", request);
125 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
126 mailbox = smpi_process_mailbox_small();
128 mailbox = smpi_process_mailbox();
130 // FIXME: SIMIX does not yet support non-contiguous datatypes
131 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
133 print_request("New send", request);
135 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
136 mailbox = smpi_process_remote_mailbox_small(
137 smpi_group_index(smpi_comm_group(request->comm), request->dst));
139 XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
140 mailbox = smpi_process_remote_mailbox(
141 smpi_group_index(smpi_comm_group(request->comm), request->dst));
143 if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
144 void *oldbuf = request->buf;
146 request->buf = malloc(request->size);
147 memcpy(request->buf,oldbuf,request->size);
148 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
150 XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
151 mailbox = smpi_process_remote_mailbox(
152 smpi_group_index(smpi_comm_group(request->comm), request->dst));
156 simcall_comm_isend(mailbox, request->size, -1.0,
157 request->buf, request->size,
159 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
161 // detach if msg size < eager/rdv switch limit
165 /* FIXME: detached sends are not traceable (request->action == NULL) */
167 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
174 void smpi_mpi_startall(int count, MPI_Request * requests)
178 for(i = 0; i < count; i++) {
179 smpi_mpi_start(requests[i]);
183 void smpi_mpi_request_free(MPI_Request * request)
186 *request = MPI_REQUEST_NULL;
189 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
190 int dst, int tag, MPI_Comm comm)
192 MPI_Request request =
193 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
194 comm, NON_PERSISTENT | SEND);
199 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
200 int dst, int tag, MPI_Comm comm)
202 MPI_Request request =
203 smpi_isend_init(buf, count, datatype, dst, tag, comm);
205 smpi_mpi_start(request);
209 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
210 int src, int tag, MPI_Comm comm)
212 MPI_Request request =
213 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
214 comm, NON_PERSISTENT | RECV);
219 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
220 int src, int tag, MPI_Comm comm)
222 MPI_Request request =
223 smpi_irecv_init(buf, count, datatype, src, tag, comm);
225 smpi_mpi_start(request);
229 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
230 int tag, MPI_Comm comm, MPI_Status * status)
234 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
235 smpi_mpi_wait(&request, status);
240 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
241 int tag, MPI_Comm comm)
245 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
246 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
249 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
250 int dst, int sendtag, void *recvbuf, int recvcount,
251 MPI_Datatype recvtype, int src, int recvtag,
252 MPI_Comm comm, MPI_Status * status)
254 MPI_Request requests[2];
258 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
260 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
261 smpi_mpi_startall(2, requests);
262 smpi_mpi_waitall(2, requests, stats);
263 if(status != MPI_STATUS_IGNORE) {
264 // Copy receive status
265 memcpy(status, &stats[1], sizeof(MPI_Status));
269 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
271 return status->count / smpi_datatype_size(datatype);
274 static void finish_wait(MPI_Request * request, MPI_Status * status)
276 MPI_Request req = *request;
278 if(status != MPI_STATUS_IGNORE) {
279 status->MPI_SOURCE = req->src;
280 status->MPI_TAG = req->tag;
281 status->MPI_ERROR = MPI_SUCCESS;
282 // FIXME: really this should just contain the count of receive-type blocks,
284 status->count = req->size;
286 print_request("Finishing", req);
287 if(req->flags & NON_PERSISTENT) {
288 smpi_mpi_request_free(request);
294 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
297 if ((*request)->action == NULL)
300 flag = simcall_comm_test((*request)->action);
302 smpi_mpi_wait(request, status);
307 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
314 *index = MPI_UNDEFINED;
317 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
318 map = xbt_new(int, count);
320 for(i = 0; i < count; i++) {
321 if(requests[i]->action) {
322 xbt_dynar_push(comms, &requests[i]->action);
328 i = simcall_comm_testany(comms);
329 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
330 if(i != MPI_UNDEFINED) {
332 smpi_mpi_wait(&requests[*index], status);
337 xbt_dynar_free(&comms);
342 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
344 print_request("Waiting", *request);
345 if ((*request)->action != NULL) { // this is not a detached send
346 simcall_comm_wait((*request)->action, -1.0);
347 finish_wait(request, status);
349 // FIXME for a detached send, finish_wait is not called:
352 int smpi_mpi_waitany(int count, MPI_Request requests[],
359 index = MPI_UNDEFINED;
361 // Wait for a request to complete
362 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
363 map = xbt_new(int, count);
365 XBT_DEBUG("Wait for one of");
366 for(i = 0; i < count; i++) {
367 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
368 print_request("Waiting any ", requests[i]);
369 xbt_dynar_push(comms, &requests[i]->action);
375 i = simcall_comm_waitany(comms);
377 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
378 if (i != MPI_UNDEFINED) {
380 finish_wait(&requests[index], status);
385 xbt_dynar_free(&comms);
390 void smpi_mpi_waitall(int count, MPI_Request requests[],
395 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
397 for(c = 0; c < count; c++) {
399 smpi_mpi_wait(&requests[c], pstat);
402 index = smpi_mpi_waitany(count, requests, pstat);
403 if(index == MPI_UNDEFINED) {
407 if(status != MPI_STATUS_IGNORE) {
408 memcpy(&status[index], pstat, sizeof(*pstat));
413 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
419 for(i = 0; i < incount; i++) {
420 if(smpi_mpi_testany(incount, requests, &index, status)) {
421 indices[count] = index;
428 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
431 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
432 nary_tree_bcast(buf, count, datatype, root, comm, 4);
435 void smpi_mpi_barrier(MPI_Comm comm)
437 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
438 nary_tree_barrier(comm, 4);
441 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
442 void *recvbuf, int recvcount, MPI_Datatype recvtype,
443 int root, MPI_Comm comm)
445 int system_tag = 666;
446 int rank, size, src, index;
447 MPI_Aint lb = 0, recvext = 0;
448 MPI_Request *requests;
450 rank = smpi_comm_rank(comm);
451 size = smpi_comm_size(comm);
453 // Send buffer to root
454 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
456 // FIXME: check for errors
457 smpi_datatype_extent(recvtype, &lb, &recvext);
458 // Local copy from root
459 smpi_datatype_copy(sendbuf, sendcount, sendtype,
460 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
461 // Receive buffers from senders
462 requests = xbt_new(MPI_Request, size - 1);
464 for(src = 0; src < size; src++) {
466 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
468 src, system_tag, comm);
472 // Wait for completion of irecv's.
473 smpi_mpi_startall(size - 1, requests);
474 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
479 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
480 void *recvbuf, int *recvcounts, int *displs,
481 MPI_Datatype recvtype, int root, MPI_Comm comm)
483 int system_tag = 666;
484 int rank, size, src, index;
485 MPI_Aint lb = 0, recvext = 0;
486 MPI_Request *requests;
488 rank = smpi_comm_rank(comm);
489 size = smpi_comm_size(comm);
491 // Send buffer to root
492 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
494 // FIXME: check for errors
495 smpi_datatype_extent(recvtype, &lb, &recvext);
496 // Local copy from root
497 smpi_datatype_copy(sendbuf, sendcount, sendtype,
498 (char *)recvbuf + displs[root] * recvext,
499 recvcounts[root], recvtype);
500 // Receive buffers from senders
501 requests = xbt_new(MPI_Request, size - 1);
503 for(src = 0; src < size; src++) {
506 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
507 recvcounts[src], recvtype, src, system_tag, comm);
511 // Wait for completion of irecv's.
512 smpi_mpi_startall(size - 1, requests);
513 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
518 void smpi_mpi_allgather(void *sendbuf, int sendcount,
519 MPI_Datatype sendtype, void *recvbuf,
520 int recvcount, MPI_Datatype recvtype,
523 int system_tag = 666;
524 int rank, size, other, index;
525 MPI_Aint lb = 0, recvext = 0;
526 MPI_Request *requests;
528 rank = smpi_comm_rank(comm);
529 size = smpi_comm_size(comm);
530 // FIXME: check for errors
531 smpi_datatype_extent(recvtype, &lb, &recvext);
532 // Local copy from self
533 smpi_datatype_copy(sendbuf, sendcount, sendtype,
534 (char *)recvbuf + rank * recvcount * recvext, recvcount,
536 // Send/Recv buffers to/from others;
537 requests = xbt_new(MPI_Request, 2 * (size - 1));
539 for(other = 0; other < size; other++) {
542 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
545 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
546 recvcount, recvtype, other,
551 // Wait for completion of all comms.
552 smpi_mpi_startall(2 * (size - 1), requests);
553 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
557 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
558 MPI_Datatype sendtype, void *recvbuf,
559 int *recvcounts, int *displs,
560 MPI_Datatype recvtype, MPI_Comm comm)
562 int system_tag = 666;
563 int rank, size, other, index;
564 MPI_Aint lb = 0, recvext = 0;
565 MPI_Request *requests;
567 rank = smpi_comm_rank(comm);
568 size = smpi_comm_size(comm);
569 // FIXME: check for errors
570 smpi_datatype_extent(recvtype, &lb, &recvext);
571 // Local copy from self
572 smpi_datatype_copy(sendbuf, sendcount, sendtype,
573 (char *)recvbuf + displs[rank] * recvext,
574 recvcounts[rank], recvtype);
575 // Send buffers to others;
576 requests = xbt_new(MPI_Request, 2 * (size - 1));
578 for(other = 0; other < size; other++) {
581 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
585 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
586 recvtype, other, system_tag, comm);
590 // Wait for completion of all comms.
591 smpi_mpi_startall(2 * (size - 1), requests);
592 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
596 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
597 void *recvbuf, int recvcount, MPI_Datatype recvtype,
598 int root, MPI_Comm comm)
600 int system_tag = 666;
601 int rank, size, dst, index;
602 MPI_Aint lb = 0, sendext = 0;
603 MPI_Request *requests;
605 rank = smpi_comm_rank(comm);
606 size = smpi_comm_size(comm);
608 // Recv buffer from root
609 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
612 // FIXME: check for errors
613 smpi_datatype_extent(sendtype, &lb, &sendext);
614 // Local copy from root
615 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
616 sendcount, sendtype, recvbuf, recvcount, recvtype);
617 // Send buffers to receivers
618 requests = xbt_new(MPI_Request, size - 1);
620 for(dst = 0; dst < size; dst++) {
622 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
623 sendcount, sendtype, dst,
628 // Wait for completion of isend's.
629 smpi_mpi_startall(size - 1, requests);
630 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
635 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
636 MPI_Datatype sendtype, void *recvbuf, int recvcount,
637 MPI_Datatype recvtype, int root, MPI_Comm comm)
639 int system_tag = 666;
640 int rank, size, dst, index;
641 MPI_Aint lb = 0, sendext = 0;
642 MPI_Request *requests;
644 rank = smpi_comm_rank(comm);
645 size = smpi_comm_size(comm);
647 // Recv buffer from root
648 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
651 // FIXME: check for errors
652 smpi_datatype_extent(sendtype, &lb, &sendext);
653 // Local copy from root
654 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
655 sendtype, recvbuf, recvcount, recvtype);
656 // Send buffers to receivers
657 requests = xbt_new(MPI_Request, size - 1);
659 for(dst = 0; dst < size; dst++) {
662 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
663 sendtype, dst, system_tag, comm);
667 // Wait for completion of isend's.
668 smpi_mpi_startall(size - 1, requests);
669 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
674 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
675 MPI_Datatype datatype, MPI_Op op, int root,
678 int system_tag = 666;
679 int rank, size, src, index;
680 MPI_Aint lb = 0, dataext = 0;
681 MPI_Request *requests;
684 rank = smpi_comm_rank(comm);
685 size = smpi_comm_size(comm);
687 // Send buffer to root
688 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
690 // FIXME: check for errors
691 smpi_datatype_extent(datatype, &lb, &dataext);
692 // Local copy from root
693 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
694 // Receive buffers from senders
695 //TODO: make a MPI_barrier here ?
696 requests = xbt_new(MPI_Request, size - 1);
697 tmpbufs = xbt_new(void *, size - 1);
699 for(src = 0; src < size; src++) {
701 // FIXME: possibly overkill we we have contiguous/noncontiguous data
703 tmpbufs[index] = xbt_malloc(count * dataext);
705 smpi_irecv_init(tmpbufs[index], count, datatype, src,
710 // Wait for completion of irecv's.
711 smpi_mpi_startall(size - 1, requests);
712 for(src = 0; src < size - 1; src++) {
713 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
714 XBT_VERB("finished waiting any request with index %d", index);
715 if(index == MPI_UNDEFINED) {
718 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
720 for(index = 0; index < size - 1; index++) {
721 xbt_free(tmpbufs[index]);
728 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
729 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
731 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
732 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
735 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
736 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
738 int system_tag = 666;
739 int rank, size, other, index;
740 MPI_Aint lb = 0, dataext = 0;
741 MPI_Request *requests;
744 rank = smpi_comm_rank(comm);
745 size = smpi_comm_size(comm);
747 // FIXME: check for errors
748 smpi_datatype_extent(datatype, &lb, &dataext);
750 // Local copy from self
751 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
753 // Send/Recv buffers to/from others;
754 requests = xbt_new(MPI_Request, size - 1);
755 tmpbufs = xbt_new(void *, rank);
757 for(other = 0; other < rank; other++) {
758 // FIXME: possibly overkill we we have contiguous/noncontiguous data
760 tmpbufs[index] = xbt_malloc(count * dataext);
762 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
766 for(other = rank + 1; other < size; other++) {
768 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
771 // Wait for completion of all comms.
772 smpi_mpi_startall(size - 1, requests);
773 for(other = 0; other < size - 1; other++) {
774 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
775 if(index == MPI_UNDEFINED) {
779 // #Request is below rank: it's a irecv
780 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
783 for(index = 0; index < rank; index++) {
784 xbt_free(tmpbufs[index]);