Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rework MessImpl
authorFred Suter <suterf@ornl.gov>
Thu, 26 Oct 2023 20:29:23 +0000 (16:29 -0400)
committerFred Suter <suterf@ornl.gov>
Thu, 26 Oct 2023 20:29:23 +0000 (16:29 -0400)
src/kernel/activity/MessImpl.cpp
src/kernel/activity/MessImpl.hpp

index 9f77c7d..28679c2 100644 (file)
@@ -47,8 +47,12 @@ MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
 
 MessImpl* MessImpl::start()
 {
-  if (get_state() == State::READY)
-    set_state(State::DONE);
+  if (get_state() == State::READY) {
+    XBT_DEBUG("Starting message exchange %p from '%s' to '%s' (state: %s)", this, src_actor_->get_host()->get_cname(),
+              dst_actor_->get_host()->get_cname(), get_state_str());
+    set_state(State::RUNNING);
+    finish();
+  }
   return this;
 }
 
@@ -87,20 +91,20 @@ ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
 
 ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
 {
-  MessImplPtr this_synchro(new MessImpl());
-  this_synchro->set_type(MessImplType::GET);
+  MessImplPtr this_mess(new MessImpl());
+  this_mess->set_type(MessImplType::GET);
 
   auto* queue = observer->get_queue();
-  XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get());
+  XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_mess.get());
 
   MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
 
   if (other_mess == nullptr) {
     XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
-    other_mess = std::move(this_synchro);
+    other_mess = std::move(this_mess);
     queue->push(other_mess);
   } else {
-    XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get());
+    XBT_DEBUG("Match my %p with the existing %p", this_mess.get(), other_mess.get());
 
     other_mess->set_state(State::READY);
   }
@@ -117,6 +121,15 @@ ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
   return other_mess;
 }
 
+void MessImpl::wait_for(actor::ActorImpl* issuer, double timeout)
+{
+  XBT_DEBUG("MessImpl::wait_for(%g), %p, state %s", timeout, this, get_state_str());
+
+  /* Associate this simcall to the wait synchro */
+  register_simcall(&issuer->simcall_);
+  ActivityImpl::wait_for(issuer, timeout);
+}
+
 void MessImpl::finish()
 {
   XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
index 1232c61..fad011c 100644 (file)
@@ -36,6 +36,8 @@ public:
   static ActivityImplPtr iput(actor::MessIputSimcall* observer);
   static ActivityImplPtr iget(actor::MessIgetSimcall* observer);
 
+  void wait_for(actor::ActorImpl* issuer, double timeout) override;
+
   MessImpl* start();
   void set_exception(actor::ActorImpl* issuer) override {};
   void finish() override;