/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include "src/smpi/smpi_request.hpp"
+
#include "mc/mc.h"
+#include "src/kernel/activity/CommImpl.hpp"
#include "src/mc/mc_replay.h"
#include "src/smpi/SmpiHost.hpp"
-#include "src/kernel/activity/SynchroComm.hpp"
#include "src/smpi/private.h"
#include "src/smpi/smpi_comm.hpp"
#include "src/smpi/smpi_datatype.hpp"
#include "src/smpi/smpi_op.hpp"
#include "src/smpi/smpi_process.hpp"
-#include "src/smpi/smpi_request.hpp"
#include <algorithm>
namespace simgrid{
namespace smpi{
-Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags)
+Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags)
{
void *old_buf = nullptr;
// FIXME Handle the case of a partial shared malloc.
ref->real_src_ = req->src_;
if(ref->tag_ == MPI_ANY_TAG)
ref->real_tag_ = req->tag_;
- if(ref->real_size_ < req->real_size_)
+ if(ref->real_size_ < req->real_size_)
ref->truncated_ = 1;
if(req->detached_==1)
ref->detached_sender_=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) {
mailbox = process->mailbox();
- }
+ }
else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) {
//We have to check both mailboxes (because SSEND messages are sent to the large mbox).
//begin with the more appropriate one : the small one.
void Request::startall(int count, MPI_Request * requests)
{
- if(requests== nullptr)
+ if(requests== nullptr)
return;
for(int i = 0; i < count; i++) {
// because the time will not normally advance when only calls to MPI_Test are made -> deadlock
// multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
static int nsleeps = 1;
- if(smpi_test_sleep > 0)
+ if(smpi_test_sleep > 0)
simcall_process_sleep(nsleeps*smpi_test_sleep);
Status::empty(status);
count++;
if (status != MPI_STATUSES_IGNORE)
status[i] = *pstat;
- if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags_ & NON_PERSISTENT)
+ if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & NON_PERSISTENT))
requests[i] = MPI_REQUEST_NULL;
}
} else {
int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
{
- std::vector<simgrid::kernel::activity::ActivityImpl*> comms;
+ std::vector<simgrid::kernel::activity::ActivityImplPtr> comms;
comms.reserve(count);
int i;
if (not map.empty()) {
//multiplier to the sleeptime, to increase speed of execution, each failed testany will increase it
static int nsleeps = 1;
- if(smpi_test_sleep > 0)
+ if(smpi_test_sleep > 0)
simcall_process_sleep(nsleeps*smpi_test_sleep);
i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
- *index = map[i];
+ *index = map[i];
finish_wait(&requests[*index],status);
flag = 1;
nsleeps = 1;
}
if (request->action_ != nullptr){
- simgrid::kernel::activity::Comm *sync_comm = static_cast<simgrid::kernel::activity::Comm*>(request->action_);
+ simgrid::kernel::activity::CommImplPtr sync_comm =
+ boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(request->action_);
MPI_Request req = static_cast<MPI_Request>(sync_comm->src_data);
*flag = 1;
if(status != MPI_STATUS_IGNORE && (req->flags_ & PREPARED) == 0) {
int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
{
s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
- int i;
int size = 0;
int index = MPI_UNDEFINED;
- int *map;
if(count > 0) {
// Wait for a request to complete
- xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr);
- map = xbt_new(int, count);
+ xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){
+ intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
+ });
+ int *map = xbt_new(int, count);
XBT_DEBUG("Wait for one of %d", count);
- for(i = 0; i < count; i++) {
+ for(int i = 0; i < count; i++) {
if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED) &&
not(requests[i]->flags_ & FINISHED)) {
if (requests[i]->action_ != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
- xbt_dynar_push(&comms, &requests[i]->action_);
+ intrusive_ptr_add_ref(requests[i]->action_.get());
+ xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
map[size] = i;
size++;
} else {
}
}
}
- if(size > 0) {
- i = simcall_comm_waitany(&comms, -1);
+ if (size > 0) {
+ XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
+ int i = simcall_comm_waitany(&comms, -1);
// not MPI_UNDEFINED, as this is a simix return code
if (i != -1) {
std::vector<MPI_Request> accumulates;
int index;
MPI_Status stat;
- MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+ MPI_Status *pstat = (status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat);
int retvalue = MPI_SUCCESS;
//tag invalid requests in the set
if (status != MPI_STATUSES_IGNORE) {
wait(&requests[c],pstat);
index = c;
} else {
- index = waitany(count, requests, pstat);
+ index = waitany(count, (MPI_Request*)requests, pstat);
if (index == MPI_UNDEFINED)
break;