*request = MPI_REQUEST_NULL;
}
+static int sort_accumulates(const void* pa, const void* pb)
+{
+ return (*static_cast<MPI_Request const*>(pa))->tag>
+ (*static_cast<MPI_Request const*>(pb))->tag;
+}
+
int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
{
s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
// not MPI_UNDEFINED, as this is a simix return code
if (i != -1) {
index = map[i];
- finish_wait(&requests[index], status);
- if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
- requests[index] = MPI_REQUEST_NULL;
+ //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
+ if ((requests[index] == MPI_REQUEST_NULL)
+ || (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){
+ finish_wait(&requests[index], status);
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+ requests[index] = MPI_REQUEST_NULL;
+ }else{
+ XBT_WARN("huu?");
+ }
}
}
+
xbt_dynar_free_data(&comms);
xbt_free(map);
}
int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
{
+ s_xbt_dynar_t accumulates;
int index, c;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
}
}
}
+ xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr);
for(c = 0; c < count; c++) {
if (MC_is_active() || MC_record_replay_is_active()) {
index = smpi_mpi_waitany(count, requests, pstat);
if (index == MPI_UNDEFINED)
break;
+
+ if (requests[index] != MPI_REQUEST_NULL
+ && (requests[index]->flags & RECV)
+ && (requests[index]->flags & ACCUMULATE))
+ xbt_dynar_push(&accumulates, &requests[index]);
if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
requests[index]=MPI_REQUEST_NULL;
+
}
if (status != MPI_STATUSES_IGNORE) {
status[index] = *pstat;
}
}
+ if(!xbt_dynar_is_empty(&accumulates)){
+ xbt_dynar_sort(&accumulates, sort_accumulates);
+ MPI_Request req;
+ unsigned int cursor;
+ xbt_dynar_foreach(&accumulates, cursor, req) {
+ finish_wait(&req, status);
+ }
+ }
+ xbt_dynar_free_data(&accumulates);
+
return retvalue;
}
char* name;
int opened;
MPI_Group group;
+ int count; //for ordering the accs
} s_smpi_mpi_win_t;
win->requests = new std::vector<MPI_Request>();
win->connected_wins = xbt_new0(MPI_Win, comm_size);
win->connected_wins[rank] = win;
-
+ win->count = 0;
if(rank==0){
win->bar = MSG_barrier_init(comm_size);
}
MPI_Request* treqs = &(*reqs)[0];
smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+ win->count=0;
}
win->assert = assert;
void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
-
+ //As the tag will be used for ordering of the operations, add count to it
//prepare send_request
MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
- smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, win->comm, op);
+ smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
//prepare receiver request
MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
- smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, recv_win->comm, op);
+ smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
+
+ win->count++;
//push request to receiver's win
recv_win->requests->push_back(rreq);
//start send