X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/cf07616d00639f39bad49729f9e63b7533eed9a4..2dba4579bf4c7f7e8cde2b36ff308645a3cc891b:/src/smpi/smpi_request.cpp diff --git a/src/smpi/smpi_request.cpp b/src/smpi/smpi_request.cpp index 89bab5ae68..ed83759424 100644 --- a/src/smpi/smpi_request.cpp +++ b/src/smpi/smpi_request.cpp @@ -1,21 +1,17 @@ -/* Copyright (c) 2007-2015. The SimGrid Team. - * All rights reserved. */ +/* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include #include - #include "private.h" -#include "xbt/replay.h" #include "mc/mc.h" #include "src/mc/mc_replay.h" #include "src/simix/smx_private.h" #include "simgrid/sg_config.h" #include "smpi/smpi_utils.hpp" #include - #include "src/kernel/activity/SynchroComm.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (reques)"); @@ -106,28 +102,30 @@ static double smpi_or(size_t size) return current; } - namespace simgrid{ namespace smpi{ -Request::Request(){} + Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) { void *old_buf = nullptr; - s_smpi_subtype_t *subtype = static_cast(datatype->substruct); - - if((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->sizeof_substruct != 0)){ +// FIXME Handle the case of a partial shared malloc. + if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED))) { // && (!smpi_is_shared(buf_))){ // This part handles the problem of non-contiguous memory old_buf = buf; - buf_ = count==0 ? nullptr : xbt_malloc(count*smpi_datatype_size(datatype)); - if ((datatype->sizeof_substruct != 0) && ((flags & SEND) != 0)) { - subtype->serialize(old_buf, buf_, count, datatype->substruct); + if (count==0){ + buf_ = nullptr; + }else { + buf_ = xbt_malloc(count*datatype->size()); + if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) { + datatype->serialize(old_buf, buf_, count); + } } } // This part handles the problem of non-contiguous memory (for the unserialisation at the reception) old_buf_ = old_buf; - size_ = smpi_datatype_size(datatype) * count; - smpi_datatype_use(datatype); - comm_->use(); + size_ = datatype->size() * count; + datatype->ref(); + comm_->ref(); action_ = nullptr; detached_ = 0; detached_sender_ = nullptr; @@ -142,13 +140,6 @@ Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, op_ = MPI_REPLACE; } - -//Request::destroy(void* request) -//{ -// MPI_Request req = static_cast(request); -// delete(req); -//} - MPI_Comm Request::comm(){ return comm_; } @@ -173,16 +164,22 @@ int Request::detached(){ return detached_; } +size_t Request::size(){ + return size_; +} + +size_t Request::real_size(){ + return real_size_; +} -void Request::unuse(MPI_Request* request) +void Request::unref(MPI_Request* request) { if((*request) != MPI_REQUEST_NULL){ (*request)->refcount_--; if((*request)->refcount_<0) xbt_die("wrong refcount"); - if((*request)->refcount_==0){ - smpi_datatype_unuse((*request)->old_type_); - (*request)->comm_->unuse(); + Datatype::unref((*request)->old_type_); + Comm::unref((*request)->comm_); (*request)->print_request("Destroying"); delete *request; *request = MPI_REQUEST_NULL; @@ -194,7 +191,6 @@ void Request::unuse(MPI_Request* request) } } - int Request::match_recv(void* a, void* b, smx_activity_t ignored) { MPI_Request ref = static_cast(a); MPI_Request req = static_cast(b); @@ -251,26 +247,21 @@ void Request::print_request(const char *message) /* factories, to hide the internal flags from the caller */ MPI_Request Request::send_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(), + + return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(), comm->group()->index(dst), tag, comm, PERSISTENT | SEND | PREPARED); - return request; } MPI_Request Request::ssend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(), + return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(), comm->group()->index(dst), tag, comm, PERSISTENT | SSEND | SEND | PREPARED); - return request; } MPI_Request Request::isend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process_index(), + return new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process()->index(), comm->group()->index(dst), tag,comm, PERSISTENT | ISEND | SEND | PREPARED); - return request; } @@ -291,11 +282,9 @@ MPI_Request Request::rma_send_init(void *buf, int count, MPI_Datatype datatype, MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) { - MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, + return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src), - smpi_process_index(), tag, comm, PERSISTENT | RECV | PREPARED); - return request; + smpi_process()->index(), tag, comm, PERSISTENT | RECV | PREPARED); } MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, @@ -315,17 +304,15 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) { - MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : - comm->group()->index(src), smpi_process_index(), tag, + return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : + comm->group()->index(src), smpi_process()->index(), tag, comm, PERSISTENT | RECV | PREPARED); - return request; } MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(), + request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(), comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SEND); request->start(); return request; @@ -334,7 +321,7 @@ MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, MPI_Request Request::issend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(), + request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(), comm->group()->index(dst), tag,comm, NON_PERSISTENT | ISEND | SSEND | SEND); request->start(); return request; @@ -345,7 +332,7 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, { MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : - comm->group()->index(src), smpi_process_index(), tag, comm, + comm->group()->index(src), smpi_process()->index(), tag, comm, NON_PERSISTENT | RECV); request->start(); return request; @@ -362,7 +349,7 @@ void Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag void Request::send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(), + request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(), comm->group()->index(dst), tag, comm, NON_PERSISTENT | SEND); request->start(); @@ -373,7 +360,7 @@ void Request::send(void *buf, int count, MPI_Datatype datatype, int dst, int tag void Request::ssend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */ - request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(), + request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(), comm->group()->index(dst), tag, comm, NON_PERSISTENT | SSEND | SEND); request->start(); @@ -381,33 +368,29 @@ void Request::ssend(void *buf, int count, MPI_Datatype datatype, int dst, int ta request = nullptr; } - - void Request::sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status * status) { MPI_Request requests[2]; MPI_Status stats[2]; - int myid=smpi_process_index(); + int myid=smpi_process()->index(); if ((comm->group()->index(dst) == myid) && (comm->group()->index(src) == myid)){ - smpi_datatype_copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype); + Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype); return; } requests[0] = isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm); requests[1] = irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm); startall(2, requests); waitall(2, requests, stats); - unuse(&requests[0]); - unuse(&requests[1]); + unref(&requests[0]); + unref(&requests[1]); if(status != MPI_STATUS_IGNORE) { // Copy receive status *status = stats[1]; } } - - void Request::start() { smx_mailbox_t mailbox; @@ -420,42 +403,44 @@ void Request::start() if ((flags_ & RECV) != 0) { this->print_request("New recv"); + simgrid::smpi::Process* process = smpi_process_remote(dst_); + int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh"); - xbt_mutex_t mut = smpi_process_mailboxes_mutex(); + xbt_mutex_t mut = process->mailboxes_mutex(); if (async_small_thresh != 0 || (flags_ & RMA) != 0) xbt_mutex_acquire(mut); if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) { - mailbox = smpi_process_mailbox(); + mailbox = process->mailbox(); } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { //We have to check both mailboxes (because SSEND messages are sent to the large mbox). //begin with the more appropriate one : the small one. - mailbox = smpi_process_mailbox_small(); + mailbox = process->mailbox_small(); XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox); smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); if (action == nullptr) { - mailbox = smpi_process_mailbox(); + mailbox = process->mailbox(); XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox); action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); if (action == nullptr) { XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox); - mailbox = smpi_process_mailbox_small(); + mailbox = process->mailbox_small(); } } else { XBT_DEBUG("yes there was something for us in the large mailbox"); } } else { - mailbox = smpi_process_mailbox_small(); + mailbox = process->mailbox_small(); XBT_DEBUG("Is there a corresponding send already posted the small mailbox?"); smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); if (action == nullptr) { XBT_DEBUG("No, nothing in the permanent receive mailbox"); - mailbox = smpi_process_mailbox(); + mailbox = process->mailbox(); } else { XBT_DEBUG("yes there was something for us in the small mailbox"); } @@ -463,19 +448,18 @@ void Request::start() // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later real_size_=size_; - action_ = simcall_comm_irecv(SIMIX_process_self(), mailbox, buf_, &real_size_, &match_recv, - ! smpi_process_get_replaying()? smpi_comm_copy_data_callback + action_ = simcall_comm_irecv(process->process(), mailbox, buf_, &real_size_, &match_recv, + ! process->replaying()? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this, -1.0); XBT_DEBUG("recv simcall posted"); if (async_small_thresh != 0 || (flags_ & RMA) != 0 ) xbt_mutex_release(mut); } else { /* the RECV flag was not set, so this is a send */ - int receiver = dst_; - + simgrid::smpi::Process* process = smpi_process_remote(dst_); int rank = src_; if (TRACE_smpi_view_internals()) { - TRACE_smpi_send(rank, rank, receiver, tag_, size_); + TRACE_smpi_send(rank, rank, dst_, tag_, size_); } this->print_request("New send"); @@ -486,10 +470,10 @@ void Request::start() detached_ = 1; XBT_DEBUG("Send request %p is detached", this); refcount_++; - if(old_type_->sizeof_substruct == 0){ + if(!(old_type_->flags() & DT_FLAG_DERIVED)){ oldbuf = buf_; - if (!smpi_process_get_replaying() && oldbuf != nullptr && size_!=0){ - if((smpi_privatize_global_variables != 0) + if (!process->replaying() && oldbuf != nullptr && size_!=0){ + if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP) && (static_cast(buf_) >= smpi_start_data_exe) && (static_cast(buf_) < smpi_start_data_exe + smpi_size_data_exe )){ XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment "); @@ -516,36 +500,36 @@ void Request::start() int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh"); - xbt_mutex_t mut=smpi_process_remote_mailboxes_mutex(receiver); + xbt_mutex_t mut=process->mailboxes_mutex(); if (async_small_thresh != 0 || (flags_ & RMA) != 0) xbt_mutex_acquire(mut); if (!(async_small_thresh != 0 || (flags_ & RMA) !=0)) { - mailbox = smpi_process_remote_mailbox(receiver); + mailbox = process->mailbox(); } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { // eager mode - mailbox = smpi_process_remote_mailbox(receiver); + mailbox = process->mailbox(); XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox); smx_activity_t action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast(this)); if (action == nullptr) { if ((flags_ & SSEND) == 0){ - mailbox = smpi_process_remote_mailbox_small(receiver); + mailbox = process->mailbox_small(); XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox); } else { - mailbox = smpi_process_remote_mailbox_small(receiver); + mailbox = process->mailbox_small(); XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox); action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast(this)); if (action == nullptr) { XBT_DEBUG("No, we are first, send to large mailbox"); - mailbox = smpi_process_remote_mailbox(receiver); + mailbox = process->mailbox(); } } } else { XBT_DEBUG("Yes there was something for us in the large mailbox"); } } else { - mailbox = smpi_process_remote_mailbox(receiver); + mailbox = process->mailbox(); XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, this,buf_); } @@ -554,7 +538,7 @@ void Request::start() action_ = simcall_comm_isend(SIMIX_process_from_PID(src_+1), mailbox, size_, -1.0, buf, real_size_, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - !smpi_process_get_replaying() ? smpi_comm_copy_data_callback + !process->replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this, // detach if msg size < eager/rdv switch limit detached_); @@ -563,13 +547,11 @@ void Request::start() /* FIXME: detached sends are not traceable (action_ == nullptr) */ if (action_ != nullptr) simcall_set_category(action_, TRACE_internal_smpi_get_category()); - if (async_small_thresh != 0 || ((flags_ & RMA)!=0)) xbt_mutex_release(mut); } } - void Request::startall(int count, MPI_Request * requests) { if(requests== nullptr) @@ -582,17 +564,15 @@ void Request::startall(int count, MPI_Request * requests) int Request::test(MPI_Request * request, MPI_Status * status) { //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before) - // to avoid deadlocks if used as a break condition, such as - // while (MPI_Test(request, flag, status) && flag) { - // } + // while (MPI_Test(request, flag, status) && flag) dostuff... // because the time will not normally advance when only calls to MPI_Test are made -> deadlock // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it static int nsleeps = 1; if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); - smpi_empty_status(status); + Status::empty(status); int flag = 1; if (((*request)->flags_ & PREPARED) == 0) { if ((*request)->action_ != nullptr) @@ -609,7 +589,6 @@ int Request::test(MPI_Request * request, MPI_Status * status) { return flag; } - int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[]) { int i; @@ -638,7 +617,6 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta else return count; } - int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status) { std::vector comms; @@ -677,13 +655,12 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * } else { //all requests are null or inactive, return true flag = 1; - smpi_empty_status(status); + Status::empty(status); } return flag; } - int Request::testall(int count, MPI_Request requests[], MPI_Status status[]) { MPI_Status stat; @@ -697,7 +674,7 @@ int Request::testall(int count, MPI_Request requests[], MPI_Status status[]) requests[i]=MPI_REQUEST_NULL; } }else{ - smpi_empty_status(pstat); + Status::empty(pstat); } if(status != MPI_STATUSES_IGNORE) { status[i] = *pstat; @@ -706,9 +683,6 @@ int Request::testall(int count, MPI_Request requests[], MPI_Status status[]) return flag; } - - - void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){ int flag=0; //FIXME find another way to avoid busy waiting ? @@ -720,16 +694,15 @@ void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){ } void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){ - MPI_Request request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : - comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV); - // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls - // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... ) + // especially when used as a break condition, such as while (MPI_Iprobe(...)) dostuff... // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it - // (This can speed up the execution of certain applications by an order of magnitude, such as HPL) + // This can speed up the execution of certain applications by an order of magnitude, such as HPL static int nsleeps = 1; double speed = simgrid::s4u::Actor::self()->host()->speed(); double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage"); + MPI_Request request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : + comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV); if (smpi_iprobe_sleep > 0) { smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed); simcall_execution_wait(iprobe_sleep); @@ -740,14 +713,14 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* request->print_request("New iprobe"); // We have to test both mailboxes as we don't know if we will receive one one or another if (xbt_cfg_get_int("smpi/async-small-thresh") > 0){ - mailbox = smpi_process_mailbox_small(); + mailbox = smpi_process()->mailbox_small(); XBT_DEBUG("Trying to probe the perm recv mailbox"); request->action_ = simcall_comm_iprobe(mailbox, 0, request->src_, request->tag_, &match_recv, static_cast(request)); } if (request->action_ == nullptr){ - mailbox = smpi_process_mailbox(); + mailbox = smpi_process()->mailbox(); XBT_DEBUG("trying to probe the other mailbox"); request->action_ = simcall_comm_iprobe(mailbox, 0, request->src_,request->tag_, &match_recv, static_cast(request)); @@ -770,14 +743,13 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* if (xbt_cfg_get_boolean("smpi/grow-injected-times")) nsleeps++; } - unuse(&request); + unref(&request); } - void Request::finish_wait(MPI_Request* request, MPI_Status * status) { MPI_Request req = *request; - smpi_empty_status(status); + Status::empty(status); if(!((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)){ if(status != MPI_STATUS_IGNORE) { @@ -792,32 +764,34 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) req->print_request("Finishing"); MPI_Datatype datatype = req->old_type_; - if(((req->flags_ & ACCUMULATE) != 0) || (datatype->sizeof_substruct != 0)){ - if (!smpi_process_get_replaying()){ - if( smpi_privatize_global_variables != 0 && (static_cast(req->old_buf_) >= smpi_start_data_exe) +// FIXME Handle the case of a partial shared malloc. + if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED))){// && (!smpi_is_shared(req->old_buf_))){ + + if (!smpi_process()->replaying()){ + if( smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP && (static_cast(req->old_buf_) >= smpi_start_data_exe) && ((char*)req->old_buf_ < smpi_start_data_exe + smpi_size_data_exe )){ XBT_VERB("Privatization : We are unserializing to a zone in global memory Switch data segment "); - smpi_switch_data_segment(smpi_process_index()); + smpi_switch_data_segment(smpi_process()->index()); } } - if(datatype->sizeof_substruct != 0){ + if(datatype->flags() & DT_FLAG_DERIVED){ // This part handles the problem of non-contignous memory the unserialization at the reception - s_smpi_subtype_t *subtype = static_cast(datatype->substruct); - if(req->flags_ & RECV) - subtype->unserialize(req->buf_, req->old_buf_, req->real_size_/smpi_datatype_size(datatype) , - datatype->substruct, req->op_); + if((req->flags_ & RECV) && datatype->size()!=0) + datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_); xbt_free(req->buf_); }else if(req->flags_ & RECV){//apply op on contiguous buffer for accumulate - int n =req->real_size_/smpi_datatype_size(datatype); - req->op_->apply(req->buf_, req->old_buf_, &n, &datatype); + if(datatype->size()!=0){ + int n =req->real_size_/datatype->size(); + req->op_->apply(req->buf_, req->old_buf_, &n, datatype); + } xbt_free(req->buf_); } } } if (TRACE_smpi_view_internals() && ((req->flags_ & RECV) != 0)){ - int rank = smpi_process_index(); + int rank = smpi_process()->index(); int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_); TRACE_smpi_recv(rank, src_traced, rank,req->tag_); } @@ -828,20 +802,19 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) simcall_process_sleep(sleeptime); XBT_DEBUG("receiving size of %zu : sleep %f ", req->real_size_, sleeptime); } - unuse(&(req->detached_sender_)); + unref(&(req->detached_sender_)); } if(req->flags_ & PERSISTENT) req->action_ = nullptr; req->flags_ |= FINISHED; - unuse(request); + unref(request); } - void Request::wait(MPI_Request * request, MPI_Status * status) { (*request)->print_request("Waiting"); if ((*request)->flags_ & PREPARED) { - smpi_empty_status(status); + Status::empty(status); return; } @@ -897,8 +870,6 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) finish_wait(&requests[index],status); if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT)) requests[index] = MPI_REQUEST_NULL; - }else{ - XBT_WARN("huu?"); } } } @@ -908,14 +879,14 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) } if (index==MPI_UNDEFINED) - smpi_empty_status(status); + Status::empty(status); return index; } static int sort_accumulates(MPI_Request a, MPI_Request b) { - return (a->tag() < b->tag()); + return (a->tag() > b->tag()); } int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) @@ -929,9 +900,9 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) if (status != MPI_STATUSES_IGNORE) { for (int c = 0; c < count; c++) { if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst_ == MPI_PROC_NULL || (requests[c]->flags_ & PREPARED)) { - smpi_empty_status(&status[c]); + Status::empty(&status[c]); } else if (requests[c]->src_ == MPI_PROC_NULL) { - smpi_empty_status(&status[c]); + Status::empty(&status[c]); status[c].MPI_SOURCE = MPI_PROC_NULL; } } @@ -995,6 +966,28 @@ int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Sta return count; } +MPI_Request Request::f2c(int id) { + char key[KEY_SIZE]; + if(id==MPI_FORTRAN_REQUEST_NULL) + return static_cast(MPI_REQUEST_NULL); + return static_cast(xbt_dict_get(F2C::f2c_lookup(), get_key_id(key, id))); +} + +int Request::add_f() { + if(F2C::f2c_lookup()==nullptr){ + F2C::set_f2c_lookup(xbt_dict_new_homogeneous(nullptr)); + } + char key[KEY_SIZE]; + xbt_dict_set(F2C::f2c_lookup(), get_key_id(key, F2C::f2c_id()), this, nullptr); + F2C::f2c_id_increment(); + return F2C::f2c_id()-1; +} + +void Request::free_f(int id) { + char key[KEY_SIZE]; + if(id!=MPI_FORTRAN_REQUEST_NULL) + xbt_dict_remove(F2C::f2c_lookup(), get_key_id(key, id)); +} } }