X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/2c5bdaef3a408b3abd47c3afa587fd968af4b37c..c5a48995c0e24c9ae38c3d14203388523c565a5b:/src/smpi/smpi_comm.cpp diff --git a/src/smpi/smpi_comm.cpp b/src/smpi/smpi_comm.cpp index 19e7a8e78d..97997d46fb 100644 --- a/src/smpi/smpi_comm.cpp +++ b/src/smpi/smpi_comm.cpp @@ -14,14 +14,12 @@ #include #include "private.h" -#include "smpi_mpi_dt_private.h" #include "src/simix/smx_private.h" -#include "colls/colls.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)"); -xbt_dict_t smpi_comm_keyvals = nullptr; -int comm_keyval_id = 0;//avoid collisions + Comm mpi_MPI_COMM_UNINITIALIZED; +MPI_Comm MPI_COMM_UNINITIALIZED=&mpi_MPI_COMM_UNINITIALIZED; /* Support for cartesian topology was added, but there are 2 other types of topology, graph et dist graph. In order to * support them, we have to add a field MPIR_Topo_type, and replace the MPI_Topology field by an union. */ @@ -47,59 +45,57 @@ static int smpi_compare_rankmap(const void *a, const void *b) } namespace simgrid{ -namespace SMPI{ +namespace smpi{ -Comm::Comm(MPI_Group group, MPI_Topology topo) +std::unordered_map Comm::keyvals_; +int Comm::keyval_id_=0; + +Comm::Comm(MPI_Group group, MPI_Topology topo) : group_(group), topo_(topo) { - m_group = group; - m_refcount=1; - m_topoType = MPI_INVALID_TOPO; - m_topo = topo; - m_intra_comm = MPI_COMM_NULL; - m_leaders_comm = MPI_COMM_NULL; - m_is_uniform=1; - m_non_uniform_map = nullptr; - m_leaders_map = nullptr; - m_is_blocked=0; - m_attributes=nullptr; + refcount_=1; + topoType_ = MPI_INVALID_TOPO; + intra_comm_ = MPI_COMM_NULL; + leaders_comm_ = MPI_COMM_NULL; + is_uniform_=1; + non_uniform_map_ = nullptr; + leaders_map_ = nullptr; + is_blocked_=0; } -void Comm::destroy() +void Comm::destroy(Comm* comm) { - if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->destroy(); - delete m_topo; // there's no use count on topos - this->unuse(); + if (comm == MPI_COMM_UNINITIALIZED){ + Comm::destroy(smpi_process()->comm_world()); + return; + } + delete comm->topo_; // there's no use count on topos + Comm::unref(comm); } int Comm::dup(MPI_Comm* newcomm){ if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables - smpi_switch_data_segment(smpi_process_index()); + smpi_switch_data_segment(smpi_process()->index()); } - MPI_Group cp = new simgrid::SMPI::Group(this->group()); - (*newcomm) = new simgrid::SMPI::Comm(cp, this->topo()); + MPI_Group cp = new Group(this->group()); + (*newcomm) = new Comm(cp, this->topo()); int ret = MPI_SUCCESS; - if(m_attributes !=nullptr){ - (*newcomm)->m_attributes = xbt_dict_new_homogeneous(nullptr); - xbt_dict_cursor_t cursor = nullptr; - char* key; + if(!attributes()->empty()){ int flag; - void* value_in; void* value_out; - xbt_dict_foreach (m_attributes, cursor, key, value_in) { - smpi_comm_key_elem elem = - static_cast(xbt_dict_get_or_null_ext(smpi_comm_keyvals, key, sizeof(int))); - if (elem != nullptr && elem->copy_fn != MPI_NULL_COPY_FN) { - ret = elem->copy_fn(this, atoi(key), nullptr, value_in, &value_out, &flag); + for(auto it : *attributes()){ + smpi_key_elem elem = keyvals_.at(it.first); + if (elem != nullptr && elem->copy_fn.comm_copy_fn != MPI_NULL_COPY_FN) { + ret = elem->copy_fn.comm_copy_fn(this, it.first, nullptr, it.second, &value_out, &flag); if (ret != MPI_SUCCESS) { - (*newcomm)->destroy(); + Comm::destroy(*newcomm); *newcomm = MPI_COMM_NULL; - xbt_dict_cursor_free(&cursor); return ret; } - if (flag) - xbt_dict_set_ext((*newcomm)->m_attributes, key, sizeof(int), value_out, nullptr); + if (flag){ + elem->refcount++; + (*newcomm)->attributes()->insert({it.first, value_out}); + } } } } @@ -109,32 +105,34 @@ int Comm::dup(MPI_Comm* newcomm){ MPI_Group Comm::group() { if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->group(); - return m_group; + return smpi_process()->comm_world()->group(); + return group_; } MPI_Topology Comm::topo() { - return m_topo; + return topo_; } int Comm::size() { if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->size(); - return m_group->size(); + return smpi_process()->comm_world()->size(); + return group_->size(); } int Comm::rank() { if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->rank(); - return m_group->rank(smpi_process_index()); + return smpi_process()->comm_world()->rank(); + return group_->rank(smpi_process()->index()); } void Comm::get_name (char* name, int* len) { - if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->get_name(name, len); + if (this == MPI_COMM_UNINITIALIZED){ + smpi_process()->comm_world()->get_name(name, len); + return; + } if(this == MPI_COMM_WORLD) { strncpy(name, "WORLD",5); *len = 5; @@ -144,55 +142,57 @@ void Comm::get_name (char* name, int* len) } void Comm::set_leaders_comm(MPI_Comm leaders){ - if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->set_leaders_comm(leaders); - m_leaders_comm=leaders; + if (this == MPI_COMM_UNINITIALIZED){ + smpi_process()->comm_world()->set_leaders_comm(leaders); + return; + } + leaders_comm_=leaders; } void Comm::set_intra_comm(MPI_Comm leaders){ - m_intra_comm=leaders; + intra_comm_=leaders; } int* Comm::get_non_uniform_map(){ if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->get_non_uniform_map(); - return m_non_uniform_map; + return smpi_process()->comm_world()->get_non_uniform_map(); + return non_uniform_map_; } int* Comm::get_leaders_map(){ if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->get_leaders_map(); - return m_leaders_map; + return smpi_process()->comm_world()->get_leaders_map(); + return leaders_map_; } MPI_Comm Comm::get_leaders_comm(){ if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->get_leaders_comm(); - return m_leaders_comm; + return smpi_process()->comm_world()->get_leaders_comm(); + return leaders_comm_; } MPI_Comm Comm::get_intra_comm(){ if (this == MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD) - return smpi_process_get_comm_intra(); - else return m_intra_comm; + return smpi_process()->comm_intra(); + else return intra_comm_; } int Comm::is_uniform(){ if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->is_uniform(); - return m_is_uniform; + return smpi_process()->comm_world()->is_uniform(); + return is_uniform_; } int Comm::is_blocked(){ if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->is_blocked(); - return m_is_blocked; + return smpi_process()->comm_world()->is_blocked(); + return is_blocked_; } MPI_Comm Comm::split(int color, int key) { if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->split(color, key); + return smpi_process()->comm_world()->split(color, key); int system_tag = 123; int* recvbuf; @@ -210,7 +210,7 @@ MPI_Comm Comm::split(int color, int key) } else { recvbuf = nullptr; } - smpi_mpi_gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, this); + Coll_gather_default::gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, this); xbt_free(sendbuf); /* Do the actual job */ if(rank == 0) { @@ -233,7 +233,7 @@ MPI_Comm Comm::split(int color, int key) rankmap[2 * count + 1] = recvbuf[2 * i + 1]; count++; qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap); - group_out = new simgrid::SMPI::Group(count); + group_out = new Group(count); if (i == 0) { group_root = group_out; /* Save root's group */ } @@ -245,15 +245,15 @@ MPI_Comm Comm::split(int color, int key) int reqs = 0; for (int j = 0; j < count; j++) { if(rankmap[2 * j] != 0) { - group_snd[reqs]=new simgrid::SMPI::Group(group_out); - requests[reqs] = smpi_mpi_isend(&(group_snd[reqs]), 1, MPI_PTR, rankmap[2 * j], system_tag, this); + group_snd[reqs]=new Group(group_out); + requests[reqs] = Request::isend(&(group_snd[reqs]), 1, MPI_PTR, rankmap[2 * j], system_tag, this); reqs++; } } - if(i != 0) { - group_out->destroy(); - } - smpi_mpi_waitall(reqs, requests, MPI_STATUS_IGNORE); + if(i != 0 && group_out != MPI_COMM_WORLD->group() && group_out != MPI_GROUP_EMPTY) + Group::unref(group_out); + + Request::waitall(reqs, requests, MPI_STATUS_IGNORE); xbt_free(requests); } } @@ -263,55 +263,44 @@ MPI_Comm Comm::split(int color, int key) group_out = group_root; /* exit with root's group */ } else { if(color != MPI_UNDEFINED) { - smpi_mpi_recv(&group_out, 1, MPI_PTR, 0, system_tag, this, MPI_STATUS_IGNORE); + Request::recv(&group_out, 1, MPI_PTR, 0, system_tag, this, MPI_STATUS_IGNORE); } /* otherwise, exit with group_out == nullptr */ } - return group_out!=nullptr ? new simgrid::SMPI::Comm(group_out, nullptr) : MPI_COMM_NULL; + return group_out!=nullptr ? new Comm(group_out, nullptr) : MPI_COMM_NULL; } -void Comm::use(){ - if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->use(); - m_group->use(); - m_refcount++; -} - -void Comm::cleanup_attributes(){ - if(m_attributes !=nullptr){ - xbt_dict_cursor_t cursor = nullptr; - char* key; - void* value; - int flag; - xbt_dict_foreach (m_attributes, cursor, key, value) { - smpi_comm_key_elem elem = static_cast(xbt_dict_get_or_null(smpi_comm_keyvals, key)); - if (elem != nullptr && elem->delete_fn != nullptr) - elem->delete_fn(this, atoi(key), value, &flag); - } - xbt_dict_free(&m_attributes); +void Comm::ref(){ + if (this == MPI_COMM_UNINITIALIZED){ + smpi_process()->comm_world()->ref(); + return; } + group_->ref(); + refcount_++; } void Comm::cleanup_smp(){ - if (m_intra_comm != MPI_COMM_NULL) - m_intra_comm->unuse(); - if (m_leaders_comm != MPI_COMM_NULL) - m_leaders_comm->unuse(); - if (m_non_uniform_map != nullptr) - xbt_free(m_non_uniform_map); - if (m_leaders_map != nullptr) - xbt_free(m_leaders_map); + if (intra_comm_ != MPI_COMM_NULL) + Comm::unref(intra_comm_); + if (leaders_comm_ != MPI_COMM_NULL) + Comm::unref(leaders_comm_); + if (non_uniform_map_ != nullptr) + xbt_free(non_uniform_map_); + if (leaders_map_ != nullptr) + xbt_free(leaders_map_); } -void Comm::unuse(){ - if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->unuse(); - m_refcount--; - m_group->unuse(); - - if(m_refcount==0){ - this->cleanup_smp(); - this->cleanup_attributes(); - delete this; +void Comm::unref(Comm* comm){ + if (comm == MPI_COMM_UNINITIALIZED){ + Comm::unref(smpi_process()->comm_world()); + return; + } + comm->refcount_--; + Group::unref(comm->group_); + + if(comm->refcount_==0){ + comm->cleanup_smp(); + comm->cleanup_attr(); + delete comm; } } @@ -327,52 +316,50 @@ void Comm::init_smp(){ int leader = -1; if (this == MPI_COMM_UNINITIALIZED) - return smpi_process_comm_world()->init_smp(); + smpi_process()->comm_world()->init_smp(); int comm_size = this->size(); // If we are in replay - perform an ugly hack // tell SimGrid we are not in replay for a while, because we need the buffers to be copied for the following calls bool replaying = false; //cache data to set it back again after - if(smpi_process_get_replaying()){ + if(smpi_process()->replaying()){ replaying=true; - smpi_process_set_replaying(false); + smpi_process()->set_replaying(false); } if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables - smpi_switch_data_segment(smpi_process_index()); + smpi_switch_data_segment(smpi_process()->index()); } //identify neighbours in comm //get the indexes of all processes sharing the same simix host - xbt_swag_t process_list = SIMIX_host_self()->processes(); - int intra_comm_size = 0; - int i =0; - int min_index=INT_MAX;//the minimum index will be the leader - smx_actor_t process = nullptr; - xbt_swag_foreach(process, process_list) { - int index = process->pid -1; + xbt_swag_t process_list = SIMIX_host_self()->extension()->process_list; + int intra_comm_size = 0; + int min_index = INT_MAX;//the minimum index will be the leader + smx_actor_t actor = nullptr; + xbt_swag_foreach(actor, process_list) { + int index = actor->pid -1; if(this->group()->rank(index)!=MPI_UNDEFINED){ - intra_comm_size++; + intra_comm_size++; //the process is in the comm if(index < min_index) min_index=index; - i++; } } XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size); - MPI_Group group_intra = new simgrid::SMPI::Group(intra_comm_size); - i=0; - process = nullptr; - xbt_swag_foreach(process, process_list) { - int index = process->pid -1; + MPI_Group group_intra = new Group(intra_comm_size); + int i = 0; + actor = nullptr; + xbt_swag_foreach(actor, process_list) { + int index = actor->pid -1; if(this->group()->rank(index)!=MPI_UNDEFINED){ group_intra->set_mapping(index, i); i++; } } - MPI_Comm comm_intra = new simgrid::SMPI::Comm(group_intra, nullptr); + MPI_Comm comm_intra = new Comm(group_intra, nullptr); leader=min_index; int * leaders_map= static_cast(xbt_malloc0(sizeof(int)*comm_size)); @@ -381,14 +368,14 @@ void Comm::init_smp(){ leader_list[i]=-1; } - smpi_coll_tuned_allgather_mpich(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this); + Coll_allgather_mpich::allgather(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this); if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables - smpi_switch_data_segment(smpi_process_index()); + smpi_switch_data_segment(smpi_process()->index()); } - if(m_leaders_map==nullptr){ - m_leaders_map= leaders_map; + if(leaders_map_==nullptr){ + leaders_map_= leaders_map; }else{ xbt_free(leaders_map); } @@ -397,25 +384,25 @@ void Comm::init_smp(){ for(i=0; iset_mapping(leader_list[i], i); - leader_comm = new simgrid::SMPI::Comm(leaders_group, nullptr); + leader_comm = new Comm(leaders_group, nullptr); this->set_leaders_comm(leader_comm); this->set_intra_comm(comm_intra); @@ -425,13 +412,13 @@ void Comm::init_smp(){ leaders_group->set_mapping(leader_list[i], i); if(this->get_leaders_comm()==MPI_COMM_NULL){ - leader_comm = new simgrid::SMPI::Comm(leaders_group, nullptr); + leader_comm = new Comm(leaders_group, nullptr); this->set_leaders_comm(leader_comm); }else{ leader_comm=this->get_leaders_comm(); - leaders_group->unuse(); + Group::unref(leaders_group); } - smpi_process_set_comm_intra(comm_intra); + smpi_process()->set_comm_intra(comm_intra); } int is_uniform = 1; @@ -440,7 +427,7 @@ void Comm::init_smp(){ int my_local_size=comm_intra->size(); if(comm_intra->rank()==0) { int* non_uniform_map = xbt_new0(int,leader_group_size); - smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT, + Coll_allgather_mpich::allgather(&my_local_size, 1, MPI_INT, non_uniform_map, 1, MPI_INT, leader_comm); for(i=0; i < leader_group_size; i++) { if(non_uniform_map[0] != non_uniform_map[i]) { @@ -449,16 +436,16 @@ void Comm::init_smp(){ } } if(is_uniform==0 && this->is_uniform()!=0){ - m_non_uniform_map= non_uniform_map; + non_uniform_map_= non_uniform_map; }else{ xbt_free(non_uniform_map); } - m_is_uniform=is_uniform; + is_uniform_=is_uniform; } - smpi_coll_tuned_bcast_mpich(&(m_is_uniform),1, MPI_INT, 0, comm_intra ); + Coll_bcast_mpich::bcast(&(is_uniform_),1, MPI_INT, 0, comm_intra ); if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables - smpi_switch_data_segment(smpi_process_index()); + smpi_switch_data_segment(smpi_process()->index()); } // Are the ranks blocked ? = allocated contiguously on the SMP nodes int is_blocked=1; @@ -473,109 +460,70 @@ void Comm::init_smp(){ } int global_blocked; - smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, this); + Coll_allreduce_default::allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, this); if(MPI_COMM_WORLD==MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD){ if(this->rank()==0){ - m_is_blocked=global_blocked; + is_blocked_=global_blocked; } }else{ - m_is_blocked=global_blocked; + is_blocked_=global_blocked; } xbt_free(leader_list); if(replaying) - smpi_process_set_replaying(true); + smpi_process()->set_replaying(true); } -int Comm::attr_delete(int keyval){ - smpi_comm_key_elem elem = - static_cast(xbt_dict_get_or_null_ext(smpi_comm_keyvals, reinterpret_cast(&keyval), sizeof(int))); - if(elem==nullptr) - return MPI_ERR_ARG; - if(elem->delete_fn!=MPI_NULL_DELETE_FN){ - void* value = nullptr; - int flag; - if(this->attr_get(keyval, &value, &flag)==MPI_SUCCESS){ - int ret = elem->delete_fn(this, keyval, value, &flag); - if(ret!=MPI_SUCCESS) - return ret; - } +MPI_Comm Comm::f2c(int id) { + if(id == -2) { + return MPI_COMM_SELF; + } else if(id==0){ + return MPI_COMM_WORLD; + } else if(F2C::f2c_lookup() != nullptr && id >= 0) { + char key[KEY_SIZE]; + MPI_Comm tmp = static_cast(xbt_dict_get_or_null(F2C::f2c_lookup(),get_key_id(key, id))); + return tmp != nullptr ? tmp : MPI_COMM_NULL ; + } else { + return MPI_COMM_NULL; } - if(m_attributes==nullptr) - return MPI_ERR_ARG; +} - xbt_dict_remove_ext(m_attributes, reinterpret_cast(&keyval), sizeof(int)); - return MPI_SUCCESS; +void Comm::free_f(int id) { + char key[KEY_SIZE]; + xbt_dict_remove(F2C::f2c_lookup(), id==0? get_key(key, id) : get_key_id(key, id)); } -int Comm::attr_get(int keyval, void* attr_value, int* flag){ - smpi_comm_key_elem elem = - static_cast(xbt_dict_get_or_null_ext(smpi_comm_keyvals, reinterpret_cast(&keyval), sizeof(int))); - if(elem==nullptr) - return MPI_ERR_ARG; - if(m_attributes==nullptr){ - *flag=0; - return MPI_SUCCESS; - } - try { - *static_cast(attr_value) = - xbt_dict_get_ext(m_attributes, reinterpret_cast(&keyval), sizeof(int)); - *flag=1; +int Comm::add_f() { + if(F2C::f2c_lookup()==nullptr){ + F2C::set_f2c_lookup(xbt_dict_new_homogeneous(nullptr)); } - catch (xbt_ex& ex) { - *flag=0; - } - return MPI_SUCCESS; + char key[KEY_SIZE]; + xbt_dict_set(F2C::f2c_lookup(), this==MPI_COMM_WORLD? get_key(key, F2C::f2c_id()) : get_key_id(key,F2C::f2c_id()), this, nullptr); + f2c_id_increment(); + return F2C::f2c_id()-1; } -int Comm::attr_put(int keyval, void* attr_value){ - if(smpi_comm_keyvals==nullptr) - smpi_comm_keyvals = xbt_dict_new_homogeneous(nullptr); - smpi_comm_key_elem elem = - static_cast(xbt_dict_get_or_null_ext(smpi_comm_keyvals, reinterpret_cast(&keyval), sizeof(int))); - if(elem==nullptr) - return MPI_ERR_ARG; - int flag; - void* value = nullptr; - this->attr_get(keyval, &value, &flag); - if(flag!=0 && elem->delete_fn!=MPI_NULL_DELETE_FN){ - int ret = elem->delete_fn(this, keyval, value, &flag); - if(ret!=MPI_SUCCESS) - return ret; - } - if(m_attributes==nullptr) - m_attributes = xbt_dict_new_homogeneous(nullptr); - xbt_dict_set_ext(m_attributes, reinterpret_cast(&keyval), sizeof(int), attr_value, nullptr); - return MPI_SUCCESS; +void Comm::add_rma_win(MPI_Win win){ + rma_wins_.push_back(win); } +void Comm::remove_rma_win(MPI_Win win){ + rma_wins_.remove(win); } -} - -int smpi_comm_keyval_create(MPI_Comm_copy_attr_function* copy_fn, MPI_Comm_delete_attr_function* delete_fn, int* keyval, - void* extra_state){ - if(smpi_comm_keyvals==nullptr) - smpi_comm_keyvals = xbt_dict_new_homogeneous(nullptr); - smpi_comm_key_elem value = static_cast(xbt_new0(s_smpi_mpi_comm_key_elem_t,1)); +void Comm::finish_rma_calls(){ + for(auto it : rma_wins_){ + if(it->rank()==this->rank()){//is it ours (for MPI_COMM_WORLD)? + int finished = it->finish_comms(); + XBT_DEBUG("Barrier for rank %d - Finished %d RMA calls",this->rank(), finished); + } + } +} - value->copy_fn=copy_fn; - value->delete_fn=delete_fn; - *keyval = comm_keyval_id; - xbt_dict_set_ext(smpi_comm_keyvals, reinterpret_cast(keyval), sizeof(int),static_cast(value), nullptr); - comm_keyval_id++; - return MPI_SUCCESS; } - -int smpi_comm_keyval_free(int* keyval){ - smpi_comm_key_elem elem = - static_cast(xbt_dict_get_or_null_ext(smpi_comm_keyvals, reinterpret_cast(keyval), sizeof(int))); - if(elem==nullptr) - return MPI_ERR_ARG; - xbt_dict_remove_ext(smpi_comm_keyvals, reinterpret_cast(keyval), sizeof(int)); - xbt_free(elem); - return MPI_SUCCESS; } + +