sudo apt install simgrid pajeng make gcc g++ gfortran vite
For R analysis of the produced traces, you may want to install R,
-and the `pajengr<https://github.com/schnorr/pajengr#installation/>_ package.
+and the `pajengr<https://github.com/schnorr/pajengr#installation/>`_ package.
.. code-block:: shell
MPI_CALL(XBT_PUBLIC int, MPI_Bcast, (void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm));
MPI_CALL(XBT_PUBLIC int, MPI_Barrier, (MPI_Comm comm));
+MPI_CALL(XBT_PUBLIC int, MPI_Ibarrier, (MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Ibcast, (void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Igather, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
+ MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Igatherv, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iallgather, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iallgatherv, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iscatter, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iscatterv, (void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype,
+ void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Ireduce,
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iallreduce,
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iscan,
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Iexscan,
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Ireduce_scatter,
+ (void* sendbuf, void* recvbuf, int* recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Ireduce_scatter_block,
+ (void* sendbuf, void* recvbuf, int recvcount, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Ialltoall, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request));
+MPI_CALL(XBT_PUBLIC int, MPI_Ialltoallv,
+ (void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf, int* recvcounts,
+ int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request));
+
MPI_CALL(XBT_PUBLIC int, MPI_Gather, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
MPI_Datatype recvtype, int root, MPI_Comm comm));
MPI_CALL(XBT_PUBLIC int, MPI_Gatherv, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
tracing_file << stream_.str() << std::endl;
}
+StateEvent::~StateEvent(){
+ delete extra_;
+}
+
void StateEvent::print()
{
if (trace_format == simgrid::instr::TraceFormat::Paje) {
THROW_IMPOSSIBLE;
}
- delete extra_;
}
}
}
public:
StateEvent(Container* container, Type* type, e_event_type event_type, EntityValue* value, TIData* extra);
+ ~StateEvent();
void print() override;
};
/* internal do the instrumentation module */
void simgrid::instr::PajeEvent::insert_into_buffer()
{
- buffer_debug(&buffer);
-
XBT_DEBUG("%s: insert event_type=%u, timestamp=%f, buffersize=%zu)", __func__, eventType_, timestamp_, buffer.size());
std::vector<simgrid::instr::PajeEvent*>::reverse_iterator i;
for (i = buffer.rbegin(); i != buffer.rend(); ++i) {
else
XBT_DEBUG("%s: inserted at pos= %zd from its end", __func__, std::distance(buffer.rbegin(), i));
buffer.insert(i.base(), this);
-
- buffer_debug(&buffer);
}
WRAPPED_PMPI_CALL(int,MPI_Group_size,(MPI_Group group, int *size),(group, size))
WRAPPED_PMPI_CALL(int,MPI_Group_translate_ranks,(MPI_Group group1, int n, int *ranks1, MPI_Group group2, int *ranks2),(group1, n, ranks1, group2, ranks2))
WRAPPED_PMPI_CALL(int,MPI_Group_union,(MPI_Group group1, MPI_Group group2, MPI_Group * newgroup),(group1, group2, newgroup))
+WRAPPED_PMPI_CALL(int,MPI_Ibarrier,(MPI_Comm comm, MPI_Request *request),(comm,request))
+WRAPPED_PMPI_CALL(int,MPI_Ibcast,(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request *request),(buf, count, datatype, root, comm, request))
+
+WRAPPED_PMPI_CALL(int,MPI_Iallgather,(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Iallgatherv,(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Iallreduce,(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request),(sendbuf, recvbuf, count, datatype, op, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Ialltoall,(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount,MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Ialltoallv,(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Igather,(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Igatherv,(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm, request))
+//WRAPPED_PMPI_CALL(int,MPI_Ireduce_scatter_block,(void *sendbuf, void *recvbuf, int recvcount, MPI_Datatype datatype, MPI_Op op,MPI_Comm comm, MPI_Request *request),(sendbuf, recvbuf, recvcount, datatype, op, comm, request))
+//WRAPPED_PMPI_CALL(int,MPI_Ireduce_scatter,(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request),(sendbuf, recvbuf, recvcounts, datatype, op, comm, request))
+//WRAPPED_PMPI_CALL(int,MPI_Ireduce,(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, MPI_Request *request),(sendbuf, recvbuf, count, datatype, op, root, comm, request))
+//WRAPPED_PMPI_CALL(int,MPI_Iexscan,(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request),(sendbuf, recvbuf, count, datatype, op, comm, request))
+//WRAPPED_PMPI_CALL(int,MPI_Iscan,(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request),(sendbuf, recvbuf, count, datatype, op, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Iscatter,(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, request))
+WRAPPED_PMPI_CALL(int,MPI_Iscatterv,(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request),(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm, request))
+
WRAPPED_PMPI_CALL(int,MPI_Info_create,( MPI_Info *info),( info))
WRAPPED_PMPI_CALL(int,MPI_Info_delete,(MPI_Info info, char *key),(info, key))
WRAPPED_PMPI_CALL(int,MPI_Info_dup,(MPI_Info info, MPI_Info *newinfo),(info, newinfo))
WRAPPED_PMPI_CALL(int,MPI_Testany,(int count, MPI_Request requests[], int *index, int *flag, MPI_Status * status),(count, requests, index, flag, status))
WRAPPED_PMPI_CALL(int,MPI_Test,(MPI_Request * request, int *flag, MPI_Status * status),(request, flag, status))
WRAPPED_PMPI_CALL(int,MPI_Testsome,(int incount, MPI_Request* requests, int* outcount, int* indices, MPI_Status* statuses) ,(incount, requests, outcount, indices, statuses))
+WRAPPED_PMPI_CALL(int,MPI_Request_get_status,( MPI_Request request, int *flag, MPI_Status *status),( request, flag, status))
+WRAPPED_PMPI_CALL(int,MPI_Grequest_complete,( MPI_Request request),( request))
+WRAPPED_PMPI_CALL(int,MPI_Grequest_start,(MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn,MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request),( query_fn, free_fn, cancel_fn, extra_state, request))
WRAPPED_PMPI_CALL(int,MPI_Type_commit,(MPI_Datatype* datatype) ,(datatype))
WRAPPED_PMPI_CALL(int,MPI_Type_contiguous,(int count, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, old_type, newtype))
WRAPPED_PMPI_CALL(int,MPI_Type_create_hindexed_block,(int count, int blocklength, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, blocklength, indices, old_type, newtype))
WRAPPED_PMPI_CALL_NORETURN(MPI_Op, MPI_Op_f2c,(MPI_Fint op),(op))
WRAPPED_PMPI_CALL_NORETURN(MPI_Request, MPI_Request_f2c,(MPI_Fint request),(request))
WRAPPED_PMPI_CALL_NORETURN(MPI_Win, MPI_Win_f2c,(MPI_Fint win),(win))
-WRAPPED_PMPI_CALL(int,MPI_Cancel,(MPI_Request* request) ,(request))
+WRAPPED_PMPI_CALL(int, MPI_Cancel,(MPI_Request* request) ,(request))
WRAPPED_PMPI_CALL(int, MPI_Test_cancelled,(MPI_Status* status, int* flag) ,(status, flag))
+WRAPPED_PMPI_CALL(int, MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag))
+WRAPPED_PMPI_CALL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count))
/*
Unimplemented Calls - both PMPI and MPI calls are generated.
When implementing, please move ahead, swap UNIMPLEMENTED_WRAPPED_PMPI_CALL for WRAPPED_PMPI_CALL,
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_map,(MPI_Comm comm_old, int nnodes, int* index, int* edges, int* newrank) ,(comm_old, nnodes, index, edges, newrank))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors_count,(MPI_Comm comm, int rank, int* nneighbors) ,(comm, rank, nneighbors))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors,(MPI_Comm comm, int rank, int maxneighbors, int* neighbors) ,(comm, rank, maxneighbors, neighbors))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Grequest_complete,( MPI_Request request),( request))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Grequest_start,(MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn,MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request),( query_fn, free_fn, cancel_fn, extra_state, request))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Ibsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request) ,(buf, count, datatype, dest, tag, comm, request))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_create,(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag,MPI_Comm* comm_out) ,(local_comm, local_leader, peer_comm, remote_leader, tag, comm_out))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_merge,(MPI_Comm comm, int high, MPI_Comm* comm_out) ,(comm, high, comm_out))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pack_external_size,(char *datarep, int incount, MPI_Datatype datatype, MPI_Aint *size),(datarep, incount, datatype, size))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pcontrol,(const int level, ... ),(level))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Publish_name,( char *service_name, MPI_Info info, char *port_name),( service_name, info, port_name))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Request_get_status,( MPI_Request request, int *flag, MPI_Status *status),( request, flag, status))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend_init,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request),(buf, count, datatype, dest, tag, comm, request))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) ,(buf, count, datatype, dest, tag, comm))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Topo_test,(MPI_Comm comm, int* top_type) ,(comm, top_type))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_create_darray,(int size, int rank, int ndims, int* array_of_gsizes, int* array_of_distribs, int* array_of_dargs, int* array_of_psizes,int order, MPI_Datatype oldtype, MPI_Datatype *newtype) ,(size, rank, ndims, array_of_gsizes,array_of_distribs, array_of_dargs, array_of_psizes,order,oldtype, newtype))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses, int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, MPI_Datatype *array_of_datatypes),(datatype, max_integers, max_addresses,max_datatypes, array_of_integers, array_of_addresses, array_of_datatypes))
#include "private.hpp"
#include "smpi_coll.hpp"
#include "smpi_comm.hpp"
+#include "smpi_request.hpp"
#include "smpi_datatype_derived.hpp"
#include "smpi_op.hpp"
#include "src/smpi/include/smpi_actor.hpp"
int PMPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
{
- int retval = 0;
+ return PMPI_Ibcast(buf, count, datatype, root, comm, MPI_REQUEST_IGNORED);
+}
- smpi_bench_end();
+int PMPI_Barrier(MPI_Comm comm)
+{
+ return PMPI_Ibarrier(comm, MPI_REQUEST_IGNORED);
+}
+int PMPI_Ibarrier(MPI_Comm comm, MPI_Request *request)
+{
+ int retval = 0;
+ smpi_bench_end();
if (comm == MPI_COMM_NULL) {
retval = MPI_ERR_COMM;
- } else if (not datatype->is_valid()) {
+ } else if(request == nullptr){
retval = MPI_ERR_ARG;
- } else {
+ }else{
int rank = simgrid::s4u::this_actor::get_pid();
- TRACE_smpi_comm_in(rank, __func__,
- new simgrid::instr::CollTIData("bcast", root, -1.0,
- datatype->is_replayable() ? count : count * datatype->size(), -1,
- simgrid::smpi::Datatype::encode(datatype), ""));
- if (comm->size() > 1)
- simgrid::smpi::Colls::bcast(buf, count, datatype, root, comm);
- retval = MPI_SUCCESS;
-
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED? "PMPI_Barrier" : "PMPI_Ibarrier", new simgrid::instr::NoOpTIData(request==MPI_REQUEST_IGNORED? "barrier" : "ibarrier"));
+ if(request==MPI_REQUEST_IGNORED){
+ simgrid::smpi::Colls::barrier(comm);
+ //Barrier can be used to synchronize RMA calls. Finish all requests from comm before.
+ comm->finish_rma_calls();
+ } else
+ simgrid::smpi::Colls::ibarrier(comm, request);
TRACE_smpi_comm_out(rank);
- }
+ }
smpi_bench_begin();
return retval;
}
-int PMPI_Barrier(MPI_Comm comm)
+int PMPI_Ibcast(void *buf, int count, MPI_Datatype datatype,
+ int root, MPI_Comm comm, MPI_Request* request)
{
int retval = 0;
-
smpi_bench_end();
-
if (comm == MPI_COMM_NULL) {
retval = MPI_ERR_COMM;
+ } else if (not datatype->is_valid()) {
+ retval = MPI_ERR_ARG;
+ } else if(request == nullptr){
+ retval = MPI_ERR_ARG;
} else {
int rank = simgrid::s4u::this_actor::get_pid();
- TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("barrier"));
-
- simgrid::smpi::Colls::barrier(comm);
-
- //Barrier can be used to synchronize RMA calls. Finish all requests from comm before.
- comm->finish_rma_calls();
-
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Bcast":"PMPI_Ibcast",
+ new simgrid::instr::CollTIData(request==MPI_REQUEST_IGNORED?"bcast":"ibcast", root, -1.0,
+ datatype->is_replayable() ? count : count * datatype->size(), -1,
+ simgrid::smpi::Datatype::encode(datatype), ""));
+ if (comm->size() > 1){
+ if(request==MPI_REQUEST_IGNORED)
+ simgrid::smpi::Colls::bcast(buf, count, datatype, root, comm);
+ else
+ simgrid::smpi::Colls::ibcast(buf, count, datatype, root, comm, request);
+ }
retval = MPI_SUCCESS;
TRACE_smpi_comm_out(rank);
}
-
smpi_bench_begin();
return retval;
}
int PMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,
- int root, MPI_Comm comm)
+ int root, MPI_Comm comm){
+ return PMPI_Igather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Igather(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,
+ int root, MPI_Comm comm, MPI_Request *request)
{
int retval = 0;
retval = MPI_ERR_TYPE;
} else if ((( sendbuf != MPI_IN_PLACE) && (sendcount <0)) || ((comm->rank() == root) && (recvcount <0))){
retval = MPI_ERR_COUNT;
- } else {
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
+ } else {
char* sendtmpbuf = static_cast<char*>(sendbuf);
int sendtmpcount = sendcount;
int rank = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(
- rank, __func__,
+ rank, request==MPI_REQUEST_IGNORED?"PMPI_Gather":"PMPI_Igather",
new simgrid::instr::CollTIData(
- "gather", root, -1.0, sendtmptype->is_replayable() ? sendtmpcount : sendtmpcount * sendtmptype->size(),
+ request==MPI_REQUEST_IGNORED ? "gather":"igather", root, -1.0, sendtmptype->is_replayable() ? sendtmpcount : sendtmpcount * sendtmptype->size(),
(comm->rank() != root || recvtype->is_replayable()) ? recvcount : recvcount * recvtype->size(),
simgrid::smpi::Datatype::encode(sendtmptype), simgrid::smpi::Datatype::encode(recvtype)));
-
- simgrid::smpi::Colls::gather(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcount, recvtype, root, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ simgrid::smpi::Colls::gather(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcount, recvtype, root, comm);
+ else
+ simgrid::smpi::Colls::igather(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcount, recvtype, root, comm, request);
retval = MPI_SUCCESS;
TRACE_smpi_comm_out(rank);
}
int PMPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
- MPI_Datatype recvtype, int root, MPI_Comm comm)
+ MPI_Datatype recvtype, int root, MPI_Comm comm){
+ return PMPI_Igatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Igatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
+ MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request)
{
int retval = 0;
retval = MPI_ERR_COUNT;
} else if ((comm->rank() == root) && (recvcounts == nullptr || displs == nullptr)) {
retval = MPI_ERR_ARG;
- } else {
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
+ } else {
char* sendtmpbuf = static_cast<char*>(sendbuf);
int sendtmpcount = sendcount;
MPI_Datatype sendtmptype = sendtype;
trace_recvcounts->push_back(recvcounts[i] * dt_size_recv);
}
- TRACE_smpi_comm_in(rank, __func__,
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Gatherv":"PMPI_Igatherv",
new simgrid::instr::VarCollTIData(
- "gatherv", root,
+ request==MPI_REQUEST_IGNORED ? "gatherv":"igatherv", root,
sendtmptype->is_replayable() ? sendtmpcount : sendtmpcount * sendtmptype->size(), nullptr,
dt_size_recv, trace_recvcounts, simgrid::smpi::Datatype::encode(sendtmptype),
simgrid::smpi::Datatype::encode(recvtype)));
-
- retval = simgrid::smpi::Colls::gatherv(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcounts, displs, recvtype, root, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ retval = simgrid::smpi::Colls::gatherv(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcounts, displs, recvtype, root, comm);
+ else
+ retval = simgrid::smpi::Colls::igatherv(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcounts, displs, recvtype, root, comm, request);
TRACE_smpi_comm_out(rank);
}
}
int PMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
- void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm){
+ return PMPI_Iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request)
{
int retval = MPI_SUCCESS;
} else if ((( sendbuf != MPI_IN_PLACE) && (sendcount <0)) ||
(recvcount <0)){
retval = MPI_ERR_COUNT;
- } else {
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
+ } else {
if(sendbuf == MPI_IN_PLACE) {
sendbuf=static_cast<char*>(recvbuf)+recvtype->get_extent()*recvcount*comm->rank();
sendcount=recvcount;
}
int rank = simgrid::s4u::this_actor::get_pid();
- TRACE_smpi_comm_in(rank, __func__,
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Allgather":"PMPI_Iallggather",
new simgrid::instr::CollTIData(
- "allgather", -1, -1.0, sendtype->is_replayable() ? sendcount : sendcount * sendtype->size(),
+ request==MPI_REQUEST_IGNORED ? "allgather" : "iallgather", -1, -1.0, sendtype->is_replayable() ? sendcount : sendcount * sendtype->size(),
recvtype->is_replayable() ? recvcount : recvcount * recvtype->size(),
simgrid::smpi::Datatype::encode(sendtype), simgrid::smpi::Datatype::encode(recvtype)));
-
- simgrid::smpi::Colls::allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ simgrid::smpi::Colls::allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
+ else
+ simgrid::smpi::Colls::iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, request);
TRACE_smpi_comm_out(rank);
}
smpi_bench_begin();
}
int PMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
- void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
+ void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm){
+ return PMPI_Iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Iallgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
+ void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request)
{
int retval = 0;
retval = MPI_ERR_COUNT;
} else if (recvcounts == nullptr || displs == nullptr) {
retval = MPI_ERR_ARG;
- } else {
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
+ } else {
if(sendbuf == MPI_IN_PLACE) {
sendbuf=static_cast<char*>(recvbuf)+recvtype->get_extent()*displs[comm->rank()];
for (int i = 0; i < comm->size(); i++) // copy data to avoid bad free
trace_recvcounts->push_back(recvcounts[i] * dt_size_recv);
- TRACE_smpi_comm_in(rank, __func__,
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Allgatherv":"PMPI_Iallgatherv",
new simgrid::instr::VarCollTIData(
- "allgatherv", -1, sendtype->is_replayable() ? sendcount : sendcount * sendtype->size(),
+ request==MPI_REQUEST_IGNORED ? "allgatherv" : "iallgatherv", -1, sendtype->is_replayable() ? sendcount : sendcount * sendtype->size(),
nullptr, dt_size_recv, trace_recvcounts, simgrid::smpi::Datatype::encode(sendtype),
simgrid::smpi::Datatype::encode(recvtype)));
-
- simgrid::smpi::Colls::allgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ simgrid::smpi::Colls::allgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm);
+ else
+ simgrid::smpi::Colls::iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, request);
retval = MPI_SUCCESS;
TRACE_smpi_comm_out(rank);
}
}
int PMPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
- void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm){
+ return PMPI_Iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Iscatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request)
{
int retval = 0;
int rank = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(
- rank, __func__,
+ rank, request==MPI_REQUEST_IGNORED?"PMPI_Scatter":"PMPI_Iscatter",
new simgrid::instr::CollTIData(
- "scatter", root, -1.0,
+ request==MPI_REQUEST_IGNORED ? "scatter" : "iscatter", root, -1.0,
(comm->rank() != root || sendtype->is_replayable()) ? sendcount : sendcount * sendtype->size(),
recvtype->is_replayable() ? recvcount : recvcount * recvtype->size(),
simgrid::smpi::Datatype::encode(sendtype), simgrid::smpi::Datatype::encode(recvtype)));
-
- simgrid::smpi::Colls::scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ simgrid::smpi::Colls::scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm);
+ else
+ simgrid::smpi::Colls::iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, request);
retval = MPI_SUCCESS;
TRACE_smpi_comm_out(rank);
}
}
int PMPI_Scatterv(void *sendbuf, int *sendcounts, int *displs,
- MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
+ MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm){
+ return PMPI_Iscatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Iscatterv(void *sendbuf, int *sendcounts, int *displs,
+ MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request)
{
int retval = 0;
} else if (((comm->rank() == root) && (sendtype == MPI_DATATYPE_NULL)) ||
((recvbuf != MPI_IN_PLACE) && (recvtype == MPI_DATATYPE_NULL))) {
retval = MPI_ERR_TYPE;
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
} else {
if (recvbuf == MPI_IN_PLACE) {
recvtype = sendtype;
trace_sendcounts->push_back(sendcounts[i] * dt_size_send);
}
- TRACE_smpi_comm_in(rank, __func__,
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Scatterv":"PMPI_Iscatterv",
new simgrid::instr::VarCollTIData(
- "scatterv", root, dt_size_send, trace_sendcounts,
+ request==MPI_REQUEST_IGNORED ? "scatterv":"iscatterv", root, dt_size_send, trace_sendcounts,
recvtype->is_replayable() ? recvcount : recvcount * recvtype->size(), nullptr,
simgrid::smpi::Datatype::encode(sendtype), simgrid::smpi::Datatype::encode(recvtype)));
-
- retval = simgrid::smpi::Colls::scatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ retval = simgrid::smpi::Colls::scatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm);
+ else
+ retval = simgrid::smpi::Colls::iscatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm, request);
TRACE_smpi_comm_out(rank);
}
}
int PMPI_Allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
+{
+ return PMPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Iallreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request)
{
int retval = 0;
retval = MPI_ERR_TYPE;
} else if (op == MPI_OP_NULL) {
retval = MPI_ERR_OP;
+ } else if (request != MPI_REQUEST_IGNORED) {
+ xbt_die("Iallreduce is not yet implemented. WIP");
+ retval = MPI_ERR_ARG;
} else {
char* sendtmpbuf = static_cast<char*>(sendbuf);
int rank = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(rank, __func__,
- new simgrid::instr::CollTIData("allreduce", -1, 0,
+ new simgrid::instr::CollTIData(request==MPI_REQUEST_IGNORED ? "allreduce":"iallreduce", -1, 0,
datatype->is_replayable() ? count : count * datatype->size(), -1,
simgrid::smpi::Datatype::encode(datatype), ""));
- simgrid::smpi::Colls::allreduce(sendtmpbuf, recvbuf, count, datatype, op, comm);
+// if(request == MPI_REQUEST_IGNORED)
+ simgrid::smpi::Colls::allreduce(sendtmpbuf, recvbuf, count, datatype, op, comm);
+// else
+// simgrid::smpi::Colls::iallreduce(sendtmpbuf, recvbuf, count, datatype, op, comm, request);
if( sendbuf == MPI_IN_PLACE )
xbt_free(sendtmpbuf);
smpi_bench_begin();
return retval;
}
-
int PMPI_Alltoall(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
- MPI_Datatype recvtype, MPI_Comm comm)
+ MPI_Datatype recvtype, MPI_Comm comm){
+ return PMPI_Ialltoall(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Ialltoall(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
+ MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request)
{
int retval = 0;
smpi_bench_end();
retval = MPI_ERR_COMM;
} else if ((sendbuf != MPI_IN_PLACE && sendtype == MPI_DATATYPE_NULL) || recvtype == MPI_DATATYPE_NULL) {
retval = MPI_ERR_TYPE;
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
} else {
int rank = simgrid::s4u::this_actor::get_pid();
void* sendtmpbuf = static_cast<char*>(sendbuf);
sendtmptype = recvtype;
}
- TRACE_smpi_comm_in(rank, __func__,
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Alltoall":"PMPI_Ialltoall",
new simgrid::instr::CollTIData(
- "alltoall", -1, -1.0,
+ request==MPI_REQUEST_IGNORED ? "alltoall" : "ialltoall", -1, -1.0,
sendtmptype->is_replayable() ? sendtmpcount : sendtmpcount * sendtmptype->size(),
recvtype->is_replayable() ? recvcount : recvcount * recvtype->size(),
simgrid::smpi::Datatype::encode(sendtmptype), simgrid::smpi::Datatype::encode(recvtype)));
-
- retval = simgrid::smpi::Colls::alltoall(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcount, recvtype, comm);
+ if(request == MPI_REQUEST_IGNORED)
+ retval = simgrid::smpi::Colls::alltoall(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcount, recvtype, comm);
+ else
+ retval = simgrid::smpi::Colls::ialltoall(sendtmpbuf, sendtmpcount, sendtmptype, recvbuf, recvcount, recvtype, comm, request);
TRACE_smpi_comm_out(rank);
int PMPI_Alltoallv(void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf,
int* recvcounts, int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm)
+{
+ return PMPI_Ialltoallv(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm, MPI_REQUEST_IGNORED);
+}
+
+int PMPI_Ialltoallv(void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf,
+ int* recvcounts, int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request)
{
int retval = 0;
if (comm == MPI_COMM_NULL) {
retval = MPI_ERR_COMM;
- } else if (sendtype == MPI_DATATYPE_NULL || recvtype == MPI_DATATYPE_NULL) {
+ } else if ((sendbuf != MPI_IN_PLACE && sendtype == MPI_DATATYPE_NULL) || recvtype == MPI_DATATYPE_NULL) {
retval = MPI_ERR_TYPE;
} else if ((sendbuf != MPI_IN_PLACE && (sendcounts == nullptr || senddisps == nullptr)) || recvcounts == nullptr ||
recvdisps == nullptr) {
retval = MPI_ERR_ARG;
- } else {
+ } else if (request == nullptr){
+ retval = MPI_ERR_ARG;
+ } else {
int rank = simgrid::s4u::this_actor::get_pid();
int size = comm->size();
int send_size = 0;
trace_sendcounts->push_back(sendtmpcounts[i] * dt_size_send);
}
- TRACE_smpi_comm_in(rank, __func__,
- new simgrid::instr::VarCollTIData("alltoallv", -1, send_size, trace_sendcounts, recv_size,
+ TRACE_smpi_comm_in(rank, request==MPI_REQUEST_IGNORED?"PMPI_Alltoallv":"PMPI_Ialltoallv",
+ new simgrid::instr::VarCollTIData(request==MPI_REQUEST_IGNORED ? "alltoallv":"ialltoallv", -1, send_size, trace_sendcounts, recv_size,
trace_recvcounts, simgrid::smpi::Datatype::encode(sendtype),
simgrid::smpi::Datatype::encode(recvtype)));
- retval = simgrid::smpi::Colls::alltoallv(sendtmpbuf, sendtmpcounts, sendtmpdisps, sendtmptype, recvbuf, recvcounts,
+ if(request == MPI_REQUEST_IGNORED)
+ retval = simgrid::smpi::Colls::alltoallv(sendtmpbuf, sendtmpcounts, sendtmpdisps, sendtmptype, recvbuf, recvcounts,
recvdisps, recvtype, comm);
+ else
+ retval = simgrid::smpi::Colls::ialltoallv(sendtmpbuf, sendtmpcounts, sendtmpdisps, sendtmptype, recvbuf, recvcounts,
+ recvdisps, recvtype, comm, request);
TRACE_smpi_comm_out(rank);
if (sendbuf == MPI_IN_PLACE) {
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
- *flag = simgrid::smpi::Request::test(request,status);
+ retval = simgrid::smpi::Request::test(request,status, flag);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
int my_proc_id = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testany"));
- *flag = simgrid::smpi::Request::testany(count, requests, index, status);
+ retval = simgrid::smpi::Request::testany(count, requests, index, flag, status);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
int my_proc_id = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testall"));
- *flag = simgrid::smpi::Request::testall(count, requests, statuses);
+ retval = simgrid::smpi::Request::testall(count, requests, flag, statuses);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
int my_proc_id = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testsome"));
- *outcount = simgrid::smpi::Request::testsome(incount, requests, indices, status);
+ retval = simgrid::smpi::Request::testsome(incount, requests, outcount, indices, status);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
//for tracing, save the handle which might get overriden before we can use the helper on it
MPI_Request savedreq = *request;
- if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED))
+ if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED)
+ && not(savedreq->flags() & MPI_REQ_GENERALIZED))
savedreq->ref();//don't erase te handle in Request::wait, we'll need it later
else
savedreq = MPI_REQUEST_NULL;
TRACE_smpi_comm_in(my_proc_id, __func__,
new simgrid::instr::WaitTIData((*request)->src(), (*request)->dst(), (*request)->tag()));
- simgrid::smpi::Request::wait(request, status);
- retval = MPI_SUCCESS;
+ retval = simgrid::smpi::Request::wait(request, status);
//the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
TRACE_smpi_comm_out(my_proc_id);
return MPI_SUCCESS;
}
+int PMPI_Status_set_cancelled(MPI_Status* status, int flag){
+ if(status==MPI_STATUS_IGNORE){
+ return MPI_ERR_ARG;
+ }
+ simgrid::smpi::Status::set_cancelled(status,flag);
+ return MPI_SUCCESS;
+}
+
+int PMPI_Status_set_elements(MPI_Status* status, MPI_Datatype datatype, int count){
+ if(status==MPI_STATUS_IGNORE){
+ return MPI_ERR_ARG;
+ }
+ simgrid::smpi::Status::set_elements(status,datatype, count);
+ return MPI_SUCCESS;
+}
+
+int PMPI_Grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request){
+ return simgrid::smpi::Request::grequest_start(query_fn, free_fn,cancel_fn, extra_state, request);
+}
+
+int PMPI_Grequest_complete( MPI_Request request){
+ return simgrid::smpi::Request::grequest_complete(request);
+}
+
+
+int PMPI_Request_get_status( MPI_Request request, int *flag, MPI_Status *status){
+ if(request==MPI_REQUEST_NULL){
+ return MPI_ERR_REQUEST;
+ } else if (flag==NULL || status ==NULL){
+ return MPI_ERR_ARG;
+ }
+ return simgrid::smpi::Request::get_status(request,flag,status);
+}
+
MPI_Request PMPI_Request_f2c(MPI_Fint request){
return static_cast<MPI_Request>(simgrid::smpi::Request::f2c(request));
}
int Coll_alltoall_basic_linear::alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
{
- int system_tag = 888;
+ int system_tag = COLL_TAG_ALLTOALL;
int i;
int count;
MPI_Aint lb = 0, sendext = 0, recvext = 0;
int Colls::gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
MPI_Datatype recvtype, int root, MPI_Comm comm)
{
- int system_tag = COLL_TAG_GATHERV;
- MPI_Aint lb = 0;
- MPI_Aint recvext = 0;
-
- int rank = comm->rank();
- int size = comm->size();
- if (rank != root) {
- // Send buffer to root
- Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
- } else {
- recvtype->extent(&lb, &recvext);
- // Local copy from root
- Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
- recvcounts[root], recvtype);
- // Receive buffers from senders
- MPI_Request *requests = xbt_new(MPI_Request, size - 1);
- int index = 0;
- for (int src = 0; src < size; src++) {
- if(src != root) {
- requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
- recvcounts[src], recvtype, src, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of irecv's.
- Request::startall(size - 1, requests);
- Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for (int src = 0; src < size-1; src++) {
- Request::unref(&requests[src]);
- }
- xbt_free(requests);
+ MPI_Request request;
+ Colls::igatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int i = 0; i < count; i++) {
+ if(requests[i]!=MPI_REQUEST_NULL)
+ Request::unref(&requests[i]);
}
+ delete[] requests;
+ Request::unref(&request);
return MPI_SUCCESS;
}
int Colls::scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
MPI_Datatype recvtype, int root, MPI_Comm comm)
{
- int system_tag = COLL_TAG_SCATTERV;
- MPI_Aint lb = 0;
- MPI_Aint sendext = 0;
-
- int rank = comm->rank();
- int size = comm->size();
- if(rank != root) {
- // Recv buffer from root
- Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
- } else {
- sendtype->extent(&lb, &sendext);
- // Local copy from root
- if(recvbuf!=MPI_IN_PLACE){
- Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
- sendtype, recvbuf, recvcount, recvtype);
- }
- // Send buffers to receivers
- MPI_Request *requests = xbt_new(MPI_Request, size - 1);
- int index = 0;
- for (int dst = 0; dst < size; dst++) {
- if (dst != root) {
- requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
- sendtype, dst, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of isend's.
- Request::startall(size - 1, requests);
- Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for (int dst = 0; dst < size-1; dst++) {
+ MPI_Request request;
+ Colls::iscatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int dst = 0; dst < count; dst++) {
+ if(requests[dst]!=MPI_REQUEST_NULL)
Request::unref(&requests[dst]);
- }
- xbt_free(requests);
}
+ delete[] requests;
+ Request::unref(&request);
return MPI_SUCCESS;
}
int Coll_gather_default::gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
{
- const int system_tag = COLL_TAG_GATHER;
- MPI_Aint lb = 0;
- MPI_Aint recvext = 0;
-
- int rank = comm->rank();
- int size = comm->size();
- if(rank != root) {
- // Send buffer to root
- Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
- } else {
- recvtype->extent(&lb, &recvext);
- // Local copy from root
- Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
- recvcount, recvtype);
- // Receive buffers from senders
- MPI_Request *requests = xbt_new(MPI_Request, size - 1);
- int index = 0;
- for (int src = 0; src < size; src++) {
- if(src != root) {
- requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
- src, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of irecv's.
- Request::startall(size - 1, requests);
- Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for (int src = 0; src < size-1; src++) {
- Request::unref(&requests[src]);
- }
- xbt_free(requests);
+ MPI_Request request;
+ Colls::igather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int i = 0; i < count; i++) {
+ if(requests[i]!=MPI_REQUEST_NULL)
+ Request::unref(&requests[i]);
}
+ delete[] requests;
+ Request::unref(&request);
return MPI_SUCCESS;
}
int Coll_allgather_default::allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
{
- const int system_tag = COLL_TAG_ALLGATHER;
- MPI_Aint lb = 0;
- MPI_Aint recvext = 0;
- MPI_Request *requests;
-
- int rank = comm->rank();
- int size = comm->size();
- // FIXME: check for errors
- recvtype->extent(&lb, &recvext);
- // Local copy from self
- Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
- recvtype);
- // Send/Recv buffers to/from others;
- requests = xbt_new(MPI_Request, 2 * (size - 1));
- int index = 0;
- for (int other = 0; other < size; other++) {
- if(other != rank) {
- requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
- index++;
- requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
- other, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of all comms.
- Request::startall(2 * (size - 1), requests);
- Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
- for (int other = 0; other < 2*(size-1); other++) {
+ MPI_Request request;
+ Colls::iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int other = 0; other < count; other++) {
Request::unref(&requests[other]);
}
- xbt_free(requests);
+ delete[] requests;
+ Request::unref(&request);
return MPI_SUCCESS;
}
int Coll_allgatherv_default::allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
{
- const int system_tag = COLL_TAG_ALLGATHERV;
- MPI_Aint lb = 0;
- MPI_Aint recvext = 0;
-
- int rank = comm->rank();
- int size = comm->size();
- recvtype->extent(&lb, &recvext);
- // Local copy from self
- Datatype::copy(sendbuf, sendcount, sendtype,
- static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
- // Send buffers to others;
- MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
- int index = 0;
- for (int other = 0; other < size; other++) {
- if(other != rank) {
- requests[index] =
- Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
- index++;
- requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
- recvtype, other, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of all comms.
- Request::startall(2 * (size - 1), requests);
- Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
- for (int other = 0; other < 2*(size-1); other++) {
+ MPI_Request request;
+ Colls::iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int other = 0; other < count; other++) {
Request::unref(&requests[other]);
}
- xbt_free(requests);
+ delete[] requests;
+ Request::unref(&request);
return MPI_SUCCESS;
}
int Coll_scatter_default::scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
{
- const int system_tag = COLL_TAG_SCATTER;
- MPI_Aint lb = 0;
- MPI_Aint sendext = 0;
- MPI_Request *requests;
-
- int rank = comm->rank();
- int size = comm->size();
- if(rank != root) {
- // Recv buffer from root
- Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
- } else {
- sendtype->extent(&lb, &sendext);
- // Local copy from root
- if(recvbuf!=MPI_IN_PLACE){
- Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
- sendcount, sendtype, recvbuf, recvcount, recvtype);
- }
- // Send buffers to receivers
- requests = xbt_new(MPI_Request, size - 1);
- int index = 0;
- for(int dst = 0; dst < size; dst++) {
- if(dst != root) {
- requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
- dst, system_tag, comm);
- index++;
- }
- }
- // Wait for completion of isend's.
- Request::startall(size - 1, requests);
- Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
- for (int dst = 0; dst < size-1; dst++) {
+ MPI_Request request;
+ Colls::iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int dst = 0; dst < count; dst++) {
+ if(requests[dst]!=MPI_REQUEST_NULL)
Request::unref(&requests[dst]);
- }
- xbt_free(requests);
}
+ delete[] requests;
+ Request::unref(&request);
return MPI_SUCCESS;
}
int Coll_alltoallv_default::alltoallv(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype,
void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm)
{
- const int system_tag = 889;
- MPI_Aint lb = 0;
- MPI_Aint sendext = 0;
- MPI_Aint recvext = 0;
- MPI_Request *requests;
-
- /* Initialize. */
- int rank = comm->rank();
- int size = comm->size();
- XBT_DEBUG("<%d> algorithm basic_alltoallv() called.", rank);
- sendtype->extent(&lb, &sendext);
- recvtype->extent(&lb, &recvext);
- /* Local copy from self */
- int err = Datatype::copy(static_cast<char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
- static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
- if (err == MPI_SUCCESS && size > 1) {
- /* Initiate all send/recv to/from others. */
- requests = xbt_new(MPI_Request, 2 * (size - 1));
- int count = 0;
- /* Create all receives that will be posted first */
- for (int i = 0; i < size; ++i) {
- if (i != rank && recvcounts[i] != 0) {
- requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
- recvcounts[i], recvtype, i, system_tag, comm);
- count++;
- }else{
- XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
- }
- }
- /* Now create all sends */
- for (int i = 0; i < size; ++i) {
- if (i != rank && sendcounts[i] != 0) {
- requests[count] = Request::isend_init(static_cast<char *>(sendbuf) + senddisps[i] * sendext,
- sendcounts[i], sendtype, i, system_tag, comm);
- count++;
- }else{
- XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
- }
- }
- /* Wait for them all. */
- Request::startall(count, requests);
- XBT_DEBUG("<%d> wait for %d requests", rank, count);
- Request::waitall(count, requests, MPI_STATUS_IGNORE);
- for (int i = 0; i < count; i++) {
- if(requests[i]!=MPI_REQUEST_NULL)
- Request::unref(&requests[i]);
- }
- xbt_free(requests);
+ MPI_Request request;
+ int err = Colls::ialltoallv(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm, &request);
+ MPI_Request* requests = request->get_nbc_requests();
+ int count = request->get_nbc_requests_size();
+ XBT_DEBUG("<%d> wait for %d requests", comm->rank(), count);
+ Request::waitall(count, requests, MPI_STATUS_IGNORE);
+ for (int i = 0; i < count; i++) {
+ if(requests[i]!=MPI_REQUEST_NULL)
+ Request::unref(&requests[i]);
}
+ delete[] requests;
+ Request::unref(&request);
return err;
}
--- /dev/null
+/* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
+
+/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "colls_private.hpp"
+#include "src/smpi/include/smpi_actor.hpp"
+
+namespace simgrid{
+namespace smpi{
+
+
+int Colls::ibarrier(MPI_Comm comm, MPI_Request* request)
+{
+ int i;
+ int size = comm->size();
+ int rank = comm->rank();
+ MPI_Request* requests;
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_PERSISTENT);
+ if (rank > 0) {
+ requests = new MPI_Request[2];
+ requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0,
+ COLL_TAG_BARRIER,
+ comm);
+ requests[1] = Request::irecv (nullptr, 0, MPI_BYTE, 0,
+ COLL_TAG_BARRIER,
+ comm);
+ (*request)->set_nbc_requests(requests, 2);
+ }
+ else {
+ requests = new MPI_Request[(size-1)*2];
+ for (i = 1; i < 2*size-1; i+=2) {
+ requests[i-1] = Request::irecv(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE,
+ COLL_TAG_BARRIER, comm
+ );
+ requests[i] = Request::isend(nullptr, 0, MPI_BYTE, (i+1)/2,
+ COLL_TAG_BARRIER,
+ comm
+ );
+ }
+ (*request)->set_nbc_requests(requests, 2*(size-1));
+ }
+ return MPI_SUCCESS;
+}
+
+int Colls::ibcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request* request)
+{
+ int i;
+ int size = comm->size();
+ int rank = comm->rank();
+ MPI_Request* requests;
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_PERSISTENT);
+ if (rank != root) {
+ requests = new MPI_Request[1];
+ requests[0] = Request::irecv (buf, count, datatype, root,
+ COLL_TAG_BCAST,
+ comm);
+ (*request)->set_nbc_requests(requests, 1);
+ }
+ else {
+ requests = new MPI_Request[size-1];
+ int n = 0;
+ for (i = 0; i < size; i++) {
+ if(i!=root){
+ requests[n] = Request::isend(buf, count, datatype, i,
+ COLL_TAG_BCAST,
+ comm
+ );
+ n++;
+ }
+ }
+ (*request)->set_nbc_requests(requests, size-1);
+ }
+ return MPI_SUCCESS;
+}
+
+int Colls::iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
+ void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request)
+{
+
+ const int system_tag = COLL_TAG_ALLGATHER;
+ MPI_Aint lb = 0;
+ MPI_Aint recvext = 0;
+ MPI_Request *requests;
+
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_PERSISTENT);
+ // FIXME: check for errors
+ recvtype->extent(&lb, &recvext);
+ // Local copy from self
+ Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
+ recvtype);
+ // Send/Recv buffers to/from others;
+ requests = new MPI_Request[2 * (size - 1)];
+ int index = 0;
+ for (int other = 0; other < size; other++) {
+ if(other != rank) {
+ requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
+ index++;
+ requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
+ other, system_tag, comm);
+ index++;
+ }
+ }
+ Request::startall(2 * (size - 1), requests);
+ (*request)->set_nbc_requests(requests, 2 * (size - 1));
+ return MPI_SUCCESS;
+}
+
+int Colls::iscatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request)
+{
+ const int system_tag = COLL_TAG_SCATTER;
+ MPI_Aint lb = 0;
+ MPI_Aint sendext = 0;
+ MPI_Request *requests;
+
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_PERSISTENT);
+ if(rank != root) {
+ requests = new MPI_Request[1];
+ // Recv buffer from root
+ requests[0] = Request::irecv(recvbuf, recvcount, recvtype, root, system_tag, comm);
+ (*request)->set_nbc_requests(requests, 1);
+ } else {
+ sendtype->extent(&lb, &sendext);
+ // Local copy from root
+ if(recvbuf!=MPI_IN_PLACE){
+ Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
+ sendcount, sendtype, recvbuf, recvcount, recvtype);
+ }
+ // Send buffers to receivers
+ requests = new MPI_Request[size - 1];
+ int index = 0;
+ for(int dst = 0; dst < size; dst++) {
+ if(dst != root) {
+ requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
+ dst, system_tag, comm);
+ index++;
+ }
+ }
+ // Wait for completion of isend's.
+ Request::startall(size - 1, requests);
+ (*request)->set_nbc_requests(requests, size - 1);
+ }
+ return MPI_SUCCESS;
+}
+
+int Colls::iallgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
+ int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request)
+{
+ const int system_tag = COLL_TAG_ALLGATHERV;
+ MPI_Aint lb = 0;
+ MPI_Aint recvext = 0;
+
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_PERSISTENT);
+ recvtype->extent(&lb, &recvext);
+ // Local copy from self
+ Datatype::copy(sendbuf, sendcount, sendtype,
+ static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
+ // Send buffers to others;
+ MPI_Request *requests = new MPI_Request[2 * (size - 1)];
+ int index = 0;
+ for (int other = 0; other < size; other++) {
+ if(other != rank) {
+ requests[index] =
+ Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
+ index++;
+ requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
+ recvtype, other, system_tag, comm);
+ index++;
+ }
+ }
+ // Wait for completion of all comms.
+ Request::startall(2 * (size - 1), requests);
+ (*request)->set_nbc_requests(requests, 2 * (size - 1));
+ return MPI_SUCCESS;
+}
+
+int Colls::ialltoall( void *sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request){
+int system_tag = COLL_TAG_ALLTOALL;
+ int i;
+ int count;
+ MPI_Aint lb = 0, sendext = 0, recvext = 0;
+ MPI_Request *requests;
+
+ /* Initialize. */
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_ALLTOALL, comm, MPI_REQ_PERSISTENT);
+ sendtype->extent(&lb, &sendext);
+ recvtype->extent(&lb, &recvext);
+ /* simple optimization */
+ int err = Datatype::copy(static_cast<char *>(sendbuf) + rank * sendcount * sendext, sendcount, sendtype,
+ static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount, recvtype);
+ if (err == MPI_SUCCESS && size > 1) {
+ /* Initiate all send/recv to/from others. */
+ requests = new MPI_Request[2 * (size - 1)];
+ /* Post all receives first -- a simple optimization */
+ count = 0;
+ for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
+ requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + i * recvcount * recvext, recvcount,
+ recvtype, i, system_tag, comm);
+ count++;
+ }
+ /* Now post all sends in reverse order
+ * - We would like to minimize the search time through message queue
+ * when messages actually arrive in the order in which they were posted.
+ * TODO: check the previous assertion
+ */
+ for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size) {
+ requests[count] = Request::isend_init(static_cast<char *>(sendbuf) + i * sendcount * sendext, sendcount,
+ sendtype, i, system_tag, comm);
+ count++;
+ }
+ /* Wait for them all. */
+ Request::startall(count, requests);
+ (*request)->set_nbc_requests(requests, count);
+ }
+ return MPI_SUCCESS;
+}
+
+int Colls::ialltoallv(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype,
+ void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request){
+ const int system_tag = COLL_TAG_ALLTOALLV;
+ MPI_Aint lb = 0;
+ MPI_Aint sendext = 0;
+ MPI_Aint recvext = 0;
+ MPI_Request *requests;
+
+ /* Initialize. */
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_ALLTOALLV, comm, MPI_REQ_PERSISTENT);
+ sendtype->extent(&lb, &sendext);
+ recvtype->extent(&lb, &recvext);
+ /* Local copy from self */
+ int err = Datatype::copy(static_cast<char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
+ static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
+ if (err == MPI_SUCCESS && size > 1) {
+ /* Initiate all send/recv to/from others. */
+ requests = new MPI_Request[2 * (size - 1)];
+ int count = 0;
+ /* Create all receives that will be posted first */
+ for (int i = 0; i < size; ++i) {
+ if (i != rank && recvcounts[i] != 0) {
+ requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
+ recvcounts[i], recvtype, i, system_tag, comm);
+ count++;
+ }else{
+ XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
+ }
+ }
+ /* Now create all sends */
+ for (int i = 0; i < size; ++i) {
+ if (i != rank && sendcounts[i] != 0) {
+ requests[count] = Request::isend_init(static_cast<char *>(sendbuf) + senddisps[i] * sendext,
+ sendcounts[i], sendtype, i, system_tag, comm);
+ count++;
+ }else{
+ XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
+ }
+ }
+ /* Wait for them all. */
+ Request::startall(count, requests);
+ (*request)->set_nbc_requests(requests, count);
+ }
+ return err;
+}
+
+int Colls::igather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request)
+{
+ const int system_tag = COLL_TAG_GATHER;
+ MPI_Aint lb = 0;
+ MPI_Aint recvext = 0;
+ MPI_Request *requests;
+
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_GATHER, comm, MPI_REQ_PERSISTENT);
+ if(rank != root) {
+ // Send buffer to root
+ requests = new MPI_Request[1];
+ requests[0]=Request::isend(sendbuf, sendcount, sendtype, root, system_tag, comm);
+ (*request)->set_nbc_requests(requests, 1);
+ } else {
+ recvtype->extent(&lb, &recvext);
+ // Local copy from root
+ Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
+ recvcount, recvtype);
+ // Receive buffers from senders
+ requests = new MPI_Request[size - 1];
+ int index = 0;
+ for (int src = 0; src < size; src++) {
+ if(src != root) {
+ requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
+ src, system_tag, comm);
+ index++;
+ }
+ }
+ // Wait for completion of irecv's.
+ Request::startall(size - 1, requests);
+ (*request)->set_nbc_requests(requests, size - 1);
+ }
+ return MPI_SUCCESS;
+}
+
+int Colls::igatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
+ MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request)
+{
+ int system_tag = COLL_TAG_GATHERV;
+ MPI_Aint lb = 0;
+ MPI_Aint recvext = 0;
+ MPI_Request *requests;
+
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_GATHERV, comm, MPI_REQ_PERSISTENT);
+ if (rank != root) {
+ // Send buffer to root
+ requests = new MPI_Request[1];
+ requests[0]=Request::isend(sendbuf, sendcount, sendtype, root, system_tag, comm);
+ (*request)->set_nbc_requests(requests, 1);
+ } else {
+ recvtype->extent(&lb, &recvext);
+ // Local copy from root
+ Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
+ recvcounts[root], recvtype);
+ // Receive buffers from senders
+ requests = new MPI_Request[size - 1];
+ int index = 0;
+ for (int src = 0; src < size; src++) {
+ if(src != root) {
+ requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
+ recvcounts[src], recvtype, src, system_tag, comm);
+ index++;
+ }
+ }
+ // Wait for completion of irecv's.
+ Request::startall(size - 1, requests);
+ (*request)->set_nbc_requests(requests, size - 1);
+ }
+ return MPI_SUCCESS;
+}
+int Colls::iscatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
+ MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request)
+{
+ int system_tag = COLL_TAG_SCATTERV;
+ MPI_Aint lb = 0;
+ MPI_Aint sendext = 0;
+ MPI_Request* requests;
+
+ int rank = comm->rank();
+ int size = comm->size();
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_SCATTERV, comm, MPI_REQ_PERSISTENT);
+ if(rank != root) {
+ // Recv buffer from root
+ requests = new MPI_Request[1];
+ requests[0]=Request::irecv(recvbuf, recvcount, recvtype, root, system_tag, comm);
+ (*request)->set_nbc_requests(requests, 1);
+ } else {
+ sendtype->extent(&lb, &sendext);
+ // Local copy from root
+ if(recvbuf!=MPI_IN_PLACE){
+ Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
+ sendtype, recvbuf, recvcount, recvtype);
+ }
+ // Send buffers to receivers
+ MPI_Request *requests = new MPI_Request[size - 1];
+ int index = 0;
+ for (int dst = 0; dst < size; dst++) {
+ if (dst != root) {
+ requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
+ sendtype, dst, system_tag, comm);
+ index++;
+ }
+ }
+ // Wait for completion of isend's.
+ Request::startall(size - 1, requests);
+ (*request)->set_nbc_requests(requests, size - 1);
+ }
+ return MPI_SUCCESS;
+}
+}
+}
constexpr unsigned MPI_REQ_FINISHED = 0x100;
constexpr unsigned MPI_REQ_RMA = 0x200;
constexpr unsigned MPI_REQ_ACCUMULATE = 0x400;
+constexpr unsigned MPI_REQ_GENERALIZED = 0x800;
+constexpr unsigned MPI_REQ_COMPLETE = 0x1000;
enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED, FINALIZED };
// SMPI_RMA_TAG has to be the smallest one, as it will be decremented for accumulate ordering.
constexpr int SMPI_RMA_TAG = -6666;
+#define MPI_REQUEST_IGNORED ((MPI_Request*)-100)
+
/* Convert between Fortran and C */
#define FORT_BOTTOM(addr) ((*(int*)addr) == -200 ? MPI_BOTTOM : (void*)addr)
MPI_Datatype recvtype, int root, MPI_Comm comm);
static int scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
static int exscan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
+
+ //async collectives
+ static int ibarrier(MPI_Comm comm, MPI_Request* request);
+ static int ibcast(void *buf, int count, MPI_Datatype datatype,
+ int root, MPI_Comm comm, MPI_Request* request);
+ static int igather (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
+ MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request);
+ static int igatherv (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request);
+ static int iallgather (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request);
+ static int iallgatherv (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request);
+ static int iscatter (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request);
+ static int iscatterv (void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype,
+ void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request);
+ static int ireduce
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, MPI_Request *request);
+ static int iallreduce
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request);
+ static int iscan
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request);
+ static int iexscan
+ (void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request);
+ static int ireduce_scatter
+ (void* sendbuf, void* recvbuf, int* recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request);
+ static int ireduce_scatter_block
+ (void* sendbuf, void* recvbuf, int recvcount, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request *request);
+ static int ialltoall (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request);
+ static int ialltoallv
+ (void* sendbuf, int* sendcounts, int* senddisps, MPI_Datatype sendtype, void* recvbuf, int* recvcounts,
+ int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request);
+
static void (*smpi_coll_cleanup_callback)();
};
namespace simgrid{
namespace smpi{
+typedef struct s_smpi_mpi_generalized_request_funcs {
+ MPI_Grequest_query_function *query_fn;
+ MPI_Grequest_free_function *free_fn;
+ MPI_Grequest_cancel_function *cancel_fn;
+ void* extra_state;
+ s4u::ConditionVariablePtr cond;
+ s4u::MutexPtr mutex;
+} s_smpi_mpi_generalized_request_funcs_t;
+typedef struct s_smpi_mpi_generalized_request_funcs *smpi_mpi_generalized_request_funcs;
+
class Request : public F2C {
void* buf_;
/* in the case of non-contiguous memory the user address should be keep
int refcount_;
MPI_Op op_;
int cancelled_;
+ smpi_mpi_generalized_request_funcs generalized_funcs;
+ MPI_Request* nbc_requests_;
+ int nbc_requests_size_;
public:
Request() = default;
void start();
void cancel();
void ref();
+ void set_nbc_requests(MPI_Request* reqs, int size);
+ int get_nbc_requests_size();
+ MPI_Request* get_nbc_requests();
static void finish_wait(MPI_Request* request, MPI_Status* status);
static void unref(MPI_Request* request);
- static void wait(MPI_Request* req, MPI_Status* status);
+ static int wait(MPI_Request* req, MPI_Status* status);
static MPI_Request send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
static MPI_Request isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
static MPI_Request ssend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
static void startall(int count, MPI_Request* requests);
- static int test(MPI_Request* request, MPI_Status* status);
- static int testsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]);
- static int testany(int count, MPI_Request requests[], int* index, MPI_Status* status);
- static int testall(int count, MPI_Request requests[], MPI_Status status[]);
+ static int test(MPI_Request* request, MPI_Status* status, int* flag);
+ static int testsome(int incount, MPI_Request requests[], int* outcounts, int* indices, MPI_Status status[]);
+ static int testany(int count, MPI_Request requests[], int* index, int* flag, MPI_Status* status);
+ static int testall(int count, MPI_Request requests[], int* flag, MPI_Status status[]);
static void probe(int source, int tag, MPI_Comm comm, MPI_Status* status);
static void iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status);
static int match_send(void* a, void* b, kernel::activity::CommImpl* ignored);
static int match_recv(void* a, void* b, kernel::activity::CommImpl* ignored);
+ static int grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request);
+ static int grequest_complete( MPI_Request request);
+ static int get_status(MPI_Request req, int* flag, MPI_Status * status);
+
int add_f() override;
static void free_f(int id);
static Request* f2c(int);
public:
static void empty(MPI_Status * status);
static int cancelled (MPI_Status * status);
+static void set_cancelled (MPI_Status * status, int flag);
+static void set_elements (MPI_Status * status, MPI_Datatype , int count);
static int get_count(MPI_Status * status, MPI_Datatype datatype);
};
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
MPI_Status status;
- int flag = Request::test(&request, &status);
+ int flag = 0;
+ Request::test(&request, &status, &flag);
XBT_DEBUG("MPI_Test result: %d", flag);
/* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-
+#include "simgrid/s4u/Mutex.hpp"
+#include "simgrid/s4u/ConditionVariable.hpp"
#include "smpi_request.hpp"
#include "mc/mc.h"
#include "src/smpi/include/smpi_actor.hpp"
#include "xbt/config.hpp"
+
#include <algorithm>
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
refcount_ = 0;
op_ = MPI_REPLACE;
cancelled_ = 0;
+ generalized_funcs=nullptr;
+ nbc_requests_=nullptr;
+ nbc_requests_size_=0;
}
void Request::ref(){
xbt_die("Whoops, wrong refcount");
}
if((*request)->refcount_==0){
- Datatype::unref((*request)->old_type_);
+ if ((*request)->flags_ & MPI_REQ_GENERALIZED){
+ ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
+ }else{
Comm::unref((*request)->comm_);
- (*request)->print_request("Destroying");
- delete *request;
- *request = MPI_REQUEST_NULL;
+ Datatype::unref((*request)->old_type_);
+ }
+ (*request)->print_request("Destroying");
+ delete *request;
+ *request = MPI_REQUEST_NULL;
}else{
(*request)->print_request("Decrementing");
}
(boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(this->action_))->cancel();
}
-int Request::test(MPI_Request * request, MPI_Status * status) {
+int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
//assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
// to avoid deadlocks if used as a break condition, such as
// while (MPI_Test(request, flag, status) && flag) dostuff...
// because the time will not normally advance when only calls to MPI_Test are made -> deadlock
// multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
static int nsleeps = 1;
+ int ret = MPI_SUCCESS;
+
+ // Are we testing a request meant for non blocking collectives ?
+ // If so, test all the subrequests.
+ if ((*request)->nbc_requests_size_>0){
+ ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE);
+ if(*flag){
+ delete[] (*request)->nbc_requests_;
+ (*request)->nbc_requests_size_=0;
+ unref(request);
+ }
+ return ret;
+ }
+
if(smpi_test_sleep > 0)
simcall_process_sleep(nsleeps*smpi_test_sleep);
+ MPI_Status* mystatus;
Status::empty(status);
- int flag = 1;
+ *flag = 1;
if (((*request)->flags_ & MPI_REQ_PREPARED) == 0) {
if ((*request)->action_ != nullptr){
try{
- flag = simcall_comm_test((*request)->action_);
+ *flag = simcall_comm_test((*request)->action_);
}catch (xbt_ex& e) {
- return 0;
+ *flag = 0;
+ return ret;
}
}
- if (flag) {
+ if (*request != MPI_REQUEST_NULL &&
+ ((*request)->flags_ & MPI_REQ_GENERALIZED)
+ && !((*request)->flags_ & MPI_REQ_COMPLETE))
+ *flag=0;
+ if (*flag) {
finish_wait(request,status);
+ if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+ if(status==MPI_STATUS_IGNORE){
+ mystatus=new MPI_Status();
+ Status::empty(mystatus);
+ }else{
+ mystatus=status;
+ }
+ ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
+ if(status==MPI_STATUS_IGNORE)
+ delete mystatus;
+ }
nsleeps=1;//reset the number of sleeps we will do next time
if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0)
*request = MPI_REQUEST_NULL;
nsleeps++;
}
}
- return flag;
+ return ret;
}
-int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
+int Request::testsome(int incount, MPI_Request requests[], int *count, int *indices, MPI_Status status[])
{
- int count = 0;
+ int ret = MPI_SUCCESS;
+ int error=0;
int count_dead = 0;
+ int flag = 0;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+ *count = 0;
for (int i = 0; i < incount; i++) {
if (requests[i] != MPI_REQUEST_NULL) {
- if (test(&requests[i], pstat)) {
+ ret = test(&requests[i], pstat, &flag);
+ if(ret!=MPI_SUCCESS)
+ error = 1;
+ if(flag) {
indices[i] = 1;
- count++;
+ (*count)++;
if (status != MPI_STATUSES_IGNORE)
status[i] = *pstat;
if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
count_dead++;
}
}
- if(count_dead==incount)
- return MPI_UNDEFINED;
- else return count;
+ if(count_dead==incount)*count=MPI_UNDEFINED;
+ if(error!=0)
+ return MPI_ERR_IN_STATUS;
+ else
+ return MPI_SUCCESS;
}
-int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
+int Request::testany(int count, MPI_Request requests[], int *index, int* flag, MPI_Status * status)
{
std::vector<simgrid::kernel::activity::CommImpl*> comms;
comms.reserve(count);
int i;
- int flag = 0;
-
+ *flag = 0;
+ int ret = MPI_SUCCESS;
+ MPI_Status* mystatus;
*index = MPI_UNDEFINED;
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
*index = map[i];
- finish_wait(&requests[*index],status);
- flag = 1;
- nsleeps = 1;
- if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) {
- requests[*index] = MPI_REQUEST_NULL;
+ if (requests[*index] != MPI_REQUEST_NULL &&
+ (requests[*index]->flags_ & MPI_REQ_GENERALIZED)
+ && !(requests[*index]->flags_ & MPI_REQ_COMPLETE)) {
+ *flag=0;
+ } else {
+ finish_wait(&requests[*index],status);
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_GENERALIZED)){
+ if(status==MPI_STATUS_IGNORE){
+ mystatus=new MPI_Status();
+ Status::empty(mystatus);
+ }else{
+ mystatus=status;
+ }
+ ret=(requests[*index]->generalized_funcs)->query_fn((requests[*index]->generalized_funcs)->extra_state, mystatus);
+ if(status==MPI_STATUS_IGNORE)
+ delete mystatus;
+ }
+
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT))
+ requests[*index] = MPI_REQUEST_NULL;
+ *flag=1;
}
+ nsleeps = 1;
} else {
nsleeps++;
}
} else {
//all requests are null or inactive, return true
- flag = 1;
+ *flag = 1;
Status::empty(status);
}
- return flag;
+ return ret;
}
-int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
+int Request::testall(int count, MPI_Request requests[], int* outflag, MPI_Status status[])
{
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
- int flag=1;
+ int flag, error=0;
+ int ret=MPI_SUCCESS;
+ *outflag = 1;
for(int i=0; i<count; i++){
if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
- if (test(&requests[i], pstat)!=1){
+ ret = test(&requests[i], pstat, &flag);
+ if (flag){
flag=0;
+ requests[i]=MPI_REQUEST_NULL;
}else{
- requests[i]=MPI_REQUEST_NULL;
+ *outflag=0;
}
+ if (ret != MPI_SUCCESS)
+ error = 1;
}else{
Status::empty(pstat);
}
status[i] = *pstat;
}
}
- return flag;
+ if(error==1)
+ return MPI_ERR_IN_STATUS;
+ else
+ return MPI_SUCCESS;
}
void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
return;
}
- if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0)) && ((req->flags_ & MPI_REQ_PREPARED) == 0)) {
+ if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0))
+ && ((req->flags_ & MPI_REQ_PREPARED) == 0)
+ && ((req->flags_ & MPI_REQ_GENERALIZED) == 0)) {
if(status != MPI_STATUS_IGNORE) {
int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
status->MPI_SOURCE = req->comm_->group()->rank(src);
status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
// this handles the case were size in receive differs from size in send
status->count = req->real_size_;
+// int flag;
+// Request::get_status(req,&flag,status);
}
req->print_request("Finishing");
unref(request);
}
-void Request::wait(MPI_Request * request, MPI_Status * status)
+int Request::wait(MPI_Request * request, MPI_Status * status)
{
+ int ret=MPI_SUCCESS;
+ // Are we waiting on a request meant for non blocking collectives ?
+ // If so, wait for all the subrequests.
+ if ((*request)->nbc_requests_size_>0){
+ ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE);
+ for (int i = 0; i < (*request)->nbc_requests_size_; i++) {
+ if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL)
+ Request::unref(&((*request)->nbc_requests_[i]));
+ }
+ delete[] (*request)->nbc_requests_;
+ (*request)->nbc_requests_size_=0;
+ unref(request);
+ return ret;
+ }
+
(*request)->print_request("Waiting");
if ((*request)->flags_ & MPI_REQ_PREPARED) {
Status::empty(status);
- return;
+ return ret;
}
if ((*request)->action_ != nullptr){
}
}
+ if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+ MPI_Status* mystatus;
+ if(!((*request)->flags_ & MPI_REQ_COMPLETE)){
+ ((*request)->generalized_funcs)->mutex->lock();
+ ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
+ ((*request)->generalized_funcs)->mutex->unlock();
+ }
+ if(status==MPI_STATUS_IGNORE){
+ mystatus=new MPI_Status();
+ Status::empty(mystatus);
+ }else{
+ mystatus=status;
+ }
+ ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
+ if(status==MPI_STATUS_IGNORE)
+ delete mystatus;
+ }
finish_wait(request,status);
if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
*request = MPI_REQUEST_NULL;
+ return ret;
}
int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
{
int count = 0;
+ int flag = 0;
+ int index = 0;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+ index = waitany(incount, (MPI_Request*)requests, pstat);
+ if(index==MPI_UNDEFINED) return MPI_UNDEFINED;
+ if(status != MPI_STATUSES_IGNORE) {
+ status[count] = *pstat;
+ }
+ indices[count] = index;
+ count++;
for (int i = 0; i < incount; i++) {
- int index = waitany(incount, requests, pstat);
- if(index!=MPI_UNDEFINED){
- indices[count] = index;
- count++;
- if(status != MPI_STATUSES_IGNORE) {
- status[index] = *pstat;
+ if((requests[i] != MPI_REQUEST_NULL)) {
+ test(&requests[i], pstat,&flag);
+ if (flag==1){
+ indices[count] = i;
+ if(status != MPI_STATUSES_IGNORE) {
+ status[count] = *pstat;
+ }
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
+ requests[i]=MPI_REQUEST_NULL;
+ count++;
}
- if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
- requests[index] = MPI_REQUEST_NULL;
- }else{
- return MPI_UNDEFINED;
}
}
return count;
}
}
+
+int Request::get_status(MPI_Request req, int* flag, MPI_Status * status){
+ *flag=0;
+
+ if(req != MPI_REQUEST_NULL && req->action_ != nullptr) {
+ req->iprobe(req->src_, req->tag_, req->comm_, flag, status);
+ if(flag)
+ return MPI_SUCCESS;
+ }
+ if (req != MPI_REQUEST_NULL &&
+ (req->flags_ & MPI_REQ_GENERALIZED)
+ && !(req->flags_ & MPI_REQ_COMPLETE)) {
+ *flag=0;
+ return MPI_SUCCESS;
+ }
+
+ *flag=1;
+ if(status != MPI_STATUS_IGNORE) {
+ int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
+ status->MPI_SOURCE = req->comm_->group()->rank(src);
+ status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
+ status->MPI_ERROR = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
+ status->count = req->real_size_;
+ }
+ return MPI_SUCCESS;
+}
+
+int Request::grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request){
+
+ *request = new Request();
+ (*request)->flags_ |= MPI_REQ_GENERALIZED;
+ (*request)->flags_ |= MPI_REQ_PERSISTENT;
+ (*request)->refcount_ = 1;
+ ((*request)->generalized_funcs)=xbt_new0(s_smpi_mpi_generalized_request_funcs_t ,1);
+ ((*request)->generalized_funcs)->query_fn=query_fn;
+ ((*request)->generalized_funcs)->free_fn=free_fn;
+ ((*request)->generalized_funcs)->cancel_fn=cancel_fn;
+ ((*request)->generalized_funcs)->extra_state=extra_state;
+ ((*request)->generalized_funcs)->cond = simgrid::s4u::ConditionVariable::create();
+ ((*request)->generalized_funcs)->mutex = simgrid::s4u::Mutex::create();
+ return MPI_SUCCESS;
+}
+
+int Request::grequest_complete( MPI_Request request){
+ if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex==NULL)
+ return MPI_ERR_REQUEST;
+ request->generalized_funcs->mutex->lock();
+ request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
+ request->generalized_funcs->cond->notify_one();
+ request->generalized_funcs->mutex->unlock();
+ return MPI_SUCCESS;
+}
+
+void Request::set_nbc_requests(MPI_Request* reqs, int size){
+ nbc_requests_=reqs;
+ nbc_requests_size_=size;
+}
+
+int Request::get_nbc_requests_size(){
+ return nbc_requests_size_;
+}
+
+MPI_Request* Request::get_nbc_requests(){
+ return nbc_requests_;
+}
+
}
}
return status->cancelled!=0;
}
+void Status::set_cancelled(MPI_Status * status, int flag)
+{
+ status->cancelled=flag;
+}
+
+void Status::set_elements (MPI_Status * status, MPI_Datatype , int count){
+ status->count=count;
+}
+
int Status::get_count(MPI_Status * status, MPI_Datatype datatype)
{
return status->count / datatype->size();
Options:
-keep-temps # don't remove the generated files after execution
-wrapper <command> # use command to run the program (e.g. "valgrind" or "gdb --args")
+ -gdb # run within GDB (-wrapper "gdb --args" -keep-temps)
+ -lldb # run within LLDB (-wrapper "lldb --" -keep-temps)
-map # display the machine on which each process rank is mapped
-np <numprocs> # use that amount of processes from the hostfile.
# By default, all processes of the hostfile are used.
shift 1
;;
"-keep-temps")
- KEEP="true"
- SIMOPTS="$SIMOPTS --cfg=smpi/keep-temps:yes"
+ KEEP="true"
+ SIMOPTS="$SIMOPTS --cfg=smpi/keep-temps:yes"
shift 1
;;
"-wrapper")
WRAPPER="$2"
shift 2
;;
+ "-gdb")
+ WRAPPER="gdb --args"
+ KEEP="true"
+ SIMOPTS="$SIMOPTS --cfg=smpi/keep-temps:yes"
+ shift 1
+ ;;
+ "-lldb")
+ WRAPPER="lldb --"
+ KEEP="true"
+ SIMOPTS="$SIMOPTS --cfg=smpi/keep-temps:yes"
+ shift 1
+ ;;
"-help" | "--help" | "-h")
usage
exit 0
coll2 coll3 coll4 coll5 coll6 coll7 coll8 coll9 coll10 coll11 coll12 coll13
exscan exscan2
gather gather2 gather_big
- # iallred ibarrier icallgather icallgatherv icallreduce
+ ibarrier nonblocking
+ # iallred icallgather icallgatherv icallreduce
# icalltoall icalltoallv icalltoallw icbarrier icbcast
# icgather icgatherv icreduce icscatter icscatterv
longuser
- # nonblocking2 nonblocking3 nonblocking
+ # nonblocking2 nonblocking3
# opband opbor opbxor opland oplor oplxor opmax opmaxloc
# opmin opminloc opprod opsum
op_commutative
comm, &req);
MPI_Wait(&req, MPI_STATUS_IGNORE);
- MPI_Ialltoallw(sbuf, scounts, sdispls, types, rbuf, rcounts, rdispls, types, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ialltoallw(sbuf, scounts, sdispls, types, rbuf, rcounts, rdispls, types, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Ialltoallw(MPI_IN_PLACE, NULL, NULL, NULL, rbuf, rcounts, rdispls, types, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ialltoallw(MPI_IN_PLACE, NULL, NULL, NULL, rbuf, rcounts, rdispls, types, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Ireduce(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, 0, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ireduce(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, 0, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- if (0 == rank)
- MPI_Ireduce(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, 0, comm, &req);
- else
- MPI_Ireduce(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, 0, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* if (0 == rank)*/
+/* MPI_Ireduce(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, 0, comm, &req);*/
+/* else*/
+/* MPI_Ireduce(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, 0, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Iallreduce(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Iallreduce(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Iallreduce(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Iallreduce(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Ireduce_scatter(sbuf, rbuf, rcounts, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ireduce_scatter(sbuf, rbuf, rcounts, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Ireduce_scatter(MPI_IN_PLACE, rbuf, rcounts, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ireduce_scatter(MPI_IN_PLACE, rbuf, rcounts, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Ireduce_scatter_block(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ireduce_scatter_block(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Ireduce_scatter_block(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Ireduce_scatter_block(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Iscan(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Iscan(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Iscan(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Iscan(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Iexscan(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Iexscan(sbuf, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
- MPI_Iexscan(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);
- MPI_Wait(&req, MPI_STATUS_IGNORE);
+/* MPI_Iexscan(MPI_IN_PLACE, rbuf, NUM_INTS, MPI_INT, MPI_SUM, comm, &req);*/
+/* MPI_Wait(&req, MPI_STATUS_IGNORE);*/
if (sbuf)
free(sbuf);
#uoplong 4
#uoplong 11
#uoplong 16
-nonblocking 4 mpiversion=3.0
-nonblocking 5 mpiversion=3.0
-nonblocking 10 mpiversion=3.0
+nonblocking 4
+nonblocking 5
+nonblocking 10
nonblocking2 1 mpiversion=3.0
nonblocking2 4 mpiversion=3.0
nonblocking2 5 mpiversion=3.0
iallred 2 mpiversion=3.0
# ibarrier will hang forever if it fails, but will complete quickly if it
# succeeds
-ibarrier 2 mpiversion=3.0 timeLimit=30
+ibarrier 2 timeLimit=30
# run some of the tests, relinked with the nbc_pmpi_adaptor.o file
nballtoall1 8 mpiversion=3.0
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../include/")
foreach(file anyall bottom eagerdt huge_anysrc huge_underflow inactivereq isendself isendirecv isendselfprobe issendselfcancel cancelanysrc pingping probenull
- dtype_send probe-unexp sendall sendflood sendrecv1 sendrecv2 sendrecv3 waitany-null waittestnull many_isend manylmt recv_any scancel scancel2 rcancel)
+ dtype_send greq1 probe-unexp sendall sendflood sendrecv1 sendrecv2 sendrecv3 waitany-null waittestnull many_isend manylmt recv_any scancel scancel2 rcancel)
# not compiled files: big_count_status bsend1 bsend2 bsend3 bsend4 bsend5 bsendalign bsendfrag bsendpending mprobe
- # cancelrecv greq1 icsend large_message pscancel rqfreeb rqstatus sendself scancel_unmatch
+ # cancelrecv icsend large_message pscancel rqfreeb rqstatus sendself scancel_unmatch
add_executable(${file} EXCLUDE_FROM_ALL ${file}.c)
add_dependencies(tests ${file})
target_link_libraries(${file} simgrid mtest_c)
#needs MPI_Pack, MPI_Buffer_attach, MPI_Buffer_detach, MPI_Irsend, MPI_Ibsend
#rqfreeb 4
#needs MPI_Grequest_start MPI_Grequest_complete
-#greq1 1
+greq1 1
probe-unexp 4
probenull 1
# For testing, scancel will run with 1 process as well
src/smpi/colls/reduce/reduce-rab.cpp
src/smpi/colls/scatter/scatter-ompi.cpp
src/smpi/colls/scatter/scatter-mvapich-two-level.cpp
+ src/smpi/colls/smpi_nbc_impl.cpp
src/smpi/colls/smpi_automatic_selector.cpp
src/smpi/colls/smpi_default_selector.cpp
src/smpi/colls/smpi_mpich_selector.cpp