}
if (detached) {
- other_comm->detached_ = true;
+ other_comm->detach();
other_comm->clean_fun = clean_fun;
} else {
other_comm->clean_fun = nullptr;
if (other_comm->surf_action_ && other_comm->get_remaining() < 1e-12) {
XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get());
other_comm->state_ = SIMIX_DONE;
- other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::DONE);
- other_comm->mbox = nullptr;
+ other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::DONE).set_mailbox(nullptr);
}
}
} else {
{
XBT_DEBUG("Copy the data over");
memcpy(comm->dst_buff_, buff, buff_size);
- if (comm->detached_) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
- // original buffer available to the application ASAP
+ if (comm->detached()) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
+ // original buffer available to the application ASAP
xbt_free(buff);
comm->src_buff_ = nullptr;
}
rate_ = rate;
return *this;
}
+CommImpl& CommImpl::set_mailbox(MailboxImpl* mbox)
+{
+ mbox_ = mbox;
+ return *this;
+}
CommImpl& CommImpl::set_src_buff(unsigned char* buff, size_t size)
{
return *this;
}
+CommImpl& CommImpl::detach()
+{
+ detached_ = true;
+ return *this;
+}
+
CommImpl::~CommImpl()
{
XBT_DEBUG("Really free communication %p in state %d (detached = %d)", this, static_cast<int>(state_), detached_);
if (clean_fun)
clean_fun(src_buff_);
src_buff_ = nullptr;
- } else if (mbox) {
- mbox->remove(this);
+ } else if (mbox_) {
+ mbox_->remove(this);
}
}
{
size_t buff_size = src_buff_size_;
/* If there is no data to copy then return */
- if (not src_buff_ || not dst_buff_ || copied)
+ if (not src_buff_ || not dst_buff_ || copied_)
return;
XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
/* Set the copied flag so we copy data only once */
/* (this function might be called from both communication ends) */
- copied = true;
+ copied_ = true;
}
void CommImpl::suspend()
/* if the synchro is a waiting state means that it is still in a mbox so remove from it and delete it */
if (state_ == SIMIX_WAITING) {
if (not detached_) {
- mbox->remove(this);
+ mbox_->remove(this);
state_ = SIMIX_CANCELED;
}
} else if (not MC_is_active() /* when running the MC there are no surf actions */
}
/* If the synchro is still in a rendez-vous point then remove from it */
- if (mbox)
- mbox->remove(this);
+ if (mbox_)
+ mbox_->remove(this);
XBT_DEBUG("CommImpl::finish(): synchro state = %d", static_cast<int>(state_));
~CommImpl() override;
void cleanupSurf();
- double rate_ = 0.0;
- double size_ = 0.0;
+ double rate_ = 0.0;
+ double size_ = 0.0;
+ bool detached_ = false; /* If detached or not */
+ bool copied_ = false; /* whether the data were already copied */
+ MailboxImpl* mbox_ = nullptr; /* Rendez-vous where the comm is queued */
public:
enum class Type { SEND = 0, RECEIVE, READY, DONE };
CommImpl& set_src_buff(unsigned char* buff, size_t size);
CommImpl& set_dst_buff(unsigned char* buff, size_t* size);
CommImpl& set_rate(double rate);
- double get_rate() { return rate_; }
+ CommImpl& set_mailbox(MailboxImpl* mbox);
+ CommImpl& detach();
+
+ double get_rate() const { return rate_; }
+ MailboxImpl* get_mailbox() const { return mbox_; }
+ bool detached() const { return detached_; }
void copy_data();
void finish() override;
CommImpl::Type type_; /* Type of the communication (SIMIX_COMM_SEND or SIMIX_COMM_RECEIVE) */
- MailboxImpl* mbox = nullptr; /* Rendez-vous where the comm is queued */
#if SIMGRID_HAVE_MC
MailboxImpl* mbox_cpy = nullptr; /* Copy of the rendez-vous where the comm is queued, MC needs it for DPOR
(comm.mbox set to nullptr when the communication is removed from the mailbox
(used as garbage collector)) */
#endif
- bool detached_ = false; /* If detached or not */
void (*clean_fun)(void*) = nullptr; /* Function to clean the detached src_buf if something goes wrong */
int (*match_fun)(void*, void*, CommImpl*) = nullptr; /* Filter function used by the other side. It is used when
unsigned char* dst_buff_ = nullptr;
size_t src_buff_size_ = 0;
size_t* dst_buff_size_ = nullptr;
- bool copied = false; /* whether the data were already copied */
void* src_data_ = nullptr; /* User data associated to the communication */
void* dst_data_ = nullptr;
*/
void MailboxImpl::push(CommImplPtr comm)
{
- comm->mbox = this;
+ comm->set_mailbox(this);
this->comm_queue_.push_back(std::move(comm));
}
*/
void MailboxImpl::remove(const CommImplPtr& comm)
{
- xbt_assert(comm->mbox == this, "Comm %p is in mailbox %s, not mailbox %s", comm.get(),
- (comm->mbox ? comm->mbox->get_cname() : "(null)"), this->get_cname());
- comm->mbox = nullptr;
+ xbt_assert(comm->get_mailbox() == this, "Comm %p is in mailbox %s, not mailbox %s", comm.get(),
+ (comm->get_mailbox() ? comm->get_mailbox()->get_cname() : "(null)"), this->get_cname());
+
+ comm->set_mailbox(nullptr);
for (auto it = this->comm_queue_.begin(); it != this->comm_queue_.end(); it++)
if (*it == comm) {
this->comm_queue_.erase(it);
(not comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro.get()))) {
XBT_DEBUG("Found a matching communication synchro %p", comm.get());
#if SIMGRID_HAVE_MC
- comm->mbox_cpy = comm->mbox;
+ comm->mbox_cpy = comm->get_mailbox();
#endif
- comm->mbox = nullptr;
+ comm->set_mailbox(nullptr);
CommImplPtr comm_cpy = comm;
if (remove_matching)
comm_queue.erase(it);
class MailboxImpl {
static constexpr size_t MAX_MAILBOX_SIZE = 10000000;
+ s4u::Mailbox piface_;
+ xbt::string name_;
+
friend s4u::Mailbox;
friend s4u::Mailbox* s4u::Mailbox::by_name(const std::string& name);
friend mc::CommunicationDeterminismChecker;
CommImplPtr find_matching_comm(CommImpl::Type type, int (*match_fun)(void*, void*, CommImpl*), void* this_user_data,
const CommImplPtr& my_synchro, bool done, bool remove_matching);
-private:
- s4u::Mailbox piface_;
- xbt::string name_;
-
-public:
actor::ActorImplPtr permanent_receiver_; // actor to which the mailbox is attached
boost::circular_buffer_space_optimized<CommImplPtr> comm_queue_;
boost::circular_buffer_space_optimized<CommImplPtr> done_comm_queue_; // messages already received in the permanent
namespace activity {
class XBT_PUBLIC MutexImpl {
+ std::atomic_int_fast32_t refcount_{1};
+ s4u::Mutex piface_;
+ bool locked_ = false;
+
public:
MutexImpl() : piface_(this) {}
MutexImpl(MutexImpl const&) = delete;
void lock(actor::ActorImpl* issuer);
bool try_lock(actor::ActorImpl* issuer);
void unlock(actor::ActorImpl* issuer);
+ bool is_locked() { return locked_; }
MutexImpl* ref();
void unref();
- bool locked_ = false;
+
actor::ActorImpl* owner_ = nullptr;
// List of sleeping actors:
actor::SynchroList sleeping_;
}
s4u::Mutex& mutex() { return piface_; }
-
-private:
- std::atomic_int_fast32_t refcount_{1};
- s4u::Mutex piface_;
};
}
}
namespace activity {
class XBT_PUBLIC SemaphoreImpl {
+ std::atomic_int_fast32_t refcount_{1};
+ unsigned int value_;
+
public:
+ actor::SynchroList sleeping_; /* list of sleeping actors*/
+
explicit SemaphoreImpl(unsigned int value) : value_(value){};
~SemaphoreImpl() = default;
void acquire(actor::ActorImpl* issuer, double timeout);
void release();
bool would_block() { return (value_ == 0); }
+
unsigned int get_capacity() { return value_; }
friend void intrusive_ptr_add_ref(SemaphoreImpl* sem)
if (sem->refcount_.fetch_sub(1) == 1)
delete sem;
}
-
- unsigned int value_;
- actor::SynchroList sleeping_; /* list of sleeping actors*/
-
-private:
- std::atomic_int_fast32_t refcount_{1};
};
} // namespace activity
} // namespace kernel
simgrid::kernel::activity::CommImpl* synchro =
static_cast<simgrid::kernel::activity::CommImpl*>(temp_synchro.getBuffer());
- char* remote_name = mc_model_checker->process().read<char*>(
- RemotePtr<char*>((uint64_t)(synchro->mbox ? &synchro->mbox->name_ : &synchro->mbox_cpy->name_)));
+ char* remote_name = mc_model_checker->process().read<char*>(RemotePtr<char*>(
+ (uint64_t)(synchro->get_mailbox() ? &synchro->get_mailbox()->name_ : &synchro->mbox_cpy->name_)));
pattern->rdv = mc_model_checker->process().read_string(RemotePtr<char>(remote_name));
pattern->src_proc =
mc_model_checker->process().resolveActor(simgrid::mc::remote(synchro->src_actor_.get()))->get_pid();
simgrid::kernel::activity::CommImpl* comm = temp_comm.getBuffer();
char* remote_name;
- mc_model_checker->process().read(
- &remote_name, remote(comm->mbox ? &simgrid::xbt::string::to_string_data(comm->mbox->name_).data
- : &simgrid::xbt::string::to_string_data(comm->mbox_cpy->name_).data));
+ mc_model_checker->process().read(&remote_name,
+ remote(comm->get_mailbox()
+ ? &simgrid::xbt::string::to_string_data(comm->get_mailbox()->name_).data
+ : &simgrid::xbt::string::to_string_data(comm->mbox_cpy->name_).data));
pattern->rdv = mc_model_checker->process().read_string(RemotePtr<char>(remote_name));
pattern->dst_proc =
mc_model_checker->process().resolveActor(simgrid::mc::remote(comm->dst_actor_.get()))->get_pid();
return true;
}
/* On the other hand if it hasn't a timeout, check if the comm is ready.*/
- else if (act->detached_ && act->src_actor_ == nullptr &&
+ else if (act->detached() && act->src_actor_ == nullptr &&
act->type_ == simgrid::kernel::activity::CommImpl::Type::READY)
return (act->dst_actor_ != nullptr);
return (act->src_actor_ && act->dst_actor_);
: simcall_mutex_trylock__get__mutex(req)
));
args = bprintf(
- "locked = %d, owner = %d, sleeping = n/a", mutex.getBuffer()->locked_,
+ "locked = %d, owner = %d, sleeping = n/a", mutex.getBuffer()->is_locked(),
mutex.getBuffer()->owner_ != nullptr
? (int)mc_model_checker->process().resolveActor(simgrid::mc::remote(mutex.getBuffer()->owner_))->get_pid()
: -1);
if (act->src_actor_.get() && act->dst_actor_.get())
state->transition.argument = 0;
else if (act->src_actor_.get() == nullptr && act->type_ == simgrid::kernel::activity::CommImpl::Type::READY &&
- act->detached_)
+ act->detached())
state->transition.argument = 0;
else
state->transition.argument = -1;
XBT_DEBUG("Copying %zu bytes from %p to %p", buff_size, tmpbuff, comm->dst_buff_);
memcpy_private(comm->dst_buff_, tmpbuff, private_blocks);
- if (comm->detached_) {
+ if (comm->detached()) {
// if this is a detached send, the source buffer was duplicated by SMPI
// sender to make the original buffer available to the application ASAP
xbt_free(buff);