#include "smpi_op.hpp"
#include "src/kernel/activity/CommImpl.hpp"
#include "src/mc/mc_replay.hpp"
-#include "src/simix/ActorImpl.hpp"
#include "src/smpi/include/smpi_actor.hpp"
#include "xbt/config.hpp"
std::vector<s_smpi_factor_t> smpi_ois_values;
-extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
+extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl*, void*, size_t);
namespace simgrid{
namespace smpi{
int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
- xbt_mutex_t mut = process->mailboxes_mutex();
+ simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
- xbt_mutex_acquire(mut);
+ mut->lock();
if (async_small_thresh == 0 && (flags_ & MPI_REQ_RMA) == 0) {
mailbox = process->mailbox();
XBT_DEBUG("recv simcall posted");
if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
- xbt_mutex_release(mut);
+ mut->unlock();
} else { /* the RECV flag was not set, so this is a send */
simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
int rank = src_;
int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
- xbt_mutex_t mut=process->mailboxes_mutex();
+ simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
- xbt_mutex_acquire(mut);
+ mut->lock();
if (not(async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
mailbox = process->mailbox();
/* FIXME: detached sends are not traceable (action_ == nullptr) */
if (action_ != nullptr) {
- std::string category = TRACE_internal_smpi_get_category();
+ std::string category = smpi_process()->get_tracing_category();
simgrid::simix::simcall([this, category] { this->action_->set_category(category); });
}
if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0))
- xbt_mutex_release(mut);
+ mut->unlock();
}
}
int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
{
- std::vector<simgrid::kernel::activity::ActivityImplPtr> comms;
+ std::vector<simgrid::kernel::activity::CommImpl*> comms;
comms.reserve(count);
int i;
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
for(i = 0; i < count; i++) {
if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
- comms.push_back(requests[i]->action_);
+ comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
map.push_back(i);
}
}
int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
{
- s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
+ std::vector<simgrid::kernel::activity::CommImpl*> comms;
+ comms.reserve(count);
int index = MPI_UNDEFINED;
if(count > 0) {
- int size = 0;
// Wait for a request to complete
- xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){
- intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
- });
- int *map = xbt_new(int, count);
+ std::vector<int> map;
XBT_DEBUG("Wait for one of %d", count);
for(int i = 0; i < count; i++) {
if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED) &&
not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
if (requests[i]->action_ != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
- intrusive_ptr_add_ref(requests[i]->action_.get());
- xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
- map[size] = i;
- size++;
+ comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+ map.push_back(i);
} else {
// This is a finished detached request, let's return this one
- size = 0; // so we free the dynar but don't do the waitany call
+ comms.clear(); // so we free don't do the waitany call
index = i;
finish_wait(&requests[i], status); // cleanup if refcount = 0
if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
}
}
}
- if (size > 0) {
- XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
+ if (not comms.empty()) {
+ XBT_DEBUG("Enter waitany for %zu comms", comms.size());
int i=MPI_UNDEFINED;
try{
// this is not a detached send
- i = simcall_comm_waitany(&comms, -1);
+ i = simcall_comm_waitany(comms.data(), comms.size(), -1);
}catch (xbt_ex& e) {
XBT_INFO("request %d cancelled ",i);
return i;
}
}
}
-
- xbt_dynar_free_data(&comms);
- xbt_free(map);
}
if (index==MPI_UNDEFINED)