1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
14 "Logging specific to SMPI (base)");
16 static int match_recv(void* a, void* b, smx_action_t ignored) {
17 MPI_Request ref = (MPI_Request)a;
18 MPI_Request req = (MPI_Request)b;
20 xbt_assert(ref, "Cannot match recv against null reference");
21 xbt_assert(req, "Cannot match recv against null request");
22 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
23 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
26 static int match_send(void* a, void* b,smx_action_t ignored) {
27 MPI_Request ref = (MPI_Request)a;
28 MPI_Request req = (MPI_Request)b;
30 xbt_assert(ref, "Cannot match send against null reference");
31 xbt_assert(req, "Cannot match send against null request");
32 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
33 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
36 static MPI_Request build_request(void *buf, int count,
37 MPI_Datatype datatype, int src, int dst,
38 int tag, MPI_Comm comm, unsigned flags)
42 request = xbt_new(s_smpi_mpi_request_t, 1);
44 // FIXME: this will have to be changed to support non-contiguous datatypes
45 request->size = smpi_datatype_size(datatype) * count;
50 request->action = NULL;
51 request->flags = flags;
59 void smpi_action_trace_run(char *path)
63 xbt_dict_cursor_t cursor;
67 action_fp = fopen(path, "r");
68 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
72 if (!xbt_dict_is_empty(action_queues)) {
74 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
77 xbt_dict_foreach(action_queues, cursor, name, todo) {
78 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
84 xbt_dict_free(&action_queues);
85 action_queues = xbt_dict_new_homogeneous(NULL);
88 static void smpi_mpi_request_free_voidp(void* request)
90 MPI_Request req = request;
91 smpi_mpi_request_free(&req);
94 /* MPI Low level calls */
95 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
96 int dst, int tag, MPI_Comm comm)
99 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
100 comm, PERSISTENT | SEND);
105 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
106 int src, int tag, MPI_Comm comm)
108 MPI_Request request =
109 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
110 comm, PERSISTENT | RECV);
115 void smpi_mpi_start(MPI_Request request)
120 xbt_assert(!request->action,
121 "Cannot (re)start a non-finished communication");
122 if(request->flags & RECV) {
123 print_request("New recv", request);
124 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
125 mailbox = smpi_process_mailbox_small();
127 mailbox = smpi_process_mailbox();
129 // FIXME: SIMIX does not yet support non-contiguous datatypes
130 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
132 print_request("New send", request);
134 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
135 mailbox = smpi_process_remote_mailbox_small(
136 smpi_group_index(smpi_comm_group(request->comm), request->dst));
138 XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
139 mailbox = smpi_process_remote_mailbox(
140 smpi_group_index(smpi_comm_group(request->comm), request->dst));
142 if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
143 void *oldbuf = request->buf;
145 request->buf = malloc(request->size);
146 memcpy(request->buf,oldbuf,request->size);
147 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
149 XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
150 mailbox = smpi_process_remote_mailbox(
151 smpi_group_index(smpi_comm_group(request->comm), request->dst));
155 simcall_comm_isend(mailbox, request->size, -1.0,
156 request->buf, request->size,
158 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
160 // detach if msg size < eager/rdv switch limit
164 /* FIXME: detached sends are not traceable (request->action == NULL) */
166 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
173 void smpi_mpi_startall(int count, MPI_Request * requests)
177 for(i = 0; i < count; i++) {
178 smpi_mpi_start(requests[i]);
182 void smpi_mpi_request_free(MPI_Request * request)
185 *request = MPI_REQUEST_NULL;
188 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
189 int dst, int tag, MPI_Comm comm)
191 MPI_Request request =
192 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
193 comm, NON_PERSISTENT | SEND);
198 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
199 int dst, int tag, MPI_Comm comm)
201 MPI_Request request =
202 smpi_isend_init(buf, count, datatype, dst, tag, comm);
204 smpi_mpi_start(request);
208 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
209 int src, int tag, MPI_Comm comm)
211 MPI_Request request =
212 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
213 comm, NON_PERSISTENT | RECV);
218 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
219 int src, int tag, MPI_Comm comm)
221 MPI_Request request =
222 smpi_irecv_init(buf, count, datatype, src, tag, comm);
224 smpi_mpi_start(request);
228 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
229 int tag, MPI_Comm comm, MPI_Status * status)
233 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
234 smpi_mpi_wait(&request, status);
239 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
240 int tag, MPI_Comm comm)
244 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
245 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
248 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
249 int dst, int sendtag, void *recvbuf, int recvcount,
250 MPI_Datatype recvtype, int src, int recvtag,
251 MPI_Comm comm, MPI_Status * status)
253 MPI_Request requests[2];
257 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
259 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
260 smpi_mpi_startall(2, requests);
261 smpi_mpi_waitall(2, requests, stats);
262 if(status != MPI_STATUS_IGNORE) {
263 // Copy receive status
264 memcpy(status, &stats[1], sizeof(MPI_Status));
268 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
270 return status->count / smpi_datatype_size(datatype);
273 static void finish_wait(MPI_Request * request, MPI_Status * status)
275 MPI_Request req = *request;
277 if(status != MPI_STATUS_IGNORE) {
278 status->MPI_SOURCE = req->src;
279 status->MPI_TAG = req->tag;
280 status->MPI_ERROR = MPI_SUCCESS;
281 // FIXME: really this should just contain the count of receive-type blocks,
283 status->count = req->size;
285 print_request("Finishing", req);
286 if(req->flags & NON_PERSISTENT) {
287 smpi_mpi_request_free(request);
293 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
296 if ((*request)->action == NULL)
299 flag = simcall_comm_test((*request)->action);
301 smpi_mpi_wait(request, status);
306 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
313 *index = MPI_UNDEFINED;
316 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
317 map = xbt_new(int, count);
319 for(i = 0; i < count; i++) {
320 if(requests[i]->action) {
321 xbt_dynar_push(comms, &requests[i]->action);
327 i = simcall_comm_testany(comms);
328 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
329 if(i != MPI_UNDEFINED) {
331 smpi_mpi_wait(&requests[*index], status);
336 xbt_dynar_free(&comms);
341 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
343 print_request("Waiting", *request);
344 if ((*request)->action != NULL) { // this is not a detached send
345 simcall_comm_wait((*request)->action, -1.0);
346 finish_wait(request, status);
348 // FIXME for a detached send, finish_wait is not called:
351 int smpi_mpi_waitany(int count, MPI_Request requests[],
358 index = MPI_UNDEFINED;
360 // Wait for a request to complete
361 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
362 map = xbt_new(int, count);
364 XBT_DEBUG("Wait for one of");
365 for(i = 0; i < count; i++) {
366 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
367 print_request("Waiting any ", requests[i]);
368 xbt_dynar_push(comms, &requests[i]->action);
374 i = simcall_comm_waitany(comms);
376 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
377 if (i != MPI_UNDEFINED) {
379 finish_wait(&requests[index], status);
384 xbt_dynar_free(&comms);
389 void smpi_mpi_waitall(int count, MPI_Request requests[],
394 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
396 for(c = 0; c < count; c++) {
398 smpi_mpi_wait(&requests[c], pstat);
401 index = smpi_mpi_waitany(count, requests, pstat);
402 if(index == MPI_UNDEFINED) {
406 if(status != MPI_STATUS_IGNORE) {
407 memcpy(&status[index], pstat, sizeof(*pstat));
412 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
418 for(i = 0; i < incount; i++) {
419 if(smpi_mpi_testany(incount, requests, &index, status)) {
420 indices[count] = index;
427 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
430 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
431 nary_tree_bcast(buf, count, datatype, root, comm, 4);
434 void smpi_mpi_barrier(MPI_Comm comm)
436 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
437 nary_tree_barrier(comm, 4);
440 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
441 void *recvbuf, int recvcount, MPI_Datatype recvtype,
442 int root, MPI_Comm comm)
444 int system_tag = 666;
445 int rank, size, src, index;
446 MPI_Aint lb = 0, recvext = 0;
447 MPI_Request *requests;
449 rank = smpi_comm_rank(comm);
450 size = smpi_comm_size(comm);
452 // Send buffer to root
453 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
455 // FIXME: check for errors
456 smpi_datatype_extent(recvtype, &lb, &recvext);
457 // Local copy from root
458 smpi_datatype_copy(sendbuf, sendcount, sendtype,
459 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
460 // Receive buffers from senders
461 requests = xbt_new(MPI_Request, size - 1);
463 for(src = 0; src < size; src++) {
465 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
467 src, system_tag, comm);
471 // Wait for completion of irecv's.
472 smpi_mpi_startall(size - 1, requests);
473 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
478 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
479 void *recvbuf, int *recvcounts, int *displs,
480 MPI_Datatype recvtype, int root, MPI_Comm comm)
482 int system_tag = 666;
483 int rank, size, src, index;
484 MPI_Aint lb = 0, recvext = 0;
485 MPI_Request *requests;
487 rank = smpi_comm_rank(comm);
488 size = smpi_comm_size(comm);
490 // Send buffer to root
491 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
493 // FIXME: check for errors
494 smpi_datatype_extent(recvtype, &lb, &recvext);
495 // Local copy from root
496 smpi_datatype_copy(sendbuf, sendcount, sendtype,
497 (char *)recvbuf + displs[root] * recvext,
498 recvcounts[root], recvtype);
499 // Receive buffers from senders
500 requests = xbt_new(MPI_Request, size - 1);
502 for(src = 0; src < size; src++) {
505 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
506 recvcounts[src], recvtype, src, system_tag, comm);
510 // Wait for completion of irecv's.
511 smpi_mpi_startall(size - 1, requests);
512 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
517 void smpi_mpi_allgather(void *sendbuf, int sendcount,
518 MPI_Datatype sendtype, void *recvbuf,
519 int recvcount, MPI_Datatype recvtype,
522 int system_tag = 666;
523 int rank, size, other, index;
524 MPI_Aint lb = 0, recvext = 0;
525 MPI_Request *requests;
527 rank = smpi_comm_rank(comm);
528 size = smpi_comm_size(comm);
529 // FIXME: check for errors
530 smpi_datatype_extent(recvtype, &lb, &recvext);
531 // Local copy from self
532 smpi_datatype_copy(sendbuf, sendcount, sendtype,
533 (char *)recvbuf + rank * recvcount * recvext, recvcount,
535 // Send/Recv buffers to/from others;
536 requests = xbt_new(MPI_Request, 2 * (size - 1));
538 for(other = 0; other < size; other++) {
541 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
544 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
545 recvcount, recvtype, other,
550 // Wait for completion of all comms.
551 smpi_mpi_startall(2 * (size - 1), requests);
552 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
556 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
557 MPI_Datatype sendtype, void *recvbuf,
558 int *recvcounts, int *displs,
559 MPI_Datatype recvtype, MPI_Comm comm)
561 int system_tag = 666;
562 int rank, size, other, index;
563 MPI_Aint lb = 0, recvext = 0;
564 MPI_Request *requests;
566 rank = smpi_comm_rank(comm);
567 size = smpi_comm_size(comm);
568 // FIXME: check for errors
569 smpi_datatype_extent(recvtype, &lb, &recvext);
570 // Local copy from self
571 smpi_datatype_copy(sendbuf, sendcount, sendtype,
572 (char *)recvbuf + displs[rank] * recvext,
573 recvcounts[rank], recvtype);
574 // Send buffers to others;
575 requests = xbt_new(MPI_Request, 2 * (size - 1));
577 for(other = 0; other < size; other++) {
580 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
584 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
585 recvtype, other, system_tag, comm);
589 // Wait for completion of all comms.
590 smpi_mpi_startall(2 * (size - 1), requests);
591 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
595 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
596 void *recvbuf, int recvcount, MPI_Datatype recvtype,
597 int root, MPI_Comm comm)
599 int system_tag = 666;
600 int rank, size, dst, index;
601 MPI_Aint lb = 0, sendext = 0;
602 MPI_Request *requests;
604 rank = smpi_comm_rank(comm);
605 size = smpi_comm_size(comm);
607 // Recv buffer from root
608 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
611 // FIXME: check for errors
612 smpi_datatype_extent(sendtype, &lb, &sendext);
613 // Local copy from root
614 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
615 sendcount, sendtype, recvbuf, recvcount, recvtype);
616 // Send buffers to receivers
617 requests = xbt_new(MPI_Request, size - 1);
619 for(dst = 0; dst < size; dst++) {
621 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
622 sendcount, sendtype, dst,
627 // Wait for completion of isend's.
628 smpi_mpi_startall(size - 1, requests);
629 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
634 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
635 MPI_Datatype sendtype, void *recvbuf, int recvcount,
636 MPI_Datatype recvtype, int root, MPI_Comm comm)
638 int system_tag = 666;
639 int rank, size, dst, index;
640 MPI_Aint lb = 0, sendext = 0;
641 MPI_Request *requests;
643 rank = smpi_comm_rank(comm);
644 size = smpi_comm_size(comm);
646 // Recv buffer from root
647 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
650 // FIXME: check for errors
651 smpi_datatype_extent(sendtype, &lb, &sendext);
652 // Local copy from root
653 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
654 sendtype, recvbuf, recvcount, recvtype);
655 // Send buffers to receivers
656 requests = xbt_new(MPI_Request, size - 1);
658 for(dst = 0; dst < size; dst++) {
661 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
662 sendtype, dst, system_tag, comm);
666 // Wait for completion of isend's.
667 smpi_mpi_startall(size - 1, requests);
668 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
673 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
674 MPI_Datatype datatype, MPI_Op op, int root,
677 int system_tag = 666;
678 int rank, size, src, index;
679 MPI_Aint lb = 0, dataext = 0;
680 MPI_Request *requests;
683 rank = smpi_comm_rank(comm);
684 size = smpi_comm_size(comm);
686 // Send buffer to root
687 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
689 // FIXME: check for errors
690 smpi_datatype_extent(datatype, &lb, &dataext);
691 // Local copy from root
692 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
693 // Receive buffers from senders
694 //TODO: make a MPI_barrier here ?
695 requests = xbt_new(MPI_Request, size - 1);
696 tmpbufs = xbt_new(void *, size - 1);
698 for(src = 0; src < size; src++) {
700 // FIXME: possibly overkill we we have contiguous/noncontiguous data
702 tmpbufs[index] = xbt_malloc(count * dataext);
704 smpi_irecv_init(tmpbufs[index], count, datatype, src,
709 // Wait for completion of irecv's.
710 smpi_mpi_startall(size - 1, requests);
711 for(src = 0; src < size - 1; src++) {
712 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
713 XBT_VERB("finished waiting any request with index %d", index);
714 if(index == MPI_UNDEFINED) {
717 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
719 for(index = 0; index < size - 1; index++) {
720 xbt_free(tmpbufs[index]);
727 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
728 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
730 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
731 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
734 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
735 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
737 int system_tag = 666;
738 int rank, size, other, index;
739 MPI_Aint lb = 0, dataext = 0;
740 MPI_Request *requests;
743 rank = smpi_comm_rank(comm);
744 size = smpi_comm_size(comm);
746 // FIXME: check for errors
747 smpi_datatype_extent(datatype, &lb, &dataext);
749 // Local copy from self
750 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
752 // Send/Recv buffers to/from others;
753 requests = xbt_new(MPI_Request, size - 1);
754 tmpbufs = xbt_new(void *, rank);
756 for(other = 0; other < rank; other++) {
757 // FIXME: possibly overkill we we have contiguous/noncontiguous data
759 tmpbufs[index] = xbt_malloc(count * dataext);
761 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
765 for(other = rank + 1; other < size; other++) {
767 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
770 // Wait for completion of all comms.
771 smpi_mpi_startall(size - 1, requests);
772 for(other = 0; other < size - 1; other++) {
773 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
774 if(index == MPI_UNDEFINED) {
778 // #Request is below rank: it's a irecv
779 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
782 for(index = 0; index < rank; index++) {
783 xbt_free(tmpbufs[index]);