Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
strenghten the behavior of Message queues after some Wrench breaking
[simgrid.git] / src / kernel / activity / MessImpl.cpp
index 9f77c7d..3a3e307 100644 (file)
@@ -47,8 +47,12 @@ MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
 
 MessImpl* MessImpl::start()
 {
-  if (get_state() == State::READY)
-    set_state(State::DONE);
+  if (get_state() == State::READY) {
+    XBT_DEBUG("Starting message exchange %p from '%s' to '%s' (state: %s)", this, src_actor_->get_host()->get_cname(),
+              dst_actor_->get_host()->get_cname(), get_state_str());
+    set_state(State::RUNNING);
+    finish();
+  }
   return this;
 }
 
@@ -87,20 +91,20 @@ ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
 
 ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
 {
-  MessImplPtr this_synchro(new MessImpl());
-  this_synchro->set_type(MessImplType::GET);
+  MessImplPtr this_mess(new MessImpl());
+  this_mess->set_type(MessImplType::GET);
 
   auto* queue = observer->get_queue();
-  XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get());
+  XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_mess.get());
 
   MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
 
   if (other_mess == nullptr) {
     XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
-    other_mess = std::move(this_synchro);
+    other_mess = std::move(this_mess);
     queue->push(other_mess);
   } else {
-    XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get());
+    XBT_DEBUG("Match my %p with the existing %p", this_mess.get(), other_mess.get());
 
     other_mess->set_state(State::READY);
   }
@@ -117,9 +121,27 @@ ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
   return other_mess;
 }
 
+void MessImpl::wait_for(actor::ActorImpl* issuer, double timeout)
+{
+  XBT_DEBUG("MessImpl::wait_for(%g), %p, state %s", timeout, this, get_state_str());
+
+  /* Associate this simcall to the wait synchro */
+  register_simcall(&issuer->simcall_);
+  ActivityImpl::wait_for(issuer, timeout);
+}
+
+void MessImpl::cancel()
+{
+  /* if the synchro is a waiting state means that it is still in a mbox so remove from it and delete it */
+  if (get_state() == State::WAITING) {
+      queue_->remove(this);
+      set_state(State::CANCELED);
+  }
+}
+
 void MessImpl::finish()
 {
-  XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
+  XBT_DEBUG("MessImpl::finish() mess %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
             src_actor_.get(), dst_actor_.get());
 
   if (get_iface()) {