From: Frederic Suter Date: Tue, 19 Feb 2019 09:02:03 +0000 (+0100) Subject: All activities have their own finish method \o/ X-Git-Tag: v3_22~307 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/121e8462d6b912a252a9016683d2e3e021cde9a0 All activities have their own finish method \o/ The only sparks were of joy ... --- diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 172615effd..1dac63be64 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -157,7 +157,7 @@ XBT_PUBLIC void SIMIX_comm_set_copy_data_callback(void (*callback)(smx_activity_ XBT_PUBLIC void SIMIX_comm_copy_pointer_callback(smx_activity_t comm, void* buff, size_t buff_size); XBT_PUBLIC void SIMIX_comm_copy_buffer_callback(smx_activity_t comm, void* buff, size_t buff_size); -XBT_PUBLIC void SIMIX_comm_finish(smx_activity_t synchro); +XBT_ATTRIB_DEPRECATED_v325("Please use CommImpl::finish") XBT_PUBLIC void SIMIX_comm_finish(smx_activity_t synchro); /******************************************************************************/ /* SIMIX simcalls */ diff --git a/src/kernel/activity/ActivityImpl.hpp b/src/kernel/activity/ActivityImpl.hpp index 6d14c86de7..70e4b5943b 100644 --- a/src/kernel/activity/ActivityImpl.hpp +++ b/src/kernel/activity/ActivityImpl.hpp @@ -34,8 +34,8 @@ public: virtual void suspend(); virtual void resume(); - virtual void post() = 0; // What to do when a simcall terminates - + virtual void post() = 0; // What to do when a simcall terminates + virtual void finish() = 0; void set_category(std::string category); // boost::intrusive_ptr support: diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index e6042d46dd..ff80213f39 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -4,21 +4,368 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "src/kernel/activity/CommImpl.hpp" +#include "simgrid/Exception.hpp" #include "simgrid/kernel/resource/Action.hpp" #include "simgrid/modelchecker.h" #include "simgrid/s4u/Host.hpp" #include "src/kernel/activity/MailboxImpl.hpp" +#include "src/kernel/context/Context.hpp" #include "src/mc/mc_replay.hpp" +#include "src/surf/cpu_interface.hpp" #include "src/surf/network_interface.hpp" #include "src/surf/surf_interface.hpp" -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_network); +#include +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization"); + +XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox, double task_size, + double rate, void* src_buff, size_t src_buff_size, + int (*match_fun)(void*, void*, simgrid::kernel::activity::CommImpl*), + void (*copy_data_fun)(smx_activity_t, void*, size_t), void* data, + double timeout) +{ + smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate, src_buff, src_buff_size, + match_fun, nullptr, copy_data_fun, data, 0); + SIMCALL_SET_MC_VALUE(simcall, 0); + simcall_HANDLER_comm_wait(simcall, comm, timeout); +} + +XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend( + smx_simcall_t /*simcall*/, smx_actor_t src_proc, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, + size_t src_buff_size, int (*match_fun)(void*, void*, simgrid::kernel::activity::CommImpl*), + void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send + void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one + void* data, int detached) +{ + XBT_DEBUG("send from mailbox %p", mbox); + + /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */ + simgrid::kernel::activity::CommImplPtr this_comm = simgrid::kernel::activity::CommImplPtr( + new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::SEND)); + + /* Look for communication synchro matching our needs. We also provide a description of + * ourself so that the other side also gets a chance of choosing if it wants to match with us. + * + * If it is not found then push our communication into the rendez-vous point */ + simgrid::kernel::activity::CommImplPtr other_comm = + mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::RECEIVE, match_fun, data, this_comm, + /*done*/ false, /*remove_matching*/ true); + + if (not other_comm) { + other_comm = std::move(this_comm); + + if (mbox->permanent_receiver_ != nullptr) { + // this mailbox is for small messages, which have to be sent right now + other_comm->state_ = SIMIX_READY; + other_comm->dst_actor_ = mbox->permanent_receiver_.get(); + mbox->done_comm_queue_.push_back(other_comm); + XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, other_comm.get()); + + } else { + mbox->push(other_comm); + } + } else { + XBT_DEBUG("Receive already pushed"); + + other_comm->state_ = SIMIX_READY; + other_comm->type = simgrid::kernel::activity::CommImpl::Type::READY; + } + src_proc->comms.push_back(other_comm); + + if (detached) { + other_comm->detached = true; + other_comm->clean_fun = clean_fun; + } else { + other_comm->clean_fun = nullptr; + } + + /* Setup the communication synchro */ + other_comm->src_actor_ = src_proc; + other_comm->task_size_ = task_size; + other_comm->rate_ = rate; + other_comm->src_buff_ = src_buff; + other_comm->src_buff_size_ = src_buff_size; + other_comm->src_data_ = data; + + other_comm->match_fun = match_fun; + other_comm->copy_data_fun = copy_data_fun; + + if (MC_is_active() || MC_record_replay_is_active()) { + other_comm->state_ = SIMIX_RUNNING; + return (detached ? nullptr : other_comm); + } + + other_comm->start(); + + return (detached ? nullptr : other_comm); +} + +XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox, + void* dst_buff, size_t* dst_buff_size, + int (*match_fun)(void*, void*, simgrid::kernel::activity::CommImpl*), + void (*copy_data_fun)(smx_activity_t, void*, size_t), void* data, + double timeout, double rate) +{ + smx_activity_t comm = simcall_HANDLER_comm_irecv(simcall, receiver, mbox, dst_buff, dst_buff_size, match_fun, + copy_data_fun, data, rate); + SIMCALL_SET_MC_VALUE(simcall, 0); + simcall_HANDLER_comm_wait(simcall, comm, timeout); +} + +XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, + smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, + simix_match_func_t match_fun, + void (*copy_data_fun)(smx_activity_t, void*, size_t), void* data, + double rate) +{ + simgrid::kernel::activity::CommImplPtr this_synchro = simgrid::kernel::activity::CommImplPtr( + new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::RECEIVE)); + XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get()); + + simgrid::kernel::activity::CommImplPtr other_comm; + // communication already done, get it inside the list of completed comms + if (mbox->permanent_receiver_ != nullptr && not mbox->done_comm_queue_.empty()) { + + XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication"); + // find a match in the list of already received comms + other_comm = mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::SEND, match_fun, data, + this_synchro, /*done*/ true, + /*remove_matching*/ true); + // if not found, assume the receiver came first, register it to the mailbox in the classical way + if (not other_comm) { + XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request " + "into list"); + other_comm = std::move(this_synchro); + mbox->push(other_comm); + } else { + if (other_comm->surf_action_ && other_comm->remains() < 1e-12) { + XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get()); + other_comm->state_ = SIMIX_DONE; + other_comm->type = simgrid::kernel::activity::CommImpl::Type::DONE; + other_comm->mbox = nullptr; + } + } + } else { + /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */ + + /* Look for communication activity matching our needs. We also provide a description of + * ourself so that the other side also gets a chance of choosing if it wants to match with us. + * + * If it is not found then push our communication into the rendez-vous point */ + other_comm = mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::SEND, match_fun, data, + this_synchro, /*done*/ false, + /*remove_matching*/ true); + + if (other_comm == nullptr) { + XBT_DEBUG("Receive pushed first (%zu comm enqueued so far)", mbox->comm_queue_.size()); + other_comm = std::move(this_synchro); + mbox->push(other_comm); + } else { + XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_comm.get()); + + other_comm->state_ = SIMIX_READY; + other_comm->type = simgrid::kernel::activity::CommImpl::Type::READY; + } + receiver->comms.push_back(other_comm); + } + + /* Setup communication synchro */ + other_comm->dst_actor_ = receiver; + other_comm->dst_buff_ = dst_buff; + other_comm->dst_buff_size_ = dst_buff_size; + other_comm->dst_data_ = data; + + if (rate > -1.0 && (other_comm->rate_ < 0.0 || rate < other_comm->rate_)) + other_comm->rate_ = rate; + + other_comm->match_fun = match_fun; + other_comm->copy_data_fun = copy_data_fun; + + if (MC_is_active() || MC_record_replay_is_active()) { + other_comm->state_ = SIMIX_RUNNING; + return other_comm; + } + other_comm->start(); + return other_comm; +} + +void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout) +{ + /* Associate this simcall to the wait synchro */ + XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro.get()); + + synchro->simcalls_.push_back(simcall); + simcall->issuer->waiting_synchro = synchro; + + if (MC_is_active() || MC_record_replay_is_active()) { + int idx = SIMCALL_GET_MC_VALUE(simcall); + if (idx == 0) { + synchro->state_ = SIMIX_DONE; + } else { + /* If we reached this point, the wait simcall must have a timeout */ + /* Otherwise it shouldn't be enabled and executed by the MC */ + if (timeout < 0.0) + THROW_IMPOSSIBLE; + + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + if (comm->src_actor_ == simcall->issuer) + comm->state_ = SIMIX_SRC_TIMEOUT; + else + comm->state_ = SIMIX_DST_TIMEOUT; + } + + boost::static_pointer_cast(synchro)->finish(); + return; + } + + /* If the synchro has already finish perform the error handling, */ + /* otherwise set up a waiting timeout on the right side */ + if (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING) { + boost::static_pointer_cast(synchro)->finish(); + } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */ + simgrid::kernel::resource::Action* sleep = simcall->issuer->get_host()->pimpl_cpu->sleep(timeout); + sleep->set_data(synchro.get()); + + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + if (simcall->issuer == comm->src_actor_) + comm->src_timeout_ = sleep; + else + comm->dst_timeout_ = sleep; + } +} + +void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro) +{ + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + + int res; + + if (MC_is_active() || MC_record_replay_is_active()) { + res = comm->src_actor_ && comm->dst_actor_; + if (res) + synchro->state_ = SIMIX_DONE; + } else { + res = synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING; + } + + simcall_comm_test__set__result(simcall, res); + if (simcall_comm_test__get__result(simcall)) { + synchro->simcalls_.push_back(simcall); + boost::static_pointer_cast(synchro)->finish(); + } else { + SIMIX_simcall_answer(simcall); + } +} + +void simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::ActivityImplPtr comms[], + size_t count) +{ + // The default result is -1 -- this means, "nothing is ready". + // It can be changed below, but only if something matches. + simcall_comm_testany__set__result(simcall, -1); + + if (MC_is_active() || MC_record_replay_is_active()) { + int idx = SIMCALL_GET_MC_VALUE(simcall); + if (idx == -1) { + SIMIX_simcall_answer(simcall); + } else { + simgrid::kernel::activity::ActivityImplPtr synchro = comms[idx]; + simcall_comm_testany__set__result(simcall, idx); + synchro->simcalls_.push_back(simcall); + synchro->state_ = SIMIX_DONE; + boost::static_pointer_cast(synchro)->finish(); + } + return; + } + + for (std::size_t i = 0; i != count; ++i) { + simgrid::kernel::activity::ActivityImplPtr synchro = comms[i]; + if (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING) { + simcall_comm_testany__set__result(simcall, i); + synchro->simcalls_.push_back(simcall); + boost::static_pointer_cast(synchro)->finish(); + return; + } + } + SIMIX_simcall_answer(simcall); +} + +static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall) +{ + unsigned int cursor = 0; + xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall); + + simgrid::kernel::activity::ActivityImpl* ptr; + xbt_dynar_foreach (synchros, cursor, ptr) { + smx_activity_t synchro = simgrid::kernel::activity::ActivityImplPtr(ptr); + + // Remove the first occurence of simcall: + auto i = boost::range::find(synchro->simcalls_, simcall); + if (i != synchro->simcalls_.end()) + synchro->simcalls_.erase(i); + } +} +void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout) +{ + if (MC_is_active() || MC_record_replay_is_active()) { + if (timeout > 0.0) + xbt_die("Timeout not implemented for waitany in the model-checker"); + int idx = SIMCALL_GET_MC_VALUE(simcall); + smx_activity_t synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t); + synchro->simcalls_.push_back(simcall); + simcall_comm_waitany__set__result(simcall, idx); + synchro->state_ = SIMIX_DONE; + boost::static_pointer_cast(synchro)->finish(); + return; + } + + if (timeout < 0.0) { + simcall->timer = NULL; + } else { + simcall->timer = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() { + SIMIX_waitany_remove_simcall_from_actions(simcall); + simcall_comm_waitany__set__result(simcall, -1); + SIMIX_simcall_answer(simcall); + }); + } + + unsigned int cursor; + simgrid::kernel::activity::ActivityImpl* ptr; + xbt_dynar_foreach (synchros, cursor, ptr) { + smx_activity_t synchro = simgrid::kernel::activity::ActivityImplPtr(ptr); + /* associate this simcall to the the synchro */ + synchro->simcalls_.push_back(simcall); + + /* see if the synchro is already finished */ + if (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING) { + boost::static_pointer_cast(synchro)->finish(); + break; + } + } +} /******************************************************************************/ /* SIMIX_comm_copy_data callbacks */ /******************************************************************************/ static void (*SIMIX_comm_copy_data_callback)(smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback; +void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size) +{ + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + + XBT_DEBUG("Copy the data over"); + memcpy(comm->dst_buff_, buff, buff_size); + if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the + // original buffer available to the application ASAP + xbt_free(buff); + comm->src_buff_ = nullptr; + } +} + void SIMIX_comm_set_copy_data_callback(void (*callback)(smx_activity_t, void*, size_t)) { SIMIX_comm_copy_data_callback = callback; @@ -212,7 +559,163 @@ void CommImpl::post() /* if there are simcalls associated with the synchro, then answer them */ if (not simcalls_.empty()) { - SIMIX_comm_finish(this); + finish(); + } +} + +void CommImpl::finish() +{ + smx_activity_t synchro = this; + while (not simcalls_.empty()) { + smx_simcall_t simcall = simcalls_.front(); + simcalls_.pop_front(); + + /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany + * list. Afterwards, get the position of the actual synchro in the waitany dynar and return it as the result of the + * simcall */ + + if (simcall->call == SIMCALL_NONE) // FIXME: maybe a better way to handle this case + continue; // if process handling comm is killed + if (simcall->call == SIMCALL_COMM_WAITANY) { + SIMIX_waitany_remove_simcall_from_actions(simcall); + if (simcall->timer) { + simcall->timer->remove(); + simcall->timer = nullptr; + } + if (not MC_is_active() && not MC_record_replay_is_active()) + simcall_comm_waitany__set__result(simcall, + xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro)); + } + + /* If the synchro is still in a rendez-vous point then remove from it */ + if (mbox) + mbox->remove(this); + + XBT_DEBUG("CommImpl::finish(): synchro state = %d", static_cast(state_)); + + /* Check out for errors */ + + if (not simcall->issuer->get_host()->is_on()) { + simcall->issuer->context_->iwannadie = true; + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::HostFailureException(XBT_THROW_POINT, "Host failed")); + } else { + switch (state_) { + + case SIMIX_DONE: + XBT_DEBUG("Communication %p complete!", this); + copy_data(); + break; + + case SIMIX_SRC_TIMEOUT: + simcall->issuer->exception_ = std::make_exception_ptr( + simgrid::TimeoutError(XBT_THROW_POINT, "Communication timeouted because of the sender")); + break; + + case SIMIX_DST_TIMEOUT: + simcall->issuer->exception_ = std::make_exception_ptr( + simgrid::TimeoutError(XBT_THROW_POINT, "Communication timeouted because of the receiver")); + break; + + case SIMIX_SRC_HOST_FAILURE: + if (simcall->issuer == src_actor_) + simcall->issuer->context_->iwannadie = true; + else + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); + break; + + case SIMIX_DST_HOST_FAILURE: + if (simcall->issuer == dst_actor_) + simcall->issuer->context_->iwannadie = true; + else + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); + break; + + case SIMIX_LINK_FAILURE: + XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) " + "detached:%d", + this, src_actor_ ? src_actor_->get_host()->get_cname() : nullptr, + dst_actor_ ? dst_actor_->get_host()->get_cname() : nullptr, simcall->issuer->get_cname(), + simcall->issuer, detached); + if (src_actor_ == simcall->issuer) { + XBT_DEBUG("I'm source"); + } else if (dst_actor_ == simcall->issuer) { + XBT_DEBUG("I'm dest"); + } else { + XBT_DEBUG("I'm neither source nor dest"); + } + simcall->issuer->throw_exception( + std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Link failure"))); + break; + + case SIMIX_CANCELED: + if (simcall->issuer == dst_actor_) + simcall->issuer->exception_ = std::make_exception_ptr( + simgrid::CancelException(XBT_THROW_POINT, "Communication canceled by the sender")); + else + simcall->issuer->exception_ = std::make_exception_ptr( + simgrid::CancelException(XBT_THROW_POINT, "Communication canceled by the receiver")); + break; + + default: + xbt_die("Unexpected synchro state in CommImpl::finish: %d", static_cast(state_)); + } + } + + /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */ + if (simcall->issuer->exception_ && + (simcall->call == SIMCALL_COMM_WAITANY || simcall->call == SIMCALL_COMM_TESTANY)) { + // First retrieve the rank of our failing synchro + int rank = -1; + if (simcall->call == SIMCALL_COMM_WAITANY) { + rank = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro); + } else if (simcall->call == SIMCALL_COMM_TESTANY) { + rank = -1; + auto* comms = simcall_comm_testany__get__comms(simcall); + auto count = simcall_comm_testany__get__count(simcall); + auto element = std::find(comms, comms + count, this); + if (element == comms + count) + rank = -1; + else + rank = element - comms; + } + + // In order to modify the exception we have to rethrow it: + try { + std::rethrow_exception(simcall->issuer->exception_); + } catch (simgrid::TimeoutError& e) { + e.value = rank; + simcall->issuer->exception_ = std::make_exception_ptr(e); + } catch (simgrid::NetworkFailureException& e) { + e.value = rank; + simcall->issuer->exception_ = std::make_exception_ptr(e); + } catch (simgrid::CancelException& e) { + e.value = rank; + simcall->issuer->exception_ = std::make_exception_ptr(e); + } + } + + simcall->issuer->waiting_synchro = nullptr; + simcall->issuer->comms.remove(this); + if (detached) { + if (simcall->issuer == src_actor_) { + if (dst_actor_) + dst_actor_->comms.remove(this); + } else if (simcall->issuer == dst_actor_) { + if (src_actor_) + src_actor_->comms.remove(this); + } else { + dst_actor_->comms.remove(this); + src_actor_->comms.remove(this); + } + } + + if (simcall->issuer->get_host()->is_on()) + SIMIX_simcall_answer(simcall); + else + simcall->issuer->context_->iwannadie = true; } } diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index acebe1d6eb..715483a39a 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -27,6 +27,7 @@ public: void suspend() override; void resume() override; void post() override; + void finish() override; void cancel(); double remains(); void cleanupSurf(); // FIXME: make me protected diff --git a/src/kernel/activity/ConditionVariableImpl.cpp b/src/kernel/activity/ConditionVariableImpl.cpp index 0186959676..6370b6248d 100644 --- a/src/kernel/activity/ConditionVariableImpl.cpp +++ b/src/kernel/activity/ConditionVariableImpl.cpp @@ -7,7 +7,6 @@ #include "simgrid/Exception.hpp" #include "src/kernel/activity/MutexImpl.hpp" #include "src/kernel/activity/SynchroRaw.hpp" -#include "src/simix/smx_synchro_private.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ConditionVariable, simix_synchro, "Condition variables"); diff --git a/src/kernel/activity/ExecImpl.cpp b/src/kernel/activity/ExecImpl.cpp index 83e7e88da5..ba2413b0b9 100644 --- a/src/kernel/activity/ExecImpl.cpp +++ b/src/kernel/activity/ExecImpl.cpp @@ -3,17 +3,50 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "src/kernel/activity/ExecImpl.hpp" +#include "simgrid/Exception.hpp" #include "simgrid/modelchecker.h" #include "src/mc/mc_replay.hpp" - -#include "src/kernel/activity/ExecImpl.hpp" #include "src/simix/smx_host_private.hpp" -#include "src/surf/surf_interface.hpp" #include "src/surf/cpu_interface.hpp" +#include "src/surf/surf_interface.hpp" #include "simgrid/s4u/Host.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process); + +void simcall_HANDLER_execution_wait(smx_simcall_t simcall, smx_activity_t synchro) +{ + XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro.get(), (int)synchro->state_); + + /* Associate this simcall to the synchro */ + synchro->simcalls_.push_back(simcall); + simcall->issuer->waiting_synchro = synchro; + + /* set surf's synchro */ + if (MC_is_active() || MC_record_replay_is_active()) { + synchro->state_ = SIMIX_DONE; + boost::static_pointer_cast(synchro)->finish(); + return; + } + + /* If the synchro is already finished then perform the error handling */ + if (synchro->state_ != SIMIX_RUNNING) + boost::static_pointer_cast(synchro)->finish(); +} + +void simcall_HANDLER_execution_test(smx_simcall_t simcall, smx_activity_t synchro) +{ + int res = (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING); + if (res) { + synchro->simcalls_.push_back(simcall); + boost::static_pointer_cast(synchro)->finish(); + } else { + SIMIX_simcall_answer(simcall); + } + simcall_execution_test__set__result(simcall, res); +} + namespace simgrid { namespace kernel { namespace activity { @@ -117,7 +150,52 @@ void ExecImpl::post() /* If there are simcalls associated with the synchro, then answer them */ if (not simcalls_.empty()) - SIMIX_execution_finish(this); + finish(); +} + +void ExecImpl::finish() +{ + while (not simcalls_.empty()) { + smx_simcall_t simcall = simcalls_.front(); + simcalls_.pop_front(); + switch (state_) { + + case SIMIX_DONE: + /* do nothing, synchro done */ + XBT_DEBUG("SIMIX_execution_finished: execution successful"); + break; + + case SIMIX_FAILED: + XBT_DEBUG("SIMIX_execution_finished: host '%s' failed", simcall->issuer->get_host()->get_cname()); + simcall->issuer->context_->iwannadie = true; + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::HostFailureException(XBT_THROW_POINT, "Host failed")); + break; + + case SIMIX_CANCELED: + XBT_DEBUG("SIMIX_execution_finished: execution canceled"); + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::CancelException(XBT_THROW_POINT, "Execution Canceled")); + break; + + case SIMIX_TIMEOUT: + XBT_DEBUG("SIMIX_execution_finished: execution timeouted"); + simcall->issuer->exception_ = std::make_exception_ptr(simgrid::TimeoutError(XBT_THROW_POINT, "Timeouted")); + break; + + default: + xbt_die("Internal error in SIMIX_execution_finish: unexpected synchro state %d", static_cast(state_)); + } + + simcall->issuer->waiting_synchro = nullptr; + simcall_execution_wait__set__result(simcall, state_); + + /* Fail the process if the host is down */ + if (simcall->issuer->get_host()->is_on()) + SIMIX_simcall_answer(simcall); + else + simcall->issuer->context_->iwannadie = true; + } } ActivityImpl* ExecImpl::migrate(simgrid::s4u::Host* to) diff --git a/src/kernel/activity/ExecImpl.hpp b/src/kernel/activity/ExecImpl.hpp index accc1c5d7e..117831ad92 100644 --- a/src/kernel/activity/ExecImpl.hpp +++ b/src/kernel/activity/ExecImpl.hpp @@ -7,6 +7,7 @@ #define SIMIX_SYNCHRO_EXEC_HPP #include "src/kernel/activity/ActivityImpl.hpp" +#include "src/kernel/context/Context.hpp" #include "surf/surf.hpp" namespace simgrid { @@ -22,6 +23,7 @@ public: ExecImpl* start(double flops_amount, double priority, double bound); void cancel(); void post() override; + void finish() override; double get_remaining(); double get_remaining_ratio(); void set_bound(double bound); diff --git a/src/kernel/activity/IoImpl.hpp b/src/kernel/activity/IoImpl.hpp index f7f2217ffc..a4288e0743 100644 --- a/src/kernel/activity/IoImpl.hpp +++ b/src/kernel/activity/IoImpl.hpp @@ -21,7 +21,7 @@ public: IoImpl* start(sg_size_t size, simgrid::s4u::Io::OpType type); void post() override; - void finish(); + void finish() override; void cancel(); double get_remaining(); sg_size_t get_performed_ioops() { return performed_ioops_; } diff --git a/src/kernel/activity/MutexImpl.cpp b/src/kernel/activity/MutexImpl.cpp index 67b5dc8e04..80fa3d529d 100644 --- a/src/kernel/activity/MutexImpl.cpp +++ b/src/kernel/activity/MutexImpl.cpp @@ -5,7 +5,6 @@ #include "src/kernel/activity/MutexImpl.hpp" #include "src/kernel/activity/SynchroRaw.hpp" -#include "src/simix/smx_synchro_private.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_mutex, simix_synchro, "Mutex kernel-space implementation"); diff --git a/src/kernel/activity/SemaphoreImpl.cpp b/src/kernel/activity/SemaphoreImpl.cpp index cedb307903..86d1e9ef24 100644 --- a/src/kernel/activity/SemaphoreImpl.cpp +++ b/src/kernel/activity/SemaphoreImpl.cpp @@ -5,7 +5,6 @@ #include "src/kernel/activity/SemaphoreImpl.hpp" #include "src/kernel/activity/SynchroRaw.hpp" -#include "src/simix/smx_synchro_private.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_semaphore, simix_synchro, "Semaphore kernel-space implementation"); diff --git a/src/kernel/activity/SleepImpl.cpp b/src/kernel/activity/SleepImpl.cpp index 18bdbef0d7..edabd248fa 100644 --- a/src/kernel/activity/SleepImpl.cpp +++ b/src/kernel/activity/SleepImpl.cpp @@ -8,8 +8,6 @@ #include "simgrid/kernel/resource/Action.hpp" #include "simgrid/s4u/Host.hpp" #include "src/kernel/context/Context.hpp" - -#include "simgrid/Exception.hpp" #include "src/simix/ActorImpl.hpp" #include "src/simix/popping_private.hpp" #include "src/simix/smx_private.hpp" @@ -80,7 +78,10 @@ void SleepImpl::post() SIMIX_process_sleep_destroy(this); } - +void SleepImpl::finish() +{ + /* FIXME some part of post should move to finish */ +} } // namespace activity } // namespace kernel } // namespace simgrid diff --git a/src/kernel/activity/SleepImpl.hpp b/src/kernel/activity/SleepImpl.hpp index a98198fd36..5ab94f8460 100644 --- a/src/kernel/activity/SleepImpl.hpp +++ b/src/kernel/activity/SleepImpl.hpp @@ -19,6 +19,7 @@ class XBT_PUBLIC SleepImpl : public ActivityImpl { public: explicit SleepImpl(std::string name, s4u::Host* host) : ActivityImpl(std::move(name)), host_(host) {} void post() override; + void finish() override; SleepImpl* start(double duration); sg_host_t host_ = nullptr; diff --git a/src/kernel/activity/SynchroRaw.cpp b/src/kernel/activity/SynchroRaw.cpp index 5b3efd5c5a..42133c2427 100644 --- a/src/kernel/activity/SynchroRaw.cpp +++ b/src/kernel/activity/SynchroRaw.cpp @@ -4,14 +4,17 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "src/kernel/activity/SynchroRaw.hpp" +#include "simgrid/Exception.hpp" #include "simgrid/kernel/resource/Action.hpp" +#include "src/kernel/activity/ConditionVariableImpl.hpp" +#include "src/kernel/activity/MutexImpl.hpp" +#include "src/kernel/activity/SemaphoreImpl.hpp" #include "src/kernel/context/Context.hpp" -#include "src/simix/smx_synchro_private.hpp" #include "src/surf/cpu_interface.hpp" #include "src/surf/surf_interface.hpp" #include -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_synchro); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_synchro, simix, "SIMIX Synchronization (mutex, semaphores and conditions)"); namespace simgrid { namespace kernel { @@ -41,22 +44,70 @@ void RawImpl::resume() } void RawImpl::post() { - XBT_IN("(%p)",this); - smx_simcall_t simcall = simcalls_.front(); - simcalls_.pop_front(); - - SIMIX_synchro_stop_waiting(simcall->issuer, simcall); - simcall->issuer->waiting_synchro = nullptr; - if (surf_action_->get_state() == resource::Action::State::FAILED) { state_ = SIMIX_FAILED; - simcall->issuer->context_->iwannadie = true; } else if (surf_action_->get_state() == resource::Action::State::FINISHED) { state_ = SIMIX_SRC_TIMEOUT; - SIMIX_simcall_answer(simcall); } - XBT_OUT(); + finish(); } + +void RawImpl::finish() +{ + smx_simcall_t simcall = simcalls_.front(); + simcalls_.pop_front(); + + switch (state_) { + case SIMIX_DONE: + /* do nothing, synchro done */ + XBT_DEBUG("SIMIX_execution_finished: execution successful"); + break; + + case SIMIX_FAILED: + XBT_DEBUG("SIMIX_execution_finished: host '%s' failed", simcall->issuer->get_host()->get_cname()); + simcall->issuer->context_->iwannadie = true; + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::HostFailureException(XBT_THROW_POINT, "Host failed")); + break; + case SIMIX_SRC_TIMEOUT: + simcall->issuer->exception_ = + std::make_exception_ptr(simgrid::TimeoutError(XBT_THROW_POINT, "Synchronization timeout")); + break; + default: + xbt_die("Internal error in RawImpl::finish() unexpected synchro state %d", static_cast(state_)); + } + + switch (simcall->call) { + + case SIMCALL_MUTEX_LOCK: + simgrid::xbt::intrusive_erase(simcall_mutex_lock__get__mutex(simcall)->sleeping, *simcall->issuer); + break; + + case SIMCALL_COND_WAIT: + simgrid::xbt::intrusive_erase(simcall_cond_wait__get__cond(simcall)->sleeping_, *simcall->issuer); + break; + + case SIMCALL_COND_WAIT_TIMEOUT: + simgrid::xbt::intrusive_erase(simcall_cond_wait_timeout__get__cond(simcall)->sleeping_, *simcall->issuer); + simcall_cond_wait_timeout__set__result(simcall, 1); // signal a timeout + break; + + case SIMCALL_SEM_ACQUIRE: + simgrid::xbt::intrusive_erase(simcall_sem_acquire__get__sem(simcall)->sleeping_, *simcall->issuer); + break; + + case SIMCALL_SEM_ACQUIRE_TIMEOUT: + simgrid::xbt::intrusive_erase(simcall_sem_acquire_timeout__get__sem(simcall)->sleeping_, *simcall->issuer); + simcall_sem_acquire_timeout__set__result(simcall, 1); // signal a timeout + break; + + default: + THROW_IMPOSSIBLE; + } + simcall->issuer->waiting_synchro = nullptr; + SIMIX_simcall_answer(simcall); +} + } // namespace activity } // namespace kernel } // namespace simgrid diff --git a/src/kernel/activity/SynchroRaw.hpp b/src/kernel/activity/SynchroRaw.hpp index a8289758d1..91158611f0 100644 --- a/src/kernel/activity/SynchroRaw.hpp +++ b/src/kernel/activity/SynchroRaw.hpp @@ -21,6 +21,7 @@ public: void suspend() override; void resume() override; void post() override; + void finish() override; }; }}} // namespace simgrid::kernel::activity diff --git a/src/simix/ActorImpl.cpp b/src/simix/ActorImpl.cpp index 3b42e4c566..0590033d47 100644 --- a/src/simix/ActorImpl.cpp +++ b/src/simix/ActorImpl.cpp @@ -16,7 +16,6 @@ #include "src/mc/mc_replay.hpp" #include "src/mc/remote/Client.hpp" #include "src/simix/smx_host_private.hpp" -#include "src/simix/smx_synchro_private.hpp" #include "src/surf/HostImpl.hpp" #include "src/surf/cpu_interface.hpp" @@ -123,7 +122,7 @@ void ActorImpl::exit() sleep->surf_action_->cancel(); sleep->post(); } else if (raw != nullptr) { - SIMIX_synchro_stop_waiting(this, &simcall); + raw->finish(); } else if (io != nullptr) { io->cancel(); } else { @@ -357,7 +356,7 @@ void ActorImpl::throw_exception(std::exception_ptr e) activity::RawImplPtr raw = boost::dynamic_pointer_cast(waiting_synchro); if (raw != nullptr) { - SIMIX_synchro_stop_waiting(this, &simcall); + raw->finish(); } activity::IoImplPtr io = boost::dynamic_pointer_cast(waiting_synchro); @@ -541,7 +540,7 @@ void SIMIX_process_throw(smx_actor_t actor, xbt_errcat_t cat, int value, const c simgrid::kernel::activity::RawImplPtr raw = boost::dynamic_pointer_cast(actor->waiting_synchro); if (raw != nullptr) { - SIMIX_synchro_stop_waiting(actor, &actor->simcall); + raw->finish(); } simgrid::kernel::activity::IoImplPtr io = diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index 247dd903c5..94d37fc962 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -404,3 +404,9 @@ smx_activity_t simcall_execution_start(const std::string& name, const std::strin ->start(flops_amount, priority, bound); }); } + +// deprecated +void SIMIX_comm_finish(smx_activity_t synchro) +{ + boost::static_pointer_cast(synchro)->finish(); +} diff --git a/src/simix/popping_bodies.cpp b/src/simix/popping_bodies.cpp index 2525848ee2..eb015a08f2 100644 --- a/src/simix/popping_bodies.cpp +++ b/src/simix/popping_bodies.cpp @@ -16,7 +16,6 @@ #include "smx_private.hpp" #include "src/mc/mc_forward.hpp" -#include "src/simix/smx_synchro_private.hpp" #include "xbt/ex.h" #include #include diff --git a/src/simix/popping_generated.cpp b/src/simix/popping_generated.cpp index c5cd0daebd..612aeed3c1 100644 --- a/src/simix/popping_generated.cpp +++ b/src/simix/popping_generated.cpp @@ -21,7 +21,6 @@ #endif #include "src/kernel/activity/ConditionVariableImpl.hpp" #include "src/simix/smx_host_private.hpp" -#include "src/simix/smx_synchro_private.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_popping); diff --git a/src/simix/simcalls.py b/src/simix/simcalls.py index 44871e4d42..2f8b00cc40 100755 --- a/src/simix/simcalls.py +++ b/src/simix/simcalls.py @@ -314,7 +314,6 @@ if __name__ == '__main__': fd.write('#endif\n') fd.write('#include "src/kernel/activity/ConditionVariableImpl.hpp"\n') fd.write('#include "src/simix/smx_host_private.hpp"\n') - fd.write('#include "src/simix/smx_synchro_private.hpp"\n') fd.write('\n') fd.write('XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_popping);\n\n') @@ -365,7 +364,6 @@ if __name__ == '__main__': fd = header('popping_bodies.cpp') fd.write('#include "smx_private.hpp"\n') fd.write('#include "src/mc/mc_forward.hpp"\n') - fd.write('#include "src/simix/smx_synchro_private.hpp"\n') fd.write('#include "xbt/ex.h"\n') fd.write('#include \n') fd.write('#include \n') diff --git a/src/simix/smx_host.cpp b/src/simix/smx_host.cpp index 33d574972f..c30ee976c9 100644 --- a/src/simix/smx_host.cpp +++ b/src/simix/smx_host.cpp @@ -4,7 +4,6 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "mc/mc.h" -#include "simgrid/Exception.hpp" #include "smx_private.hpp" #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/activity/ExecImpl.hpp" @@ -57,82 +56,3 @@ SIMIX_execution_parallel_start(std::string name, int host_nb, const sg_host_t* h return exec; } -void simcall_HANDLER_execution_wait(smx_simcall_t simcall, smx_activity_t synchro) -{ - XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro.get(), (int)synchro->state_); - - /* Associate this simcall to the synchro */ - synchro->simcalls_.push_back(simcall); - simcall->issuer->waiting_synchro = synchro; - - /* set surf's synchro */ - if (MC_is_active() || MC_record_replay_is_active()) { - synchro->state_ = SIMIX_DONE; - SIMIX_execution_finish(synchro); - return; - } - - /* If the synchro is already finished then perform the error handling */ - if (synchro->state_ != SIMIX_RUNNING) - SIMIX_execution_finish(synchro); -} - -void simcall_HANDLER_execution_test(smx_simcall_t simcall, smx_activity_t synchro) -{ - int res = (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING); - if (res) { - synchro->simcalls_.push_back(simcall); - SIMIX_execution_finish(synchro); - } else { - SIMIX_simcall_answer(simcall); - } - simcall_execution_test__set__result(simcall, res); -} - -void SIMIX_execution_finish(smx_activity_t synchro) -{ - simgrid::kernel::activity::ExecImplPtr exec = - boost::static_pointer_cast(synchro); - - while (not synchro->simcalls_.empty()) { - smx_simcall_t simcall = synchro->simcalls_.front(); - synchro->simcalls_.pop_front(); - switch (exec->state_) { - - case SIMIX_DONE: - /* do nothing, synchro done */ - XBT_DEBUG("SIMIX_execution_finished: execution successful"); - break; - - case SIMIX_FAILED: - XBT_DEBUG("SIMIX_execution_finished: host '%s' failed", simcall->issuer->get_host()->get_cname()); - simcall->issuer->context_->iwannadie = true; - simcall->issuer->exception_ = - std::make_exception_ptr(simgrid::HostFailureException(XBT_THROW_POINT, "Host failed")); - break; - - case SIMIX_CANCELED: - XBT_DEBUG("SIMIX_execution_finished: execution canceled"); - simcall->issuer->exception_ = - std::make_exception_ptr(simgrid::CancelException(XBT_THROW_POINT, "Execution Canceled")); - break; - - case SIMIX_TIMEOUT: - XBT_DEBUG("SIMIX_execution_finished: execution timeouted"); - simcall->issuer->exception_ = std::make_exception_ptr(simgrid::TimeoutError(XBT_THROW_POINT, "Timeouted")); - break; - - default: - xbt_die("Internal error in SIMIX_execution_finish: unexpected synchro state %d", (int)exec->state_); - } - - simcall->issuer->waiting_synchro = nullptr; - simcall_execution_wait__set__result(simcall, exec->state_); - - /* Fail the process if the host is down */ - if (simcall->issuer->get_host()->is_on()) - SIMIX_simcall_answer(simcall); - else - simcall->issuer->context_->iwannadie = true; - } -} diff --git a/src/simix/smx_host_private.hpp b/src/simix/smx_host_private.hpp index 8c13cc2cb6..84c484062a 100644 --- a/src/simix/smx_host_private.hpp +++ b/src/simix/smx_host_private.hpp @@ -10,8 +10,6 @@ #include -XBT_PRIVATE void SIMIX_execution_finish(smx_activity_t synchro); - XBT_PRIVATE simgrid::kernel::activity::ExecImplPtr SIMIX_execution_parallel_start(std::string name, int host_nb, const sg_host_t* host_list, const double* flops_amount, const double* bytes_amount, double rate, double timeout); diff --git a/src/simix/smx_network.cpp b/src/simix/smx_network.cpp deleted file mode 100644 index 54219f1999..0000000000 --- a/src/simix/smx_network.cpp +++ /dev/null @@ -1,528 +0,0 @@ -/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "mc/mc.h" -#include "simgrid/Exception.hpp" -#include "src/kernel/activity/MailboxImpl.hpp" -#include "src/mc/mc_replay.hpp" -#include "src/simix/smx_private.hpp" -#include "src/surf/cpu_interface.hpp" -#include "src/surf/network_interface.hpp" - -#include - -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization"); - -static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall); - -/******************************************************************************/ -/* Communication synchros */ -/******************************************************************************/ -XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox, double task_size, - double rate, void* src_buff, size_t src_buff_size, - int (*match_fun)(void*, void*, simgrid::kernel::activity::CommImpl*), - void (*copy_data_fun)(smx_activity_t, void*, size_t), void* data, - double timeout) -{ - smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate, - src_buff, src_buff_size, match_fun, nullptr, copy_data_fun, - data, 0); - SIMCALL_SET_MC_VALUE(simcall, 0); - simcall_HANDLER_comm_wait(simcall, comm, timeout); -} - -XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend( - smx_simcall_t /*simcall*/, smx_actor_t src_proc, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, - size_t src_buff_size, int (*match_fun)(void*, void*, simgrid::kernel::activity::CommImpl*), - void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send - void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one - void* data, int detached) -{ - XBT_DEBUG("send from mailbox %p", mbox); - - /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */ - simgrid::kernel::activity::CommImplPtr this_comm = simgrid::kernel::activity::CommImplPtr( - new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::SEND)); - - /* Look for communication synchro matching our needs. We also provide a description of - * ourself so that the other side also gets a chance of choosing if it wants to match with us. - * - * If it is not found then push our communication into the rendez-vous point */ - simgrid::kernel::activity::CommImplPtr other_comm = - mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::RECEIVE, match_fun, data, this_comm, - /*done*/ false, /*remove_matching*/ true); - - if (not other_comm) { - other_comm = std::move(this_comm); - - if (mbox->permanent_receiver_ != nullptr) { - //this mailbox is for small messages, which have to be sent right now - other_comm->state_ = SIMIX_READY; - other_comm->dst_actor_ = mbox->permanent_receiver_.get(); - mbox->done_comm_queue_.push_back(other_comm); - XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, other_comm.get()); - - }else{ - mbox->push(other_comm); - } - } else { - XBT_DEBUG("Receive already pushed"); - - other_comm->state_ = SIMIX_READY; - other_comm->type = simgrid::kernel::activity::CommImpl::Type::READY; - } - src_proc->comms.push_back(other_comm); - - if (detached) { - other_comm->detached = true; - other_comm->clean_fun = clean_fun; - } else { - other_comm->clean_fun = nullptr; - } - - /* Setup the communication synchro */ - other_comm->src_actor_ = src_proc; - other_comm->task_size_ = task_size; - other_comm->rate_ = rate; - other_comm->src_buff_ = src_buff; - other_comm->src_buff_size_ = src_buff_size; - other_comm->src_data_ = data; - - other_comm->match_fun = match_fun; - other_comm->copy_data_fun = copy_data_fun; - - - if (MC_is_active() || MC_record_replay_is_active()) { - other_comm->state_ = SIMIX_RUNNING; - return (detached ? nullptr : other_comm); - } - - other_comm->start(); - - return (detached ? nullptr : other_comm); -} - -XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox, - void* dst_buff, size_t* dst_buff_size, - int (*match_fun)(void*, void*, simgrid::kernel::activity::CommImpl*), - void (*copy_data_fun)(smx_activity_t, void*, size_t), void* data, - double timeout, double rate) -{ - smx_activity_t comm = simcall_HANDLER_comm_irecv(simcall, receiver, mbox, dst_buff, dst_buff_size, match_fun, - copy_data_fun, data, rate); - SIMCALL_SET_MC_VALUE(simcall, 0); - simcall_HANDLER_comm_wait(simcall, comm, timeout); -} - -XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, - smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, - simix_match_func_t match_fun, - void (*copy_data_fun)(smx_activity_t, void*, size_t), void* data, - double rate) -{ - simgrid::kernel::activity::CommImplPtr this_synchro = simgrid::kernel::activity::CommImplPtr( - new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::RECEIVE)); - XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get()); - - simgrid::kernel::activity::CommImplPtr other_comm; - //communication already done, get it inside the list of completed comms - if (mbox->permanent_receiver_ != nullptr && not mbox->done_comm_queue_.empty()) { - - XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication"); - //find a match in the list of already received comms - other_comm = mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::SEND, match_fun, data, - this_synchro, /*done*/ true, - /*remove_matching*/ true); - //if not found, assume the receiver came first, register it to the mailbox in the classical way - if (not other_comm) { - XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into list"); - other_comm = std::move(this_synchro); - mbox->push(other_comm); - } else { - if (other_comm->surf_action_ && other_comm->remains() < 1e-12) { - XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get()); - other_comm->state_ = SIMIX_DONE; - other_comm->type = simgrid::kernel::activity::CommImpl::Type::DONE; - other_comm->mbox = nullptr; - } - } - } else { - /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */ - - /* Look for communication activity matching our needs. We also provide a description of - * ourself so that the other side also gets a chance of choosing if it wants to match with us. - * - * If it is not found then push our communication into the rendez-vous point */ - other_comm = mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::SEND, match_fun, data, - this_synchro, /*done*/ false, - /*remove_matching*/ true); - - if (other_comm == nullptr) { - XBT_DEBUG("Receive pushed first (%zu comm enqueued so far)", mbox->comm_queue_.size()); - other_comm = std::move(this_synchro); - mbox->push(other_comm); - } else { - XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_comm.get()); - - other_comm->state_ = SIMIX_READY; - other_comm->type = simgrid::kernel::activity::CommImpl::Type::READY; - } - receiver->comms.push_back(other_comm); - } - - /* Setup communication synchro */ - other_comm->dst_actor_ = receiver; - other_comm->dst_buff_ = dst_buff; - other_comm->dst_buff_size_ = dst_buff_size; - other_comm->dst_data_ = data; - - if (rate > -1.0 && (other_comm->rate_ < 0.0 || rate < other_comm->rate_)) - other_comm->rate_ = rate; - - other_comm->match_fun = match_fun; - other_comm->copy_data_fun = copy_data_fun; - - if (MC_is_active() || MC_record_replay_is_active()) { - other_comm->state_ = SIMIX_RUNNING; - return other_comm; - } - other_comm->start(); - return other_comm; -} - -void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout) -{ - /* Associate this simcall to the wait synchro */ - XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro.get()); - - synchro->simcalls_.push_back(simcall); - simcall->issuer->waiting_synchro = synchro; - - if (MC_is_active() || MC_record_replay_is_active()) { - int idx = SIMCALL_GET_MC_VALUE(simcall); - if (idx == 0) { - synchro->state_ = SIMIX_DONE; - } else { - /* If we reached this point, the wait simcall must have a timeout */ - /* Otherwise it shouldn't be enabled and executed by the MC */ - if (timeout < 0.0) - THROW_IMPOSSIBLE; - - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - if (comm->src_actor_ == simcall->issuer) - comm->state_ = SIMIX_SRC_TIMEOUT; - else - comm->state_ = SIMIX_DST_TIMEOUT; - } - - SIMIX_comm_finish(synchro); - return; - } - - /* If the synchro has already finish perform the error handling, */ - /* otherwise set up a waiting timeout on the right side */ - if (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING) { - SIMIX_comm_finish(synchro); - } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */ - simgrid::kernel::resource::Action* sleep = simcall->issuer->get_host()->pimpl_cpu->sleep(timeout); - sleep->set_data(synchro.get()); - - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - if (simcall->issuer == comm->src_actor_) - comm->src_timeout_ = sleep; - else - comm->dst_timeout_ = sleep; - } -} - -void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro) -{ - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - - int res; - - if (MC_is_active() || MC_record_replay_is_active()){ - res = comm->src_actor_ && comm->dst_actor_; - if (res) - synchro->state_ = SIMIX_DONE; - } else { - res = synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING; - } - - simcall_comm_test__set__result(simcall, res); - if (simcall_comm_test__get__result(simcall)) { - synchro->simcalls_.push_back(simcall); - SIMIX_comm_finish(synchro); - } else { - SIMIX_simcall_answer(simcall); - } -} - -void simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::ActivityImplPtr comms[], - size_t count) -{ - // The default result is -1 -- this means, "nothing is ready". - // It can be changed below, but only if something matches. - simcall_comm_testany__set__result(simcall, -1); - - if (MC_is_active() || MC_record_replay_is_active()){ - int idx = SIMCALL_GET_MC_VALUE(simcall); - if(idx == -1){ - SIMIX_simcall_answer(simcall); - }else{ - simgrid::kernel::activity::ActivityImplPtr synchro = comms[idx]; - simcall_comm_testany__set__result(simcall, idx); - synchro->simcalls_.push_back(simcall); - synchro->state_ = SIMIX_DONE; - SIMIX_comm_finish(synchro); - } - return; - } - - for (std::size_t i = 0; i != count; ++i) { - simgrid::kernel::activity::ActivityImplPtr synchro = comms[i]; - if (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING) { - simcall_comm_testany__set__result(simcall, i); - synchro->simcalls_.push_back(simcall); - SIMIX_comm_finish(synchro); - return; - } - } - SIMIX_simcall_answer(simcall); -} - -void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout) -{ - if (MC_is_active() || MC_record_replay_is_active()){ - if (timeout > 0.0) - xbt_die("Timeout not implemented for waitany in the model-checker"); - int idx = SIMCALL_GET_MC_VALUE(simcall); - smx_activity_t synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t); - synchro->simcalls_.push_back(simcall); - simcall_comm_waitany__set__result(simcall, idx); - synchro->state_ = SIMIX_DONE; - SIMIX_comm_finish(synchro); - return; - } - - if (timeout < 0.0){ - simcall->timer = NULL; - } else { - simcall->timer = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() { - SIMIX_waitany_remove_simcall_from_actions(simcall); - simcall_comm_waitany__set__result(simcall, -1); - SIMIX_simcall_answer(simcall); - }); - } - - unsigned int cursor; - simgrid::kernel::activity::ActivityImpl* ptr; - xbt_dynar_foreach(synchros, cursor, ptr){ - smx_activity_t synchro = simgrid::kernel::activity::ActivityImplPtr(ptr); - /* associate this simcall to the the synchro */ - synchro->simcalls_.push_back(simcall); - - /* see if the synchro is already finished */ - if (synchro->state_ != SIMIX_WAITING && synchro->state_ != SIMIX_RUNNING) { - SIMIX_comm_finish(synchro); - break; - } - } -} - -void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall) -{ - unsigned int cursor = 0; - xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall); - - simgrid::kernel::activity::ActivityImpl* ptr; - xbt_dynar_foreach(synchros, cursor, ptr){ - smx_activity_t synchro = simgrid::kernel::activity::ActivityImplPtr(ptr); - - // Remove the first occurence of simcall: - auto i = boost::range::find(synchro->simcalls_, simcall); - if (i != synchro->simcalls_.end()) - synchro->simcalls_.erase(i); - } -} - -/** - * @brief Answers the SIMIX simcalls associated to a communication synchro. - * @param synchro a finished communication synchro - */ -void SIMIX_comm_finish(smx_activity_t synchro) -{ - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - - while (not synchro->simcalls_.empty()) { - smx_simcall_t simcall = synchro->simcalls_.front(); - synchro->simcalls_.pop_front(); - - /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany - * list. Afterwards, get the position of the actual synchro in the waitany dynar and return it as the result of the - * simcall */ - - if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case - continue; // if process handling comm is killed - if (simcall->call == SIMCALL_COMM_WAITANY) { - SIMIX_waitany_remove_simcall_from_actions(simcall); - if (simcall->timer) { - simcall->timer->remove(); - simcall->timer = nullptr; - } - if (not MC_is_active() && not MC_record_replay_is_active()) - simcall_comm_waitany__set__result(simcall, - xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro)); - } - - /* If the synchro is still in a rendez-vous point then remove from it */ - if (comm->mbox) - comm->mbox->remove(comm); - - XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state_); - - /* Check out for errors */ - - if (not simcall->issuer->get_host()->is_on()) { - simcall->issuer->context_->iwannadie = true; - simcall->issuer->exception_ = - std::make_exception_ptr(simgrid::HostFailureException(XBT_THROW_POINT, "Host failed")); - } else { - switch (comm->state_) { - - case SIMIX_DONE: - XBT_DEBUG("Communication %p complete!", synchro.get()); - comm->copy_data(); - break; - - case SIMIX_SRC_TIMEOUT: - simcall->issuer->exception_ = std::make_exception_ptr( - simgrid::TimeoutError(XBT_THROW_POINT, "Communication timeouted because of the sender")); - break; - - case SIMIX_DST_TIMEOUT: - simcall->issuer->exception_ = std::make_exception_ptr( - simgrid::TimeoutError(XBT_THROW_POINT, "Communication timeouted because of the receiver")); - break; - - case SIMIX_SRC_HOST_FAILURE: - if (simcall->issuer == comm->src_actor_) - simcall->issuer->context_->iwannadie = true; - else - simcall->issuer->exception_ = - std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); - break; - - case SIMIX_DST_HOST_FAILURE: - if (simcall->issuer == comm->dst_actor_) - simcall->issuer->context_->iwannadie = true; - else - simcall->issuer->exception_ = - std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); - break; - - case SIMIX_LINK_FAILURE: - XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) " - "detached:%d", - synchro.get(), comm->src_actor_ ? comm->src_actor_->get_host()->get_cname() : nullptr, - comm->dst_actor_ ? comm->dst_actor_->get_host()->get_cname() : nullptr, - simcall->issuer->get_cname(), simcall->issuer, comm->detached); - if (comm->src_actor_ == simcall->issuer) { - XBT_DEBUG("I'm source"); - } else if (comm->dst_actor_ == simcall->issuer) { - XBT_DEBUG("I'm dest"); - } else { - XBT_DEBUG("I'm neither source nor dest"); - } - simcall->issuer->throw_exception( - std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Link failure"))); - break; - - case SIMIX_CANCELED: - if (simcall->issuer == comm->dst_actor_) - simcall->issuer->exception_ = std::make_exception_ptr( - simgrid::CancelException(XBT_THROW_POINT, "Communication canceled by the sender")); - else - simcall->issuer->exception_ = std::make_exception_ptr( - simgrid::CancelException(XBT_THROW_POINT, "Communication canceled by the receiver")); - break; - - default: - xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state_); - } - } - - /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */ - if (simcall->issuer->exception_ && - (simcall->call == SIMCALL_COMM_WAITANY || simcall->call == SIMCALL_COMM_TESTANY)) { - // First retrieve the rank of our failing synchro - int rank = -1; - if (simcall->call == SIMCALL_COMM_WAITANY) { - rank = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro); - } else if (simcall->call == SIMCALL_COMM_TESTANY) { - rank = -1; - auto* comms = simcall_comm_testany__get__comms(simcall); - auto count = simcall_comm_testany__get__count(simcall); - auto element = std::find(comms, comms + count, synchro); - if (element == comms + count) - rank = -1; - else - rank = element - comms; - } - - // In order to modify the exception we have to rethrow it: - try { - std::rethrow_exception(simcall->issuer->exception_); - } catch (simgrid::TimeoutError& e) { - e.value = rank; - simcall->issuer->exception_ = std::make_exception_ptr(e); - } catch (simgrid::NetworkFailureException& e) { - e.value = rank; - simcall->issuer->exception_ = std::make_exception_ptr(e); - } catch (simgrid::CancelException& e) { - e.value = rank; - simcall->issuer->exception_ = std::make_exception_ptr(e); - } - } - - simcall->issuer->waiting_synchro = nullptr; - simcall->issuer->comms.remove(synchro); - if(comm->detached){ - if (simcall->issuer == comm->src_actor_) { - if (comm->dst_actor_) - comm->dst_actor_->comms.remove(synchro); - } else if (simcall->issuer == comm->dst_actor_) { - if (comm->src_actor_) - comm->src_actor_->comms.remove(synchro); - } - else{ - comm->dst_actor_->comms.remove(synchro); - comm->src_actor_->comms.remove(synchro); - } - } - - if (simcall->issuer->get_host()->is_on()) - SIMIX_simcall_answer(simcall); - else - simcall->issuer->context_->iwannadie = true; - } -} - -void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size) -{ - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - - XBT_DEBUG("Copy the data over"); - memcpy(comm->dst_buff_, buff, buff_size); - if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP - xbt_free(buff); - comm->src_buff_ = nullptr; - } -} diff --git a/src/simix/smx_synchro.cpp b/src/simix/smx_synchro.cpp deleted file mode 100644 index 153d7a7d6e..0000000000 --- a/src/simix/smx_synchro.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "src/kernel/activity/ConditionVariableImpl.hpp" -#include "src/kernel/activity/MutexImpl.hpp" -#include "src/kernel/activity/SemaphoreImpl.hpp" -#include "src/kernel/context/Context.hpp" -#include "src/simix/smx_synchro_private.hpp" - -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_synchro, simix, "SIMIX Synchronization (mutex, semaphores and conditions)"); - -/***************************** Raw synchronization *********************************/ - -void SIMIX_synchro_stop_waiting(smx_actor_t process, smx_simcall_t simcall) -{ - XBT_IN("(%p, %p)",process,simcall); - switch (simcall->call) { - - case SIMCALL_MUTEX_LOCK: - simgrid::xbt::intrusive_erase(simcall_mutex_lock__get__mutex(simcall)->sleeping, *process); - break; - - case SIMCALL_COND_WAIT: - simgrid::xbt::intrusive_erase(simcall_cond_wait__get__cond(simcall)->sleeping_, *process); - break; - - case SIMCALL_COND_WAIT_TIMEOUT: - simgrid::xbt::intrusive_erase(simcall_cond_wait_timeout__get__cond(simcall)->sleeping_, *process); - simcall_cond_wait_timeout__set__result(simcall, 1); // signal a timeout - break; - - case SIMCALL_SEM_ACQUIRE: - simgrid::xbt::intrusive_erase(simcall_sem_acquire__get__sem(simcall)->sleeping_, *process); - break; - - case SIMCALL_SEM_ACQUIRE_TIMEOUT: - simgrid::xbt::intrusive_erase(simcall_sem_acquire_timeout__get__sem(simcall)->sleeping_, *process); - simcall_sem_acquire_timeout__set__result(simcall, 1); // signal a timeout - break; - - default: - THROW_IMPOSSIBLE; - } - XBT_OUT(); -} diff --git a/src/simix/smx_synchro_private.hpp b/src/simix/smx_synchro_private.hpp deleted file mode 100644 index 97fc9b8484..0000000000 --- a/src/simix/smx_synchro_private.hpp +++ /dev/null @@ -1,13 +0,0 @@ -/* Copyright (c) 2012-2019. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef SIMIX_SYNCHRO_PRIVATE_H -#define SIMIX_SYNCHRO_PRIVATE_H - -#include "src/simix/ActorImpl.hpp" - -XBT_PRIVATE void SIMIX_synchro_stop_waiting(smx_actor_t process, smx_simcall_t simcall); - -#endif diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index dd534b7a2c..489e92a710 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -27,7 +27,6 @@ set(EXTRA_DIST src/simix/popping_accessors.hpp src/simix/smx_host_private.hpp src/simix/smx_private.hpp - src/simix/smx_synchro_private.hpp src/smpi/colls/coll_tuned_topo.hpp src/smpi/colls/colls_private.hpp src/smpi/colls/smpi_mvapich2_selector_stampede.hpp @@ -386,10 +385,8 @@ set(SIMIX_SRC src/simix/smx_environment.cpp src/simix/smx_global.cpp src/simix/smx_host.cpp - src/simix/smx_network.cpp src/simix/ActorImpl.cpp src/simix/ActorImpl.hpp - src/simix/smx_synchro.cpp src/simix/popping.cpp src/kernel/activity/ActivityImpl.cpp src/kernel/activity/ActivityImpl.hpp