Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 5 Jul 2018 13:08:51 +0000 (15:08 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 5 Jul 2018 13:08:51 +0000 (15:08 +0200)
13 files changed:
include/simgrid/barrier.h [new file with mode: 0644]
include/simgrid/forward.h
include/simgrid/msg.h
src/msg/msg_legacy.cpp
src/msg/msg_synchro.cpp
src/s4u/s4u_Barrier.cpp
src/smpi/include/private.hpp
src/smpi/include/smpi_process.hpp
src/smpi/include/smpi_win.hpp
src/smpi/internals/smpi_deployment.cpp
src/smpi/internals/smpi_process.cpp
src/smpi/mpi/smpi_win.cpp
tools/cmake/DefinePackages.cmake

diff --git a/include/simgrid/barrier.h b/include/simgrid/barrier.h
new file mode 100644 (file)
index 0000000..72f990e
--- /dev/null
@@ -0,0 +1,22 @@
+/* Public interface to the Link datatype                                    */
+
+/* Copyright (c) 2018. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef INCLUDE_SIMGRID_BARRIER_H_
+#define INCLUDE_SIMGRID_BARRIER_H_
+
+#include <simgrid/forward.h>
+
+/* C interface */
+SG_BEGIN_DECL()
+
+XBT_PUBLIC sg_bar_t sg_barrier_init(unsigned int count);
+XBT_PUBLIC void sg_barrier_destroy(sg_bar_t bar);
+XBT_PUBLIC int sg_barrier_wait(sg_bar_t bar);
+
+SG_END_DECL()
+
+#endif /* INCLUDE_SIMGRID_BARRIER_H_ */
index 4ce7e57..601cb74 100644 (file)
@@ -20,6 +20,7 @@ class Actor;
 using ActorPtr = boost::intrusive_ptr<Actor>;
 XBT_PUBLIC void intrusive_ptr_release(Actor* actor);
 XBT_PUBLIC void intrusive_ptr_add_ref(Actor* actor);
+class Barrier;
 class Comm;
 using CommPtr = boost::intrusive_ptr<Comm>;
 XBT_PUBLIC void intrusive_ptr_release(Comm* c);
@@ -128,6 +129,7 @@ class VirtualMachineImpl;
 } // namespace simgrid
 
 typedef simgrid::s4u::Actor s4u_Actor;
+typedef simgrid::s4u::Barrier s4u_Barrier;
 typedef simgrid::s4u::Host s4u_Host;
 typedef simgrid::s4u::Link s4u_Link;
 typedef simgrid::s4u::File s4u_File;
@@ -147,6 +149,7 @@ typedef simgrid::surf::StorageImpl* surf_storage_t;
 #else
 
 typedef struct s4u_Actor s4u_Actor;
+typedef struct s4u_Barrier s4u_Barrier;
 typedef struct s4u_Host s4u_Host;
 typedef struct s4u_Link s4u_Link;
 typedef struct s4u_File s4u_File;
@@ -164,6 +167,7 @@ typedef struct s_surf_storage* surf_storage_t;
 
 #endif
 
+typedef s4u_Barrier* sg_bar_t;
 typedef s4u_NetZone* sg_netzone_t;
 typedef s4u_Host* sg_host_t;
 typedef s4u_Link* sg_link_t;
index b562b47..ea93d1b 100644 (file)
@@ -7,6 +7,7 @@
 #define SIMGRID_MSG_H
 
 #include <simgrid/actor.h>
+#include <simgrid/barrier.h>
 #include <simgrid/engine.h>
 #include <simgrid/forward.h>
 #include <simgrid/host.h>
@@ -354,13 +355,7 @@ XBT_PUBLIC int MSG_sem_get_capacity(msg_sem_t sem);
 XBT_PUBLIC void MSG_sem_destroy(msg_sem_t sem);
 XBT_PUBLIC int MSG_sem_would_block(msg_sem_t sem);
 
-/** @brief Opaque type representing a barrier identifier
- *  @ingroup msg_synchro
- *  @hideinitializer
- */
-
-#define MSG_BARRIER_SERIAL_PROCESS -1
-typedef struct s_msg_bar_t* msg_bar_t;
+typedef sg_bar_t msg_bar_t;
 XBT_PUBLIC msg_bar_t MSG_barrier_init(unsigned int count);
 XBT_PUBLIC void MSG_barrier_destroy(msg_bar_t bar);
 XBT_PUBLIC int MSG_barrier_wait(msg_bar_t bar);
