From: Frederic Suter Date: Wed, 13 Feb 2019 14:35:43 +0000 (+0100) Subject: add start and copy_data methods to CommImpl X-Git-Tag: v3_22~336 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/0b709e78b05186146867b7b5a841bdec06507bf8 add start and copy_data methods to CommImpl --- diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index 63e04af5a8..f55d4d445c 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -6,12 +6,34 @@ #include "src/kernel/activity/CommImpl.hpp" #include "simgrid/kernel/resource/Action.hpp" #include "simgrid/modelchecker.h" +#include "simgrid/s4u/Host.hpp" #include "src/kernel/activity/MailboxImpl.hpp" #include "src/mc/mc_replay.hpp" #include "src/simix/smx_network_private.hpp" +#include "src/surf/network_interface.hpp" #include "src/surf/surf_interface.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_network); + +/******************************************************************************/ +/* SIMIX_comm_copy_data callbacks */ +/******************************************************************************/ +static void (*SIMIX_comm_copy_data_callback)(smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback; + +void SIMIX_comm_set_copy_data_callback(void (*callback)(smx_activity_t, void*, size_t)) +{ + SIMIX_comm_copy_data_callback = callback; +} + +void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size) +{ + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + + xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); + *(void**)(comm->dst_buff_) = buff; +} + namespace simgrid { namespace kernel { namespace activity { @@ -42,12 +64,85 @@ CommImpl::~CommImpl() mbox->remove(this); } +/** @brief Starts the simulation of a communication synchro. */ +void CommImpl::start() +{ + /* If both the sender and the receiver are already there, start the communication */ + if (state_ == SIMIX_READY) { + + s4u::Host* sender = src_actor_->host_; + s4u::Host* receiver = dst_actor_->host_; + + surf_action_ = surf_network_model->communicate(sender, receiver, task_size_, rate_); + surf_action_->set_data(this); + state_ = SIMIX_RUNNING; + + XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(), + receiver->get_cname(), surf_action_); + + /* If a link is failed, detect it immediately */ + if (surf_action_->get_state() == resource::Action::State::FAILED) { + XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), + receiver->get_cname()); + state_ = SIMIX_LINK_FAILURE; + cleanupSurf(); + } + + /* If any of the process is suspended, create the synchro but stop its execution, + it will be restarted when the sender process resume */ + if (src_actor_->is_suspended() || dst_actor_->is_suspended()) { + if (src_actor_->is_suspended()) + XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the " + "communication", + src_actor_->get_cname(), src_actor_->host_->get_cname()); + else + XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the " + "communication", + dst_actor_->get_cname(), dst_actor_->host_->get_cname()); + + surf_action_->suspend(); + } + } +} + +/** @brief Copy the communication data from the sender's buffer to the receiver's one */ +void CommImpl::copy_data() +{ + size_t buff_size = src_buff_size_; + /* If there is no data to copy then return */ + if (not src_buff_ || not dst_buff_ || copied) + return; + + XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this, + src_actor_ ? src_actor_->host_->get_cname() : "a finished process", src_buff_, + dst_actor_ ? dst_actor_->host_->get_cname() : "a finished process", dst_buff_, buff_size); + + /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ + if (dst_buff_size_) + buff_size = std::min(buff_size, *(dst_buff_size_)); + + /* Update the receiver's buffer size to the copied amount */ + if (dst_buff_size_) + *dst_buff_size_ = buff_size; + + if (buff_size > 0) { + if (copy_data_fun) + copy_data_fun(this, src_buff_, buff_size); + else + SIMIX_comm_copy_data_callback(this, src_buff_, buff_size); + } + + /* Set the copied flag so we copy data only once */ + /* (this function might be called from both communication ends) */ + copied = true; +} + void CommImpl::suspend() { /* FIXME: shall we suspend also the timeout synchro? */ if (surf_action_) surf_action_->suspend(); - /* if not created yet, the action will be suspended on creation, in SIMIX_comm_start() */ + /* if not created yet, the action will be suspended on creation, in CommImpl::start() */ } void CommImpl::resume() @@ -55,7 +150,7 @@ void CommImpl::resume() /*FIXME: check what happen with the timeouts */ if (surf_action_) surf_action_->resume(); - /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */ + /* in the other case, the synchro were not really suspended yet, see CommImpl::suspend() and CommImpl::start() */ } void CommImpl::cancel() @@ -66,7 +161,6 @@ void CommImpl::cancel() state_ = SIMIX_CANCELED; } else if (not MC_is_active() /* when running the MC there are no surf actions */ && not MC_record_replay_is_active() && (state_ == SIMIX_READY || state_ == SIMIX_RUNNING)) { - surf_action_->cancel(); } } diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index 55828307c9..4c7a715d91 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -21,6 +21,8 @@ class XBT_PUBLIC CommImpl : public ActivityImpl { public: explicit CommImpl(e_smx_comm_type_t type); + void start(); + void copy_data(); void suspend() override; void resume() override; void post() override; diff --git a/src/simix/smx_network.cpp b/src/simix/smx_network.cpp index 06e856a24d..9bf9864557 100644 --- a/src/simix/smx_network.cpp +++ b/src/simix/smx_network.cpp @@ -18,8 +18,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization"); static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall); -static void SIMIX_comm_copy_data(smx_activity_t comm); -static void SIMIX_comm_start(simgrid::kernel::activity::CommImplPtr synchro); /** * @brief Checks if there is a communication activity queued in a deque matching our needs @@ -146,7 +144,8 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend( return (detached ? nullptr : other_comm); } - SIMIX_comm_start(other_comm); + other_comm->start(); + return (detached ? nullptr : other_comm); } @@ -240,8 +239,7 @@ SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void* dst_buff, size_ other_comm->state_ = SIMIX_RUNNING; return other_comm; } - - SIMIX_comm_start(other_comm); + other_comm->start(); return other_comm; } @@ -438,50 +436,6 @@ void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall) } } -/** - * @brief Starts the simulation of a communication synchro. - * @param comm the communication that will be started - */ -static inline void SIMIX_comm_start(simgrid::kernel::activity::CommImplPtr comm) -{ - /* If both the sender and the receiver are already there, start the communication */ - if (comm->state_ == SIMIX_READY) { - - simgrid::s4u::Host* sender = comm->src_actor_->host_; - simgrid::s4u::Host* receiver = comm->dst_actor_->host_; - - comm->surf_action_ = surf_network_model->communicate(sender, receiver, comm->task_size_, comm->rate_); - comm->surf_action_->set_data(comm.get()); - comm->state_ = SIMIX_RUNNING; - - XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", comm.get(), sender->get_cname(), - receiver->get_cname(), comm->surf_action_); - - /* If a link is failed, detect it immediately */ - if (comm->surf_action_->get_state() == simgrid::kernel::resource::Action::State::FAILED) { - XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), - receiver->get_cname()); - comm->state_ = SIMIX_LINK_FAILURE; - comm->cleanupSurf(); - } - - /* If any of the process is suspended, create the synchro but stop its execution, - it will be restarted when the sender process resume */ - if (comm->src_actor_->is_suspended() || comm->dst_actor_->is_suspended()) { - if (comm->src_actor_->is_suspended()) - XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the " - "communication", - comm->src_actor_->get_cname(), comm->src_actor_->host_->get_cname()); - else - XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the " - "communication", - comm->dst_actor_->get_cname(), comm->dst_actor_->host_->get_cname()); - - comm->surf_action_->suspend(); - } - } -} - /** * @brief Answers the SIMIX simcalls associated to a communication synchro. * @param synchro a finished communication synchro @@ -529,7 +483,7 @@ void SIMIX_comm_finish(smx_activity_t synchro) case SIMIX_DONE: XBT_DEBUG("Communication %p complete!", synchro.get()); - SIMIX_comm_copy_data(synchro); + comm->copy_data(); break; case SIMIX_SRC_TIMEOUT: @@ -647,24 +601,6 @@ void SIMIX_comm_finish(smx_activity_t synchro) } } -/******************************************************************************/ -/* SIMIX_comm_copy_data callbacks */ -/******************************************************************************/ -static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback; - -void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t)) -{ - SIMIX_comm_copy_data_callback = callback; -} - -void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size) -{ - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - - xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); - *(void**)(comm->dst_buff_) = buff; -} void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size) { @@ -678,41 +614,3 @@ void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t comm->src_buff_ = nullptr; } } - -/** - * @brief Copy the communication data from the sender's buffer to the receiver's one - * @param synchro The communication - */ -void SIMIX_comm_copy_data(smx_activity_t synchro) -{ - simgrid::kernel::activity::CommImplPtr comm = - boost::static_pointer_cast(synchro); - - size_t buff_size = comm->src_buff_size_; - /* If there is no data to copy then return */ - if (not comm->src_buff_ || not comm->dst_buff_ || comm->copied) - return; - - XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm.get(), - comm->src_actor_ ? comm->src_actor_->host_->get_cname() : "a finished process", comm->src_buff_, - comm->dst_actor_ ? comm->dst_actor_->host_->get_cname() : "a finished process", comm->dst_buff_, buff_size); - - /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ - if (comm->dst_buff_size_) - buff_size = std::min(buff_size, *(comm->dst_buff_size_)); - - /* Update the receiver's buffer size to the copied amount */ - if (comm->dst_buff_size_) - *comm->dst_buff_size_ = buff_size; - - if (buff_size > 0){ - if(comm->copy_data_fun) - comm->copy_data_fun(comm, comm->src_buff_, buff_size); - else - SIMIX_comm_copy_data_callback(comm, comm->src_buff_, buff_size); - } - - /* Set the copied flag so we copy data only once */ - /* (this function might be called from both communication ends) */ - comm->copied = 1; -}