Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
enhance shared tests to validate comms are skipped
[simgrid.git] / src / smpi / internals / smpi_global.cpp
index 8144ad9..5325b9d 100644 (file)
@@ -82,7 +82,6 @@ std::map</* computation unit name */ std::string, papi_process_data> units2papi_
 
 std::unordered_map<std::string, double> location2speedup;
 
-static std::map</*process_id*/ simgrid::s4u::Actor const*, simgrid::smpi::ActorExt*> process_data;
 static int smpi_exit_status = 0;
 extern double smpi_total_benched_time;
 xbt_os_timer_t global_timer;
@@ -114,14 +113,14 @@ simgrid::smpi::ActorExt* smpi_process()
   if (me == nullptr) // This happens sometimes (eg, when linking against NS3 because it pulls openMPI...)
     return nullptr;
 
-  return process_data.at(me.get());
+  return me->extension<simgrid::smpi::ActorExt>();
 }
 
 simgrid::smpi::ActorExt* smpi_process_remote(simgrid::s4u::ActorPtr actor)
 {
   if (actor.get() == nullptr)
     return nullptr;
-  return process_data.at(actor.get());
+  return actor->extension<simgrid::smpi::ActorExt>();
 }
 
 MPI_Comm smpi_process_comm_self(){
@@ -133,11 +132,11 @@ MPI_Info smpi_process_info_env(){
 }
 
 void * smpi_process_get_user_data(){
-  return simgrid::s4u::Actor::self()->get_impl()->get_user_data();
+  return simgrid::s4u::Actor::self()->get_data();
 }
 
 void smpi_process_set_user_data(void *data){
-  simgrid::s4u::Actor::self()->get_impl()->set_user_data(data);
+  simgrid::s4u::Actor::self()->set_data(data);
 }
 
 void smpi_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
@@ -160,6 +159,17 @@ static void check_blocks(std::vector<std::pair<size_t, size_t>> &private_blocks,
     xbt_assert(block.first <= block.second && block.second <= buff_size, "Oops, bug in shared malloc.");
 }
 
+static void smpi_cleanup_comm_after_copy(simgrid::kernel::activity::CommImpl* comm, void* buff){
+  if (comm->detached()) {
+    // if this is a detached send, the source buffer was duplicated by SMPI
+    // sender to make the original buffer available to the application ASAP
+    xbt_free(buff);
+    //It seems that the request is used after the call there this should be free somewhere else but where???
+    //xbt_free(comm->comm.src_data);// inside SMPI the request is kept inside the user data and should be free
+    comm->src_buff_ = nullptr;
+  }
+}
+
 void smpi_comm_copy_buffer_callback(simgrid::kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
 {
   size_t src_offset                     = 0;
@@ -168,16 +178,24 @@ void smpi_comm_copy_buffer_callback(simgrid::kernel::activity::CommImpl* comm, v
   std::vector<std::pair<size_t, size_t>> dst_private_blocks;
   XBT_DEBUG("Copy the data over");
   if(smpi_is_shared(buff, src_private_blocks, &src_offset)) {
-    XBT_DEBUG("Sender %p is shared. Let's ignore it.", buff);
     src_private_blocks = shift_and_frame_private_blocks(src_private_blocks, src_offset, buff_size);
+    if (src_private_blocks.size()==1 && (src_private_blocks[0].second - src_private_blocks[0].first)==buff_size){//simple shared malloc ... return.
+      XBT_VERB("Sender is shared. Let's ignore it.");
+      smpi_cleanup_comm_after_copy(comm, buff);
+      return;
+    }
   }
   else {
     src_private_blocks.clear();
     src_private_blocks.push_back(std::make_pair(0, buff_size));
   }
   if (smpi_is_shared((char*)comm->dst_buff_, dst_private_blocks, &dst_offset)) {
-    XBT_DEBUG("Receiver %p is shared. Let's ignore it.", (char*)comm->dst_buff_);
     dst_private_blocks = shift_and_frame_private_blocks(dst_private_blocks, dst_offset, buff_size);
+    if (src_private_blocks.size()==1 && (src_private_blocks[0].second - src_private_blocks[0].first)==buff_size){//simple shared malloc ... return.
+      XBT_VERB("Receiver is shared. Let's ignore it.");
+      smpi_cleanup_comm_after_copy(comm, buff);
+      return;
+    }
   }
   else {
     dst_private_blocks.clear();
@@ -206,14 +224,7 @@ void smpi_comm_copy_buffer_callback(simgrid::kernel::activity::CommImpl* comm, v
   XBT_DEBUG("Copying %zu bytes from %p to %p", buff_size, tmpbuff, comm->dst_buff_);
   memcpy_private(comm->dst_buff_, tmpbuff, private_blocks);
 
-  if (comm->detached()) {
-    // if this is a detached send, the source buffer was duplicated by SMPI
-    // sender to make the original buffer available to the application ASAP
-    xbt_free(buff);
-    //It seems that the request is used after the call there this should be free somewhere else but where???
-    //xbt_free(comm->comm.src_data);// inside SMPI the request is kept inside the user data and should be free
-    comm->src_buff_ = nullptr;
-  }
+  smpi_cleanup_comm_after_copy(comm,buff);
   if (tmpbuff != buff)
     xbt_free(tmpbuff);
 }
@@ -331,8 +342,8 @@ static void smpi_init_options(){
   // return if already called
   if (smpi_cpu_threshold > -1)
     return;
-  simgrid::smpi::Colls::set_collectives();
-  simgrid::smpi::Colls::smpi_coll_cleanup_callback = nullptr;
+  simgrid::smpi::colls::set_collectives();
+  simgrid::smpi::colls::smpi_coll_cleanup_callback = nullptr;
   smpi_cpu_threshold                               = simgrid::config::get_value<double>("smpi/cpu-threshold");
   if (smpi_cpu_threshold < 0)
     smpi_cpu_threshold = DBL_MAX;
@@ -620,10 +631,11 @@ int smpi_main(const char* executable, int argc, char* argv[])
   TRACE_global_init();
   SIMIX_global_init(&argc, argv);
 
+  auto engine              = simgrid::s4u::Engine::get_instance();
   SMPI_switch_data_segment = &smpi_switch_data_segment;
   sg_storage_file_system_init();
   // parse the platform file: get the host list
-  simgrid::s4u::Engine::get_instance()->load_platform(argv[1]);
+  engine->load_platform(argv[1]);
   SIMIX_comm_set_copy_data_callback(smpi_comm_copy_buffer_callback);
 
   smpi_init_options();
@@ -633,9 +645,16 @@ int smpi_main(const char* executable, int argc, char* argv[])
     smpi_init_privatization_no_dlopen(executable);
 
   SMPI_init();
-  simgrid::s4u::Engine::get_instance()->load_deployment(argv[2]);
-  SMPI_app_instance_register(smpi_default_instance_name.c_str(), nullptr,
-                             process_data.size()); // This call has a side effect on process_count...
+
+  /* This is a ... heavy way to count the MPI ranks */
+  int rank_counts = 0;
+  simgrid::s4u::Actor::on_creation.connect([&rank_counts](simgrid::s4u::Actor& actor) {
+    if (not actor.is_daemon())
+      rank_counts++;
+  });
+  engine->load_deployment(argv[2]);
+
+  SMPI_app_instance_register(smpi_default_instance_name.c_str(), nullptr, rank_counts);
   MPI_COMM_WORLD = *smpi_deployment_comm_world(smpi_default_instance_name);
 
   /* Clean IO before the run */
@@ -669,20 +688,13 @@ int smpi_main(const char* executable, int argc, char* argv[])
 // Called either directly from the user code, or from the code called by smpirun
 void SMPI_init(){
   simgrid::s4u::Actor::on_creation.connect([](simgrid::s4u::Actor& actor) {
-    if (not actor.is_daemon()) {
-      process_data.insert({&actor, new simgrid::smpi::ActorExt(&actor)});
-    }
-  });
-  simgrid::s4u::Actor::on_destruction.connect([](simgrid::s4u::Actor const& actor) {
-    XBT_DEBUG("Delete the extension of actor %s", actor.get_cname());
-    auto it = process_data.find(&actor);
-    if (it != process_data.end()) {
-      delete it->second;
-      process_data.erase(it);
-    }
+    if (not actor.is_daemon())
+      actor.extension_set<simgrid::smpi::ActorExt>(new simgrid::smpi::ActorExt(&actor));
   });
   simgrid::s4u::Host::on_creation.connect(
       [](simgrid::s4u::Host& host) { host.extension_set(new simgrid::smpi::Host(&host)); });
+  for (auto const& host : simgrid::s4u::Engine::get_instance()->get_all_hosts())
+    host->extension_set(new simgrid::smpi::Host(host));
 
   smpi_init_options();
   if (not MC_is_active()) {
@@ -718,8 +730,8 @@ void SMPI_finalize()
   smpi_shared_destroy();
   smpi_deployment_cleanup_instances();
 
-  if (simgrid::smpi::Colls::smpi_coll_cleanup_callback != nullptr)
-    simgrid::smpi::Colls::smpi_coll_cleanup_callback();
+  if (simgrid::smpi::colls::smpi_coll_cleanup_callback != nullptr)
+    simgrid::smpi::colls::smpi_coll_cleanup_callback();
 
   MPI_COMM_WORLD = MPI_COMM_NULL;