index bb1148f..fc75415 100644 (file)
@@ -358,3 +358,18 @@ void MSG_vm_destroy(sg_vm_t vm)
 {
   sg_vm_destroy(vm);
 }
+/********* barriers ************/
+sg_bar_t MSG_barrier_init(unsigned int count)
+{
+  return sg_barrier_init(count);
+}
+
+void MSG_barrier_destroy(sg_bar_t bar)
+{
+  sg_barrier_destroy(bar);
+}
+
+int MSG_barrier_wait(sg_bar_t bar)
+{
+  return sg_barrier_wait(bar);
+}
index bce88e5..99290b1 100644 (file)
@@ -54,45 +54,4 @@ int MSG_sem_would_block(msg_sem_t sem) {
   return simgrid::simix::simcall([sem] { return SIMIX_sem_would_block(sem); });
 }
 
-/*-**** barrier related functions ****-*/
-struct s_msg_bar_t {
-  xbt_mutex_t mutex;
-  xbt_cond_t cond;
-  unsigned int arrived_processes;
-  unsigned int expected_processes;
-};
-
-/** @brief Initializes a barrier, with count elements */
-msg_bar_t MSG_barrier_init(unsigned int count) {
-  msg_bar_t bar           = new s_msg_bar_t;
-  bar->expected_processes = count;
-  bar->arrived_processes  = 0;
-  bar->mutex              = xbt_mutex_init();
-  bar->cond               = xbt_cond_init();
-  return bar;
-}
-
-/** @brief Initializes a barrier, with count elements */
-void MSG_barrier_destroy(msg_bar_t bar) {
-  xbt_mutex_destroy(bar->mutex);
-  xbt_cond_destroy(bar->cond);
-  delete bar;
-}
-
-/** @brief Performs a barrier already initialized */
-int MSG_barrier_wait(msg_bar_t bar) {
-  xbt_mutex_acquire(bar->mutex);
-  bar->arrived_processes++;
-  XBT_DEBUG("waiting %p %u/%u", bar, bar->arrived_processes, bar->expected_processes);
-  if (bar->arrived_processes == bar->expected_processes) {
-    xbt_cond_broadcast(bar->cond);
-    xbt_mutex_release(bar->mutex);
-    bar->arrived_processes = 0;
-    return MSG_BARRIER_SERIAL_PROCESS;
-  }
-
-  xbt_cond_wait(bar->cond, bar->mutex);
-  xbt_mutex_release(bar->mutex);
-  return 0;
-}
 /**@}*/
index 482a164..713245c 100644 (file)
@@ -9,6 +9,7 @@
 #include <xbt/ex.hpp>
 #include <xbt/log.hpp>
 
+#include "simgrid/barrier.h"
 #include "simgrid/s4u/Barrier.hpp"
 #include "simgrid/simix.h"
 
@@ -44,3 +45,22 @@ int Barrier::wait()
 }
 } // namespace s4u
 } // namespace simgrid
+
+/* **************************** Public C interface *************************** */
+
+sg_bar_t sg_barrier_init(unsigned int count)
+{
+  return new simgrid::s4u::Barrier(count);
+}
+
+/** @brief Initializes a barrier, with count elements */
+void sg_barrier_destroy(sg_bar_t bar)
+{
+  delete bar;
+}
+
+/** @brief Performs a barrier already initialized */
+int sg_barrier_wait(sg_bar_t bar)
+{
+  return bar->wait();
+}
index bac1653..502cee5 100644 (file)
@@ -6,7 +6,7 @@
 #ifndef SMPI_PRIVATE_HPP
 #define SMPI_PRIVATE_HPP
 
-#include "simgrid/msg.h" // msg_bar_t
+#include "simgrid/s4u/Barrier.hpp"
 #include "smpi/smpi.h"
 #include "smpi/smpi_helpers_internal.h"
 #include "src/instr/instr_smpi.hpp"
@@ -66,7 +66,7 @@ XBT_PRIVATE int smpi_process_count();
 XBT_PRIVATE void smpi_deployment_register_process(const std::string instance_id, int rank,
                                                   simgrid::s4u::ActorPtr actor);
 XBT_PRIVATE MPI_Comm* smpi_deployment_comm_world(const std::string instance_id);
-XBT_PRIVATE msg_bar_t smpi_deployment_finalization_barrier(const std::string instance_id);
+XBT_PRIVATE simgrid::s4u::Barrier* smpi_deployment_finalization_barrier(const std::string instance_id);
 XBT_PRIVATE void smpi_deployment_cleanup_instances();
 
 XBT_PRIVATE void smpi_comm_copy_buffer_callback(smx_activity_t comm, void* buff, size_t buff_size);
