Also fix testsome behavior.
WRAPPED_PMPI_CALL(int,MPI_Testany,(int count, MPI_Request requests[], int *index, int *flag, MPI_Status * status),(count, requests, index, flag, status))
WRAPPED_PMPI_CALL(int,MPI_Test,(MPI_Request * request, int *flag, MPI_Status * status),(request, flag, status))
WRAPPED_PMPI_CALL(int,MPI_Testsome,(int incount, MPI_Request* requests, int* outcount, int* indices, MPI_Status* statuses) ,(incount, requests, outcount, indices, statuses))
+WRAPPED_PMPI_CALL(int,MPI_Grequest_complete,( MPI_Request request),( request))
+WRAPPED_PMPI_CALL(int,MPI_Grequest_start,(MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn,MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request),( query_fn, free_fn, cancel_fn, extra_state, request))
WRAPPED_PMPI_CALL(int,MPI_Type_commit,(MPI_Datatype* datatype) ,(datatype))
WRAPPED_PMPI_CALL(int,MPI_Type_contiguous,(int count, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, old_type, newtype))
WRAPPED_PMPI_CALL(int,MPI_Type_create_hindexed_block,(int count, int blocklength, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, blocklength, indices, old_type, newtype))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_map,(MPI_Comm comm_old, int nnodes, int* index, int* edges, int* newrank) ,(comm_old, nnodes, index, edges, newrank))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors_count,(MPI_Comm comm, int rank, int* nneighbors) ,(comm, rank, nneighbors))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors,(MPI_Comm comm, int rank, int maxneighbors, int* neighbors) ,(comm, rank, maxneighbors, neighbors))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Grequest_complete,( MPI_Request request),( request))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Grequest_start,(MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn,MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request),( query_fn, free_fn, cancel_fn, extra_state, request))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Ibsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request) ,(buf, count, datatype, dest, tag, comm, request))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_create,(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag,MPI_Comm* comm_out) ,(local_comm, local_leader, peer_comm, remote_leader, tag, comm_out))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_merge,(MPI_Comm comm, int high, MPI_Comm* comm_out) ,(comm, high, comm_out))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pack_external_size,(char *datarep, int incount, MPI_Datatype datatype, MPI_Aint *size),(datarep, incount, datatype, size))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pcontrol,(const int level, ... ),(level))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Publish_name,( char *service_name, MPI_Info info, char *port_name),( service_name, info, port_name))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Request_get_status,( MPI_Request request, int *flag, MPI_Status *status),( request, flag, status))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend_init,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request),(buf, count, datatype, dest, tag, comm, request))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) ,(buf, count, datatype, dest, tag, comm))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count))
+UNIMPLEMENTED_WRAPPED_PMPI_CALL_NOFAIL(int,MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag))
+UNIMPLEMENTED_WRAPPED_PMPI_CALL_NOFAIL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Topo_test,(MPI_Comm comm, int* top_type) ,(comm, top_type))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_create_darray,(int size, int rank, int ndims, int* array_of_gsizes, int* array_of_distribs, int* array_of_dargs, int* array_of_psizes,int order, MPI_Datatype oldtype, MPI_Datatype *newtype) ,(size, rank, ndims, array_of_gsizes,array_of_distribs, array_of_dargs, array_of_psizes,order,oldtype, newtype))
UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses, int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, MPI_Datatype *array_of_datatypes),(datatype, max_integers, max_addresses,max_datatypes, array_of_integers, array_of_addresses, array_of_datatypes))
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
- *flag = simgrid::smpi::Request::test(request,status);
+ retval = simgrid::smpi::Request::test(request,status, flag);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
int my_proc_id = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testany"));
- *flag = simgrid::smpi::Request::testany(count, requests, index, status);
+ retval = simgrid::smpi::Request::testany(count, requests, index, flag, status);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
int my_proc_id = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testall"));
- *flag = simgrid::smpi::Request::testall(count, requests, statuses);
+ retval = simgrid::smpi::Request::testall(count, requests, flag, statuses);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
int my_proc_id = simgrid::s4u::this_actor::get_pid();
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testsome"));
- *outcount = simgrid::smpi::Request::testsome(incount, requests, indices, status);
+ retval = simgrid::smpi::Request::testsome(incount, requests, outcount, indices, status);
TRACE_smpi_comm_out(my_proc_id);
- retval = MPI_SUCCESS;
}
smpi_bench_begin();
return retval;
} else {
//for tracing, save the handle which might get overriden before we can use the helper on it
MPI_Request savedreq = *request;
- if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED))
+ if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED)
+ && not(savedreq->flags() & MPI_REQ_GENERALIZED))
savedreq->ref();//don't erase te handle in Request::wait, we'll need it later
else
savedreq = MPI_REQUEST_NULL;
TRACE_smpi_comm_in(my_proc_id, __func__,
new simgrid::instr::WaitTIData((*request)->src(), (*request)->dst(), (*request)->tag()));
- simgrid::smpi::Request::wait(request, status);
- retval = MPI_SUCCESS;
+ retval = simgrid::smpi::Request::wait(request, status);
//the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
TRACE_smpi_comm_out(my_proc_id);
return MPI_SUCCESS;
}
+
+int PMPI_Grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request){
+ return simgrid::smpi::Request::grequest_start(query_fn, free_fn,cancel_fn, extra_state, request);
+}
+
+int PMPI_Grequest_complete( MPI_Request request){
+ return simgrid::smpi::Request::grequest_complete(request);
+}
+
MPI_Request PMPI_Request_f2c(MPI_Fint request){
return static_cast<MPI_Request>(simgrid::smpi::Request::f2c(request));
}
constexpr unsigned MPI_REQ_FINISHED = 0x100;
constexpr unsigned MPI_REQ_RMA = 0x200;
constexpr unsigned MPI_REQ_ACCUMULATE = 0x400;
+constexpr unsigned MPI_REQ_GENERALIZED = 0x800;
+constexpr unsigned MPI_REQ_COMPLETE = 0x1000;
enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED, FINALIZED };
namespace simgrid{
namespace smpi{
+typedef struct s_smpi_mpi_generalized_request_funcs {
+ MPI_Grequest_query_function *query_fn;
+ MPI_Grequest_free_function *free_fn;
+ MPI_Grequest_cancel_function *cancel_fn;
+ void* extra_state;
+ s4u::ConditionVariablePtr cond;
+ s4u::MutexPtr mutex;
+} s_smpi_mpi_generalized_request_funcs_t;
+typedef struct s_smpi_mpi_generalized_request_funcs *smpi_mpi_generalized_request_funcs;
+
class Request : public F2C {
void* buf_;
/* in the case of non-contiguous memory the user address should be keep
int refcount_;
MPI_Op op_;
int cancelled_;
+ smpi_mpi_generalized_request_funcs generalized_funcs;
public:
Request() = default;
void ref();
static void finish_wait(MPI_Request* request, MPI_Status* status);
static void unref(MPI_Request* request);
- static void wait(MPI_Request* req, MPI_Status* status);
+ static int wait(MPI_Request* req, MPI_Status* status);
static MPI_Request send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
static MPI_Request isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
static MPI_Request ssend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
static void startall(int count, MPI_Request* requests);
- static int test(MPI_Request* request, MPI_Status* status);
- static int testsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]);
- static int testany(int count, MPI_Request requests[], int* index, MPI_Status* status);
- static int testall(int count, MPI_Request requests[], MPI_Status status[]);
+ static int test(MPI_Request* request, MPI_Status* status, int* flag);
+ static int testsome(int incount, MPI_Request requests[], int* outcounts, int* indices, MPI_Status status[]);
+ static int testany(int count, MPI_Request requests[], int* index, int* flag, MPI_Status* status);
+ static int testall(int count, MPI_Request requests[], int* flag, MPI_Status status[]);
static void probe(int source, int tag, MPI_Comm comm, MPI_Status* status);
static void iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status);
static int match_send(void* a, void* b, kernel::activity::CommImpl* ignored);
static int match_recv(void* a, void* b, kernel::activity::CommImpl* ignored);
+ static int grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request);
+ static int grequest_complete( MPI_Request request);
int add_f() override;
static void free_f(int id);
static Request* f2c(int);
TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
MPI_Status status;
- int flag = Request::test(&request, &status);
+ int flag = 0;
+ Request::test(&request, &status, &flag);
XBT_DEBUG("MPI_Test result: %d", flag);
/* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-
+#include "simgrid/s4u/Mutex.hpp"
+#include "simgrid/s4u/ConditionVariable.hpp"
#include "smpi_request.hpp"
#include "mc/mc.h"
#include "src/smpi/include/smpi_actor.hpp"
#include "xbt/config.hpp"
+
#include <algorithm>
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
xbt_die("Whoops, wrong refcount");
}
if((*request)->refcount_==0){
- Datatype::unref((*request)->old_type_);
+ if ((*request)->flags_ & MPI_REQ_GENERALIZED){
+ ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
+ }else{
Comm::unref((*request)->comm_);
- (*request)->print_request("Destroying");
- delete *request;
- *request = MPI_REQUEST_NULL;
+ Datatype::unref((*request)->old_type_);
+ }
+ (*request)->print_request("Destroying");
+ delete *request;
+ *request = MPI_REQUEST_NULL;
}else{
(*request)->print_request("Decrementing");
}
(boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(this->action_))->cancel();
}
-int Request::test(MPI_Request * request, MPI_Status * status) {
+int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
//assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
// to avoid deadlocks if used as a break condition, such as
// while (MPI_Test(request, flag, status) && flag) dostuff...
// because the time will not normally advance when only calls to MPI_Test are made -> deadlock
// multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
static int nsleeps = 1;
+ int ret = MPI_SUCCESS;
if(smpi_test_sleep > 0)
simcall_process_sleep(nsleeps*smpi_test_sleep);
+ MPI_Status* mystatus;
Status::empty(status);
- int flag = 1;
+ *flag = 1;
if (((*request)->flags_ & MPI_REQ_PREPARED) == 0) {
if ((*request)->action_ != nullptr){
try{
- flag = simcall_comm_test((*request)->action_);
+ *flag = simcall_comm_test((*request)->action_);
}catch (xbt_ex& e) {
- return 0;
+ *flag = 0;
+ return ret;
}
}
- if (flag) {
+ if (*request != MPI_REQUEST_NULL &&
+ ((*request)->flags_ & MPI_REQ_GENERALIZED)
+ && !((*request)->flags_ & MPI_REQ_COMPLETE))
+ *flag=0;
+ if (*flag) {
finish_wait(request,status);
+ if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+ if(status==MPI_STATUS_IGNORE){
+ mystatus=new MPI_Status();
+ Status::empty(mystatus);
+ }else{
+ mystatus=status;
+ }
+ ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
+ if(status==MPI_STATUS_IGNORE)
+ delete mystatus;
+ }
nsleeps=1;//reset the number of sleeps we will do next time
if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0)
*request = MPI_REQUEST_NULL;
nsleeps++;
}
}
- return flag;
+ return ret;
}
-int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
+int Request::testsome(int incount, MPI_Request requests[], int *count, int *indices, MPI_Status status[])
{
- int count = 0;
+ int ret = MPI_SUCCESS;
+ int error=0;
int count_dead = 0;
+ int flag = 0;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+ *count = 0;
for (int i = 0; i < incount; i++) {
if (requests[i] != MPI_REQUEST_NULL) {
- if (test(&requests[i], pstat)) {
+ ret = test(&requests[i], pstat, &flag);
+ if(ret!=MPI_SUCCESS)
+ error = 1;
+ if(flag) {
indices[i] = 1;
- count++;
+ (*count)++;
if (status != MPI_STATUSES_IGNORE)
status[i] = *pstat;
if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
count_dead++;
}
}
- if(count_dead==incount)
- return MPI_UNDEFINED;
- else return count;
+ if(count_dead==incount)*count=MPI_UNDEFINED;
+ if(error!=0)
+ return MPI_ERR_IN_STATUS;
+ else
+ return MPI_SUCCESS;
}
-int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
+int Request::testany(int count, MPI_Request requests[], int *index, int* flag, MPI_Status * status)
{
std::vector<simgrid::kernel::activity::CommImpl*> comms;
comms.reserve(count);
int i;
- int flag = 0;
-
+ *flag = 0;
+ int ret = MPI_SUCCESS;
+ MPI_Status* mystatus;
*index = MPI_UNDEFINED;
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
*index = map[i];
- finish_wait(&requests[*index],status);
- flag = 1;
- nsleeps = 1;
- if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) {
- requests[*index] = MPI_REQUEST_NULL;
+ if (requests[*index] != MPI_REQUEST_NULL &&
+ (requests[*index]->flags_ & MPI_REQ_GENERALIZED)
+ && !(requests[*index]->flags_ & MPI_REQ_COMPLETE)) {
+ *flag=0;
+ } else {
+ finish_wait(&requests[*index],status);
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_GENERALIZED)){
+ if(status==MPI_STATUS_IGNORE){
+ mystatus=new MPI_Status();
+ Status::empty(mystatus);
+ }else{
+ mystatus=status;
+ }
+ ret=(requests[*index]->generalized_funcs)->query_fn((requests[*index]->generalized_funcs)->extra_state, mystatus);
+ if(status==MPI_STATUS_IGNORE)
+ delete mystatus;
+ }
+
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT))
+ requests[*index] = MPI_REQUEST_NULL;
+ *flag=1;
}
+ nsleeps = 1;
} else {
nsleeps++;
}
} else {
//all requests are null or inactive, return true
- flag = 1;
+ *flag = 1;
Status::empty(status);
}
- return flag;
+ return ret;
}
-int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
+int Request::testall(int count, MPI_Request requests[], int* outflag, MPI_Status status[])
{
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
- int flag=1;
+ int flag, error=0;
+ int ret=MPI_SUCCESS;
+ *outflag = 1;
for(int i=0; i<count; i++){
if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
- if (test(&requests[i], pstat)!=1){
+ ret = test(&requests[i], pstat, &flag);
+ if (flag){
flag=0;
+ requests[i]=MPI_REQUEST_NULL;
}else{
- requests[i]=MPI_REQUEST_NULL;
+ *outflag=0;
}
+ if (ret != MPI_SUCCESS)
+ error = 1;
}else{
Status::empty(pstat);
}
status[i] = *pstat;
}
}
- return flag;
+ if(error==1)
+ return MPI_ERR_IN_STATUS;
+ else
+ return MPI_SUCCESS;
}
void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
return;
}
- if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0)) && ((req->flags_ & MPI_REQ_PREPARED) == 0)) {
+ if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0))
+ && ((req->flags_ & MPI_REQ_PREPARED) == 0)
+ && ((req->flags_ & MPI_REQ_GENERALIZED) == 0)) {
if(status != MPI_STATUS_IGNORE) {
int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
status->MPI_SOURCE = req->comm_->group()->rank(src);
unref(request);
}
-void Request::wait(MPI_Request * request, MPI_Status * status)
+int Request::wait(MPI_Request * request, MPI_Status * status)
{
+ int ret=MPI_SUCCESS;
(*request)->print_request("Waiting");
if ((*request)->flags_ & MPI_REQ_PREPARED) {
Status::empty(status);
- return;
+ return ret;
}
if ((*request)->action_ != nullptr){
}
}
+ if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+ MPI_Status* mystatus;
+ if(!((*request)->flags_ & MPI_REQ_COMPLETE)){
+ ((*request)->generalized_funcs)->mutex->lock();
+ ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
+ ((*request)->generalized_funcs)->mutex->unlock();
+ }
+ if(status==MPI_STATUS_IGNORE){
+ mystatus=new MPI_Status();
+ Status::empty(mystatus);
+ }else{
+ mystatus=status;
+ }
+ ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
+ if(status==MPI_STATUS_IGNORE)
+ delete mystatus;
+ }
finish_wait(request,status);
if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
*request = MPI_REQUEST_NULL;
+ return ret;
}
int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
{
int count = 0;
+ int flag = 0;
+ int index = 0;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+ index = waitany(incount, (MPI_Request*)requests, pstat);
+ if(index==MPI_UNDEFINED) return MPI_UNDEFINED;
+ if(status != MPI_STATUSES_IGNORE) {
+ status[count] = *pstat;
+ }
+ indices[count] = index;
+ count++;
for (int i = 0; i < incount; i++) {
- int index = waitany(incount, requests, pstat);
- if(index!=MPI_UNDEFINED){
- indices[count] = index;
- count++;
- if(status != MPI_STATUSES_IGNORE) {
- status[index] = *pstat;
+ if((requests[i] != MPI_REQUEST_NULL)) {
+ test(&requests[i], pstat,&flag);
+ if (flag==1){
+ indices[count] = i;
+ if(status != MPI_STATUSES_IGNORE) {
+ status[count] = *pstat;
+ }
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
+ requests[i]=MPI_REQUEST_NULL;
+ count++;
}
- if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
- requests[index] = MPI_REQUEST_NULL;
- }else{
- return MPI_UNDEFINED;
}
}
return count;
}
}
+
+int Request::grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request){
+
+ *request = new Request();
+ (*request)->flags_ |= MPI_REQ_GENERALIZED;
+ (*request)->flags_ |= MPI_REQ_PERSISTENT;
+ (*request)->refcount_ = 1;
+ ((*request)->generalized_funcs)=xbt_new0(s_smpi_mpi_generalized_request_funcs_t ,1);
+ ((*request)->generalized_funcs)->query_fn=query_fn;
+ ((*request)->generalized_funcs)->free_fn=free_fn;
+ ((*request)->generalized_funcs)->cancel_fn=cancel_fn;
+ ((*request)->generalized_funcs)->extra_state=extra_state;
+ ((*request)->generalized_funcs)->cond = simgrid::s4u::ConditionVariable::create();
+ ((*request)->generalized_funcs)->mutex = simgrid::s4u::Mutex::create();
+ return MPI_SUCCESS;
+}
+
+int Request::grequest_complete( MPI_Request request){
+ if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex==NULL)
+ return MPI_ERR_REQUEST;
+ request->generalized_funcs->mutex->lock();
+ request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
+ request->generalized_funcs->cond->notify_one();
+ request->generalized_funcs->mutex->unlock();
+ return MPI_SUCCESS;
+}
+
}
}
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../include/")
foreach(file anyall bottom eagerdt huge_anysrc huge_underflow inactivereq isendself isendirecv isendselfprobe issendselfcancel cancelanysrc pingping probenull
- dtype_send probe-unexp sendall sendflood sendrecv1 sendrecv2 sendrecv3 waitany-null waittestnull many_isend manylmt recv_any scancel scancel2 rcancel)
+ dtype_send greq1 probe-unexp sendall sendflood sendrecv1 sendrecv2 sendrecv3 waitany-null waittestnull many_isend manylmt recv_any scancel scancel2 rcancel)
# not compiled files: big_count_status bsend1 bsend2 bsend3 bsend4 bsend5 bsendalign bsendfrag bsendpending mprobe
- # cancelrecv greq1 icsend large_message pscancel rqfreeb rqstatus sendself scancel_unmatch
+ # cancelrecv icsend large_message pscancel rqfreeb rqstatus sendself scancel_unmatch
add_executable(${file} EXCLUDE_FROM_ALL ${file}.c)
add_dependencies(tests ${file})
target_link_libraries(${file} simgrid mtest_c)
#needs MPI_Pack, MPI_Buffer_attach, MPI_Buffer_detach, MPI_Irsend, MPI_Ibsend
#rqfreeb 4
#needs MPI_Grequest_start MPI_Grequest_complete
-#greq1 1
+greq1 1
probe-unexp 4
probenull 1
# For testing, scancel will run with 1 process as well