p first generate the deployment file
$ ${srcdir:=.}/generate_multiple_deployment.sh -platform ${srcdir:=.}/../../platforms/small_platform_with_routers.xml -hostfile ${srcdir:=.}/../hostfile ${srcdir:=.}/description_file ${srcdir:=.}/deployment.xml
-p This test needs maxmin/concurrency-limit=100 because it stats 64 hosts on 5 machines.
+p This test needs maxmin/concurrency-limit=100 because it starts 64 hosts on 5 machines.
! timeout 120
$ ./replay_multiple description_file ${srcdir:=.}/../../platforms/small_platform_with_routers.xml ${srcdir:=.}/deployment.xml --log=smpi.:info --cfg=maxmin/concurrency-limit:100
> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'maxmin/concurrency-limit' to '100'
XBT_PUBLIC(int) SIMIX_comm_has_send_match(smx_mailbox_t mbox, int (*match_fun)(void*, void*), void* data);
XBT_PUBLIC(int) SIMIX_comm_has_recv_match(smx_mailbox_t mbox, int (*match_fun)(void*, void*), void* data);
XBT_PUBLIC(void) SIMIX_comm_finish(smx_activity_t synchro);
+XBT_PUBLIC(smx_activity_t) SIMIX_comm_ref(smx_activity_t comm);
+XBT_PUBLIC(void) SIMIX_comm_unref(smx_activity_t comm);
/******************************************************************************/
/* SIMIX simcalls */
#include <xbt/base.h>
#include "simgrid/forward.h"
+#include <atomic>
#include <simgrid/simix.hpp>
namespace simgrid {
virtual void resume()=0;
virtual void post() =0; // What to do when a simcall terminates
+ // boost::intrusive_ptr<Activity> support:
+ friend void intrusive_ptr_add_ref(ActivityImpl * activity)
+ {
+ // Atomic operation! Do not split in two instructions!
+ auto previous = (activity->refcount_)++;
+ xbt_assert(previous != 0);
+ (void)previous;
+ }
+
+ friend void intrusive_ptr_release(ActivityImpl * activity)
+ {
+ // Atomic operation! Do not split in two instructions!
+ auto count = --(activity->refcount_);
+ if (count == 0)
+ delete activity;
+ }
+
void ref();
void unref();
private:
+ std::atomic_int_fast32_t refcount_{1};
int refcount = 1;
};
}}} // namespace simgrid::kernel::activity
--- /dev/null
+/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_
+#define SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_
+
+#include "simgrid/forward.h"
+
+#include <atomic>
+#include <simgrid/simix.hpp>
+
+namespace simgrid {
+namespace kernel {
+namespace activity {
+
+XBT_PUBLIC_CLASS CommImpl : ActivityImpl
+{
+public:
+ CommImpl() : piface_(this){};
+ ~CommImpl() = default;
+
+ using Ptr = boost::intrusive_ptr<ActivityImpl>;
+ simgrid::s4u::Comm piface_; // Our interface
+};
+}
+}
+}
+
+#endif /* SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_ */
state = SIMIX_WAITING;
src_data=nullptr;
dst_data=nullptr;
-
+ intrusive_ptr_add_ref(this);
XBT_DEBUG("Create communicate synchro %p", this);
}
if(mbox)
mbox->remove(this);
}
+
void simgrid::kernel::activity::Comm::suspend()
{
/* FIXME: shall we suspend also the timeout synchro? */
if (state == SIMIX_WAITING) {
mbox->remove(this);
state = SIMIX_CANCELED;
+ SIMIX_comm_unref(this);
} else if (not MC_is_active() /* when running the MC there are no surf actions */
&& not MC_record_replay_is_active() && (state == SIMIX_READY || state == SIMIX_RUNNING)) {
cleanupSurf();
/* if there are simcalls associated with the synchro, then answer them */
- if (not simcalls.empty())
+ if (not simcalls.empty()) {
SIMIX_comm_finish(this);
+ SIMIX_comm_unref(this);
+ }
}
simcall_comm_recv(MSG_process_self()->getImpl(), mailbox->getImpl(), task, nullptr, nullptr, nullptr, nullptr, timeout, rate);
XBT_DEBUG("Got task %s from %s",(*task)->name,mailbox->name());
(*task)->simdata->setNotUsed();
+ SIMIX_comm_unref((*task)->simdata->comm);
}
catch (xbt_ex& e) {
switch (e.category) {
if (finished && comm->task_received != nullptr) {
/* I am the receiver */
(*comm->task_received)->simdata->setNotUsed();
+ SIMIX_comm_unref(comm->s_comm);
}
}
catch (xbt_ex& e) {
if (status == MSG_OK && comm->task_received != nullptr) {
/* I am the receiver */
(*comm->task_received)->simdata->setNotUsed();
+ SIMIX_comm_unref(comm->s_comm);
}
}
{
try {
simcall_comm_wait(comm->s_comm, timeout);
+ SIMIX_comm_unref(comm->s_comm);
if (comm->task_received != nullptr) {
/* I am the receiver */
if (comm->task_received != nullptr) {
/* I am the receiver */
(*comm->task_received)->simdata->setNotUsed();
+ SIMIX_comm_unref(comm->s_comm);
}
return finished_index;
simcall_set_category(comm, task->category);
t_simdata->comm = static_cast<simgrid::kernel::activity::Comm*>(comm);
simcall_comm_wait(comm, timeout);
+ SIMIX_comm_unref(comm);
}
catch (xbt_ex& e) {
switch (e.category) {
if (comm->detached)
XBT_DEBUG("Don't destroy it since it's a detached comm and I'm the sender");
else
- comm->unref();
-
- }
- else if (comm->dst_proc == process){
+ SIMIX_comm_unref(comm);
+ } else if (comm->dst_proc == process) {
XBT_DEBUG("Found an unfinished recv comm %p, state %d, src = %p, dst = %p",
comm, (int)comm->state, comm->src_proc, comm->dst_proc);
comm->dst_proc = nullptr;
/* the comm will be freed right now, remove it from the sender */
comm->src_proc->comms.remove(comm);
}
-
- comm->unref();
+ SIMIX_comm_unref(comm);
} else {
xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro);
}
} else if (comm != nullptr) {
process->comms.remove(process->waiting_synchro);
comm->cancel();
-
// Remove first occurrence of &process->simcall:
- auto i = boost::range::find(
- process->waiting_synchro->simcalls,
- &process->simcall);
+ auto i = boost::range::find(process->waiting_synchro->simcalls, &process->simcall);
if (i != process->waiting_synchro->simcalls.end())
process->waiting_synchro->simcalls.remove(&process->simcall);
-
- comm->unref();
-
+ SIMIX_comm_unref(comm);
} else if (sleep != nullptr) {
SIMIX_process_sleep_destroy(process->waiting_synchro);
for(auto it = deque->begin(); it != deque->end(); it++){
smx_activity_t synchro = *it;
- simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
+ simgrid::kernel::activity::Comm* comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
if (comm->type == SIMIX_COMM_SEND) {
other_user_data = comm->src_data;
XBT_DEBUG("Found a matching communication synchro %p", comm);
if (remove_matching)
deque->erase(it);
- comm->ref();
+ comm = static_cast<simgrid::kernel::activity::Comm*>(SIMIX_comm_ref(comm));
#if SIMGRID_HAVE_MC
comm->mbox_cpy = comm->mbox;
#endif
//this mailbox is for small messages, which have to be sent right now
other_comm->state = SIMIX_READY;
other_comm->dst_proc=mbox->permanent_receiver.get();
- other_comm->ref();
+ other_comm = static_cast<simgrid::kernel::activity::Comm*>(SIMIX_comm_ref(other_comm));
mbox->done_comm_queue.push_back(other_comm);
XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
}
} else {
XBT_DEBUG("Receive already pushed");
- this_comm->unref();
+ SIMIX_comm_unref(this_comm);
other_comm->state = SIMIX_READY;
other_comm->type = SIMIX_COMM_READY;
other_comm->type = SIMIX_COMM_DONE;
other_comm->mbox = nullptr;
}
- other_comm->unref();
- static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
+ SIMIX_comm_unref(other_comm);
+ SIMIX_comm_unref(this_synchro);
}
} else {
/* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
other_comm = this_synchro;
mbox->push(this_synchro);
} else {
- this_synchro->unref();
+ SIMIX_comm_unref(this_synchro);
+ other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_comm);
other_comm->state = SIMIX_READY;
other_comm->type = SIMIX_COMM_READY;
}
if(other_synchro)
- other_synchro->unref();
+ SIMIX_comm_unref(other_synchro);
- this_comm->unref();
+ SIMIX_comm_unref(this_comm);
return other_synchro;
}
void SIMIX_comm_finish(smx_activity_t synchro)
{
simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
- unsigned int destroy_count = 0;
while (not synchro->simcalls.empty()) {
smx_simcall_t simcall = synchro->simcalls.front();
synchro->simcalls.pop_front();
- /* If a waitany simcall is waiting for this synchro to finish, then remove
- it from the other synchros in the waitany list. Afterwards, get the
- position of the actual synchro in the waitany dynar and
- return it as the result of the simcall */
+ /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+ * list. Afterwards, get the position of the actual synchro in the waitany dynar and return it as the result of the
+ * simcall */
if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
continue; // if process handling comm is killed
simcall->timer = nullptr;
}
if (not MC_is_active() && not MC_record_replay_is_active())
- simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
+ simcall_comm_waitany__set__result(simcall,
+ xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
}
/* If the synchro is still in a rendez-vous point then remove from it */
comm->dst_proc->comms.remove(synchro);
comm->src_proc->comms.remove(synchro);
}
- //in case of a detached comm we have an extra ref to remove, as the sender won't do it
- destroy_count++;
}
SIMIX_simcall_answer(simcall);
- destroy_count++;
}
-
- while (destroy_count-- > 0)
- static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
}
/******************************************************************************/
}
}
-
/**
* @brief Copy the communication data from the sender's buffer to the receiver's one
* @param synchro The communication
SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
}
-
/* Set the copied flag so we copy data only once */
/* (this function might be called from both communication ends) */
comm->copied = 1;
}
+
+/** Increase the refcount for this comm */
+smx_activity_t SIMIX_comm_ref(smx_activity_t comm)
+{
+ if (comm != nullptr)
+ intrusive_ptr_add_ref(comm);
+ return comm;
+}
+
+/** Decrease the refcount for this comm */
+void SIMIX_comm_unref(smx_activity_t comm)
+{
+ if (comm != nullptr)
+ intrusive_ptr_release(comm);
+}
src/simix/smx_private.h
src/simix/smx_synchro_private.h
src/kernel/activity/ActivityImpl.hpp
+ src/kernel/activity/CommImpl.hpp
src/kernel/activity/SynchroComm.hpp
src/kernel/activity/SynchroExec.hpp
src/kernel/activity/SynchroIo.hpp