index e7c6772..eaa3ed1 100644 (file)
@@ -32,7 +32,7 @@ class Process {
     int sampling_                   = 0; /* inside an SMPI_SAMPLE_ block? */
     std::string instance_id_;
     bool replaying_                 = false; /* is the process replaying a trace */
-    msg_bar_t finalization_barrier_;
+    simgrid::s4u::Barrier* finalization_barrier_;
     smpi_trace_call_location_t trace_call_loc_;
     simgrid::s4u::ActorPtr actor_ = nullptr;
     smpi_privatization_region_t privatized_region_;
@@ -43,7 +43,7 @@ class Process {
     papi_counter_t papi_counter_data_;
 #endif
   public:
-    explicit Process(simgrid::s4u::ActorPtr actor, msg_bar_t barrier);
+    explicit Process(simgrid::s4u::ActorPtr actor, simgrid::s4u::Barrier* barrier);
     ~Process();
     void set_data(int* argc, char*** argv);
     void finalize();
index d29f96a..ec89c1d 100644 (file)
@@ -7,10 +7,10 @@
 #ifndef SMPI_WIN_HPP_INCLUDED
 #define SMPI_WIN_HPP_INCLUDED
 
+#include "simgrid/s4u/Barrier.hpp"
 #include "smpi_f2c.hpp"
 #include "smpi_keyvals.hpp"
 #include "xbt/synchro.h"
-#include <simgrid/msg.h>
 
 #include <vector>
 #include <list>
@@ -28,7 +28,7 @@ class Win : public F2C, public Keyval {
   MPI_Comm comm_;
   std::vector<MPI_Request> *requests_;
   xbt_mutex_t mut_;
-  msg_bar_t bar_;
+  simgrid::s4u::Barrier* bar_;
   MPI_Win* connected_wins_;
   char* name_;
   int opened_;
index 6c1c75c..ccf7148 100644 (file)
@@ -6,7 +6,6 @@
 
 #include "smpi_host.hpp"
 #include "private.hpp"
-#include "simgrid/msg.h" /* barrier */
 #include "simgrid/s4u/Engine.hpp"
 #include "smpi_comm.hpp"
 #include <map>
@@ -18,7 +17,7 @@ namespace app {
 class Instance {
 public:
   Instance(const std::string name, int max_no_processes, int process_count, MPI_Comm comm,
-           msg_bar_t finalization_barrier)
+           simgrid::s4u::Barrier* finalization_barrier)
       : name(name)
       , size(max_no_processes)
       , present_processes(0)
@@ -30,7 +29,7 @@ public:
   int size;
   int present_processes;
   MPI_Comm comm_world;
-  msg_bar_t finalization_barrier;
+  simgrid::s4u::Barrier* finalization_barrier;
 };
 }
 }
@@ -63,7 +62,8 @@ void SMPI_app_instance_register(const char *name, xbt_main_func_t code, int num_
     }
   }
 
-  Instance instance(std::string(name), num_processes, process_count, MPI_COMM_NULL, MSG_barrier_init(num_processes));
+  Instance instance(std::string(name), num_processes, process_count, MPI_COMM_NULL,
+                    new simgrid::s4u::Barrier(num_processes));
   MPI_Group group     = new simgrid::smpi::Group(instance.size);
   instance.comm_world = new simgrid::smpi::Comm(group, nullptr);
   MPI_Attr_put(instance.comm_world, MPI_UNIVERSE_SIZE, reinterpret_cast<void*>(instance.size));
@@ -90,7 +90,7 @@ MPI_Comm* smpi_deployment_comm_world(const std::string instance_id)
   return &instance.comm_world;
 }
 
