X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/343a69a0fd4edaa7143e2cf15256bfa65dbd9e5d..9f62e12607e183452816941da3e99bb5efda40d0:/src/simix/smx_network.cpp diff --git a/src/simix/smx_network.cpp b/src/simix/smx_network.cpp index ea0d502246..ed7189c6d9 100644 --- a/src/simix/smx_network.cpp +++ b/src/simix/smx_network.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -7,6 +7,7 @@ #include "simgrid/Exception.hpp" #include "src/kernel/activity/MailboxImpl.hpp" #include "src/mc/mc_replay.hpp" +#include "src/simix/smx_network_private.hpp" #include "src/simix/smx_private.hpp" #include "src/surf/cpu_interface.hpp" #include "src/surf/network_interface.hpp" @@ -42,9 +43,9 @@ _find_matching_comm(boost::circular_buffer_space_optimized* dequ boost::dynamic_pointer_cast(std::move(*it)); if (comm->type == SIMIX_COMM_SEND) { - other_user_data = comm->src_data; + other_user_data = comm->src_data_; } else if (comm->type == SIMIX_COMM_RECEIVE) { - other_user_data = comm->dst_data; + other_user_data = comm->dst_data_; } if (comm->type == type && (match_fun == nullptr || match_fun(this_user_data, other_user_data, comm.get())) && (not comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro.get()))) { @@ -106,7 +107,7 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend( if (mbox->permanent_receiver_ != nullptr) { //this mailbox is for small messages, which have to be sent right now other_comm->state_ = SIMIX_READY; - other_comm->dst_proc = mbox->permanent_receiver_.get(); + other_comm->dst_actor_ = mbox->permanent_receiver_.get(); mbox->done_comm_queue_.push_back(other_comm); XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, other_comm.get()); @@ -129,12 +130,12 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend( } /* Setup the communication synchro */ - other_comm->src_proc = src_proc; - other_comm->task_size = task_size; - other_comm->rate = rate; - other_comm->src_buff = src_buff; - other_comm->src_buff_size = src_buff_size; - other_comm->src_data = data; + other_comm->src_actor_ = src_proc; + other_comm->task_size_ = task_size; + other_comm->rate_ = rate; + other_comm->src_buff_ = src_buff; + other_comm->src_buff_size_ = src_buff_size; + other_comm->src_data_ = data; other_comm->match_fun = match_fun; other_comm->copy_data_fun = copy_data_fun; @@ -193,7 +194,7 @@ SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void* dst_buff, size_ other_comm = std::move(this_synchro); mbox->push(other_comm); } else { - if (other_comm->surfAction_ && other_comm->remains() < 1e-12) { + if (other_comm->surf_action_ && other_comm->remains() < 1e-12) { XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get()); other_comm->state_ = SIMIX_DONE; other_comm->type = SIMIX_COMM_DONE; @@ -224,13 +225,13 @@ SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void* dst_buff, size_ } /* Setup communication synchro */ - other_comm->dst_proc = dst_proc; - other_comm->dst_buff = dst_buff; - other_comm->dst_buff_size = dst_buff_size; - other_comm->dst_data = data; + other_comm->dst_actor_ = dst_proc; + other_comm->dst_buff_ = dst_buff; + other_comm->dst_buff_size_ = dst_buff_size; + other_comm->dst_data_ = data; - if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate)) - other_comm->rate = rate; + if (rate > -1.0 && (other_comm->rate_ < 0.0 || rate < other_comm->rate_)) + other_comm->rate_ = rate; other_comm->match_fun = match_fun; other_comm->copy_data_fun = copy_data_fun; @@ -298,7 +299,7 @@ void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, do simgrid::kernel::activity::CommImplPtr comm = boost::static_pointer_cast(synchro); - if (comm->src_proc == simcall->issuer) + if (comm->src_actor_ == simcall->issuer) comm->state_ = SIMIX_SRC_TIMEOUT; else comm->state_ = SIMIX_DST_TIMEOUT; @@ -318,10 +319,10 @@ void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, do simgrid::kernel::activity::CommImplPtr comm = boost::static_pointer_cast(synchro); - if (simcall->issuer == comm->src_proc) - comm->src_timeout = sleep; + if (simcall->issuer == comm->src_actor_) + comm->src_timeout_ = sleep; else - comm->dst_timeout = sleep; + comm->dst_timeout_ = sleep; } } @@ -333,7 +334,7 @@ void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro) int res; if (MC_is_active() || MC_record_replay_is_active()){ - res = comm->src_proc && comm->dst_proc; + res = comm->src_actor_ && comm->dst_actor_; if (res) synchro->state_ = SIMIX_DONE; } else { @@ -446,18 +447,18 @@ static inline void SIMIX_comm_start(simgrid::kernel::activity::CommImplPtr comm) /* If both the sender and the receiver are already there, start the communication */ if (comm->state_ == SIMIX_READY) { - simgrid::s4u::Host* sender = comm->src_proc->host_; - simgrid::s4u::Host* receiver = comm->dst_proc->host_; + simgrid::s4u::Host* sender = comm->src_actor_->host_; + simgrid::s4u::Host* receiver = comm->dst_actor_->host_; - comm->surfAction_ = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate); - comm->surfAction_->set_data(comm.get()); + comm->surf_action_ = surf_network_model->communicate(sender, receiver, comm->task_size_, comm->rate_); + comm->surf_action_->set_data(comm.get()); comm->state_ = SIMIX_RUNNING; XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", comm.get(), sender->get_cname(), - receiver->get_cname(), comm->surfAction_); + receiver->get_cname(), comm->surf_action_); /* If a link is failed, detect it immediately */ - if (comm->surfAction_->get_state() == simgrid::kernel::resource::Action::State::FAILED) { + if (comm->surf_action_->get_state() == simgrid::kernel::resource::Action::State::FAILED) { XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), receiver->get_cname()); comm->state_ = SIMIX_LINK_FAILURE; @@ -466,17 +467,17 @@ static inline void SIMIX_comm_start(simgrid::kernel::activity::CommImplPtr comm) /* If any of the process is suspended, create the synchro but stop its execution, it will be restarted when the sender process resume */ - if (comm->src_proc->is_suspended() || comm->dst_proc->is_suspended()) { - if (comm->src_proc->is_suspended()) + if (comm->src_actor_->is_suspended() || comm->dst_actor_->is_suspended()) { + if (comm->src_actor_->is_suspended()) XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the " "communication", - comm->src_proc->get_cname(), comm->src_proc->host_->get_cname()); + comm->src_actor_->get_cname(), comm->src_actor_->host_->get_cname()); else XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the " "communication", - comm->dst_proc->get_cname(), comm->dst_proc->host_->get_cname()); + comm->dst_actor_->get_cname(), comm->dst_actor_->host_->get_cname()); - comm->surfAction_->suspend(); + comm->surf_action_->suspend(); } } } @@ -520,7 +521,7 @@ void SIMIX_comm_finish(smx_activity_t synchro) /* Check out for errors */ if (simcall->issuer->host_->is_off()) { - simcall->issuer->context_->iwannadie = 1; + simcall->issuer->context_->iwannadie = true; simcall->issuer->exception = std::make_exception_ptr(simgrid::HostFailureException(XBT_THROW_POINT, "Host failed")); } else { @@ -532,45 +533,50 @@ void SIMIX_comm_finish(smx_activity_t synchro) break; case SIMIX_SRC_TIMEOUT: - SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender"); + simcall->issuer->exception = std::make_exception_ptr( + simgrid::TimeoutError(XBT_THROW_POINT, "Communication timeouted because of the sender")); break; case SIMIX_DST_TIMEOUT: - SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver"); + simcall->issuer->exception = std::make_exception_ptr( + simgrid::TimeoutError(XBT_THROW_POINT, "Communication timeouted because of the receiver")); break; case SIMIX_SRC_HOST_FAILURE: - if (simcall->issuer == comm->src_proc) - simcall->issuer->context_->iwannadie = 1; + if (simcall->issuer == comm->src_actor_) + simcall->issuer->context_->iwannadie = true; else - SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); + simcall->issuer->exception = + std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); break; case SIMIX_DST_HOST_FAILURE: - if (simcall->issuer == comm->dst_proc) - simcall->issuer->context_->iwannadie = 1; + if (simcall->issuer == comm->dst_actor_) + simcall->issuer->context_->iwannadie = true; else - SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); + simcall->issuer->exception = + std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); break; case SIMIX_LINK_FAILURE: XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) " "detached:%d", - synchro.get(), comm->src_proc ? comm->src_proc->host_->get_cname() : nullptr, - comm->dst_proc ? comm->dst_proc->host_->get_cname() : nullptr, simcall->issuer->get_cname(), + synchro.get(), comm->src_actor_ ? comm->src_actor_->host_->get_cname() : nullptr, + comm->dst_actor_ ? comm->dst_actor_->host_->get_cname() : nullptr, simcall->issuer->get_cname(), simcall->issuer, comm->detached); - if (comm->src_proc == simcall->issuer) { + if (comm->src_actor_ == simcall->issuer) { XBT_DEBUG("I'm source"); - } else if (comm->dst_proc == simcall->issuer) { + } else if (comm->dst_actor_ == simcall->issuer) { XBT_DEBUG("I'm dest"); } else { XBT_DEBUG("I'm neither source nor dest"); } - SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure"); + simcall->issuer->throw_exception( + std::make_exception_ptr(simgrid::NetworkFailureException(XBT_THROW_POINT, "Link failure"))); break; case SIMIX_CANCELED: - if (simcall->issuer == comm->dst_proc) + if (simcall->issuer == comm->dst_actor_) SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender"); else SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver"); @@ -582,54 +588,62 @@ void SIMIX_comm_finish(smx_activity_t synchro) } /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */ - if (simcall->issuer->exception) { + if (simcall->issuer->exception && + (simcall->call == SIMCALL_COMM_WAITANY || simcall->call == SIMCALL_COMM_TESTANY)) { + // First retrieve the rank of our failing synchro + int rank = -1; + if (simcall->call == SIMCALL_COMM_WAITANY) { + rank = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro); + } else if (simcall->call == SIMCALL_COMM_TESTANY) { + rank = -1; + auto* comms = simcall_comm_testany__get__comms(simcall); + auto count = simcall_comm_testany__get__count(simcall); + auto element = std::find(comms, comms + count, synchro); + if (element == comms + count) + rank = -1; + else + rank = element - comms; + } + // In order to modify the exception we have to rethrow it: try { std::rethrow_exception(simcall->issuer->exception); - } - catch(xbt_ex& e) { - if (simcall->call == SIMCALL_COMM_WAITANY) { - e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro); - } - else if (simcall->call == SIMCALL_COMM_TESTANY) { - e.value = -1; - auto* comms = simcall_comm_testany__get__comms(simcall); - auto count = simcall_comm_testany__get__count(simcall); - auto element = std::find(comms, comms + count, synchro); - if (element == comms + count) - e.value = -1; - else - e.value = element - comms; - } + } catch (simgrid::TimeoutError& e) { + e.value = rank; simcall->issuer->exception = std::make_exception_ptr(e); + } catch (simgrid::NetworkFailureException& e) { + e.value = rank; + simcall->issuer->exception = std::make_exception_ptr(e); + } catch (xbt_ex& e) { + if (e.category == cancel_error) { + e.value = rank; + simcall->issuer->exception = std::make_exception_ptr(e); + } else { + xbt_die("Unexpected xbt_ex(%s). Please enhance this code", xbt_ex_catname(e.category)); + } } - catch(...) { - // Nothing to do - } - } - - if (simcall->issuer->host_->is_off()) { - simcall->issuer->context_->iwannadie = 1; } simcall->issuer->waiting_synchro = nullptr; simcall->issuer->comms.remove(synchro); if(comm->detached){ - if(simcall->issuer == comm->src_proc){ - if(comm->dst_proc) - comm->dst_proc->comms.remove(synchro); - } - else if(simcall->issuer == comm->dst_proc){ - if(comm->src_proc) - comm->src_proc->comms.remove(synchro); + if (simcall->issuer == comm->src_actor_) { + if (comm->dst_actor_) + comm->dst_actor_->comms.remove(synchro); + } else if (simcall->issuer == comm->dst_actor_) { + if (comm->src_actor_) + comm->src_actor_->comms.remove(synchro); } else{ - comm->dst_proc->comms.remove(synchro); - comm->src_proc->comms.remove(synchro); + comm->dst_actor_->comms.remove(synchro); + comm->src_actor_->comms.remove(synchro); } } - SIMIX_simcall_answer(simcall); + if (simcall->issuer->host_->is_off()) + simcall->issuer->context_->iwannadie = true; + else + SIMIX_simcall_answer(simcall); } } @@ -649,7 +663,7 @@ void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t boost::static_pointer_cast(synchro); xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); - *(void **) (comm->dst_buff) = buff; + *(void**)(comm->dst_buff_) = buff; } void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size) @@ -658,10 +672,10 @@ void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t boost::static_pointer_cast(synchro); XBT_DEBUG("Copy the data over"); - memcpy(comm->dst_buff, buff, buff_size); + memcpy(comm->dst_buff_, buff, buff_size); if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP xbt_free(buff); - comm->src_buff = nullptr; + comm->src_buff_ = nullptr; } } @@ -674,28 +688,28 @@ void SIMIX_comm_copy_data(smx_activity_t synchro) simgrid::kernel::activity::CommImplPtr comm = boost::static_pointer_cast(synchro); - size_t buff_size = comm->src_buff_size; + size_t buff_size = comm->src_buff_size_; /* If there is no data to copy then return */ - if (not comm->src_buff || not comm->dst_buff || comm->copied) + if (not comm->src_buff_ || not comm->dst_buff_ || comm->copied) return; XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm.get(), - comm->src_proc ? comm->src_proc->host_->get_cname() : "a finished process", comm->src_buff, - comm->dst_proc ? comm->dst_proc->host_->get_cname() : "a finished process", comm->dst_buff, buff_size); + comm->src_actor_ ? comm->src_actor_->host_->get_cname() : "a finished process", comm->src_buff_, + comm->dst_actor_ ? comm->dst_actor_->host_->get_cname() : "a finished process", comm->dst_buff_, buff_size); /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ - if (comm->dst_buff_size) - buff_size = std::min(buff_size, *(comm->dst_buff_size)); + if (comm->dst_buff_size_) + buff_size = std::min(buff_size, *(comm->dst_buff_size_)); /* Update the receiver's buffer size to the copied amount */ - if (comm->dst_buff_size) - *comm->dst_buff_size = buff_size; + if (comm->dst_buff_size_) + *comm->dst_buff_size_ = buff_size; if (buff_size > 0){ if(comm->copy_data_fun) - comm->copy_data_fun (comm, comm->src_buff, buff_size); + comm->copy_data_fun(comm, comm->src_buff_, buff_size); else - SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size); + SIMIX_comm_copy_data_callback(comm, comm->src_buff_, buff_size); } /* Set the copied flag so we copy data only once */