From 0f9c16dc1c877e21cf0a8c538e62f94dc66bce49 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Wed, 24 May 2017 11:35:17 +0200 Subject: [PATCH] first step towards a CommPtr --- .../smpi/replay_multiple/replay_multiple.tesh | 2 +- include/simgrid/simix.h | 2 + src/kernel/activity/ActivityImpl.hpp | 19 +++++++ src/kernel/activity/CommImpl.hpp | 31 +++++++++++ src/kernel/activity/SynchroComm.cpp | 8 ++- src/msg/msg_gos.cpp | 6 +++ src/simix/ActorImpl.cpp | 18 ++----- src/simix/smx_network.cpp | 53 +++++++++++-------- tools/cmake/DefinePackages.cmake | 1 + 9 files changed, 101 insertions(+), 39 deletions(-) create mode 100644 src/kernel/activity/CommImpl.hpp diff --git a/examples/smpi/replay_multiple/replay_multiple.tesh b/examples/smpi/replay_multiple/replay_multiple.tesh index 72f83bb1ce..a8816fd11e 100644 --- a/examples/smpi/replay_multiple/replay_multiple.tesh +++ b/examples/smpi/replay_multiple/replay_multiple.tesh @@ -3,7 +3,7 @@ p Test the replay with multiple instances p first generate the deployment file $ ${srcdir:=.}/generate_multiple_deployment.sh -platform ${srcdir:=.}/../../platforms/small_platform_with_routers.xml -hostfile ${srcdir:=.}/../hostfile ${srcdir:=.}/description_file ${srcdir:=.}/deployment.xml -p This test needs maxmin/concurrency-limit=100 because it stats 64 hosts on 5 machines. +p This test needs maxmin/concurrency-limit=100 because it starts 64 hosts on 5 machines. ! timeout 120 $ ./replay_multiple description_file ${srcdir:=.}/../../platforms/small_platform_with_routers.xml ${srcdir:=.}/deployment.xml --log=smpi.:info --cfg=maxmin/concurrency-limit:100 > [0.000000] [xbt_cfg/INFO] Configuration change: Set 'maxmin/concurrency-limit' to '100' diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 02fdab7357..3f0e836533 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -226,6 +226,8 @@ XBT_PUBLIC(smx_activity_t) SIMIX_comm_get_send_match(smx_mailbox_t mbox, int (*m XBT_PUBLIC(int) SIMIX_comm_has_send_match(smx_mailbox_t mbox, int (*match_fun)(void*, void*), void* data); XBT_PUBLIC(int) SIMIX_comm_has_recv_match(smx_mailbox_t mbox, int (*match_fun)(void*, void*), void* data); XBT_PUBLIC(void) SIMIX_comm_finish(smx_activity_t synchro); +XBT_PUBLIC(smx_activity_t) SIMIX_comm_ref(smx_activity_t comm); +XBT_PUBLIC(void) SIMIX_comm_unref(smx_activity_t comm); /******************************************************************************/ /* SIMIX simcalls */ diff --git a/src/kernel/activity/ActivityImpl.hpp b/src/kernel/activity/ActivityImpl.hpp index 94b85900e8..4791484309 100644 --- a/src/kernel/activity/ActivityImpl.hpp +++ b/src/kernel/activity/ActivityImpl.hpp @@ -12,6 +12,7 @@ #include #include "simgrid/forward.h" +#include #include namespace simgrid { @@ -30,9 +31,27 @@ namespace activity { virtual void resume()=0; virtual void post() =0; // What to do when a simcall terminates + // boost::intrusive_ptr support: + friend void intrusive_ptr_add_ref(ActivityImpl * activity) + { + // Atomic operation! Do not split in two instructions! + auto previous = (activity->refcount_)++; + xbt_assert(previous != 0); + (void)previous; + } + + friend void intrusive_ptr_release(ActivityImpl * activity) + { + // Atomic operation! Do not split in two instructions! + auto count = --(activity->refcount_); + if (count == 0) + delete activity; + } + void ref(); void unref(); private: + std::atomic_int_fast32_t refcount_{1}; int refcount = 1; }; }}} // namespace simgrid::kernel::activity diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp new file mode 100644 index 0000000000..d1e19492a5 --- /dev/null +++ b/src/kernel/activity/CommImpl.hpp @@ -0,0 +1,31 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_ +#define SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_ + +#include "simgrid/forward.h" + +#include +#include + +namespace simgrid { +namespace kernel { +namespace activity { + +XBT_PUBLIC_CLASS CommImpl : ActivityImpl +{ +public: + CommImpl() : piface_(this){}; + ~CommImpl() = default; + + using Ptr = boost::intrusive_ptr; + simgrid::s4u::Comm piface_; // Our interface +}; +} +} +} + +#endif /* SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_ */ diff --git a/src/kernel/activity/SynchroComm.cpp b/src/kernel/activity/SynchroComm.cpp index 1949b21a05..9cb4c3701d 100644 --- a/src/kernel/activity/SynchroComm.cpp +++ b/src/kernel/activity/SynchroComm.cpp @@ -16,7 +16,7 @@ simgrid::kernel::activity::Comm::Comm(e_smx_comm_type_t _type) : type(_type) state = SIMIX_WAITING; src_data=nullptr; dst_data=nullptr; - + intrusive_ptr_add_ref(this); XBT_DEBUG("Create communicate synchro %p", this); } @@ -37,6 +37,7 @@ simgrid::kernel::activity::Comm::~Comm() if(mbox) mbox->remove(this); } + void simgrid::kernel::activity::Comm::suspend() { /* FIXME: shall we suspend also the timeout synchro? */ @@ -61,6 +62,7 @@ void simgrid::kernel::activity::Comm::cancel() if (state == SIMIX_WAITING) { mbox->remove(this); state = SIMIX_CANCELED; + SIMIX_comm_unref(this); } else if (not MC_is_active() /* when running the MC there are no surf actions */ && not MC_record_replay_is_active() && (state == SIMIX_READY || state == SIMIX_RUNNING)) { @@ -120,6 +122,8 @@ void simgrid::kernel::activity::Comm::post() cleanupSurf(); /* if there are simcalls associated with the synchro, then answer them */ - if (not simcalls.empty()) + if (not simcalls.empty()) { SIMIX_comm_finish(this); + SIMIX_comm_unref(this); + } } diff --git a/src/msg/msg_gos.cpp b/src/msg/msg_gos.cpp index 7bbb3033ed..8e921e334c 100644 --- a/src/msg/msg_gos.cpp +++ b/src/msg/msg_gos.cpp @@ -270,6 +270,7 @@ msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, d simcall_comm_recv(MSG_process_self()->getImpl(), mailbox->getImpl(), task, nullptr, nullptr, nullptr, nullptr, timeout, rate); XBT_DEBUG("Got task %s from %s",(*task)->name,mailbox->name()); (*task)->simdata->setNotUsed(); + SIMIX_comm_unref((*task)->simdata->comm); } catch (xbt_ex& e) { switch (e.category) { @@ -489,6 +490,7 @@ int MSG_comm_test(msg_comm_t comm) if (finished && comm->task_received != nullptr) { /* I am the receiver */ (*comm->task_received)->simdata->setNotUsed(); + SIMIX_comm_unref(comm->s_comm); } } catch (xbt_ex& e) { @@ -556,6 +558,7 @@ int MSG_comm_testany(xbt_dynar_t comms) if (status == MSG_OK && comm->task_received != nullptr) { /* I am the receiver */ (*comm->task_received)->simdata->setNotUsed(); + SIMIX_comm_unref(comm->s_comm); } } @@ -584,6 +587,7 @@ msg_error_t MSG_comm_wait(msg_comm_t comm, double timeout) { try { simcall_comm_wait(comm->s_comm, timeout); + SIMIX_comm_unref(comm->s_comm); if (comm->task_received != nullptr) { /* I am the receiver */ @@ -668,6 +672,7 @@ int MSG_comm_waitany(xbt_dynar_t comms) if (comm->task_received != nullptr) { /* I am the receiver */ (*comm->task_received)->simdata->setNotUsed(); + SIMIX_comm_unref(comm->s_comm); } return finished_index; @@ -795,6 +800,7 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl simcall_set_category(comm, task->category); t_simdata->comm = static_cast(comm); simcall_comm_wait(comm, timeout); + SIMIX_comm_unref(comm); } catch (xbt_ex& e) { switch (e.category) { diff --git a/src/simix/ActorImpl.cpp b/src/simix/ActorImpl.cpp index 758d26d432..dca4a3b11c 100644 --- a/src/simix/ActorImpl.cpp +++ b/src/simix/ActorImpl.cpp @@ -111,10 +111,8 @@ void SIMIX_process_cleanup(smx_actor_t process) if (comm->detached) XBT_DEBUG("Don't destroy it since it's a detached comm and I'm the sender"); else - comm->unref(); - - } - else if (comm->dst_proc == process){ + SIMIX_comm_unref(comm); + } else if (comm->dst_proc == process) { XBT_DEBUG("Found an unfinished recv comm %p, state %d, src = %p, dst = %p", comm, (int)comm->state, comm->src_proc, comm->dst_proc); comm->dst_proc = nullptr; @@ -123,8 +121,7 @@ void SIMIX_process_cleanup(smx_actor_t process) /* the comm will be freed right now, remove it from the sender */ comm->src_proc->comms.remove(comm); } - - comm->unref(); + SIMIX_comm_unref(comm); } else { xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro); } @@ -440,16 +437,11 @@ void SIMIX_process_kill(smx_actor_t process, smx_actor_t issuer) { } else if (comm != nullptr) { process->comms.remove(process->waiting_synchro); comm->cancel(); - // Remove first occurrence of &process->simcall: - auto i = boost::range::find( - process->waiting_synchro->simcalls, - &process->simcall); + auto i = boost::range::find(process->waiting_synchro->simcalls, &process->simcall); if (i != process->waiting_synchro->simcalls.end()) process->waiting_synchro->simcalls.remove(&process->simcall); - - comm->unref(); - + SIMIX_comm_unref(comm); } else if (sleep != nullptr) { SIMIX_process_sleep_destroy(process->waiting_synchro); diff --git a/src/simix/smx_network.cpp b/src/simix/smx_network.cpp index ef153ce17a..f542074033 100644 --- a/src/simix/smx_network.cpp +++ b/src/simix/smx_network.cpp @@ -45,7 +45,7 @@ _find_matching_comm(boost::circular_buffer_space_optimized* dequ for(auto it = deque->begin(); it != deque->end(); it++){ smx_activity_t synchro = *it; - simgrid::kernel::activity::Comm *comm = static_cast(synchro); + simgrid::kernel::activity::Comm* comm = static_cast(synchro); if (comm->type == SIMIX_COMM_SEND) { other_user_data = comm->src_data; @@ -57,7 +57,7 @@ _find_matching_comm(boost::circular_buffer_space_optimized* dequ XBT_DEBUG("Found a matching communication synchro %p", comm); if (remove_matching) deque->erase(it); - comm->ref(); + comm = static_cast(SIMIX_comm_ref(comm)); #if SIMGRID_HAVE_MC comm->mbox_cpy = comm->mbox; #endif @@ -114,7 +114,7 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx //this mailbox is for small messages, which have to be sent right now other_comm->state = SIMIX_READY; other_comm->dst_proc=mbox->permanent_receiver.get(); - other_comm->ref(); + other_comm = static_cast(SIMIX_comm_ref(other_comm)); mbox->done_comm_queue.push_back(other_comm); XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm)); @@ -123,7 +123,7 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx } } else { XBT_DEBUG("Receive already pushed"); - this_comm->unref(); + SIMIX_comm_unref(this_comm); other_comm->state = SIMIX_READY; other_comm->type = SIMIX_COMM_READY; @@ -207,8 +207,8 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void * other_comm->type = SIMIX_COMM_DONE; other_comm->mbox = nullptr; } - other_comm->unref(); - static_cast(this_synchro)->unref(); + SIMIX_comm_unref(other_comm); + SIMIX_comm_unref(this_synchro); } } else { /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */ @@ -225,7 +225,8 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void * other_comm = this_synchro; mbox->push(this_synchro); } else { - this_synchro->unref(); + SIMIX_comm_unref(this_synchro); + other_comm = static_cast(other_comm); other_comm->state = SIMIX_READY; other_comm->type = SIMIX_COMM_READY; @@ -287,9 +288,9 @@ smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int t } if(other_synchro) - other_synchro->unref(); + SIMIX_comm_unref(other_synchro); - this_comm->unref(); + SIMIX_comm_unref(this_comm); return other_synchro; } @@ -501,16 +502,14 @@ static inline void SIMIX_comm_start(smx_activity_t synchro) void SIMIX_comm_finish(smx_activity_t synchro) { simgrid::kernel::activity::Comm *comm = static_cast(synchro); - unsigned int destroy_count = 0; while (not synchro->simcalls.empty()) { smx_simcall_t simcall = synchro->simcalls.front(); synchro->simcalls.pop_front(); - /* If a waitany simcall is waiting for this synchro to finish, then remove - it from the other synchros in the waitany list. Afterwards, get the - position of the actual synchro in the waitany dynar and - return it as the result of the simcall */ + /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany + * list. Afterwards, get the position of the actual synchro in the waitany dynar and return it as the result of the + * simcall */ if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case continue; // if process handling comm is killed @@ -521,7 +520,8 @@ void SIMIX_comm_finish(smx_activity_t synchro) simcall->timer = nullptr; } if (not MC_is_active() && not MC_record_replay_is_active()) - simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro)); + simcall_comm_waitany__set__result(simcall, + xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro)); } /* If the synchro is still in a rendez-vous point then remove from it */ @@ -642,16 +642,10 @@ void SIMIX_comm_finish(smx_activity_t synchro) comm->dst_proc->comms.remove(synchro); comm->src_proc->comms.remove(synchro); } - //in case of a detached comm we have an extra ref to remove, as the sender won't do it - destroy_count++; } SIMIX_simcall_answer(simcall); - destroy_count++; } - - while (destroy_count-- > 0) - static_cast(synchro)->unref(); } /******************************************************************************/ @@ -684,7 +678,6 @@ void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t } } - /** * @brief Copy the communication data from the sender's buffer to the receiver's one * @param synchro The communication @@ -717,8 +710,22 @@ void SIMIX_comm_copy_data(smx_activity_t synchro) SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size); } - /* Set the copied flag so we copy data only once */ /* (this function might be called from both communication ends) */ comm->copied = 1; } + +/** Increase the refcount for this comm */ +smx_activity_t SIMIX_comm_ref(smx_activity_t comm) +{ + if (comm != nullptr) + intrusive_ptr_add_ref(comm); + return comm; +} + +/** Decrease the refcount for this comm */ +void SIMIX_comm_unref(smx_activity_t comm) +{ + if (comm != nullptr) + intrusive_ptr_release(comm); +} diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 71961df702..17f25b1271 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -31,6 +31,7 @@ set(EXTRA_DIST src/simix/smx_private.h src/simix/smx_synchro_private.h src/kernel/activity/ActivityImpl.hpp + src/kernel/activity/CommImpl.hpp src/kernel/activity/SynchroComm.hpp src/kernel/activity/SynchroExec.hpp src/kernel/activity/SynchroIo.hpp -- 2.20.1