XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
+extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
+
+
static int match_recv(void* a, void* b, smx_activity_t ignored) {
MPI_Request ref = static_cast<MPI_Request>(a);
MPI_Request req = static_cast<MPI_Request>(b);
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
request->real_size=request->size;
request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf, &request->real_size, &match_recv,
- ! smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+ ! smpi_process_get_replaying()? smpi_comm_copy_data_callback
: &smpi_comm_null_copy_buffer_callback, request, -1.0);
XBT_DEBUG("recv simcall posted");
request->action = simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0,
buf, request->real_size, &match_send,
&xbt_free_f, // how to free the userdata if a detached send fails
- !smpi_process_get_replaying() ? &smpi_comm_copy_buffer_callback
+ !smpi_process_get_replaying() ? smpi_comm_copy_data_callback
: &smpi_comm_null_copy_buffer_callback, request,
// detach if msg size < eager/rdv switch limit
request->detached);
*request = MPI_REQUEST_NULL;
}
-static int sort_accumulates(const void* pa, const void* pb)
+static int sort_accumulates(MPI_Request a, MPI_Request b)
{
- return (*static_cast<MPI_Request const*>(pa))->tag>
- (*static_cast<MPI_Request const*>(pb))->tag;
+ return (a->tag < b->tag);
}
int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
xbt_dynar_push(&comms, &requests[i]->action);
map[size] = i;
size++;
- }else{
- //This is a finished detached request, let's return this one
- size=0;//so we free the dynar but don't do the waitany call
- index=i;
- finish_wait(&requests[i], status);//cleanup if refcount = 0
- if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
- requests[i]=MPI_REQUEST_NULL;//set to null
- break;
- }
+ } else {
+ // This is a finished detached request, let's return this one
+ size = 0; // so we free the dynar but don't do the waitany call
+ index = i;
+ finish_wait(&requests[i], status); // cleanup if refcount = 0
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+ requests[i] = MPI_REQUEST_NULL; // set to null
+ break;
+ }
}
}
if(size > 0) {
int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
{
- s_xbt_dynar_t accumulates;
- int index, c;
+ std::vector<MPI_Request> accumulates;
+ int index;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
int retvalue = MPI_SUCCESS;
//tag invalid requests in the set
if (status != MPI_STATUSES_IGNORE) {
- for (c = 0; c < count; c++) {
+ for (int c = 0; c < count; c++) {
if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL || (requests[c]->flags & PREPARED)) {
smpi_empty_status(&status[c]);
} else if (requests[c]->src == MPI_PROC_NULL) {
}
}
}
- xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr);
- for(c = 0; c < count; c++) {
-
+ for (int c = 0; c < count; c++) {
if (MC_is_active() || MC_record_replay_is_active()) {
smpi_mpi_wait(&requests[c], pstat);
index = c;
if (requests[index] != MPI_REQUEST_NULL
&& (requests[index]->flags & RECV)
&& (requests[index]->flags & ACCUMULATE))
- xbt_dynar_push(&accumulates, &requests[index]);
+ accumulates.push_back(requests[index]);
if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
- requests[index]=MPI_REQUEST_NULL;
-
+ requests[index] = MPI_REQUEST_NULL;
}
if (status != MPI_STATUSES_IGNORE) {
status[index] = *pstat;
}
}
- if(!xbt_dynar_is_empty(&accumulates)){
- xbt_dynar_sort(&accumulates, sort_accumulates);
- MPI_Request req;
- unsigned int cursor;
- xbt_dynar_foreach(&accumulates, cursor, req) {
+ if (!accumulates.empty()) {
+ std::sort(accumulates.begin(), accumulates.end(), sort_accumulates);
+ for (auto req : accumulates) {
finish_wait(&req, status);
}
}
- xbt_dynar_free_data(&accumulates);
return retvalue;
}
void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
int system_tag = -888;
- MPI_Aint lb = 0, dataext = 0;
+ MPI_Aint lb = 0;
+ MPI_Aint dataext = 0;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
int system_tag = -888;
- MPI_Aint lb = 0, dataext = 0;
+ MPI_Aint lb = 0;
+ MPI_Aint dataext = 0;
int recvbuf_is_empty=1;
int rank = smpi_comm_rank(comm);
int size = smpi_comm_size(comm);
}
// Wait for completion of all comms.
smpi_mpi_startall(size - 1, requests);
+
if(smpi_op_is_commute(op)){
for (int other = 0; other < size - 1; other++) {
index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
for (int other = 0; other < size - 1; other++) {
smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
if(index < rank) {
- if(recvbuf_is_empty){
- smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
- recvbuf_is_empty=0;
- } else
- smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
+ if (recvbuf_is_empty) {
+ smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
+ recvbuf_is_empty = 0;
+ } else
+ smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
}
}
}