Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[s4u] Allocate ConditionVariable on the heap and return ConditionVariablePtr
[simgrid.git] / examples / s4u / actions-comm / s4u_actions-comm.cpp
index cc77351..51c89d3 100644 (file)
@@ -3,6 +3,8 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <mutex>
+
 #include "simgrid/msg.h"
 #include "simgrid/simix.h"      /* semaphores for the barrier */
 #include <xbt/replay.h>
@@ -50,15 +52,17 @@ static void log_action(const char *const *action, double date)
   }
 }
 
-static void asynchronous_cleanup(void)
+static void asynchronous_cleanup()
 {
-  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+  process_globals_t globals = static_cast<process_globals_t>( MSG_process_get_data(MSG_process_self()) );
 
   /* Destroy any isend which correspond to completed communications */
-  int found;
   msg_comm_t comm;
-  while ((found = MSG_comm_testany(globals->isends)) != -1) {
-    xbt_dynar_remove_at(globals->isends, found, &comm);
+  while (true) {
+    int pos_found = MSG_comm_testany(globals->isends);
+    if (pos_found == -1) /* none remaining */
+      break;
+    xbt_dynar_remove_at(globals->isends, pos_found, &comm);
     MSG_comm_destroy(comm);
   }
 }
@@ -89,7 +93,7 @@ static void action_Isend(const char *const *action)
   char to[250];
   const char *size = action[3];
   double clock = MSG_get_clock();
-  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+  process_globals_t globals = static_cast<process_globals_t>( MSG_process_get_data(MSG_process_self()) );
 
   snprintf(to,249, "%s_%s", MSG_process_get_name(MSG_process_self()), action[2]);
   msg_comm_t comm = MSG_task_isend(MSG_task_create(to, 0, parse_double(size), NULL), to);
@@ -122,7 +126,7 @@ static void action_Irecv(const char *const *action)
 {
   char mailbox[250];
   double clock = MSG_get_clock();
-  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+  process_globals_t globals = static_cast<process_globals_t>( MSG_process_get_data(MSG_process_self()) );
 
   XBT_DEBUG("Irecv on %s", MSG_process_get_name(MSG_process_self()));
 
@@ -141,7 +145,7 @@ static void action_wait(const char *const *action)
   msg_task_t task = NULL;
   msg_comm_t comm;
   double clock = MSG_get_clock();
-  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+  process_globals_t globals = static_cast<process_globals_t>( MSG_process_get_data(MSG_process_self()) );
 
   xbt_assert(xbt_dynar_length(globals->irecvs), "action wait not preceded by any irecv: %s",
              xbt_str_join_array(action, " "));
@@ -159,33 +163,33 @@ static void action_wait(const char *const *action)
 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
 static void action_barrier(const char *const *action)
 {
-  // static smx_mutex_t mutex = NULL;
-  // static smx_cond_t cond = NULL;
-  static simgrid::s4u::Mutex *mutex = NULL;
-  static simgrid::s4u::ConditionVariable *cond = NULL;
+  static simgrid::s4u::MutexPtr mutex = nullptr;
+  static simgrid::s4u::ConditionVariablePtr cond = nullptr;
   static int processes_arrived_sofar = 0;
-  if (mutex == NULL) {          // first arriving on the barrier
-    mutex = new simgrid::s4u::Mutex();
-    cond = new simgrid::s4u::ConditionVariable();
+  if (mutex == nullptr) {          // first arriving on the barrier
+    mutex = simgrid::s4u::Mutex::createMutex();
+    cond = simgrid::s4u::ConditionVariable::createConditionVariable();
     processes_arrived_sofar = 0;
   }
   ACT_DEBUG("Entering barrier: %s (%d already there)", NAME, processes_arrived_sofar);
-  mutex->lock();
-  if (++processes_arrived_sofar == communicator_size) {
-    cond->notify_all();
-    mutex->unlock();
-  } else {
-    cond->wait(mutex);
-    mutex->unlock();
+
+  {
+    std::unique_lock<simgrid::s4u::Mutex> lock(*mutex);
+    if (++processes_arrived_sofar == communicator_size) {
+      // We can notify without the lock:
+      lock.unlock();
+      cond->notify_all();
+    } else {
+      cond->wait(lock);
+    }
   }
 
   ACT_DEBUG("Exiting barrier: %s", NAME);
 
   processes_arrived_sofar--;
   if (processes_arrived_sofar<=0) {
-    delete cond;
-    delete mutex;
-    mutex = NULL;
+    cond = nullptr;
+    mutex = nullptr;
   }
 }
 
@@ -196,7 +200,7 @@ static void action_bcast(const char *const *action)
   msg_task_t task = NULL;
   double clock = MSG_get_clock();
 
-  process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
+  process_globals_t counters = static_cast<process_globals_t>( MSG_process_get_data(MSG_process_self()) );
 
   xbt_assert(communicator_size, "Size of Communicator is not defined, can't use collective operations");
 
@@ -254,7 +258,7 @@ static void action_compute(const char *const *action)
 static void action_init(const char *const *action)
 {
   XBT_DEBUG("Initialize the counters");
-  process_globals_t globals = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
+  process_globals_t globals = static_cast<process_globals_t>( calloc(1, sizeof(s_process_globals_t)) );
   globals->isends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
   globals->irecvs = xbt_dynar_new(sizeof(msg_comm_t), NULL);
   globals->tasks = xbt_dynar_new(sizeof(msg_task_t), NULL);
@@ -263,7 +267,7 @@ static void action_init(const char *const *action)
 
 static void action_finalize(const char *const *action)
 {
-  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+  process_globals_t globals = static_cast<process_globals_t>( MSG_process_get_data(MSG_process_self()) );
   if (globals) {
     asynchronous_cleanup();
     xbt_dynar_free_container(&(globals->isends));