X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/ea93e7fc942eea632cd399a55083863c9260d0f8..0b709e78b05186146867b7b5a841bdec06507bf8:/src/kernel/activity/CommImpl.cpp diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index 63e04af5a8..f55d4d445c 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -6,12 +6,34 @@ #include "src/kernel/activity/CommImpl.hpp" #include "simgrid/kernel/resource/Action.hpp" #include "simgrid/modelchecker.h" +#include "simgrid/s4u/Host.hpp" #include "src/kernel/activity/MailboxImpl.hpp" #include "src/mc/mc_replay.hpp" #include "src/simix/smx_network_private.hpp" +#include "src/surf/network_interface.hpp" #include "src/surf/surf_interface.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_network); + +/******************************************************************************/ +/* SIMIX_comm_copy_data callbacks */ +/******************************************************************************/ +static void (*SIMIX_comm_copy_data_callback)(smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback; + +void SIMIX_comm_set_copy_data_callback(void (*callback)(smx_activity_t, void*, size_t)) +{ + SIMIX_comm_copy_data_callback = callback; +} + +void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size) +{ + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + + xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); + *(void**)(comm->dst_buff_) = buff; +} + namespace simgrid { namespace kernel { namespace activity { @@ -42,12 +64,85 @@ CommImpl::~CommImpl() mbox->remove(this); } +/** @brief Starts the simulation of a communication synchro. */ +void CommImpl::start() +{ + /* If both the sender and the receiver are already there, start the communication */ + if (state_ == SIMIX_READY) { + + s4u::Host* sender = src_actor_->host_; + s4u::Host* receiver = dst_actor_->host_; + + surf_action_ = surf_network_model->communicate(sender, receiver, task_size_, rate_); + surf_action_->set_data(this); + state_ = SIMIX_RUNNING; + + XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(), + receiver->get_cname(), surf_action_); + + /* If a link is failed, detect it immediately */ + if (surf_action_->get_state() == resource::Action::State::FAILED) { + XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), + receiver->get_cname()); + state_ = SIMIX_LINK_FAILURE; + cleanupSurf(); + } + + /* If any of the process is suspended, create the synchro but stop its execution, + it will be restarted when the sender process resume */ + if (src_actor_->is_suspended() || dst_actor_->is_suspended()) { + if (src_actor_->is_suspended()) + XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the " + "communication", + src_actor_->get_cname(), src_actor_->host_->get_cname()); + else + XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the " + "communication", + dst_actor_->get_cname(), dst_actor_->host_->get_cname()); + + surf_action_->suspend(); + } + } +} + +/** @brief Copy the communication data from the sender's buffer to the receiver's one */ +void CommImpl::copy_data() +{ + size_t buff_size = src_buff_size_; + /* If there is no data to copy then return */ + if (not src_buff_ || not dst_buff_ || copied) + return; + + XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this, + src_actor_ ? src_actor_->host_->get_cname() : "a finished process", src_buff_, + dst_actor_ ? dst_actor_->host_->get_cname() : "a finished process", dst_buff_, buff_size); + + /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ + if (dst_buff_size_) + buff_size = std::min(buff_size, *(dst_buff_size_)); + + /* Update the receiver's buffer size to the copied amount */ + if (dst_buff_size_) + *dst_buff_size_ = buff_size; + + if (buff_size > 0) { + if (copy_data_fun) + copy_data_fun(this, src_buff_, buff_size); + else + SIMIX_comm_copy_data_callback(this, src_buff_, buff_size); + } + + /* Set the copied flag so we copy data only once */ + /* (this function might be called from both communication ends) */ + copied = true; +} + void CommImpl::suspend() { /* FIXME: shall we suspend also the timeout synchro? */ if (surf_action_) surf_action_->suspend(); - /* if not created yet, the action will be suspended on creation, in SIMIX_comm_start() */ + /* if not created yet, the action will be suspended on creation, in CommImpl::start() */ } void CommImpl::resume() @@ -55,7 +150,7 @@ void CommImpl::resume() /*FIXME: check what happen with the timeouts */ if (surf_action_) surf_action_->resume(); - /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */ + /* in the other case, the synchro were not really suspended yet, see CommImpl::suspend() and CommImpl::start() */ } void CommImpl::cancel() @@ -66,7 +161,6 @@ void CommImpl::cancel() state_ = SIMIX_CANCELED; } else if (not MC_is_active() /* when running the MC there are no surf actions */ && not MC_record_replay_is_active() && (state_ == SIMIX_READY || state_ == SIMIX_RUNNING)) { - surf_action_->cancel(); } }