Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Handle onesided=detached comms and clean up the comm_fault_scenario test
authorFabien Chaix <chaix@ics.forth.gr>
Thu, 19 May 2022 07:28:39 +0000 (10:28 +0300)
committerFabien Chaix <chaix@ics.forth.gr>
Thu, 19 May 2022 07:28:39 +0000 (10:28 +0300)
teshsuite/s4u/comm-fault-scenarios/comm-fault-scenarios.cpp

index e43b1b1..cceb567 100644 (file)
@@ -3,8 +3,18 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-/* This example shows how to simulate a non-linear resource sharing for
- * network links.
+/* This example validates the behaviour in presence of node and link fault.
+ * Each test scenario consists in one host/actor (named sender) sending one message to another host/actor.
+ * The space to cover is quite large, since we consider:
+ * * communication types (eager, rendez-vous, one-sided=detached) 
+ * * use type (synchronous, asynchronous, init)
+ * * fault type (sender node, link, receiver node)
+ * * any legal permutation of the scenario steps
+ *
+ * This program also presents a way to simulate applications that are resilient to links and node faults. 
+ * Essentially, it catches exceptions related to communications and it clears the mailboxes when one of the nodes gets turned off. 
+ * However, this model would suppose that there would be 2 mailboxes for each pair of nodes, which is probably unacceptable.
+ * 
  */
 
 #include <algorithm>
@@ -15,9 +25,6 @@
 #include <time.h>
 #include <vector>
 
-
-//#include "../../src/kernel/activity/ActivityImpl.hpp"
-
 namespace sg4 = simgrid::s4u;
 namespace pr  = simgrid::kernel::profile;
 
