X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/5e08a76b1a43e12302ebd25abbe8830fcefca72b..efe776a7f971fa0f6641baa5c835364604b600c8:/src/kernel/activity/CommImpl.cpp diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index c4f8b52cbb..e6042d46dd 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -1,122 +1,221 @@ -/* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include "src/kernel/activity/CommImpl.hpp" - +#include "simgrid/kernel/resource/Action.hpp" #include "simgrid/modelchecker.h" +#include "simgrid/s4u/Host.hpp" +#include "src/kernel/activity/MailboxImpl.hpp" #include "src/mc/mc_replay.hpp" -#include "src/simix/smx_network_private.hpp" +#include "src/surf/network_interface.hpp" #include "src/surf/surf_interface.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_network); -simgrid::kernel::activity::CommImpl::CommImpl(e_smx_comm_type_t _type) : type(_type) +/******************************************************************************/ +/* SIMIX_comm_copy_data callbacks */ +/******************************************************************************/ +static void (*SIMIX_comm_copy_data_callback)(smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback; + +void SIMIX_comm_set_copy_data_callback(void (*callback)(smx_activity_t, void*, size_t)) +{ + SIMIX_comm_copy_data_callback = callback; +} + +void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size) { - state = SIMIX_WAITING; - src_data = nullptr; - dst_data = nullptr; + simgrid::kernel::activity::CommImplPtr comm = + boost::static_pointer_cast(synchro); + + xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); + *(void**)(comm->dst_buff_) = buff; +} + +namespace simgrid { +namespace kernel { +namespace activity { + +CommImpl::CommImpl(CommImpl::Type type) : type(type) +{ + state_ = SIMIX_WAITING; + src_data_ = nullptr; + dst_data_ = nullptr; XBT_DEBUG("Create comm activity %p", this); } -simgrid::kernel::activity::CommImpl::~CommImpl() +CommImpl::~CommImpl() { XBT_DEBUG("Really free communication %p", this); cleanupSurf(); - if (detached && state != SIMIX_DONE) { + if (detached && state_ != SIMIX_DONE) { /* the communication has failed and was detached: * we have to free the buffer */ if (clean_fun) - clean_fun(src_buff); - src_buff = nullptr; + clean_fun(src_buff_); + src_buff_ = nullptr; } if (mbox) mbox->remove(this); } -void simgrid::kernel::activity::CommImpl::suspend() +/** @brief Starts the simulation of a communication synchro. */ +void CommImpl::start() +{ + /* If both the sender and the receiver are already there, start the communication */ + if (state_ == SIMIX_READY) { + + s4u::Host* sender = src_actor_->get_host(); + s4u::Host* receiver = dst_actor_->get_host(); + + surf_action_ = surf_network_model->communicate(sender, receiver, task_size_, rate_); + surf_action_->set_data(this); + state_ = SIMIX_RUNNING; + + XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(), + receiver->get_cname(), surf_action_); + + /* If a link is failed, detect it immediately */ + if (surf_action_->get_state() == resource::Action::State::FAILED) { + XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), + receiver->get_cname()); + state_ = SIMIX_LINK_FAILURE; + cleanupSurf(); + + } else if (src_actor_->is_suspended() || dst_actor_->is_suspended()) { + /* If any of the process is suspended, create the synchro but stop its execution, + it will be restarted when the sender process resume */ + if (src_actor_->is_suspended()) + XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the " + "communication", + src_actor_->get_cname(), src_actor_->get_host()->get_cname()); + else + XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the " + "communication", + dst_actor_->get_cname(), dst_actor_->get_host()->get_cname()); + + surf_action_->suspend(); + } + } +} + +/** @brief Copy the communication data from the sender's buffer to the receiver's one */ +void CommImpl::copy_data() +{ + size_t buff_size = src_buff_size_; + /* If there is no data to copy then return */ + if (not src_buff_ || not dst_buff_ || copied) + return; + + XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this, + src_actor_ ? src_actor_->get_host()->get_cname() : "a finished process", src_buff_, + dst_actor_ ? dst_actor_->get_host()->get_cname() : "a finished process", dst_buff_, buff_size); + + /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ + if (dst_buff_size_) + buff_size = std::min(buff_size, *(dst_buff_size_)); + + /* Update the receiver's buffer size to the copied amount */ + if (dst_buff_size_) + *dst_buff_size_ = buff_size; + + if (buff_size > 0) { + if (copy_data_fun) + copy_data_fun(this, src_buff_, buff_size); + else + SIMIX_comm_copy_data_callback(this, src_buff_, buff_size); + } + + /* Set the copied flag so we copy data only once */ + /* (this function might be called from both communication ends) */ + copied = true; +} + +void CommImpl::suspend() { /* FIXME: shall we suspend also the timeout synchro? */ - if (surfAction_) - surfAction_->suspend(); - /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */ + if (surf_action_) + surf_action_->suspend(); + /* if not created yet, the action will be suspended on creation, in CommImpl::start() */ } -void simgrid::kernel::activity::CommImpl::resume() +void CommImpl::resume() { /*FIXME: check what happen with the timeouts */ - if (surfAction_) - surfAction_->resume(); - /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */ + if (surf_action_) + surf_action_->resume(); + /* in the other case, the synchro were not really suspended yet, see CommImpl::suspend() and CommImpl::start() */ } -void simgrid::kernel::activity::CommImpl::cancel() +void CommImpl::cancel() { - /* if the synchro is a waiting state means that it is still in a mbox */ - /* so remove from it and delete it */ - if (state == SIMIX_WAITING) { + /* if the synchro is a waiting state means that it is still in a mbox so remove from it and delete it */ + if (state_ == SIMIX_WAITING) { mbox->remove(this); - state = SIMIX_CANCELED; + state_ = SIMIX_CANCELED; } else if (not MC_is_active() /* when running the MC there are no surf actions */ - && not MC_record_replay_is_active() && (state == SIMIX_READY || state == SIMIX_RUNNING)) { - - surfAction_->cancel(); + && not MC_record_replay_is_active() && (state_ == SIMIX_READY || state_ == SIMIX_RUNNING)) { + surf_action_->cancel(); } } /** @brief get the amount remaining from the communication */ -double simgrid::kernel::activity::CommImpl::remains() +double CommImpl::remains() { - return surfAction_->getRemains(); + return surf_action_->get_remains(); } /** @brief This is part of the cleanup process, probably an internal command */ -void simgrid::kernel::activity::CommImpl::cleanupSurf() +void CommImpl::cleanupSurf() { - if (surfAction_) { - surfAction_->unref(); - surfAction_ = nullptr; + if (surf_action_) { + surf_action_->unref(); + surf_action_ = nullptr; } - if (src_timeout) { - src_timeout->unref(); - src_timeout = nullptr; + if (src_timeout_) { + src_timeout_->unref(); + src_timeout_ = nullptr; } - if (dst_timeout) { - dst_timeout->unref(); - dst_timeout = nullptr; + if (dst_timeout_) { + dst_timeout_->unref(); + dst_timeout_ = nullptr; } } -void simgrid::kernel::activity::CommImpl::post() +void CommImpl::post() { /* Update synchro state */ - if (src_timeout && src_timeout->getState() == simgrid::surf::Action::State::done) - state = SIMIX_SRC_TIMEOUT; - else if (dst_timeout && dst_timeout->getState() == simgrid::surf::Action::State::done) - state = SIMIX_DST_TIMEOUT; - else if (src_timeout && src_timeout->getState() == simgrid::surf::Action::State::failed) - state = SIMIX_SRC_HOST_FAILURE; - else if (dst_timeout && dst_timeout->getState() == simgrid::surf::Action::State::failed) - state = SIMIX_DST_HOST_FAILURE; - else if (surfAction_ && surfAction_->getState() == simgrid::surf::Action::State::failed) { - state = SIMIX_LINK_FAILURE; + if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED) + state_ = SIMIX_SRC_TIMEOUT; + else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED) + state_ = SIMIX_DST_TIMEOUT; + else if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED) + state_ = SIMIX_SRC_HOST_FAILURE; + else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED) + state_ = SIMIX_DST_HOST_FAILURE; + else if (surf_action_ && surf_action_->get_state() == resource::Action::State::FAILED) { + state_ = SIMIX_LINK_FAILURE; } else - state = SIMIX_DONE; + state_ = SIMIX_DONE; - XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state, src_proc, - dst_proc, detached); + XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_, + src_actor_.get(), dst_actor_.get(), detached); /* destroy the surf actions associated with the Simix communication */ cleanupSurf(); /* if there are simcalls associated with the synchro, then answer them */ - if (not simcalls.empty()) { + if (not simcalls_.empty()) { SIMIX_comm_finish(this); } } + +} // namespace activity +} // namespace kernel +} // namespace simgrid