#include "simgrid/sg_config.h"
#include "smpi/smpi_utils.hpp"
#include "colls/colls.h"
+#include <simgrid/s4u/host.hpp>
#include "src/kernel/activity/SynchroComm.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
+extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
+
+
static int match_recv(void* a, void* b, smx_activity_t ignored) {
MPI_Request ref = static_cast<MPI_Request>(a);
MPI_Request req = static_cast<MPI_Request>(b);
request->dst = dst;
request->tag = tag;
request->comm = comm;
- smpi_comm_use(request->comm);
+ request->comm->use();
request->action = nullptr;
request->flags = flags;
request->detached = 0;
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag, comm, PERSISTENT | SEND | PREPARED);
+ comm->group()->index(dst), tag, comm, PERSISTENT | SEND | PREPARED);
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag, comm, PERSISTENT | SSEND | SEND | PREPARED);
+ comm->group()->index(dst), tag, comm, PERSISTENT | SSEND | SEND | PREPARED);
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype,
- src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src),
+ src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src),
smpi_process_index(), tag, comm, PERSISTENT | RECV | PREPARED);
return request;
}
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
request->real_size=request->size;
request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf, &request->real_size, &match_recv,
- ! smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+ ! smpi_process_get_replaying()? smpi_comm_copy_data_callback
: &smpi_comm_null_copy_buffer_callback, request, -1.0);
XBT_DEBUG("recv simcall posted");
request->action = simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0,
buf, request->real_size, &match_send,
&xbt_free_f, // how to free the userdata if a detached send fails
- !smpi_process_get_replaying() ? &smpi_comm_copy_buffer_callback
+ !smpi_process_get_replaying() ? smpi_comm_copy_data_callback
: &smpi_comm_null_copy_buffer_callback, request,
// detach if msg size < eager/rdv switch limit
request->detached);
if((*request)->refcount==0){
smpi_datatype_unuse((*request)->old_type);
- smpi_comm_unuse((*request)->comm);
+ (*request)->comm->unuse();
print_request("Destroying", (*request));
xbt_free(*request);
*request = MPI_REQUEST_NULL;
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag,comm, PERSISTENT | ISEND | SEND | PREPARED);
+ comm->group()->index(dst), tag,comm, PERSISTENT | ISEND | SEND | PREPARED);
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag, comm, NON_PERSISTENT | ISEND | SEND);
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SEND);
smpi_mpi_start(request);
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag,comm, NON_PERSISTENT | ISEND | SSEND | SEND);
+ comm->group()->index(dst), tag,comm, NON_PERSISTENT | ISEND | SSEND | SEND);
smpi_mpi_start(request);
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
- smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
+ comm->group()->index(src), smpi_process_index(), tag,
comm, PERSISTENT | RECV | PREPARED);
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
- smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag, comm,
+ comm->group()->index(src), smpi_process_index(), tag, comm,
NON_PERSISTENT | RECV);
smpi_mpi_start(request);
return request;
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag, comm, NON_PERSISTENT | SEND);
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | SEND);
smpi_mpi_start(request);
smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = build_request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
- smpi_group_index(smpi_comm_group(comm), dst), tag, comm, NON_PERSISTENT | SSEND | SEND);
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | SSEND | SEND);
smpi_mpi_start(request);
smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
MPI_Request requests[2];
MPI_Status stats[2];
int myid=smpi_process_index();
- if ((smpi_group_index(smpi_comm_group(comm), dst) == myid) && (smpi_group_index(smpi_comm_group(comm), src) == myid)){
+ if ((comm->group()->index(dst) == myid) && (comm->group()->index(src) == myid)){
smpi_datatype_copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
return;
}
if(!((req->detached != 0) && ((req->flags & SEND) != 0)) && ((req->flags & PREPARED) == 0)){
if(status != MPI_STATUS_IGNORE) {
int src = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
- status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(req->comm), src);
+ status->MPI_SOURCE = req->comm->group()->rank(src);
status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
status->MPI_ERROR = req->truncated != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
// this handles the case were size in receive differs from size in send
void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
MPI_Request request = build_request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
- smpi_group_index(smpi_comm_group(comm), source), smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
+ comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV);
// to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
// (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... )
- // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+ // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+ // (This can speed up the execution of certain applications by an order of magnitude, such as HPL)
static int nsleeps = 1;
- if(smpi_iprobe_sleep > 0)
- simcall_process_sleep(nsleeps*smpi_iprobe_sleep);
+ double speed = simgrid::s4u::Actor::self()->host()->speed();
+ double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage");
+ if (smpi_iprobe_sleep > 0) {
+ smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed);
+ simcall_execution_wait(iprobe_sleep);
+ }
// behave like a receive, but don't do it
smx_mailbox_t mailbox;
MPI_Request req = static_cast<MPI_Request>(sync_comm->src_data);
*flag = 1;
if(status != MPI_STATUS_IGNORE && (req->flags & PREPARED) == 0) {
- status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(comm), req->src);
+ status->MPI_SOURCE = comm->group()->rank(req->src);
status->MPI_TAG = req->tag;
status->MPI_ERROR = MPI_SUCCESS;
status->count = req->real_size;
*request = MPI_REQUEST_NULL;
}
-static int sort_accumulates(const void* pa, const void* pb)
+static int sort_accumulates(MPI_Request a, MPI_Request b)
{
- return (*static_cast<MPI_Request const*>(pa))->tag>
- (*static_cast<MPI_Request const*>(pb))->tag;
+ return (a->tag < b->tag);
}
int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
if(rank != root) {
// Send buffer to root
smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
MPI_Comm comm)
{
- int rank = smpi_comm_rank(comm);
+ int rank = comm->rank();
/* arbitrarily choose root as rank 0 */
- int size = smpi_comm_size(comm);
+ int size = comm->size();
int count = 0;
int *displs = xbt_new(int, size);
for (int i = 0; i < size; i++) {
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
if (rank != root) {
// Send buffer to root
smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
MPI_Aint recvext = 0;
MPI_Request *requests;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
// FIXME: check for errors
smpi_datatype_extent(recvtype, &lb, &recvext);
// Local copy from self
MPI_Aint lb = 0;
MPI_Aint recvext = 0;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
smpi_datatype_extent(recvtype, &lb, &recvext);
// Local copy from self
smpi_datatype_copy(sendbuf, sendcount, sendtype,
MPI_Aint sendext = 0;
MPI_Request *requests;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
if(rank != root) {
// Recv buffer from root
smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
MPI_Aint lb = 0;
MPI_Aint sendext = 0;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
if(rank != root) {
// Recv buffer from root
smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
char* sendtmpbuf = static_cast<char *>(sendbuf);
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
//non commutative case, use a working algo from openmpi
if(!smpi_op_is_commute(op)){
smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
int system_tag = -888;
- MPI_Aint lb = 0, dataext = 0;
+ MPI_Aint lb = 0;
+ MPI_Aint dataext = 0;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
smpi_datatype_extent(datatype, &lb, &dataext);
void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
int system_tag = -888;
- MPI_Aint lb = 0, dataext = 0;
+ MPI_Aint lb = 0;
+ MPI_Aint dataext = 0;
int recvbuf_is_empty=1;
- int rank = smpi_comm_rank(comm);
- int size = smpi_comm_size(comm);
+ int rank = comm->rank();
+ int size = comm->size();
smpi_datatype_extent(datatype, &lb, &dataext);
}
// Wait for completion of all comms.
smpi_mpi_startall(size - 1, requests);
+
if(smpi_op_is_commute(op)){
for (int other = 0; other < size - 1; other++) {
index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
for (int other = 0; other < size - 1; other++) {
smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
if(index < rank) {
- if(recvbuf_is_empty){
- smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
- recvbuf_is_empty=0;
- } else
- smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
+ if (recvbuf_is_empty) {
+ smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
+ recvbuf_is_empty = 0;
+ } else
+ smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
}
}
}