-/* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved. */
+/* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "mc/mc.h"
#include "private.hpp"
+#include "simgrid/Exception.hpp"
#include "simgrid/s4u/Exec.hpp"
#include "smpi_comm.hpp"
#include "smpi_datatype.hpp"
#include "smpi_op.hpp"
#include "src/kernel/activity/CommImpl.hpp"
#include "src/mc/mc_replay.hpp"
-#include "src/simix/ActorImpl.hpp"
#include "src/smpi/include/smpi_actor.hpp"
#include "xbt/config.hpp"
-#include <xbt/ex.hpp>
#include <algorithm>
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (reques)");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
static simgrid::config::Flag<double> smpi_iprobe_sleep(
"smpi/iprobe", "Minimum time to inject inside a call to MPI_Iprobe", 1e-4);
std::vector<s_smpi_factor_t> smpi_ois_values;
-extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
+extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl*, void*, size_t);
namespace simgrid{
namespace smpi{
}
}
-int Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl* ignored)
+int Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
{
MPI_Request ref = static_cast<MPI_Request>(a);
MPI_Request req = static_cast<MPI_Request>(b);
}else return 0;
}
-int Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl* ignored)
+int Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
{
MPI_Request ref = static_cast<MPI_Request>(a);
MPI_Request req = static_cast<MPI_Request>(b);
void Request::start()
{
- smx_mailbox_t mailbox;
+ s4u::MailboxPtr mailbox;
xbt_assert(action_ == nullptr, "Cannot (re-)start unfinished communication");
flags_ &= ~MPI_REQ_PREPARED;
int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
- xbt_mutex_t mut = process->mailboxes_mutex();
+ simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
- xbt_mutex_acquire(mut);
+ mut->lock();
if (async_small_thresh == 0 && (flags_ & MPI_REQ_RMA) == 0) {
mailbox = process->mailbox();
//We have to check both mailboxes (because SSEND messages are sent to the large mbox).
//begin with the more appropriate one : the small one.
mailbox = process->mailbox_small();
- XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
- smx_activity_t action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(this));
+ XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %s (in case of SSEND)?",
+ mailbox->get_cname());
+ smx_activity_t action = mailbox->iprobe(0, &match_recv, static_cast<void*>(this));
if (action == nullptr) {
mailbox = process->mailbox();
- XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
- action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(this));
+ XBT_DEBUG("No, nothing in the small mailbox test the other one : %s", mailbox->get_cname());
+ action = mailbox->iprobe(0, &match_recv, static_cast<void*>(this));
if (action == nullptr) {
- XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
+ XBT_DEBUG("Still nothing, switch back to the small mailbox : %s", mailbox->get_cname());
mailbox = process->mailbox_small();
}
} else {
} else {
mailbox = process->mailbox_small();
XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
- smx_activity_t action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(this));
+ smx_activity_t action = mailbox->iprobe(0, &match_recv, static_cast<void*>(this));
if (action == nullptr) {
XBT_DEBUG("No, nothing in the permanent receive mailbox");
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
real_size_=size_;
action_ = simcall_comm_irecv(
- process->get_actor()->get_impl(), mailbox, buf_, &real_size_, &match_recv,
+ process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv,
process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0);
XBT_DEBUG("recv simcall posted");
if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
- xbt_mutex_release(mut);
+ mut->unlock();
} else { /* the RECV flag was not set, so this is a send */
simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
int rank = src_;
int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
- xbt_mutex_t mut=process->mailboxes_mutex();
+ simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
- xbt_mutex_acquire(mut);
+ mut->lock();
if (not(async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
mailbox = process->mailbox();
} else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < async_small_thresh) { // eager mode
mailbox = process->mailbox();
- XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
- smx_activity_t action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast<void*>(this));
+ XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %s?", mailbox->get_cname());
+ smx_activity_t action = mailbox->iprobe(1, &match_send, static_cast<void*>(this));
if (action == nullptr) {
if ((flags_ & MPI_REQ_SSEND) == 0) {
mailbox = process->mailbox_small();
- XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
+ XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %s",
+ mailbox->get_cname());
} else {
mailbox = process->mailbox_small();
- XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
- action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast<void*>(this));
+ XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %s?",
+ mailbox->get_cname());
+ action = mailbox->iprobe(1, &match_send, static_cast<void*>(this));
if (action == nullptr) {
XBT_DEBUG("No, we are first, send to large mailbox");
mailbox = process->mailbox();
}
} else {
mailbox = process->mailbox();
- XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, this,buf_);
+ XBT_DEBUG("Send request %p is in the large mailbox %s (buf: %p)", this, mailbox->get_cname(), buf_);
}
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
real_size_=size_;
action_ = simcall_comm_isend(
- simgrid::s4u::Actor::by_pid(src_)->get_impl(), mailbox, size_, -1.0, buf, real_size_, &match_send,
+ simgrid::s4u::Actor::by_pid(src_)->get_impl(), mailbox->get_impl(), size_, -1.0, buf, real_size_, &match_send,
&xbt_free_f, // how to free the userdata if a detached send fails
not process->replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this,
// detach if msg size < eager/rdv switch limit
XBT_DEBUG("send simcall posted");
/* FIXME: detached sends are not traceable (action_ == nullptr) */
- if (action_ != nullptr)
- simcall_set_category(action_, TRACE_internal_smpi_get_category());
+ if (action_ != nullptr) {
+ std::string category = smpi_process()->get_tracing_category();
+ simgrid::simix::simcall([this, category] { this->action_->set_category(category); });
+ }
+
if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0))
- xbt_mutex_release(mut);
+ mut->unlock();
}
}
int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
{
- std::vector<simgrid::kernel::activity::ActivityImplPtr> comms;
+ std::vector<simgrid::kernel::activity::CommImpl*> comms;
comms.reserve(count);
int i;
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
for(i = 0; i < count; i++) {
if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
- comms.push_back(requests[i]->action_);
+ comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
map.push_back(i);
}
}
source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source)->get_pid(),
simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV);
if (smpi_iprobe_sleep > 0) {
- s4u::this_actor::exec_init(/* flops to execute */ nsleeps * smpi_iprobe_sleep * speed * maxrate)
+ /** Compute the number of flops we will sleep **/
+ s4u::this_actor::exec_init(/*nsleeps: See comment above */ nsleeps *
+ /*(seconds * flop/s -> total flops)*/ smpi_iprobe_sleep * speed * maxrate)
->set_name("iprobe")
+ /* Not the entire CPU can be used when iprobing: This is important for
+ * the energy consumption caused by polling with iprobes.
+ * Note also that the number of flops that was
+ * computed above contains a maxrate factor and is hence reduced (maxrate < 1)
+ */
+ ->set_bound(maxrate*speed)
->start()
->wait();
}
// behave like a receive, but don't do it
- smx_mailbox_t mailbox;
+ s4u::MailboxPtr mailbox;
request->print_request("New iprobe");
// We have to test both mailboxes as we don't know if we will receive one one or another
if (simgrid::config::get_value<int>("smpi/async-small-thresh") > 0) {
mailbox = smpi_process()->mailbox_small();
XBT_DEBUG("Trying to probe the perm recv mailbox");
- request->action_ = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(request));
+ request->action_ = mailbox->iprobe(0, &match_recv, static_cast<void*>(request));
}
if (request->action_ == nullptr){
mailbox = smpi_process()->mailbox();
XBT_DEBUG("trying to probe the other mailbox");
- request->action_ = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(request));
+ request->action_ = mailbox->iprobe(0, &match_recv, static_cast<void*>(request));
}
if (request->action_ != nullptr){
- simgrid::kernel::activity::CommImplPtr sync_comm =
- boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(request->action_);
- MPI_Request req = static_cast<MPI_Request>(sync_comm->src_data);
+ kernel::activity::CommImplPtr sync_comm = boost::static_pointer_cast<kernel::activity::CommImpl>(request->action_);
+ MPI_Request req = static_cast<MPI_Request>(sync_comm->src_data_);
*flag = 1;
if (status != MPI_STATUS_IGNORE && (req->flags_ & MPI_REQ_PREPARED) == 0) {
status->MPI_SOURCE = comm->group()->rank(req->src_);
int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
{
- s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
+ std::vector<simgrid::kernel::activity::CommImpl*> comms;
+ comms.reserve(count);
int index = MPI_UNDEFINED;
if(count > 0) {
- int size = 0;
// Wait for a request to complete
- xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){
- intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
- });
- int *map = xbt_new(int, count);
+ std::vector<int> map;
XBT_DEBUG("Wait for one of %d", count);
for(int i = 0; i < count; i++) {
if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED) &&
not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
if (requests[i]->action_ != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
- intrusive_ptr_add_ref(requests[i]->action_.get());
- xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
- map[size] = i;
- size++;
+ comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+ map.push_back(i);
} else {
// This is a finished detached request, let's return this one
- size = 0; // so we free the dynar but don't do the waitany call
+ comms.clear(); // so we free don't do the waitany call
index = i;
finish_wait(&requests[i], status); // cleanup if refcount = 0
if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
}
}
}
- if (size > 0) {
- XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
+ if (not comms.empty()) {
+ XBT_DEBUG("Enter waitany for %zu comms", comms.size());
int i=MPI_UNDEFINED;
try{
// this is not a detached send
- i = simcall_comm_waitany(&comms, -1);
+ i = simcall_comm_waitany(comms.data(), comms.size(), -1);
}catch (xbt_ex& e) {
XBT_INFO("request %d cancelled ",i);
return i;
}
}
}
-
- xbt_dynar_free_data(&comms);
- xbt_free(map);
}
if (index==MPI_UNDEFINED)