#include <xbt/config.hpp>
#include <algorithm>
-
#include "private.h"
#include "mc/mc.h"
#include "src/mc/mc_replay.h"
#include "simgrid/sg_config.h"
#include "smpi/smpi_utils.hpp"
#include <simgrid/s4u/host.hpp>
-
#include "src/kernel/activity/SynchroComm.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (reques)");
return current;
}
-
namespace simgrid{
namespace smpi{
-Request::Request(){}
Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags)
{
void *old_buf = nullptr;
-
if((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED)){
// This part handles the problem of non-contiguous memory
old_buf = buf;
op_ = MPI_REPLACE;
}
-
-//Request::destroy(void* request)
-//{
-// MPI_Request req = static_cast<MPI_Request>(request);
-// delete(req);
-//}
-
MPI_Comm Request::comm(){
return comm_;
}
return real_size_;
}
-
void Request::unref(MPI_Request* request)
{
if((*request) != MPI_REQUEST_NULL){
(*request)->refcount_--;
if((*request)->refcount_<0) xbt_die("wrong refcount");
-
if((*request)->refcount_==0){
Datatype::unref((*request)->old_type_);
Comm::unref((*request)->comm_);
}
}
-
int Request::match_recv(void* a, void* b, smx_activity_t ignored) {
MPI_Request ref = static_cast<MPI_Request>(a);
MPI_Request req = static_cast<MPI_Request>(b);
/* factories, to hide the internal flags from the caller */
MPI_Request Request::send_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+
+ return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
comm->group()->index(dst), tag, comm, PERSISTENT | SEND | PREPARED);
- return request;
}
MPI_Request Request::ssend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+ return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
comm->group()->index(dst), tag, comm, PERSISTENT | SSEND | SEND | PREPARED);
- return request;
}
MPI_Request Request::isend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process_index(),
+ return new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process_index(),
comm->group()->index(dst), tag,comm, PERSISTENT | ISEND | SEND | PREPARED);
- return request;
}
MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
- MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype,
+ return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype,
src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src),
smpi_process_index(), tag, comm, PERSISTENT | RECV | PREPARED);
- return request;
}
MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
- MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
+ return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
comm->group()->index(src), smpi_process_index(), tag,
comm, PERSISTENT | RECV | PREPARED);
- return request;
}
MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
request = nullptr;
}
-
-
void Request::sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
MPI_Comm comm, MPI_Status * status)
}
}
-
-
void Request::start()
{
smx_mailbox_t mailbox;
/* FIXME: detached sends are not traceable (action_ == nullptr) */
if (action_ != nullptr)
simcall_set_category(action_, TRACE_internal_smpi_get_category());
-
if (async_small_thresh != 0 || ((flags_ & RMA)!=0))
xbt_mutex_release(mut);
}
}
-
void Request::startall(int count, MPI_Request * requests)
{
if(requests== nullptr)
int Request::test(MPI_Request * request, MPI_Status * status) {
//assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
-
// to avoid deadlocks if used as a break condition, such as
- // while (MPI_Test(request, flag, status) && flag) {
- // }
+ // while (MPI_Test(request, flag, status) && flag) dostuff...
// because the time will not normally advance when only calls to MPI_Test are made -> deadlock
// multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
static int nsleeps = 1;
return flag;
}
-
int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
{
int i;
else return count;
}
-
int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
{
std::vector<simgrid::kernel::activity::ActivityImpl*> comms;
return flag;
}
-
int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
{
MPI_Status stat;
return flag;
}
-
-
-
void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
int flag=0;
//FIXME find another way to avoid busy waiting ?
}
void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
- MPI_Request request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
- comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV);
-
// to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
- // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... )
+ // especially when used as a break condition, such as while (MPI_Iprobe(...)) dostuff...
// nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
- // (This can speed up the execution of certain applications by an order of magnitude, such as HPL)
+ // This can speed up the execution of certain applications by an order of magnitude, such as HPL
static int nsleeps = 1;
double speed = simgrid::s4u::Actor::self()->host()->speed();
double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage");
+ MPI_Request request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
+ comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV);
if (smpi_iprobe_sleep > 0) {
smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed);
simcall_execution_wait(iprobe_sleep);
unref(&request);
}
-
void Request::finish_wait(MPI_Request* request, MPI_Status * status)
{
MPI_Request req = *request;
unref(request);
}
-
void Request::wait(MPI_Request * request, MPI_Status * status)
{
(*request)->print_request("Waiting");
xbt_dict_remove(F2C::f2c_lookup(), get_key_id(key, id));
}
-
-
}
}