X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/8bc85164acb335cf909052b966b2ee4932e06cd7..29c54250185627bc66380a283f2289dcdd4d6353:/src/kernel/activity/CommImpl.cpp diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index 63cb6a505a..630a2c2488 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -41,9 +41,8 @@ XBT_PRIVATE simgrid::kernel::activity::ActivityImplPtr simcall_HANDLER_comm_isen XBT_DEBUG("send from mailbox %p", mbox); /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */ - simgrid::kernel::activity::CommImplPtr this_comm = - simgrid::kernel::activity::CommImplPtr(new simgrid::kernel::activity::CommImpl()); - this_comm->set_type(simgrid::kernel::activity::CommImpl::Type::SEND); + simgrid::kernel::activity::CommImplPtr this_comm( + new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::SEND)); /* Look for communication synchro matching our needs. We also provide a description of * ourself so that the other side also gets a chance of choosing if it wants to match with us. @@ -70,7 +69,6 @@ XBT_PRIVATE simgrid::kernel::activity::ActivityImplPtr simcall_HANDLER_comm_isen XBT_DEBUG("Receive already pushed"); other_comm->state_ = simgrid::kernel::activity::State::READY; - other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::READY); } if (detached) { @@ -78,7 +76,7 @@ XBT_PRIVATE simgrid::kernel::activity::ActivityImplPtr simcall_HANDLER_comm_isen other_comm->clean_fun = clean_fun; } else { other_comm->clean_fun = nullptr; - src_proc->activities_.push_back(other_comm); + src_proc->activities_.emplace_back(other_comm); } /* Setup the communication synchro */ @@ -115,9 +113,8 @@ simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, smx_ void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), void* data, double rate) { - simgrid::kernel::activity::CommImplPtr this_synchro = - simgrid::kernel::activity::CommImplPtr(new simgrid::kernel::activity::CommImpl()); - this_synchro->set_type(simgrid::kernel::activity::CommImpl::Type::RECEIVE); + simgrid::kernel::activity::CommImplPtr this_synchro( + new simgrid::kernel::activity::CommImpl(simgrid::kernel::activity::CommImpl::Type::RECEIVE)); XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get()); simgrid::kernel::activity::CommImplPtr other_comm; @@ -138,7 +135,7 @@ simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, smx_ if (other_comm->surf_action_ && other_comm->get_remaining() < 1e-12) { XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get()); other_comm->state_ = simgrid::kernel::activity::State::DONE; - other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::DONE).set_mailbox(nullptr); + other_comm->set_mailbox(nullptr); } } } else { @@ -160,9 +157,8 @@ simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, smx_ XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_comm.get()); other_comm->state_ = simgrid::kernel::activity::State::READY; - other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::READY); } - receiver->activities_.push_back(other_comm); + receiver->activities_.emplace_back(other_comm); } /* Setup communication synchro */ @@ -311,7 +307,7 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activi } if (timeout < 0.0) { - simcall->timeout_cb_ = NULL; + simcall->timeout_cb_ = nullptr; } else { simcall->timeout_cb_ = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() { simcall->timeout_cb_ = nullptr; @@ -367,12 +363,6 @@ namespace simgrid { namespace kernel { namespace activity { -CommImpl& CommImpl::set_type(CommImpl::Type type) -{ - type_ = type; - return *this; -} - CommImpl& CommImpl::set_size(double size) { size_ = size; @@ -410,11 +400,17 @@ CommImpl& CommImpl::detach() return *this; } +CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) + : size_(bytes), detached_(true), type_(Type::SEND), from_(from), to_(to) +{ + state_ = State::READY; +} + CommImpl::~CommImpl() { XBT_DEBUG("Really free communication %p in state %d (detached = %d)", this, static_cast(state_), detached_); - cleanupSurf(); + cleanup_surf(); if (detached_ && state_ != State::DONE) { /* the communication has failed and was detached: @@ -432,25 +428,26 @@ CommImpl* CommImpl::start() { /* If both the sender and the receiver are already there, start the communication */ if (state_ == State::READY) { - s4u::Host* sender = src_actor_->get_host(); - s4u::Host* receiver = dst_actor_->get_host(); + from_ = from_ != nullptr ? from_ : src_actor_->get_host(); + to_ = to_ != nullptr ? to_ : dst_actor_->get_host(); - surf_action_ = surf_network_model->communicate(sender, receiver, size_, rate_); + surf_action_ = surf_network_model->communicate(from_, to_, size_, rate_); surf_action_->set_activity(this); surf_action_->set_category(get_tracing_category()); state_ = State::RUNNING; - XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(), - receiver->get_cname(), surf_action_); + XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(), + to_->get_cname(), surf_action_, get_state_str()); /* If a link is failed, detect it immediately */ if (surf_action_->get_state() == resource::Action::State::FAILED) { - XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), - receiver->get_cname()); + XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(), + to_->get_cname()); state_ = State::LINK_FAILURE; post(); - } else if (src_actor_->is_suspended() || dst_actor_->is_suspended()) { + } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) || + (dst_actor_ != nullptr && dst_actor_->is_suspended())) { /* If any of the process is suspended, create the synchro but stop its execution, it will be restarted when the sender process resume */ if (src_actor_->is_suspended()) @@ -482,12 +479,12 @@ void CommImpl::copy_data() dst_actor_ ? dst_actor_->get_host()->get_cname() : "a finished process", dst_buff_, buff_size); /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ - if (dst_buff_size_) + if (dst_buff_size_) { buff_size = std::min(buff_size, *(dst_buff_size_)); - /* Update the receiver's buffer size to the copied amount */ - if (dst_buff_size_) + /* Update the receiver's buffer size to the copied amount */ *dst_buff_size_ = buff_size; + } if (buff_size > 0) { if (copy_data_fun) @@ -532,7 +529,7 @@ void CommImpl::cancel() } /** @brief This is part of the cleanup process, probably an internal command */ -void CommImpl::cleanupSurf() +void CommImpl::cleanup_surf() { clean_action(); @@ -563,11 +560,11 @@ void CommImpl::post() } else state_ = State::DONE; - XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_, + XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(), src_actor_.get(), dst_actor_.get(), detached_); /* destroy the surf actions associated with the Simix communication */ - cleanupSurf(); + cleanup_surf(); /* Answer all simcalls associated with the synchro */ finish(); @@ -583,9 +580,9 @@ void CommImpl::finish() * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the * simcall */ - if (simcall->call_ == SIMCALL_NONE) // FIXME: maybe a better way to handle this case - continue; // if actor handling comm is killed - if (simcall->call_ == SIMCALL_COMM_WAITANY) { + if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case + continue; // if actor handling comm is killed + if (simcall->call_ == simix::Simcall::COMM_WAITANY) { SIMIX_waitany_remove_simcall_from_actions(simcall); if (simcall->timeout_cb_) { simcall->timeout_cb_->remove(); @@ -676,15 +673,15 @@ void CommImpl::finish() } /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */ if (simcall->issuer_->exception_ && - (simcall->call_ == SIMCALL_COMM_WAITANY || simcall->call_ == SIMCALL_COMM_TESTANY)) { + (simcall->call_ == simix::Simcall::COMM_WAITANY || simcall->call_ == simix::Simcall::COMM_TESTANY)) { // First retrieve the rank of our failing synchro CommImpl** comms; size_t count; - if (simcall->call_ == SIMCALL_COMM_WAITANY) { + if (simcall->call_ == simix::Simcall::COMM_WAITANY) { comms = simcall_comm_waitany__get__comms(simcall); count = simcall_comm_waitany__get__count(simcall); } else { - /* simcall->call_ == SIMCALL_COMM_TESTANY */ + /* simcall->call_ == simix::Simcall::COMM_TESTANY */ comms = simcall_comm_testany__get__comms(simcall); count = simcall_comm_testany__get__count(simcall); } @@ -702,16 +699,10 @@ void CommImpl::finish() simcall->issuer_->waiting_synchro_ = nullptr; simcall->issuer_->activities_.remove(this); if (detached_) { - if (simcall->issuer_ == src_actor_) { - if (dst_actor_) - dst_actor_->activities_.remove(this); - } else if (simcall->issuer_ == dst_actor_) { - if (src_actor_) - src_actor_->activities_.remove(this); - } else { + if (simcall->issuer_ != dst_actor_ && dst_actor_ != nullptr) dst_actor_->activities_.remove(this); + if (simcall->issuer_ != src_actor_ && src_actor_ != nullptr) src_actor_->activities_.remove(this); - } } } }