#include "src/smpi/include/smpi_actor.hpp"
#include "src/surf/HostImpl.hpp"
-#include <climits>
+#include <limits>
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)");
}
int Comm::dup(MPI_Comm* newcomm){
- if (smpi_cfg_privatization() == SmpiPrivStrategies::MMAP) {
- // we need to switch as the called function may silently touch global variables
- smpi_switch_data_segment(s4u::Actor::self());
- }
+ // we need to switch as the called function may silently touch global variables
+ smpi_switch_data_segment(s4u::Actor::self());
+
auto* cp = new Group(this->group());
(*newcomm) = new Comm(cp, this->topo());
- int ret = MPI_SUCCESS;
-
- if (not attributes()->empty()) {
- int flag=0;
- void* value_out=nullptr;
- for (auto const& it : *attributes()) {
- smpi_key_elem elem = keyvals_.at(it.first);
- if (elem != nullptr){
- if( elem->copy_fn.comm_copy_fn != MPI_NULL_COPY_FN &&
- elem->copy_fn.comm_copy_fn != MPI_COMM_DUP_FN)
- ret = elem->copy_fn.comm_copy_fn(this, it.first, elem->extra_state, it.second, &value_out, &flag);
- else if ( elem->copy_fn.comm_copy_fn_fort != MPI_NULL_COPY_FN &&
- *(int*)*elem->copy_fn.comm_copy_fn_fort != 1){
- value_out=(int*)xbt_malloc(sizeof(int));
- elem->copy_fn.comm_copy_fn_fort(this, it.first, elem->extra_state, it.second, value_out, &flag,&ret);
- }
- if (ret != MPI_SUCCESS) {
- Comm::destroy(*newcomm);
- *newcomm = MPI_COMM_NULL;
- return ret;
- }
- if (elem->copy_fn.comm_copy_fn == MPI_COMM_DUP_FN ||
- ((elem->copy_fn.comm_copy_fn_fort != MPI_NULL_COPY_FN) && *(int*)*elem->copy_fn.comm_copy_fn_fort == 1)){
- elem->refcount++;
- (*newcomm)->attributes()->insert({it.first, it.second});
- }else if (flag){
- elem->refcount++;
- (*newcomm)->attributes()->insert({it.first, value_out});
- }
+
+ for (auto const& it : attributes()) {
+ auto elem_it = keyvals_.find(it.first);
+ xbt_assert(elem_it != keyvals_.end(), "Keyval not found for Comm: %d", it.first);
+
+ smpi_key_elem& elem = elem_it->second;
+ int ret = MPI_SUCCESS;
+ int flag = 0;
+ void* value_out = nullptr;
+ if (elem.copy_fn.comm_copy_fn == MPI_COMM_DUP_FN) {
+ value_out = it.second;
+ flag = 1;
+ } else if (elem.copy_fn.comm_copy_fn != MPI_NULL_COPY_FN) {
+ ret = elem.copy_fn.comm_copy_fn(this, it.first, elem.extra_state, it.second, &value_out, &flag);
+ }
+ if (elem.copy_fn.comm_copy_fn_fort != MPI_NULL_COPY_FN) {
+ value_out = xbt_new(int, 1);
+ if (*(int*)*elem.copy_fn.comm_copy_fn_fort == 1) { // MPI_COMM_DUP_FN
+ memcpy(value_out, it.second, sizeof(int));
+ flag = 1;
+ } else { // not null, nor dup
+ elem.copy_fn.comm_copy_fn_fort(this, it.first, elem.extra_state, it.second, value_out, &flag, &ret);
}
+ if (ret != MPI_SUCCESS)
+ xbt_free(value_out);
+ }
+ if (ret != MPI_SUCCESS) {
+ Comm::destroy(*newcomm);
+ *newcomm = MPI_COMM_NULL;
+ return ret;
+ }
+ if (flag) {
+ elem.refcount++;
+ (*newcomm)->attributes().emplace(it.first, value_out);
}
}
//duplicate info if present
(*newcomm)->set_errhandler(errhandlers_[this->rank()]);
else
(*newcomm)->set_errhandler(errhandler_);
- return ret;
+ return MPI_SUCCESS;
}
int Comm::dup_with_info(MPI_Info info, MPI_Comm* newcomm){
MPI_Comm Comm::find_intra_comm(int * leader){
//get the indices of all processes sharing the same simix host
int intra_comm_size = 0;
- int min_index = INT_MAX; // the minimum index will be the leader
+ aid_t min_index = std::numeric_limits<aid_t>::max(); // the minimum index will be the leader
sg_host_self()->get_impl()->foreach_actor([this, &intra_comm_size, &min_index](auto& actor) {
- if (this->group()->rank(actor.get_pid()) != MPI_UNDEFINED) { // Is this process in the current group?
+ aid_t index = actor.get_pid();
+ if (this->group()->rank(index) != MPI_UNDEFINED) { // Is this process in the current group?
intra_comm_size++;
- int index = actor.get_pid();
if (index < min_index)
min_index = index;
}
smpi_process()->set_replaying(false);
}
- if (smpi_cfg_privatization() == SmpiPrivStrategies::MMAP) {
- // we need to switch as the called function may silently touch global variables
- smpi_switch_data_segment(s4u::Actor::self());
- }
+ // we need to switch as the called function may silently touch global variables
+ smpi_switch_data_segment(s4u::Actor::self());
+
// identify neighbors in comm
MPI_Comm comm_intra = find_intra_comm(&leader);
allgather__ring(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this);
- if (smpi_cfg_privatization() == SmpiPrivStrategies::MMAP) {
- // we need to switch as the called function may silently touch global variables
- smpi_switch_data_segment(s4u::Actor::self());
- }
-
if(leaders_map_==nullptr){
leaders_map_= leaders_map;
}else{
}
bcast__scatter_LR_allgather(&is_uniform_, 1, MPI_INT, 0, comm_intra);
- if (smpi_cfg_privatization() == SmpiPrivStrategies::MMAP) {
- // we need to switch as the called function may silently touch global variables
- smpi_switch_data_segment(s4u::Actor::self());
- }
+ // we need to switch as the called function may silently touch global variables
+ smpi_switch_data_segment(s4u::Actor::self());
+
// Are the ranks blocked ? = allocated contiguously on the SMP nodes
int is_blocked=1;
int prev = this->group()->rank(comm_intra->group()->actor(0));
void Comm::finish_rma_calls() const
{
+ const int myrank = rank();
for (auto const& it : rma_wins_) {
- if(it->rank()==this->rank()){//is it ours (for MPI_COMM_WORLD)?
+ if (it->rank() == myrank) { // is it ours (for MPI_COMM_WORLD)?
int finished = it->finish_comms();
- XBT_DEBUG("Barrier for rank %d - Finished %d RMA calls",this->rank(), finished);
+ XBT_DEBUG("Barrier for rank %d - Finished %d RMA calls", myrank, finished);
}
}
}