int rank = request->src;
if (TRACE_smpi_view_internals()) {
- TRACE_smpi_send(rank, rank, receiver,request->size);
+ TRACE_smpi_send(rank, rank, receiver, request->tag, request->size);
}
print_request("New send", request);
if (TRACE_smpi_view_internals() && ((req->flags & RECV) != 0)){
int rank = smpi_process_index();
int src_traced = (req->src == MPI_ANY_SOURCE ? req->real_src : req->src);
- TRACE_smpi_recv(rank, src_traced, rank);
+ TRACE_smpi_recv(rank, src_traced, rank,req->tag);
}
if(req->detached_sender != nullptr){
nsleeps=1;//reset the number of sleeps we will do next time
if (*request != MPI_REQUEST_NULL && ((*request)->flags & PERSISTENT)==0)
*request = MPI_REQUEST_NULL;
- }else{
+ } else if (xbt_cfg_get_boolean("smpi/grow-injected-times")){
nsleeps++;
}
}
}
else {
*flag = 0;
- nsleeps++;
+ if (xbt_cfg_get_boolean("smpi/grow-injected-times"))
+ nsleeps++;
}
smpi_mpi_request_free(&request);
-
- return;
}
void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
*request = MPI_REQUEST_NULL;
}
+static int sort_accumulates(const void* pa, const void* pb)
+{
+ return (*static_cast<MPI_Request const*>(pa))->tag>
+ (*static_cast<MPI_Request const*>(pb))->tag;
+}
+
int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
{
- xbt_dynar_t comms;
+ s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
int i;
int size = 0;
int index = MPI_UNDEFINED;
if(count > 0) {
// Wait for a request to complete
- comms = xbt_dynar_new(sizeof(smx_activity_t), nullptr);
+ xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr);
map = xbt_new(int, count);
XBT_DEBUG("Wait for one of %d", count);
for(i = 0; i < count; i++) {
if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED) && !(requests[i]->flags & FINISHED)) {
if (requests[i]->action != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
- xbt_dynar_push(comms, &requests[i]->action);
+ xbt_dynar_push(&comms, &requests[i]->action);
map[size] = i;
size++;
}else{
}
}
if(size > 0) {
- i = simcall_comm_waitany(comms, -1);
+ i = simcall_comm_waitany(&comms, -1);
// not MPI_UNDEFINED, as this is a simix return code
if (i != -1) {
index = map[i];
- finish_wait(&requests[index], status);
- if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
- requests[index] = MPI_REQUEST_NULL;
+ //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
+ if ((requests[index] == MPI_REQUEST_NULL)
+ || (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){
+ finish_wait(&requests[index], status);
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+ requests[index] = MPI_REQUEST_NULL;
+ }else{
+ XBT_WARN("huu?");
+ }
}
}
+
+ xbt_dynar_free_data(&comms);
xbt_free(map);
- xbt_dynar_free(&comms);
}
if (index==MPI_UNDEFINED)
int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
{
+ s_xbt_dynar_t accumulates;
int index, c;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
}
}
}
+ xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr);
for(c = 0; c < count; c++) {
if (MC_is_active() || MC_record_replay_is_active()) {
index = smpi_mpi_waitany(count, requests, pstat);
if (index == MPI_UNDEFINED)
break;
+
+ if (requests[index] != MPI_REQUEST_NULL
+ && (requests[index]->flags & RECV)
+ && (requests[index]->flags & ACCUMULATE))
+ xbt_dynar_push(&accumulates, &requests[index]);
if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
requests[index]=MPI_REQUEST_NULL;
+
}
if (status != MPI_STATUSES_IGNORE) {
status[index] = *pstat;
}
}
+ if(!xbt_dynar_is_empty(&accumulates)){
+ xbt_dynar_sort(&accumulates, sort_accumulates);
+ MPI_Request req;
+ unsigned int cursor;
+ xbt_dynar_foreach(&accumulates, cursor, req) {
+ finish_wait(&req, status);
+ }
+ }
+ xbt_dynar_free_data(&accumulates);
+
return retvalue;
}
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
{
int system_tag = COLL_TAG_GATHER;
- int src, index;
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
- MPI_Request *requests;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
recvcount, recvtype);
// Receive buffers from senders
- requests = xbt_new(MPI_Request, size - 1);
- index = 0;
- for(src = 0; src < size; src++) {
+ MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+ int index = 0;
+ for (int src = 0; src < size; src++) {
if(src != root) {
requests[index] = smpi_irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
src, system_tag, comm);
// Wait for completion of irecv's.
smpi_mpi_startall(size - 1, requests);
smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for(src = 0; src < size-1; src++) {
+ for (int src = 0; src < size-1; src++) {
smpi_mpi_request_free(&requests[src]);
}
xbt_free(requests);
void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
MPI_Comm comm)
{
- int i;
- int *displs;
int rank = smpi_comm_rank(comm);
- void *tmpbuf;
/* arbitrarily choose root as rank 0 */
int size = smpi_comm_size(comm);
int count = 0;
- displs = xbt_new(int, size);
- for (i = 0; i < size; i++) {
+ int *displs = xbt_new(int, size);
+ for (int i = 0; i < size; i++) {
displs[i] = count;
count += recvcounts[i];
}
- tmpbuf=static_cast<void*>(smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype)));
+ void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype)));
mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
MPI_Datatype recvtype, int root, MPI_Comm comm)
{
int system_tag = COLL_TAG_GATHERV;
- int src, index;
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
- MPI_Request *requests;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
- if(rank != root) {
+ if (rank != root) {
// Send buffer to root
smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
} else {
smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
recvcounts[root], recvtype);
// Receive buffers from senders
- requests = xbt_new(MPI_Request, size - 1);
- index = 0;
- for(src = 0; src < size; src++) {
+ MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+ int index = 0;
+ for (int src = 0; src < size; src++) {
if(src != root) {
requests[index] = smpi_irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
recvcounts[src], recvtype, src, system_tag, comm);
// Wait for completion of irecv's.
smpi_mpi_startall(size - 1, requests);
smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for(src = 0; src < size-1; src++) {
+ for (int src = 0; src < size-1; src++) {
smpi_mpi_request_free(&requests[src]);
}
xbt_free(requests);
void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
{
int system_tag = COLL_TAG_ALLGATHER;
- int other, index;
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
MPI_Request *requests;
recvtype);
// Send/Recv buffers to/from others;
requests = xbt_new(MPI_Request, 2 * (size - 1));
- index = 0;
- for(other = 0; other < size; other++) {
+ int index = 0;
+ for (int other = 0; other < size; other++) {
if(other != rank) {
requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
index++;
// Wait for completion of all comms.
smpi_mpi_startall(2 * (size - 1), requests);
smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
- for(other = 0; other < 2*(size-1); other++) {
+ for (int other = 0; other < 2*(size-1); other++) {
smpi_mpi_request_free(&requests[other]);
}
xbt_free(requests);
int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
{
int system_tag = COLL_TAG_ALLGATHERV;
- int other, index;
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
- MPI_Request *requests;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
smpi_datatype_copy(sendbuf, sendcount, sendtype,
static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
// Send buffers to others;
- requests = xbt_new(MPI_Request, 2 * (size - 1));
- index = 0;
- for(other = 0; other < size; other++) {
+ MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
+ int index = 0;
+ for (int other = 0; other < size; other++) {
if(other != rank) {
requests[index] =
smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
// Wait for completion of all comms.
smpi_mpi_startall(2 * (size - 1), requests);
smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
- for(other = 0; other < 2*(size-1); other++) {
+ for (int other = 0; other < 2*(size-1); other++) {
smpi_mpi_request_free(&requests[other]);
}
xbt_free(requests);
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
{
int system_tag = COLL_TAG_SCATTER;
- int dst;
MPI_Aint lb = 0;
MPI_Aint sendext = 0;
MPI_Request *requests;
// Send buffers to receivers
requests = xbt_new(MPI_Request, size - 1);
int index = 0;
- for(dst = 0; dst < size; dst++) {
+ for(int dst = 0; dst < size; dst++) {
if(dst != root) {
requests[index] = smpi_isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
dst, system_tag, comm);
// Wait for completion of isend's.
smpi_mpi_startall(size - 1, requests);
smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for(dst = 0; dst < size-1; dst++) {
+ for (int dst = 0; dst < size-1; dst++) {
smpi_mpi_request_free(&requests[dst]);
}
xbt_free(requests);
MPI_Datatype recvtype, int root, MPI_Comm comm)
{
int system_tag = COLL_TAG_SCATTERV;
- int dst;
MPI_Aint lb = 0;
MPI_Aint sendext = 0;
- MPI_Request *requests;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
sendtype, recvbuf, recvcount, recvtype);
}
// Send buffers to receivers
- requests = xbt_new(MPI_Request, size - 1);
+ MPI_Request *requests = xbt_new(MPI_Request, size - 1);
int index = 0;
- for(dst = 0; dst < size; dst++) {
- if(dst != root) {
+ for (int dst = 0; dst < size; dst++) {
+ if (dst != root) {
requests[index] = smpi_isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
sendtype, dst, system_tag, comm);
index++;
// Wait for completion of isend's.
smpi_mpi_startall(size - 1, requests);
smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for(dst = 0; dst < size-1; dst++) {
+ for (int dst = 0; dst < size-1; dst++) {
smpi_mpi_request_free(&requests[dst]);
}
xbt_free(requests);
MPI_Comm comm)
{
int system_tag = COLL_TAG_REDUCE;
- int src, index;
MPI_Aint lb = 0;
MPI_Aint dataext = 0;
- MPI_Request *requests;
- void **tmpbufs;
char* sendtmpbuf = static_cast<char *>(sendbuf);
if (sendtmpbuf != nullptr && recvbuf != nullptr)
smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
// Receive buffers from senders
- requests = xbt_new(MPI_Request, size - 1);
- tmpbufs = xbt_new(void *, size - 1);
- index = 0;
- for(src = 0; src < size; src++) {
- if(src != root) {
+ MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+ void **tmpbufs = xbt_new(void *, size - 1);
+ int index = 0;
+ for (int src = 0; src < size; src++) {
+ if (src != root) {
if (!smpi_process_get_replaying())
tmpbufs[index] = xbt_malloc(count * dataext);
else
}
// Wait for completion of irecv's.
smpi_mpi_startall(size - 1, requests);
- for(src = 0; src < size - 1; src++) {
+ for (int src = 0; src < size - 1; src++) {
index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
XBT_DEBUG("finished waiting any request with index %d", index);
if(index == MPI_UNDEFINED) {
void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
int system_tag = -888;
- int other, index;
MPI_Aint lb = 0, dataext = 0;
- MPI_Request *requests;
- void **tmpbufs;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
// Send/Recv buffers to/from others;
- requests = xbt_new(MPI_Request, size - 1);
- tmpbufs = xbt_new(void *, rank);
- index = 0;
- for(other = 0; other < rank; other++) {
+ MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+ void **tmpbufs = xbt_new(void *, rank);
+ int index = 0;
+ for (int other = 0; other < rank; other++) {
tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
index++;
}
- for(other = rank + 1; other < size; other++) {
+ for (int other = rank + 1; other < size; other++) {
requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
index++;
}
smpi_mpi_startall(size - 1, requests);
if(smpi_op_is_commute(op)){
- for(other = 0; other < size - 1; other++) {
+ for (int other = 0; other < size - 1; other++) {
index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
if(index == MPI_UNDEFINED) {
break;
}
}else{
//non commutative case, wait in order
- for(other = 0; other < size - 1; other++) {
+ for (int other = 0; other < size - 1; other++) {
smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
if(index < rank) {
smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
int system_tag = -888;
- int other, index;
MPI_Aint lb = 0, dataext = 0;
- MPI_Request *requests;
- void **tmpbufs;
int recvbuf_is_empty=1;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
smpi_datatype_extent(datatype, &lb, &dataext);
// Send/Recv buffers to/from others;
- requests = xbt_new(MPI_Request, size - 1);
- tmpbufs = xbt_new(void *, rank);
- index = 0;
- for(other = 0; other < rank; other++) {
+ MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+ void **tmpbufs = xbt_new(void *, rank);
+ int index = 0;
+ for (int other = 0; other < rank; other++) {
tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
index++;
}
- for(other = rank + 1; other < size; other++) {
+ for (int other = rank + 1; other < size; other++) {
requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
index++;
}
// Wait for completion of all comms.
smpi_mpi_startall(size - 1, requests);
if(smpi_op_is_commute(op)){
- for(other = 0; other < size - 1; other++) {
+ for (int other = 0; other < size - 1; other++) {
index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
if(index == MPI_UNDEFINED) {
break;
}
}else{
//non commutative case, wait in order
- for(other = 0; other < size - 1; other++) {
+ for (int other = 0; other < size - 1; other++) {
smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
if(index < rank) {
if(recvbuf_is_empty){