Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add MessageQueue to all ActivitySet examples
authorFred Suter <suterf@ornl.gov>
Mon, 30 Oct 2023 14:04:20 +0000 (10:04 -0400)
committerFred Suter <suterf@ornl.gov>
Mon, 30 Oct 2023 14:04:20 +0000 (10:04 -0400)
examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp
examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp
examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh
examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp
examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh

index efbc3a0..2c9cf1a 100644 (file)
@@ -9,28 +9,30 @@
 #include <string>
 namespace sg4 = simgrid::s4u;
 
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitall, "Messages specific for this s4u example");
 
 static void bob()
 {
   sg4::Mailbox* mbox    = sg4::Mailbox::by_name("mbox");
+  sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("mqueue");
   const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
   std::string* payload;
+  std::string* message;
 
   XBT_INFO("Create my asynchronous activities");
   auto exec = sg4::this_actor::exec_async(5e9);
   auto comm = mbox->get_async(&payload);
   auto io   = disk->read_async(3e8);
+  auto mess = mqueue->get_async(&message);
 
-  sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
-                                       boost::dynamic_pointer_cast<sg4::Activity>(comm),
-                                       boost::dynamic_pointer_cast<sg4::Activity>(io)});
+  sg4::ActivitySet pending_activities({exec, comm, io, mess});
 
   XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
   pending_activities.wait_all();
 
   XBT_INFO("All activities are completed.");
   delete payload;
+  delete message;
 }
 
 static void alice()
@@ -40,6 +42,12 @@ static void alice()
   sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
 }
 
+static void carl()
+{
+  auto* payload = new std::string("Control Message");
+  sg4::MessageQueue::by_name("mqueue")->put(payload);
+}
+
 int main(int argc, char* argv[])
 {
   sg4::Engine e(&argc, argv);
@@ -48,6 +56,7 @@ int main(int argc, char* argv[])
 
   sg4::Actor::create("bob", e.host_by_name("bob"), bob);
   sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+  sg4::Actor::create("carl", e.host_by_name("carl"), carl);
 
   e.run();
 
index 971d774..a322cc4 100644 (file)
@@ -9,20 +9,23 @@
 #include <string>
 namespace sg4 = simgrid::s4u;
 
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitallfor, "Messages specific for this s4u example");
 
 static void bob()
 {
   sg4::Mailbox* mbox    = sg4::Mailbox::by_name("mbox");
+  sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("mqueue");
   const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
   std::string* payload;
+  std::string* message;
 
   XBT_INFO("Create my asynchronous activities");
   auto exec = sg4::this_actor::exec_async(5e9);
   auto comm = mbox->get_async(&payload);
   auto io   = disk->read_async(3e8);
+  auto mess = mqueue->get_async(&message);
 
-  sg4::ActivitySet pending_activities({exec, comm, io});
+  sg4::ActivitySet pending_activities({exec, comm, io, mess});
 
   XBT_INFO("Wait for asynchronous activities to complete");
   while (not pending_activities.empty()) {
@@ -34,6 +37,8 @@ static void bob()
     while (auto completed_one = pending_activities.test_any()) {
       if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
         XBT_INFO("Completed a Comm");
+      if (boost::dynamic_pointer_cast<sg4::Mess>(completed_one))
+        XBT_INFO("Completed a Mess");
       if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
         XBT_INFO("Completed an Exec");
       if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
@@ -42,6 +47,7 @@ static void bob()
   }
   XBT_INFO("Last activity is complete");
   delete payload;
+  delete message;
 }
 
 static void alice()
@@ -51,6 +57,14 @@ static void alice()
   sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
 }
 