-msg_bar_t smpi_deployment_finalization_barrier(const std::string instance_id)
+simgrid::s4u::Barrier* smpi_deployment_finalization_barrier(const std::string instance_id)
 {
   if (smpi_instances.empty()) { // no instance registered, we probably used smpirun.
     return nullptr;
@@ -102,7 +102,7 @@ msg_bar_t smpi_deployment_finalization_barrier(const std::string instance_id)
 void smpi_deployment_cleanup_instances(){
   for (auto const& item : smpi_instances) {
     Instance instance = item.second;
-    MSG_barrier_destroy(instance.finalization_barrier);
+    delete instance.finalization_barrier;
     simgrid::smpi::Comm::destroy(instance.comm_world);
   }
   smpi_instances.clear();
index 661968f..996fc71 100644 (file)
@@ -23,7 +23,7 @@ namespace smpi{
 using simgrid::s4u::Actor;
 using simgrid::s4u::ActorPtr;
 
-Process::Process(ActorPtr actor, msg_bar_t finalization_barrier)
+Process::Process(ActorPtr actor, simgrid::s4u::Barrier* finalization_barrier)
     : finalization_barrier_(finalization_barrier), actor_(actor)
 {
   mailbox_         = simgrid::s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
@@ -66,7 +66,7 @@ void Process::set_data(int* argc, char*** argv)
 {
   instance_id_      = std::string((*argv)[1]);
   comm_world_       = smpi_deployment_comm_world(instance_id_);
-  msg_bar_t barrier = smpi_deployment_finalization_barrier(instance_id_);
+  simgrid::s4u::Barrier* barrier = smpi_deployment_finalization_barrier(instance_id_);
   if (barrier != nullptr) // don't overwrite the current one if the instance has none
     finalization_barrier_ = barrier;
 
@@ -96,7 +96,7 @@ void Process::finalize()
   if(MC_is_active() || MC_record_replay_is_active())
     return;
   // wait for all pending asynchronous comms to finish
-  MSG_barrier_wait(finalization_barrier_);
+  finalization_barrier_->wait();
 }
 
 /** @brief Check if a process is finalized */
index c082e48..5e00ce5 100644 (file)
@@ -39,7 +39,7 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm,
   connected_wins_[rank_] = this;
   count_                 = 0;
   if(rank_==0){
-    bar_ = MSG_barrier_init(comm_size);
+    bar_ = new simgrid::s4u::Barrier(comm_size);
   }
   mode_=0;
 
@@ -49,14 +49,14 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm,
   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
                          MPI_BYTE, comm);
 
-  Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
+  Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
 
   Colls::barrier(comm);
 }
 
 Win::~Win(){
   //As per the standard, perform a barrier to ensure every async comm is finished
-  MSG_barrier_wait(bar_);
+  bar_->wait();
 
   int finished = finish_comms();
   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
@@ -76,7 +76,7 @@ Win::~Win(){
   Comm::unref(comm_);
   
   if (rank_ == 0)
-    MSG_barrier_destroy(bar_);
+    delete bar_;
   xbt_mutex_destroy(mut_);
   xbt_mutex_destroy(lock_mut_);
   xbt_mutex_destroy(atomic_mut_);
@@ -163,7 +163,7 @@ int Win::fence(int assert)
     opened_=1;
   if (assert != MPI_MODE_NOPRECEDE) {
     // This is not the first fence => finalize what came before
-    MSG_barrier_wait(bar_);
+    bar_->wait();
     xbt_mutex_acquire(mut_);
     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
     // Without this, the vector could get redimensionned when another process pushes.
@@ -184,7 +184,7 @@ int Win::fence(int assert)
     opened_=0;
   assert_ = assert;
 
-  MSG_barrier_wait(bar_);
+  bar_->wait();
   XBT_DEBUG("Leaving fence");
 
   return MPI_SUCCESS;
@@ -629,9 +629,9 @@ int Win::unlock_all(){
   int i=0;
   int retval = MPI_SUCCESS;
   for (i=0; i<comm_->size();i++){
-      int ret = this->unlock(i);
-      if(ret != MPI_SUCCESS)
-        retval = ret;
+    int ret = this->unlock(i);
+    if (ret != MPI_SUCCESS)
+      retval = ret;
   }
   return retval;
 }
@@ -652,11 +652,9 @@ int Win::flush_local(int rank){
 }
 
 int Win::flush_all(){
-  int i=0;
-  int finished = 0;
-  finished = finish_comms();
+  int finished = finish_comms();
   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
-  for (i=0; i<comm_->size();i++){
+  for (int i = 0; i < comm_->size(); i++) {
     finished = connected_wins_[i]->finish_comms(rank_);
     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
   }
@@ -673,7 +671,6 @@ Win* Win::f2c(int id){
   return static_cast<Win*>(F2C::f2c(id));
 }
 
-
 int Win::finish_comms(){
   xbt_mutex_acquire(mut_);
   //Finish own requests
index 588be4a..493b62f 100644 (file)
@@ -662,6 +662,7 @@ set(headers_to_install
 
   include/simgrid_config.h
   include/simgrid/actor.h
+  include/simgrid/barrier.h
   include/simgrid/engine.h
   include/simgrid/chrono.hpp
   include/simgrid/plugins/dvfs.h