class ActorExt {
private:
double simulated_ = 0 /* Used to time with simulated_start/elapsed */;
- simgrid::s4u::MailboxPtr mailbox_;
- simgrid::s4u::MailboxPtr mailbox_small_;
- xbt_mutex_t mailboxes_mutex_;
+ s4u::MailboxPtr mailbox_;
+ s4u::MailboxPtr mailbox_small_;
+ s4u::MutexPtr mailboxes_mutex_;
xbt_os_timer_t timer_;
MPI_Comm comm_self_ = MPI_COMM_NULL;
MPI_Comm comm_intra_ = MPI_COMM_NULL;
int sampling_ = 0; /* inside an SMPI_SAMPLE_ block? */
std::string instance_id_;
bool replaying_ = false; /* is the process replaying a trace */
- simgrid::s4u::Barrier* finalization_barrier_;
+ s4u::Barrier* finalization_barrier_;
smpi_trace_call_location_t trace_call_loc_;
- simgrid::s4u::ActorPtr actor_ = nullptr;
+ s4u::ActorPtr actor_ = nullptr;
smpi_privatization_region_t privatized_region_ = nullptr;
int optind = 0; /*for getopt replacement */
#if HAVE_PAPI
smpi_privatization_region_t privatized_region();
s4u::MailboxPtr mailbox() { return mailbox_; }
s4u::MailboxPtr mailbox_small() { return mailbox_small_; }
- xbt_mutex_t mailboxes_mutex();
+ s4u::MutexPtr mailboxes_mutex();
#if HAVE_PAPI
int papi_event_set();
papi_counter_t& papi_counters();
opened_ = 0;
group_ = MPI_GROUP_NULL;
requests_ = new std::vector<MPI_Request>();
- mut_ = xbt_mutex_init();
- lock_mut_ = xbt_mutex_init();
- atomic_mut_ = xbt_mutex_init();
+ mut_ = s4u::Mutex::create();
+ lock_mut_ = s4u::Mutex::create();
+ atomic_mut_ = s4u::Mutex::create();
connected_wins_ = new MPI_Win[comm_size];
connected_wins_[rank_] = this;
count_ = 0;
if (rank_ == 0)
delete bar_;
- xbt_mutex_destroy(mut_);
- xbt_mutex_destroy(lock_mut_);
- xbt_mutex_destroy(atomic_mut_);
if(allocated_ !=0)
xbt_free(base_);
if (assert != MPI_MODE_NOPRECEDE) {
// This is not the first fence => finalize what came before
bar_->wait();
- xbt_mutex_acquire(mut_);
+ mut_->lock();
// This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
// Without this, the vector could get redimensionned when another process pushes.
// This would result in the array used by Request::waitall() to be invalidated.
Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
}
count_=0;
- xbt_mutex_release(mut_);
+ mut_->unlock();
}
if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
if(request!=nullptr){
*request=sreq;
}else{
- xbt_mutex_acquire(mut_);
+ mut_->lock();
requests_->push_back(sreq);
- xbt_mutex_release(mut_);
+ mut_->unlock();
}
//push request to receiver's win
- xbt_mutex_acquire(recv_win->mut_);
+ recv_win->mut_->lock();
recv_win->requests_->push_back(rreq);
rreq->start();
- xbt_mutex_release(recv_win->mut_);
+ recv_win->mut_->unlock();
}else{
XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
//start the send, with another process than us as sender.
sreq->start();
//push request to receiver's win
- xbt_mutex_acquire(send_win->mut_);
+ send_win->mut_->lock();
send_win->requests_->push_back(sreq);
- xbt_mutex_release(send_win->mut_);
+ send_win->mut_->unlock();
//start recv
rreq->start();
if(request!=nullptr){
*request=rreq;
}else{
- xbt_mutex_acquire(mut_);
+ mut_->lock();
requests_->push_back(rreq);
- xbt_mutex_release(mut_);
+ mut_->unlock();
}
}else{
// start send
sreq->start();
// push request to receiver's win
- xbt_mutex_acquire(recv_win->mut_);
+ recv_win->mut_->lock();
recv_win->requests_->push_back(rreq);
rreq->start();
- xbt_mutex_release(recv_win->mut_);
+ recv_win->mut_->unlock();
if (request != nullptr) {
*request = sreq;
} else {
- xbt_mutex_acquire(mut_);
+ mut_->lock();
requests_->push_back(sreq);
- xbt_mutex_release(mut_);
+ mut_->unlock();
}
XBT_DEBUG("Leaving MPI_Win_Accumulate");
XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
//need to be sure ops are correctly ordered, so finish request here ? slow.
MPI_Request req;
- xbt_mutex_acquire(send_win->atomic_mut_);
+ send_win->atomic_mut_->lock();
get(result_addr, result_count, result_datatype, target_rank,
target_disp, target_count, target_datatype, &req);
if (req != MPI_REQUEST_NULL)
target_disp, target_count, target_datatype, op, &req);
if (req != MPI_REQUEST_NULL)
Request::wait(&req, MPI_STATUS_IGNORE);
- xbt_mutex_release(send_win->atomic_mut_);
+ send_win->atomic_mut_->unlock();
return MPI_SUCCESS;
}
XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
MPI_Request req = MPI_REQUEST_NULL;
- xbt_mutex_acquire(send_win->atomic_mut_);
+ send_win->atomic_mut_->lock();
get(result_addr, 1, datatype, target_rank,
target_disp, 1, datatype, &req);
if (req != MPI_REQUEST_NULL)
put(origin_addr, 1, datatype, target_rank,
target_disp, 1, datatype);
}
- xbt_mutex_release(send_win->atomic_mut_);
+ send_win->atomic_mut_->unlock();
return MPI_SUCCESS;
}
MPI_Win target_win = connected_wins_[rank];
if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
- xbt_mutex_acquire(target_win->lock_mut_);
+ target_win->lock_mut_->lock();
target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
- xbt_mutex_release(target_win->lock_mut_);
+ target_win->lock_mut_->unlock();
}
} else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
target_win->mode_= 0;
target_win->lockers_.remove(comm_->rank());
if (target_mode==MPI_LOCK_EXCLUSIVE){
- xbt_mutex_release(target_win->lock_mut_);
+ target_win->lock_mut_->unlock();
}
int finished = finish_comms(rank);
}
int Win::finish_comms(){
- xbt_mutex_acquire(mut_);
+ mut_->lock();
//Finish own requests
std::vector<MPI_Request> *reqqs = requests_;
int size = static_cast<int>(reqqs->size());
Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
reqqs->clear();
}
- xbt_mutex_release(mut_);
+ mut_->unlock();
return size;
}
int Win::finish_comms(int rank){
- xbt_mutex_acquire(mut_);
+ mut_->lock();
//Finish own requests
std::vector<MPI_Request> *reqqs = requests_;
int size = static_cast<int>(reqqs->size());
myreqqs.clear();
}
}
- xbt_mutex_release(mut_);
+ mut_->unlock();
return size;
}