+static void carl()
+{
+  sg4::this_actor::sleep_for(1.99);
+  auto* payload = new std::string("Control Message");
+  XBT_INFO("Send '%s'", payload->c_str());
+  sg4::MessageQueue::by_name("mqueue")->put(payload);
+}
+
 int main(int argc, char* argv[])
 {
   sg4::Engine e(&argc, argv);
@@ -59,6 +73,7 @@ int main(int argc, char* argv[])
 
   sg4::Actor::create("bob", e.host_by_name("bob"), bob);
   sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+  sg4::Actor::create("carl", e.host_by_name("carl"), carl);
 
   e.run();
 
index 014c4f0..ce8db4a 100644 (file)
@@ -5,7 +5,9 @@ $ ${bindir:=.}/s4u-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--lo
 > [0.000000] [  bob] Create my asynchronous activities
 > [0.000000] [  bob] Wait for asynchronous activities to complete
 > [1.000000] [  bob] Not all activities are terminated yet.
+> [1.990000] [ carl] Send 'Control Message'
 > [2.000000] [  bob] Not all activities are terminated yet.
+> [2.000000] [  bob] Completed a Mess
 > [3.000000] [  bob] Not all activities are terminated yet.
 > [3.000000] [  bob] Completed an I/O
 > [4.000000] [  bob] Not all activities are terminated yet.
index 6f081b3..6c8baef 100644 (file)
@@ -9,22 +9,23 @@
 #include <string>
 namespace sg4 = simgrid::s4u;
 
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitany, "Messages specific for this s4u example");
 
 static void bob()
 {
   sg4::Mailbox* mbox    = sg4::Mailbox::by_name("mbox");
+  sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("mqueue");
   const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
   std::string* payload;
+  std::string* message;
 
   XBT_INFO("Create my asynchronous activities");
   auto exec = sg4::this_actor::exec_async(5e9);
   auto comm = mbox->get_async(&payload);
   auto io   = disk->read_async(3e8);
+  auto mess = mqueue->get_async(&message);
 
-  sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
-                                       boost::dynamic_pointer_cast<sg4::Activity>(comm),
-                                       boost::dynamic_pointer_cast<sg4::Activity>(io)});
+  sg4::ActivitySet pending_activities({exec, comm, io, mess});
 
   XBT_INFO("Wait for asynchronous activities to complete");
   while (not pending_activities.empty()) {
@@ -32,6 +33,8 @@ static void bob()
     if (completed_one != nullptr) {
       if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
         XBT_INFO("Completed a Comm");
+      if (boost::dynamic_pointer_cast<sg4::Mess>(completed_one))
+        XBT_INFO("Completed a Mess");
       if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
         XBT_INFO("Completed an Exec");
       if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
@@ -40,6 +43,7 @@ static void bob()
   }
   XBT_INFO("Last activity is complete");
   delete payload;
+  delete message;
 }
 
 static void alice()
@@ -49,6 +53,14 @@ static void alice()
   sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
 }
 
+static void carl()
+{
+  sg4::this_actor::sleep_for(2);
+  auto* payload = new std::string("Control Message");
+  XBT_INFO("Send '%s'", payload->c_str());
+  sg4::MessageQueue::by_name("mqueue")->put(payload);
+}
+
 int main(int argc, char* argv[])
 {
   sg4::Engine e(&argc, argv);
@@ -57,6 +69,7 @@ int main(int argc, char* argv[])
 
   sg4::Actor::create("bob", e.host_by_name("bob"), bob);
   sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+  sg4::Actor::create("carl", e.host_by_name("carl"), carl);
 
   e.run();
 
index b9ecf1e..e665de5 100644 (file)
@@ -4,6 +4,8 @@ $ ${bindir:=.}/s4u-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=r
 > [0.000000] [alice] Send 'Message'
 > [0.000000] [  bob] Create my asynchronous activities
 > [0.000000] [  bob] Wait for asynchronous activities to complete
+> [2.000000] [ carl] Send 'Control Message'
+> [2.000000] [  bob] Completed a Mess
 > [3.000000] [  bob] Completed an I/O
 > [5.000000] [  bob] Completed an Exec
 > [5.197828] [  bob] Completed a Comm