#endif
/* FIXME: waitany is going to be a vararg function, and should take a timeout */
-XBT_PUBLIC unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout);
+XBT_PUBLIC unsigned int simcall_comm_waitany(smx_activity_t* comms, size_t count, double timeout);
XBT_PUBLIC void simcall_comm_wait(smx_activity_t comm, double timeout);
XBT_PUBLIC int simcall_comm_test(smx_activity_t comm);
XBT_PUBLIC int simcall_comm_testany(smx_activity_t* comms, size_t count);
static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
{
- unsigned int cursor = 0;
- xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
+ smx_activity_t* synchros = simcall_comm_waitany__get__comms(simcall);
+ size_t count = simcall_comm_waitany__get__count(simcall);
- simgrid::kernel::activity::ActivityImpl* synchro;
- xbt_dynar_foreach (synchros, cursor, synchro) {
+ for (size_t i = 0; i < count; i++) {
// Remove the first occurence of simcall:
- auto i = boost::range::find(synchro->simcalls_, simcall);
- if (i != synchro->simcalls_.end())
- synchro->simcalls_.erase(i);
+ smx_activity_t& synchro = synchros[i];
+ auto j = boost::range::find(synchro->simcalls_, simcall);
+ if (j != synchro->simcalls_.end())
+ synchro->simcalls_.erase(j);
}
}
-void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
+void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, smx_activity_t* synchros, size_t count, double timeout)
{
if (MC_is_active() || MC_record_replay_is_active()) {
if (timeout > 0.0)
xbt_die("Timeout not implemented for waitany in the model-checker");
- int idx = SIMCALL_GET_MC_VALUE(simcall);
- auto* synchro = xbt_dynar_get_as(synchros, idx, simgrid::kernel::activity::ActivityImpl*);
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
+ smx_activity_t& synchro = synchros[idx];
synchro->simcalls_.push_back(simcall);
simcall_comm_waitany__set__result(simcall, idx);
synchro->state_ = SIMIX_DONE;
});
}
- unsigned int cursor;
- simgrid::kernel::activity::ActivityImpl* synchro;
- xbt_dynar_foreach (synchros, cursor, synchro) {
+ for (size_t i = 0; i < count; i++) {
/* associate this simcall to the the synchro */
+ smx_activity_t& synchro = synchros[i];
synchro->simcalls_.push_back(simcall);
/* see if the synchro is already finished */
simcalls_.pop_front();
/* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
- * list. Afterwards, get the position of the actual synchro in the waitany dynar and return it as the result of the
+ * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
* simcall */
if (simcall->call == SIMCALL_NONE) // FIXME: maybe a better way to handle this case
simcall->timer = nullptr;
}
if (not MC_is_active() && not MC_record_replay_is_active()) {
- ActivityImpl* synchro = this;
- simcall_comm_waitany__set__result(simcall,
- xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
+ CommImpl** comms = simcall_comm_waitany__get__comms(simcall);
+ size_t count = simcall_comm_waitany__get__count(simcall);
+ CommImpl** element = std::find(comms, comms + count, this);
+ int rank = (element != comms + count) ? element - comms : -1;
+ simcall_comm_waitany__set__result(simcall, rank);
}
}
xbt_die("Unexpected synchro state in CommImpl::finish: %d", static_cast<int>(state_));
}
}
-
/* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
if (simcall->issuer->exception_ &&
(simcall->call == SIMCALL_COMM_WAITANY || simcall->call == SIMCALL_COMM_TESTANY)) {
// First retrieve the rank of our failing synchro
- int rank = -1;
+ CommImpl** comms;
+ size_t count;
if (simcall->call == SIMCALL_COMM_WAITANY) {
- ActivityImpl* synchro = this;
- rank = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
- } else if (simcall->call == SIMCALL_COMM_TESTANY) {
- rank = -1;
- auto* comms = simcall_comm_testany__get__comms(simcall);
- auto count = simcall_comm_testany__get__count(simcall);
- auto element = std::find(comms, comms + count, this);
- if (element == comms + count)
- rank = -1;
- else
- rank = element - comms;
+ comms = simcall_comm_waitany__get__comms(simcall);
+ count = simcall_comm_waitany__get__count(simcall);
+ } else {
+ /* simcall->call == SIMCALL_COMM_TESTANY */
+ comms = simcall_comm_testany__get__comms(simcall);
+ count = simcall_comm_testany__get__count(simcall);
}
+ CommImpl** element = std::find(comms, comms + count, this);
+ int rank = (element != comms + count) ? element - comms : -1;
// In order to modify the exception we have to rethrow it:
try {
}
case SIMCALL_COMM_WAITANY: {
- xbt_dynar_t comms = simcall_comm_waitany__get__comms(req);
- for (unsigned int index = 0; index < comms->used; ++index) {
- simgrid::kernel::activity::CommImpl* act = xbt_dynar_get_as(comms, index, simgrid::kernel::activity::CommImpl*);
- if (act->src_actor_ && act->dst_actor_)
+ simgrid::kernel::activity::ActivityImpl** comms = simcall_comm_waitany__getraw__comms(req);
+ size_t count = simcall_comm_waitany__get__count(req);
+ for (unsigned int index = 0; index < count; ++index) {
+ simgrid::kernel::activity::CommImpl* comm = static_cast<simgrid::kernel::activity::CommImpl*>(comms[index]);
+ if (comm->src_actor_ && comm->dst_actor_)
return true;
}
return false;
{
int finished_index = -1;
- /* create the equivalent dynar with SIMIX objects */
- xbt_dynar_t s_comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), nullptr);
+ /* Create the equivalent array with SIMIX objects: */
+ std::vector<simgrid::kernel::activity::ActivityImplPtr> s_comms;
+ s_comms.reserve(xbt_dynar_length(comms));
msg_comm_t comm;
unsigned int cursor;
xbt_dynar_foreach(comms, cursor, comm) {
- xbt_dynar_push_as(s_comms, simgrid::kernel::activity::ActivityImpl*, comm->s_comm.get());
+ s_comms.push_back(comm->s_comm);
}
msg_error_t status = MSG_OK;
try {
- finished_index = simcall_comm_waitany(s_comms, -1);
+ finished_index = simcall_comm_waitany(s_comms.data(), s_comms.size(), -1);
} catch (simgrid::TimeoutError& e) {
finished_index = e.value;
status = MSG_TIMEOUT;
}
xbt_assert(finished_index != -1, "WaitAny returned -1");
- xbt_dynar_free(&s_comms);
comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
/* the communication is finished */
}
}
-int Comm::wait_any_for(std::vector<CommPtr>* comms_in, double timeout)
-{
- // Map to dynar<Synchro*>:
- xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), nullptr);
- for (auto const& comm : *comms_in) {
- if (comm->state_ == Activity::State::INITED)
- comm->start();
- xbt_assert(comm->state_ == Activity::State::STARTED);
- simgrid::kernel::activity::ActivityImpl* ptr = comm->pimpl_.get();
- xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, ptr);
+int Comm::wait_any_for(std::vector<CommPtr>* comms, double timeout)
+{
+ smx_activity_t* array = new smx_activity_t[comms->size()];
+ for (unsigned int i = 0; i < comms->size(); i++) {
+ array[i] = comms->at(i)->pimpl_;
}
- // Call the underlying simcall:
- int idx = simcall_comm_waitany(comms, timeout);
- xbt_dynar_free(&comms);
+ int idx = simcall_comm_waitany(array, comms->size(), timeout);
+ delete[] array;
return idx;
}
/**
* @ingroup simix_comm_management
*/
-unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout)
+unsigned int simcall_comm_waitany(smx_activity_t* comms, size_t count, double timeout)
{
- return simcall_BODY_comm_waitany(comms, timeout);
+ return simcall_BODY_comm_waitany(comms, count, timeout);
}
/**
simgrid::simix::marshal<boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>>(simcall->result, result);
}
-static inline xbt_dynar_t simcall_comm_waitany__get__comms(smx_simcall_t simcall)
+static inline boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>*
+simcall_comm_waitany__get__comms(smx_simcall_t simcall)
{
- return simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]);
+ return simgrid::simix::unmarshal<boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>*>(simcall->args[0]);
+}
+static inline simgrid::kernel::activity::ActivityImpl** simcall_comm_waitany__getraw__comms(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<simgrid::kernel::activity::ActivityImpl**>(simcall->args[0]);
+}
+static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall,
+ boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>* arg)
+{
+ simgrid::simix::marshal<boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>*>(simcall->args[0], arg);
+}
+static inline size_t simcall_comm_waitany__get__count(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<size_t>(simcall->args[1]);
}
-static inline xbt_dynar_t simcall_comm_waitany__getraw__comms(smx_simcall_t simcall)
+static inline size_t simcall_comm_waitany__getraw__count(smx_simcall_t simcall)
{
- return simgrid::simix::unmarshal_raw<xbt_dynar_t>(simcall->args[0]);
+ return simgrid::simix::unmarshal_raw<size_t>(simcall->args[1]);
}
-static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg)
+static inline void simcall_comm_waitany__set__count(smx_simcall_t simcall, size_t arg)
{
- simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
+ simgrid::simix::marshal<size_t>(simcall->args[1], arg);
}
static inline double simcall_comm_waitany__get__timeout(smx_simcall_t simcall)
{
- return simgrid::simix::unmarshal<double>(simcall->args[1]);
+ return simgrid::simix::unmarshal<double>(simcall->args[2]);
}
static inline double simcall_comm_waitany__getraw__timeout(smx_simcall_t simcall)
{
- return simgrid::simix::unmarshal_raw<double>(simcall->args[1]);
+ return simgrid::simix::unmarshal_raw<double>(simcall->args[2]);
}
static inline void simcall_comm_waitany__set__timeout(smx_simcall_t simcall, double arg)
{
- simgrid::simix::marshal<double>(simcall->args[1], arg);
+ simgrid::simix::marshal<double>(simcall->args[2], arg);
}
static inline int simcall_comm_waitany__get__result(smx_simcall_t simcall)
{
XBT_PRIVATE boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate);
XBT_PRIVATE boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
-XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms, double timeout);
+XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall,
+ boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>* comms,
+ size_t count, double timeout);
XBT_PRIVATE void simcall_HANDLER_comm_wait(smx_simcall_t simcall, boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm, double timeout);
XBT_PRIVATE void simcall_HANDLER_comm_test(smx_simcall_t simcall, boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm);
XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>* comms, size_t count);
return simcall<boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>, smx_actor_t, smx_mailbox_t, void*, size_t*, simix_match_func_t, simix_copy_data_func_t, void*, double>(SIMCALL_COMM_IRECV, receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
}
-inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms, double timeout)
+inline static int simcall_BODY_comm_waitany(boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>* comms,
+ size_t count, double timeout)
{
if (0) /* Go to that function to follow the code flow through the simcall barrier */
- simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, timeout);
- return simcall<int, xbt_dynar_t, double>(SIMCALL_COMM_WAITANY, comms, timeout);
+ simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, count, timeout);
+ return simcall<int, boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>*, size_t, double>(
+ SIMCALL_COMM_WAITANY, comms, count, timeout);
}
inline static void simcall_BODY_comm_wait(boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm, double timeout)
break;
case SIMCALL_COMM_WAITANY:
- simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]), simgrid::simix::unmarshal<double>(simcall->args[1]));
+ simcall_HANDLER_comm_waitany(
+ simcall,
+ simgrid::simix::unmarshal<boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>*>(simcall->args[0]),
+ simgrid::simix::unmarshal<size_t>(simcall->args[1]), simgrid::simix::unmarshal<double>(simcall->args[2]));
break;
case SIMCALL_COMM_WAIT:
boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm_isend(smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
void comm_recv(smx_actor_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate) [[block]];
boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm_irecv(smx_actor_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
-int comm_waitany(xbt_dynar_t comms, double timeout) [[block]];
+int comm_waitany(boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>* comms, size_t count, double timeout) [[block]];
void comm_wait(boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm, double timeout) [[block]];
int comm_test(boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl> comm) [[block]];
int comm_testany(boost::intrusive_ptr<simgrid::kernel::activity::ActivityImpl>* comms, size_t count) [[block]];
int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
{
- s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
+ std::vector<simgrid::kernel::activity::ActivityImplPtr> comms;
+ comms.reserve(count);
int index = MPI_UNDEFINED;
if(count > 0) {
- int size = 0;
// Wait for a request to complete
- xbt_dynar_init(&comms, sizeof(simgrid::kernel::activity::ActivityImpl*), nullptr);
- int *map = xbt_new(int, count);
+ std::vector<int> map;
XBT_DEBUG("Wait for one of %d", count);
for(int i = 0; i < count; i++) {
if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED) &&
not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
if (requests[i]->action_ != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
- xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
- map[size] = i;
- size++;
+ comms.push_back(requests[i]->action_);
+ map.push_back(i);
} else {
// This is a finished detached request, let's return this one
- size = 0; // so we free the dynar but don't do the waitany call
+ comms.clear(); // so we free don't do the waitany call
index = i;
finish_wait(&requests[i], status); // cleanup if refcount = 0
if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
}
}
}
- if (size > 0) {
- XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
+ if (not comms.empty()) {
+ XBT_DEBUG("Enter waitany for %zu comms", comms.size());
int i=MPI_UNDEFINED;
try{
// this is not a detached send
- i = simcall_comm_waitany(&comms, -1);
+ i = simcall_comm_waitany(comms.data(), comms.size(), -1);
}catch (xbt_ex& e) {
XBT_INFO("request %d cancelled ",i);
return i;
}
}
}
-
- xbt_dynar_free_data(&comms);
- xbt_free(map);
}
if (index==MPI_UNDEFINED)