@@ -26,13 +33,11 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(comm_fault_scenarios, "Messages specific for this s
 /*************************************************************************************************/
 
 // Constants for platform configuration
-
 constexpr double HostComputePower = 1e9;  // FLOPs
 constexpr double LinkBandwidth    = 1e9;  // Bytes/second
 constexpr double LinkLatency      = 1e-6; // Seconds
 
 // Constants for application behaviour
-
 constexpr uint64_t MsgSize = LinkBandwidth / 2;
 
 /*************************************************************************************************/
@@ -45,15 +50,14 @@ enum class CommType {
   RDV_ASYNC,
   RDV_INIT,
   ONESIDE_SYNC,
-  ONESIDE_ASYNC,
-  ONESIDE_INIT
+  ONESIDE_ASYNC
+  //ONESIDE_INIT is equivalent to ONESIDE_ASYNC
 };
 
 enum class Action { SLEEP, PUT, GET, START, WAIT, DIE, END };
 
 static const char* to_string(const Action x)
 {
-
   switch (x) {
     case Action::END:
       return "Success";
@@ -93,7 +97,6 @@ struct Scenario {
 
 static std::string to_string(const Scenario& s)
 {
-
   std::stringstream ss;
   ss <<"#"<< s.index << "[" << s.start_time << "s," << s.start_time + s.duration << "s[: (";
   switch (s.type) {
@@ -121,10 +124,8 @@ static std::string to_string(const Scenario& s)
     case CommType::ONESIDE_ASYNC:
       ss << "ONESIDE_ASYNC";
       break;
-    case CommType::ONESIDE_INIT:
-      ss << "ONESIDE_INIT";
-      break;
   }
+
   ss << ") Expected: S:" << to_string(s.snd_expected) << " R:" << to_string(s.rcv_expected) << " Steps: ";
   for (const Step& step : s.steps) {
     ss << "+" << step.rel_time << "s:";
@@ -151,12 +152,10 @@ static std::string to_string(const Scenario& s)
     }
     ss << " ";
   }
-
   return ss.str().c_str();
 }
 
 std::vector<Scenario> scenarios;
-
 sg4::Mailbox* mbox_eager = nullptr;
 sg4::Mailbox* mbox_rdv   = nullptr;
 
@@ -164,7 +163,6 @@ class SendAgent {
 
   static int run;
   static size_t scenario;
-
   int id;
   sg4::Host* other_host;
 
@@ -190,9 +188,6 @@ class SendAgent {
         return nullptr;
       case CommType::ONESIDE_ASYNC:
         return sg4::Comm::sendto_async(sg4::this_actor::get_host(), other_host, MsgSize);
-      case CommType::ONESIDE_INIT:
-        return sg4::Comm::sendto_init()->set_payload_size(MsgSize);
-        // FIXME: how to set hosts? sg4::this_actor::get_host(),other_host
     }
     return nullptr;
   }
@@ -203,14 +198,17 @@ class SendAgent {
      XBT_DEBUG("Will try: %s", scenario_string.c_str());
     double send_value;
     sg4::CommPtr comm = nullptr;
-    // CommType type=s.type;
     Action expected = s.snd_expected;
     double end_time = s.start_time + s.duration;
-    // double curr_rel_time=0;
     send_value        = end_time;
     size_t step_index = 0;
 
     sg4::this_actor::sleep_until(s.start_time);
+
+    //Make sure we have a clean slate
+    xbt_assert(not mbox_eager->listen(),"Eager mailbox should be empty when starting a test");
+    xbt_assert(not mbox_rdv->listen(),"RDV mailbox should be empty when starting a test");
+
     for (; step_index < s.steps.size(); step_index++) {
       const Step& step = s.steps[step_index];
       if (step.entity != Step::SND || step.type != Step::ACTION)
@@ -247,8 +245,6 @@ class SendAgent {
                   typeid(e).name(), e.what());
         break;
       }
-
-      //xbt_assert(comm->get_impl()->get_refcount()==1);
     }
 
     try {
@@ -257,7 +253,7 @@ class SendAgent {
       XBT_DEBUG("During Sleep, failed to send message because of a %s exception (%s)", typeid(e).name(), e.what());
     }
 
-    Action outcome              = Action::END;
+    Action outcome = Action::END;
     if (step_index < s.steps.size()) {
       const Step& step = s.steps[step_index];
       assert(step.entity == Step::SND && step.type == Step::ACTION);
@@ -300,7 +296,6 @@ class ReceiveAgent {
 
   static int run;
   static size_t scenario;
-
   int id;
   sg4::Host* other_host;
 
@@ -314,7 +309,6 @@ class ReceiveAgent {
         return mbox_eager->get_async(&receive_ptr);
       case CommType::EAGER_INIT:
         return mbox_eager->get_init()->set_dst_data((void**)(&receive_ptr));
-
       case CommType::RDV_SYNC:
         receive_ptr = mbox_rdv->get<double>();
         return nullptr;
@@ -322,10 +316,8 @@ class ReceiveAgent {
         return mbox_rdv->get_async(&receive_ptr);
       case CommType::RDV_INIT:
         return mbox_rdv->get_init()->set_dst_data((void**)(&receive_ptr));
-
       case CommType::ONESIDE_SYNC:
       case CommType::ONESIDE_ASYNC:
-      case CommType::ONESIDE_INIT:
         xbt_die("No get in One Sided comunications!");
     }
     return nullptr;
@@ -337,10 +329,14 @@ class ReceiveAgent {
     CommType type     = s.type;
     Action expected   = s.rcv_expected;
     double end_time   = s.start_time + s.duration;
-    // double curr_rel_time=0;
     double* receive_ptr = nullptr;
     size_t step_index   = 0;
     sg4::this_actor::sleep_until(s.start_time);
+
+    //Make sure we have a clean slate
+    xbt_assert(not mbox_eager->listen(),"Eager mailbox should be empty when starting a test");
+    xbt_assert(not mbox_rdv->listen(),"RDV mailbox should be empty when starting a test");
+
     for (; step_index < s.steps.size(); step_index++) {
       const Step& step = s.steps[step_index];
       if (step.entity != Step::RCV || step.type != Step::ACTION)
@@ -391,11 +387,16 @@ class ReceiveAgent {
       const Step& step = s.steps[step_index];
       assert(step.entity == Step::RCV && step.type == Step::ACTION);
       outcome = step.action_type;
-    } else if (receive_ptr == nullptr) {
-      XBT_ERROR("Received address is NULL in %s", scenario_string.c_str());
-    } else if (*receive_ptr != end_time) {
-      XBT_ERROR("Received value invalid: expected %f but got %f in %s", end_time, *receive_ptr,
+    } else if (s.type!=CommType::ONESIDE_SYNC && 
+          s.type!=CommType::ONESIDE_ASYNC
+          ) {
+            //One sided / detached operations do not actually transfer anything
+            if(receive_ptr == nullptr ) {
+              XBT_ERROR("Received address is NULL in %s", scenario_string.c_str());
+            } else if (*receive_ptr != end_time) {
+              XBT_ERROR("Received value invalid: expected %f but got %f in %s", end_time, *receive_ptr,
                 scenario_string.c_str());
+          }
     }
 
     if (outcome != expected) {
@@ -403,11 +404,10 @@ class ReceiveAgent {
     } else {
       XBT_DEBUG("OK: %s", scenario_string.c_str());
     }
-    sg4::this_actor::sleep_until(end_time);
 
+    sg4::this_actor::sleep_until(end_time);
     xbt_assert(not mbox_eager->listen(), "Mailbox should not have ongoing communication!");
     xbt_assert(not mbox_rdv->listen(), "Mailbox should not have ongoing communication!");
-
   }
 
 public:
@@ -434,7 +434,7 @@ size_t ReceiveAgent::scenario = 0;
 static void on_host_state_change(sg4::Host const& host)
 {
   XBT_DEBUG("Host %s is now %s", host.get_cname(), host.is_on() ? "ON " : "OFF");
-  if(host.is_on()) {
+  if(not host.is_on()) {
     mbox_eager->clear();
     mbox_rdv->clear();
   }
@@ -456,7 +456,6 @@ static void addStateEvent(std::ostream& out, double date, bool isOn)
 static int prepareScenario(CommType type, int& index, double& start_time, double duration, std::ostream& sndP, std::ostream& rcvP,
                             std::ostream& lnkP, Action snd_expected, Action rcv_expected, std::vector<Step> steps, std::vector<int> active_indices)
 {
-
   int ret=0;
   if(std::find(active_indices.begin(),active_indices.end(),index)!=active_indices.end()) {
     // Update fault profiles
@@ -529,33 +528,29 @@ int main(int argc, char* argv[])
 
   mbox_eager = e.mailbox_by_name_or_create("eager");
   mbox_rdv   = e.mailbox_by_name_or_create("rdv");
-
   sg4::NetZone* zone = sg4::create_full_zone("Top");
-
   pr::Profile* profile_sender = pr::ProfileBuilder::from_string("sender_profile", SndStateProfile, 0);
   sg4::Host* sender_host = zone->create_host("senderHost", HostComputePower)->set_state_profile(profile_sender)->seal();
   pr::Profile* profile_receiver = pr::ProfileBuilder::from_string("receiver_profile", RcvStateProfile, 0);
   sg4::Host* receiver_host = zone->create_host("receiverHost", HostComputePower)->set_state_profile(profile_receiver)->seal();
-
   sg4::ActorPtr sender = sg4::Actor::create("sender", sender_host, SendAgent(0, receiver_host));
   sender->set_auto_restart(true);
-
   sg4::ActorPtr receiver = sg4::Actor::create("receiver", receiver_host, ReceiveAgent(1, sender_host));
   receiver->set_auto_restart(true);
-
   pr::Profile* profile_link = pr::ProfileBuilder::from_string("link_profile", LnkStateProfile, 0);
   sg4::Link* link =
       zone->create_link("link", LinkBandwidth)->set_latency(LinkLatency)->set_state_profile(profile_link)->seal();
-
   zone->add_route(sender_host->get_netpoint(), receiver_host->get_netpoint(), nullptr, nullptr,
                   {sg4::LinkInRoute{link}}, false);
   zone->seal();
-
   sg4::Host::on_state_change.connect(on_host_state_change);
   sg4::Link::on_state_change_cb(on_link_state_change);
 
   e.run_until(end_time);
 
+  //Make sure we have a clean slate
+  xbt_assert(not mbox_eager->listen(),"Eager mailbox should be empty in the end");
+  xbt_assert(not mbox_rdv->listen(),"RDV mailbox should be empty in the end");
   return 0;
 }
 
@@ -709,7 +704,7 @@ double build_scenarios(std::vector<int>& active_indices )
   
   // Receiver off
   _MK(RDV_SYNC, 2, PUT, DIE, ROFF(.1), SPUT(.2), RON(1));
-  _MK(RDV_SYNC, 2, PUT, DIE, SPUT(.2), ROFF(.3), RON(1));
+  _MK(RDV_SYNC, 2, PUT, DIE, SPUT(.2), ROFF(.3), RON(1)); //Fails because put comm cancellation does not trigger sender exception
   _MK(RDV_SYNC, 2, PUT, DIE, SPUT(.2), RGET(.4), ROFF(.5), RON(1));
   _MK(RDV_SYNC, 2, PUT, DIE, RGET(.2), SPUT(.4), ROFF(.5), RON(1));
   // Sender off
@@ -734,7 +729,7 @@ double build_scenarios(std::vector<int>& active_indices )
   _MK(RDV_ASYNC, 2, PUT, DIE, ROFF(.1), SPUT(.2), SWAT(.4), RON(1));
   _MK(RDV_ASYNC, 2, WAIT, DIE, SPUT(.2), ROFF(.3), SWAT(.4), RON(1));
   _MK(RDV_ASYNC, 2, PUT, DIE, RGET(.2), ROFF(.3), SPUT(.4), SWAT(.6), RON(1));
-  _MK(RDV_ASYNC, 2, WAIT, DIE, SPUT(.2), SWAT(.4), ROFF(.5), RON(1));
+  _MK(RDV_ASYNC, 2, WAIT, DIE, SPUT(.2), SWAT(.4), ROFF(.5), RON(1)); //Fails because put comm cancellation does not trigger sender exception
   _MK(RDV_ASYNC, 2, WAIT, DIE, SPUT(.2), RGET(.4), ROFF(.5), SWAT(.6), RON(1));
   _MK(RDV_ASYNC, 2, WAIT, DIE, RGET(.2), SPUT(.4), ROFF(.5), SWAT(.6), RON(1));
   _MK(RDV_ASYNC, 2, PUT , DIE, RGET(.2), RWAT(.4), ROFF(.5), SPUT(.6), SWAT(.8), RON(1));
@@ -788,9 +783,11 @@ double build_scenarios(std::vector<int>& active_indices )
   // Receiver off
   _MK(ONESIDE_SYNC, 2, PUT, DIE, ROFF(.1), SPUT(.2), RON(1));
   _MK(ONESIDE_SYNC, 2, PUT, DIE, SPUT(.2), ROFF(.3), RON(1));
+  // Sender off
+  _MK(ONESIDE_SYNC, 2, DIE, END, SPUT(.2), SOFF(.3), SON(1));
   // Link off
-  _MK(ONESIDE_SYNC, 2, PUT, GET, LOFF(.1), SPUT(.2), LON(1));
-  _MK(ONESIDE_SYNC, 2, PUT, GET, SPUT(.2), LOFF(.3), LON(1));
+  _MK(ONESIDE_SYNC, 2, PUT, END, LOFF(.1), SPUT(.2), LON(1));
+  _MK(ONESIDE_SYNC, 2, PUT, END, SPUT(.2), LOFF(.3), LON(1));
 
   // ONESIDE ASYNC use cases
   // All good
@@ -799,10 +796,12 @@ double build_scenarios(std::vector<int>& active_indices )
   _MK(ONESIDE_ASYNC, 2, PUT, DIE, ROFF(.1), SPUT(.2), SWAT(.4), RON(1));
   _MK(ONESIDE_ASYNC, 2, WAIT, DIE, SPUT(.2), ROFF(.3), SWAT(.4), RON(1));
   _MK(ONESIDE_ASYNC, 2, WAIT, DIE, SPUT(.2), SWAT(.4), ROFF(.5), RON(1));
+  // Sender off
+  _MK(ONESIDE_ASYNC, 2, DIE, END, SPUT(.2), SOFF(.3), SON(1));
   // Link off
-  _MK(ONESIDE_ASYNC, 2, WAIT, WAIT, LOFF(.1), SPUT(.2), SWAT(.4), LON(1));
-  _MK(ONESIDE_ASYNC, 2, WAIT, WAIT, SPUT(.2), LOFF(.3), SWAT(.4), LON(1));
-  _MK(ONESIDE_ASYNC, 2, WAIT, WAIT, SPUT(.2), SWAT(.4), LOFF(.5), LON(1));
+  _MK(ONESIDE_ASYNC, 2, WAIT, END, LOFF(.1), SPUT(.2), SWAT(.4), LON(1));
+  _MK(ONESIDE_ASYNC, 2, WAIT, END, SPUT(.2), LOFF(.3), SWAT(.4), LON(1));
+  _MK(ONESIDE_ASYNC, 2, WAIT, END, SPUT(.2), SWAT(.4), LOFF(.5), LON(1));
 
 
   SndStateProfile = sndP.str();