Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Simple implementation of generalized requests in MPI.
authordegomme <adegomme@users.noreply.github.com>
Tue, 26 Mar 2019 17:20:53 +0000 (18:20 +0100)
committerdegomme <adegomme@users.noreply.github.com>
Thu, 28 Mar 2019 17:35:56 +0000 (18:35 +0100)
Also fix testsome behavior.

src/smpi/bindings/smpi_mpi.cpp
src/smpi/bindings/smpi_pmpi_request.cpp
src/smpi/include/private.hpp
src/smpi/include/smpi_request.hpp
src/smpi/internals/smpi_replay.cpp
src/smpi/mpi/smpi_request.cpp
teshsuite/smpi/mpich3-test/pt2pt/CMakeLists.txt
teshsuite/smpi/mpich3-test/pt2pt/testlist

index ac866f1..30ced0b 100644 (file)
@@ -193,6 +193,8 @@ WRAPPED_PMPI_CALL(int,MPI_Testall,(int count, MPI_Request* requests, int* flag,
 WRAPPED_PMPI_CALL(int,MPI_Testany,(int count, MPI_Request requests[], int *index, int *flag, MPI_Status * status),(count, requests, index, flag, status))
 WRAPPED_PMPI_CALL(int,MPI_Test,(MPI_Request * request, int *flag, MPI_Status * status),(request, flag, status))
 WRAPPED_PMPI_CALL(int,MPI_Testsome,(int incount, MPI_Request* requests, int* outcount, int* indices, MPI_Status* statuses) ,(incount, requests, outcount, indices, statuses))
 WRAPPED_PMPI_CALL(int,MPI_Testany,(int count, MPI_Request requests[], int *index, int *flag, MPI_Status * status),(count, requests, index, flag, status))
 WRAPPED_PMPI_CALL(int,MPI_Test,(MPI_Request * request, int *flag, MPI_Status * status),(request, flag, status))
 WRAPPED_PMPI_CALL(int,MPI_Testsome,(int incount, MPI_Request* requests, int* outcount, int* indices, MPI_Status* statuses) ,(incount, requests, outcount, indices, statuses))
+WRAPPED_PMPI_CALL(int,MPI_Grequest_complete,( MPI_Request request),( request))
+WRAPPED_PMPI_CALL(int,MPI_Grequest_start,(MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn,MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request),( query_fn, free_fn, cancel_fn, extra_state, request))
 WRAPPED_PMPI_CALL(int,MPI_Type_commit,(MPI_Datatype* datatype) ,(datatype))
 WRAPPED_PMPI_CALL(int,MPI_Type_contiguous,(int count, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, old_type, newtype))
 WRAPPED_PMPI_CALL(int,MPI_Type_create_hindexed_block,(int count, int blocklength, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, blocklength, indices, old_type, newtype))
 WRAPPED_PMPI_CALL(int,MPI_Type_commit,(MPI_Datatype* datatype) ,(datatype))
 WRAPPED_PMPI_CALL(int,MPI_Type_contiguous,(int count, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, old_type, newtype))
 WRAPPED_PMPI_CALL(int,MPI_Type_create_hindexed_block,(int count, int blocklength, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* newtype) ,(count, blocklength, indices, old_type, newtype))
@@ -384,8 +386,6 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_get,(MPI_Comm comm, int maxindex,
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_map,(MPI_Comm comm_old, int nnodes, int* index, int* edges, int* newrank) ,(comm_old, nnodes, index, edges, newrank))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors_count,(MPI_Comm comm, int rank, int* nneighbors) ,(comm, rank, nneighbors))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors,(MPI_Comm comm, int rank, int maxneighbors, int* neighbors) ,(comm, rank, maxneighbors, neighbors))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_map,(MPI_Comm comm_old, int nnodes, int* index, int* edges, int* newrank) ,(comm_old, nnodes, index, edges, newrank))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors_count,(MPI_Comm comm, int rank, int* nneighbors) ,(comm, rank, nneighbors))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Graph_neighbors,(MPI_Comm comm, int rank, int maxneighbors, int* neighbors) ,(comm, rank, maxneighbors, neighbors))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Grequest_complete,( MPI_Request request),( request))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Grequest_start,(MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn,MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request),( query_fn, free_fn, cancel_fn, extra_state, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Ibsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request) ,(buf, count, datatype, dest, tag, comm, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_create,(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag,MPI_Comm* comm_out) ,(local_comm, local_leader, peer_comm, remote_leader, tag, comm_out))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_merge,(MPI_Comm comm, int high, MPI_Comm* comm_out) ,(comm, high, comm_out))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Ibsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request) ,(buf, count, datatype, dest, tag, comm, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_create,(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag,MPI_Comm* comm_out) ,(local_comm, local_leader, peer_comm, remote_leader, tag, comm_out))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Intercomm_merge,(MPI_Comm comm, int high, MPI_Comm* comm_out) ,(comm, high, comm_out))
@@ -396,11 +396,10 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pack_external,(char *datarep, void *inbu
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pack_external_size,(char *datarep, int incount, MPI_Datatype datatype, MPI_Aint *size),(datarep, incount, datatype, size))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pcontrol,(const int level, ... ),(level))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Publish_name,( char *service_name, MPI_Info info, char *port_name),( service_name, info, port_name))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pack_external_size,(char *datarep, int incount, MPI_Datatype datatype, MPI_Aint *size),(datarep, incount, datatype, size))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Pcontrol,(const int level, ... ),(level))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Publish_name,( char *service_name, MPI_Info info, char *port_name),( service_name, info, port_name))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Request_get_status,( MPI_Request request, int *flag, MPI_Status *status),( request, flag, status))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend_init,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request),(buf, count, datatype, dest, tag, comm, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) ,(buf, count, datatype, dest, tag, comm))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend_init,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request),(buf, count, datatype, dest, tag, comm, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) ,(buf, count, datatype, dest, tag, comm))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count))
+UNIMPLEMENTED_WRAPPED_PMPI_CALL_NOFAIL(int,MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag))
+UNIMPLEMENTED_WRAPPED_PMPI_CALL_NOFAIL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Topo_test,(MPI_Comm comm, int* top_type) ,(comm, top_type))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_create_darray,(int size, int rank, int ndims, int* array_of_gsizes, int* array_of_distribs, int* array_of_dargs, int* array_of_psizes,int order, MPI_Datatype oldtype, MPI_Datatype *newtype) ,(size, rank, ndims, array_of_gsizes,array_of_distribs, array_of_dargs, array_of_psizes,order,oldtype, newtype))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses, int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, MPI_Datatype *array_of_datatypes),(datatype, max_integers, max_addresses,max_datatypes, array_of_integers, array_of_addresses, array_of_datatypes))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Topo_test,(MPI_Comm comm, int* top_type) ,(comm, top_type))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_create_darray,(int size, int rank, int ndims, int* array_of_gsizes, int* array_of_distribs, int* array_of_dargs, int* array_of_psizes,int order, MPI_Datatype oldtype, MPI_Datatype *newtype) ,(size, rank, ndims, array_of_gsizes,array_of_distribs, array_of_dargs, array_of_psizes,order,oldtype, newtype))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses, int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, MPI_Datatype *array_of_datatypes),(datatype, max_integers, max_addresses,max_datatypes, array_of_integers, array_of_addresses, array_of_datatypes))
index 2153455..ab2bf56 100644 (file)
@@ -524,10 +524,9 @@ int PMPI_Test(MPI_Request * request, int *flag, MPI_Status * status)
 
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
     
 
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
     
-    *flag = simgrid::smpi::Request::test(request,status);
+    retval = simgrid::smpi::Request::test(request,status, flag);
 
     TRACE_smpi_comm_out(my_proc_id);
 
     TRACE_smpi_comm_out(my_proc_id);
-    retval = MPI_SUCCESS;
   }
   smpi_bench_begin();
   return retval;
   }
   smpi_bench_begin();
   return retval;
@@ -543,9 +542,8 @@ int PMPI_Testany(int count, MPI_Request requests[], int *index, int *flag, MPI_S
   } else {
     int my_proc_id = simgrid::s4u::this_actor::get_pid();
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testany"));
   } else {
     int my_proc_id = simgrid::s4u::this_actor::get_pid();
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testany"));
-    *flag = simgrid::smpi::Request::testany(count, requests, index, status);
+    retval = simgrid::smpi::Request::testany(count, requests, index, flag, status);
     TRACE_smpi_comm_out(my_proc_id);
     TRACE_smpi_comm_out(my_proc_id);
-    retval = MPI_SUCCESS;
   }
   smpi_bench_begin();
   return retval;
   }
   smpi_bench_begin();
   return retval;
@@ -561,9 +559,8 @@ int PMPI_Testall(int count, MPI_Request* requests, int* flag, MPI_Status* status
   } else {
     int my_proc_id = simgrid::s4u::this_actor::get_pid();
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testall"));
   } else {
     int my_proc_id = simgrid::s4u::this_actor::get_pid();
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testall"));
-    *flag = simgrid::smpi::Request::testall(count, requests, statuses);
+    retval = simgrid::smpi::Request::testall(count, requests, flag, statuses);
     TRACE_smpi_comm_out(my_proc_id);
     TRACE_smpi_comm_out(my_proc_id);
-    retval = MPI_SUCCESS;
   }
   smpi_bench_begin();
   return retval;
   }
   smpi_bench_begin();
   return retval;
@@ -579,9 +576,8 @@ int PMPI_Testsome(int incount, MPI_Request requests[], int* outcount, int* indic
   } else {
     int my_proc_id = simgrid::s4u::this_actor::get_pid();
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testsome"));
   } else {
     int my_proc_id = simgrid::s4u::this_actor::get_pid();
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("testsome"));
-    *outcount = simgrid::smpi::Request::testsome(incount, requests, indices, status);
+    retval = simgrid::smpi::Request::testsome(incount, requests, outcount, indices, status);
     TRACE_smpi_comm_out(my_proc_id);
     TRACE_smpi_comm_out(my_proc_id);
-    retval    = MPI_SUCCESS;
   }
   smpi_bench_begin();
   return retval;
   }
   smpi_bench_begin();
   return retval;
@@ -662,7 +658,8 @@ int PMPI_Wait(MPI_Request * request, MPI_Status * status)
   } else {
     //for tracing, save the handle which might get overriden before we can use the helper on it
     MPI_Request savedreq = *request;
   } else {
     //for tracing, save the handle which might get overriden before we can use the helper on it
     MPI_Request savedreq = *request;
-    if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED))
+    if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED)
+    && not(savedreq->flags() & MPI_REQ_GENERALIZED))
       savedreq->ref();//don't erase te handle in Request::wait, we'll need it later
     else
       savedreq = MPI_REQUEST_NULL;
       savedreq->ref();//don't erase te handle in Request::wait, we'll need it later
     else
       savedreq = MPI_REQUEST_NULL;
@@ -673,8 +670,7 @@ int PMPI_Wait(MPI_Request * request, MPI_Status * status)
     TRACE_smpi_comm_in(my_proc_id, __func__,
                        new simgrid::instr::WaitTIData((*request)->src(), (*request)->dst(), (*request)->tag()));
 
     TRACE_smpi_comm_in(my_proc_id, __func__,
                        new simgrid::instr::WaitTIData((*request)->src(), (*request)->dst(), (*request)->tag()));
 
-    simgrid::smpi::Request::wait(request, status);
-    retval = MPI_SUCCESS;
+    retval = simgrid::smpi::Request::wait(request, status);
 
     //the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
     TRACE_smpi_comm_out(my_proc_id);
 
     //the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
     TRACE_smpi_comm_out(my_proc_id);
@@ -793,6 +789,15 @@ int PMPI_Test_cancelled(MPI_Status* status, int* flag){
   return MPI_SUCCESS;  
 }
 
   return MPI_SUCCESS;  
 }
 
+
+int PMPI_Grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request){
+  return simgrid::smpi::Request::grequest_start(query_fn, free_fn,cancel_fn, extra_state, request);
+}
+
+int PMPI_Grequest_complete( MPI_Request request){
+  return simgrid::smpi::Request::grequest_complete(request);
+}
+
 MPI_Request PMPI_Request_f2c(MPI_Fint request){
   return static_cast<MPI_Request>(simgrid::smpi::Request::f2c(request));
 }
 MPI_Request PMPI_Request_f2c(MPI_Fint request){
   return static_cast<MPI_Request>(simgrid::smpi::Request::f2c(request));
 }
index 4c6cf91..9940976 100644 (file)
@@ -24,6 +24,8 @@ constexpr unsigned MPI_REQ_PREPARED       = 0x80;
 constexpr unsigned MPI_REQ_FINISHED       = 0x100;
 constexpr unsigned MPI_REQ_RMA            = 0x200;
 constexpr unsigned MPI_REQ_ACCUMULATE     = 0x400;
 constexpr unsigned MPI_REQ_FINISHED       = 0x100;
 constexpr unsigned MPI_REQ_RMA            = 0x200;
 constexpr unsigned MPI_REQ_ACCUMULATE     = 0x400;
+constexpr unsigned MPI_REQ_GENERALIZED    = 0x800;
+constexpr unsigned MPI_REQ_COMPLETE       = 0x1000;
 
 enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED, FINALIZED };
 
 
 enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED, FINALIZED };
 
index 4876237..0c166fd 100644 (file)
 namespace simgrid{
 namespace smpi{
 
 namespace simgrid{
 namespace smpi{
 
+typedef struct s_smpi_mpi_generalized_request_funcs {
+  MPI_Grequest_query_function *query_fn;
+  MPI_Grequest_free_function *free_fn;
+  MPI_Grequest_cancel_function *cancel_fn;
+  void* extra_state;
+  s4u::ConditionVariablePtr cond;
+  s4u::MutexPtr mutex;
+} s_smpi_mpi_generalized_request_funcs_t; 
+typedef struct s_smpi_mpi_generalized_request_funcs *smpi_mpi_generalized_request_funcs;
+
 class Request : public F2C {
   void* buf_;
   /* in the case of non-contiguous memory the user address should be keep
 class Request : public F2C {
   void* buf_;
   /* in the case of non-contiguous memory the user address should be keep
@@ -38,6 +48,7 @@ class Request : public F2C {
   int refcount_;
   MPI_Op op_;
   int cancelled_;
   int refcount_;
   MPI_Op op_;
   int cancelled_;
+  smpi_mpi_generalized_request_funcs generalized_funcs;
 
 public:
   Request() = default;
 
 public:
   Request() = default;
@@ -57,7 +68,7 @@ public:
   void ref();
   static void finish_wait(MPI_Request* request, MPI_Status* status);
   static void unref(MPI_Request* request);
   void ref();
   static void finish_wait(MPI_Request* request, MPI_Status* status);
   static void unref(MPI_Request* request);
-  static void wait(MPI_Request* req, MPI_Status* status);
+  static int wait(MPI_Request* req, MPI_Status* status);
   static MPI_Request send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
   static MPI_Request isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
   static MPI_Request ssend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
   static MPI_Request send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
   static MPI_Request isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
   static MPI_Request ssend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm);
@@ -80,10 +91,10 @@ public:
 
   static void startall(int count, MPI_Request* requests);
 
 
   static void startall(int count, MPI_Request* requests);
 
-  static int test(MPI_Request* request, MPI_Status* status);
-  static int testsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]);
-  static int testany(int count, MPI_Request requests[], int* index, MPI_Status* status);
-  static int testall(int count, MPI_Request requests[], MPI_Status status[]);
+  static int test(MPI_Request* request, MPI_Status* status, int* flag);
+  static int testsome(int incount, MPI_Request requests[], int* outcounts, int* indices, MPI_Status status[]);
+  static int testany(int count, MPI_Request requests[], int* index, int* flag, MPI_Status* status);
+  static int testall(int count, MPI_Request requests[], int* flag, MPI_Status status[]);
 
   static void probe(int source, int tag, MPI_Comm comm, MPI_Status* status);
   static void iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status);
 
   static void probe(int source, int tag, MPI_Comm comm, MPI_Status* status);
   static void iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status);
@@ -95,6 +106,8 @@ public:
   static int match_send(void* a, void* b, kernel::activity::CommImpl* ignored);
   static int match_recv(void* a, void* b, kernel::activity::CommImpl* ignored);
 
   static int match_send(void* a, void* b, kernel::activity::CommImpl* ignored);
   static int match_recv(void* a, void* b, kernel::activity::CommImpl* ignored);
 
+  static int grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request);
+  static int grequest_complete( MPI_Request request);
   int add_f() override;
   static void free_f(int id);
   static Request* f2c(int);
   int add_f() override;
   static void free_f(int id);
   static Request* f2c(int);
index 878b3b1..9d3aae3 100644 (file)
@@ -493,7 +493,8 @@ void TestAction::kernel(simgrid::xbt::ReplayAction&)
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
 
     MPI_Status status;
     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
 
     MPI_Status status;
-    int flag = Request::test(&request, &status);
+    int flag = 0;
+    Request::test(&request, &status, &flag);
 
     XBT_DEBUG("MPI_Test result: %d", flag);
     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
 
     XBT_DEBUG("MPI_Test result: %d", flag);
     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
index 759f0f5..b398ff0 100644 (file)
@@ -2,7 +2,8 @@
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
-
+#include "simgrid/s4u/Mutex.hpp"
+#include "simgrid/s4u/ConditionVariable.hpp"
 #include "smpi_request.hpp"
 
 #include "mc/mc.h"
 #include "smpi_request.hpp"
 
 #include "mc/mc.h"
@@ -18,6 +19,7 @@
 #include "src/smpi/include/smpi_actor.hpp"
 #include "xbt/config.hpp"
 
 #include "src/smpi/include/smpi_actor.hpp"
 #include "xbt/config.hpp"
 
+
 #include <algorithm>
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
 #include <algorithm>
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
@@ -84,11 +86,15 @@ void Request::unref(MPI_Request* request)
       xbt_die("Whoops, wrong refcount");
     }
     if((*request)->refcount_==0){
       xbt_die("Whoops, wrong refcount");
     }
     if((*request)->refcount_==0){
-        Datatype::unref((*request)->old_type_);
+      if ((*request)->flags_ & MPI_REQ_GENERALIZED){
+        ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
+      }else{
         Comm::unref((*request)->comm_);
         Comm::unref((*request)->comm_);
-        (*request)->print_request("Destroying");
-        delete *request;
-        *request = MPI_REQUEST_NULL;
+        Datatype::unref((*request)->old_type_);
+      }
+      (*request)->print_request("Destroying");
+      delete *request;
+      *request = MPI_REQUEST_NULL;
     }else{
       (*request)->print_request("Decrementing");
     }
     }else{
       (*request)->print_request("Decrementing");
     }
@@ -506,28 +512,46 @@ void Request::cancel()
     (boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(this->action_))->cancel();
 }
 
     (boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(this->action_))->cancel();
 }
 
-int Request::test(MPI_Request * request, MPI_Status * status) {
+int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
   // to avoid deadlocks if used as a break condition, such as
   //     while (MPI_Test(request, flag, status) && flag) dostuff...
   // because the time will not normally advance when only calls to MPI_Test are made -> deadlock
   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
   static int nsleeps = 1;
   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
   // to avoid deadlocks if used as a break condition, such as
   //     while (MPI_Test(request, flag, status) && flag) dostuff...
   // because the time will not normally advance when only calls to MPI_Test are made -> deadlock
   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
   static int nsleeps = 1;
+  int ret = MPI_SUCCESS;
   if(smpi_test_sleep > 0)
     simcall_process_sleep(nsleeps*smpi_test_sleep);
 
   if(smpi_test_sleep > 0)
     simcall_process_sleep(nsleeps*smpi_test_sleep);
 
+  MPI_Status* mystatus;
   Status::empty(status);
   Status::empty(status);
-  int flag = 1;
+  *flag = 1;
   if (((*request)->flags_ & MPI_REQ_PREPARED) == 0) {
     if ((*request)->action_ != nullptr){
       try{
   if (((*request)->flags_ & MPI_REQ_PREPARED) == 0) {
     if ((*request)->action_ != nullptr){
       try{
-        flag = simcall_comm_test((*request)->action_);
+        *flag = simcall_comm_test((*request)->action_);
       }catch (xbt_ex& e) {
       }catch (xbt_ex& e) {
-        return 0;
+        *flag = 0;
+        return ret;
       }
     }
       }
     }
-    if (flag) {
+    if (*request != MPI_REQUEST_NULL && 
+        ((*request)->flags_ & MPI_REQ_GENERALIZED)
+        && !((*request)->flags_ & MPI_REQ_COMPLETE)) 
+      *flag=0;
+    if (*flag) {
       finish_wait(request,status);
       finish_wait(request,status);
+      if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+        if(status==MPI_STATUS_IGNORE){
+          mystatus=new MPI_Status();
+          Status::empty(mystatus);
+        }else{
+          mystatus=status;
+        }
+        ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
+        if(status==MPI_STATUS_IGNORE) 
+          delete mystatus;
+      }
       nsleeps=1;//reset the number of sleeps we will do next time
       if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0)
         *request = MPI_REQUEST_NULL;
       nsleeps=1;//reset the number of sleeps we will do next time
       if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0)
         *request = MPI_REQUEST_NULL;
@@ -535,21 +559,27 @@ int Request::test(MPI_Request * request, MPI_Status * status) {
       nsleeps++;
     }
   }
       nsleeps++;
     }
   }
-  return flag;
+  return ret;
 }
 
 }
 
-int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
+int Request::testsome(int incount, MPI_Request requests[], int *count, int *indices, MPI_Status status[])
 {
 {
-  int count = 0;
+  int ret = MPI_SUCCESS;
+  int error=0;
   int count_dead = 0;
   int count_dead = 0;
+  int flag = 0;
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
+  *count = 0;
   for (int i = 0; i < incount; i++) {
     if (requests[i] != MPI_REQUEST_NULL) {
   for (int i = 0; i < incount; i++) {
     if (requests[i] != MPI_REQUEST_NULL) {
-      if (test(&requests[i], pstat)) {
+      ret = test(&requests[i], pstat, &flag);
+      if(ret!=MPI_SUCCESS)
+        error = 1;
+      if(flag) {
         indices[i] = 1;
         indices[i] = 1;
-        count++;
+        (*count)++;
         if (status != MPI_STATUSES_IGNORE)
           status[i] = *pstat;
         if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
         if (status != MPI_STATUSES_IGNORE)
           status[i] = *pstat;
         if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
@@ -559,19 +589,22 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta
       count_dead++;
     }
   }
       count_dead++;
     }
   }
-  if(count_dead==incount)
-    return MPI_UNDEFINED;
-  else return count;
+  if(count_dead==incount)*count=MPI_UNDEFINED;
+  if(error!=0)
+    return MPI_ERR_IN_STATUS;
+  else
+    return MPI_SUCCESS;
 }
 
 }
 
-int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
+int Request::testany(int count, MPI_Request requests[], int *index, int* flag, MPI_Status * status)
 {
   std::vector<simgrid::kernel::activity::CommImpl*> comms;
   comms.reserve(count);
 
   int i;
 {
   std::vector<simgrid::kernel::activity::CommImpl*> comms;
   comms.reserve(count);
 
   int i;
-  int flag = 0;
-
+  *flag = 0;
+  int ret = MPI_SUCCESS;
+  MPI_Status* mystatus;
   *index = MPI_UNDEFINED;
 
   std::vector<int> map; /** Maps all matching comms back to their location in requests **/
   *index = MPI_UNDEFINED;
 
   std::vector<int> map; /** Maps all matching comms back to their location in requests **/
@@ -594,36 +627,59 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status *
     
     if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
       *index = map[i];
     
     if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
       *index = map[i];
-      finish_wait(&requests[*index],status);
-      flag             = 1;
-      nsleeps          = 1;
-      if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) {
-        requests[*index] = MPI_REQUEST_NULL;
+      if (requests[*index] != MPI_REQUEST_NULL && 
+          (requests[*index]->flags_ & MPI_REQ_GENERALIZED)
+          && !(requests[*index]->flags_ & MPI_REQ_COMPLETE)) {
+        *flag=0;
+      } else {
+        finish_wait(&requests[*index],status);
+      if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_GENERALIZED)){
+        if(status==MPI_STATUS_IGNORE){
+          mystatus=new MPI_Status();
+          Status::empty(mystatus);
+        }else{
+          mystatus=status;
+        }
+        ret=(requests[*index]->generalized_funcs)->query_fn((requests[*index]->generalized_funcs)->extra_state, mystatus);
+        if(status==MPI_STATUS_IGNORE) 
+          delete mystatus;
+      }
+
+        if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) 
+          requests[*index] = MPI_REQUEST_NULL;
+        *flag=1;
       }
       }
+      nsleeps = 1;
     } else {
       nsleeps++;
     }
   } else {
       //all requests are null or inactive, return true
     } else {
       nsleeps++;
     }
   } else {
       //all requests are null or inactive, return true
-      flag = 1;
+      *flag = 1;
       Status::empty(status);
   }
 
       Status::empty(status);
   }
 
-  return flag;
+  return ret;
 }
 
 }
 
-int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
+int Request::testall(int count, MPI_Request requests[], int* outflag, MPI_Status status[])
 {
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 {
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
-  int flag=1;
+  int flag, error=0;
+  int ret=MPI_SUCCESS;
+  *outflag = 1;
   for(int i=0; i<count; i++){
     if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
   for(int i=0; i<count; i++){
     if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
-      if (test(&requests[i], pstat)!=1){
+      ret = test(&requests[i], pstat, &flag);
+      if (flag){
         flag=0;
         flag=0;
+        requests[i]=MPI_REQUEST_NULL;
       }else{
       }else{
-          requests[i]=MPI_REQUEST_NULL;
+        *outflag=0;
       }
       }
+      if (ret != MPI_SUCCESS) 
+        error = 1;
     }else{
       Status::empty(pstat);
     }
     }else{
       Status::empty(pstat);
     }
@@ -631,7 +687,10 @@ int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
       status[i] = *pstat;
     }
   }
       status[i] = *pstat;
     }
   }
-  return flag;
+  if(error==1) 
+    return MPI_ERR_IN_STATUS;
+  else 
+    return MPI_SUCCESS;
 }
 
 void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
 }
 
 void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
@@ -720,7 +779,9 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
     return;
   }
 
     return;
   }
 
-  if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0)) && ((req->flags_ & MPI_REQ_PREPARED) == 0)) {
+  if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0)) 
+  && ((req->flags_ & MPI_REQ_PREPARED) == 0)
+  && ((req->flags_ & MPI_REQ_GENERALIZED) == 0)) {
     if(status != MPI_STATUS_IGNORE) {
       int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
       status->MPI_SOURCE = req->comm_->group()->rank(src);
     if(status != MPI_STATUS_IGNORE) {
       int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
       status->MPI_SOURCE = req->comm_->group()->rank(src);
@@ -780,12 +841,13 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
   unref(request);
 }
 
   unref(request);
 }
 
-void Request::wait(MPI_Request * request, MPI_Status * status)
+int Request::wait(MPI_Request * request, MPI_Status * status)
 {
 {
+  int ret=MPI_SUCCESS;
   (*request)->print_request("Waiting");
   if ((*request)->flags_ & MPI_REQ_PREPARED) {
     Status::empty(status);
   (*request)->print_request("Waiting");
   if ((*request)->flags_ & MPI_REQ_PREPARED) {
     Status::empty(status);
-    return;
+    return ret;
   }
 
   if ((*request)->action_ != nullptr){
   }
 
   if ((*request)->action_ != nullptr){
@@ -797,10 +859,28 @@ void Request::wait(MPI_Request * request, MPI_Status * status)
       }
   }
 
       }
   }
 
+  if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+    MPI_Status* mystatus;
+    if(!((*request)->flags_ & MPI_REQ_COMPLETE)){
+      ((*request)->generalized_funcs)->mutex->lock();
+      ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
+      ((*request)->generalized_funcs)->mutex->unlock();
+      }
+    if(status==MPI_STATUS_IGNORE){
+      mystatus=new MPI_Status();
+      Status::empty(mystatus);
+    }else{
+      mystatus=status;
+    }
+    ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
+    if(status==MPI_STATUS_IGNORE) 
+      delete mystatus;
+  }
 
   finish_wait(request,status);
   if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
     *request = MPI_REQUEST_NULL;
 
   finish_wait(request,status);
   if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
     *request = MPI_REQUEST_NULL;
+  return ret;
 }
 
 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
 }
 
 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
@@ -922,21 +1002,30 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
 int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
 {
   int count = 0;
 int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
 {
   int count = 0;
+  int flag = 0;
+  int index = 0;
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
 
+  index = waitany(incount, (MPI_Request*)requests, pstat);
+  if(index==MPI_UNDEFINED) return MPI_UNDEFINED;
+  if(status != MPI_STATUSES_IGNORE) {
+    status[count] = *pstat;
+  }
+  indices[count] = index;
+  count++;
   for (int i = 0; i < incount; i++) {
   for (int i = 0; i < incount; i++) {
-    int index = waitany(incount, requests, pstat);
-    if(index!=MPI_UNDEFINED){
-      indices[count] = index;
-      count++;
-      if(status != MPI_STATUSES_IGNORE) {
-        status[index] = *pstat;
+    if((requests[i] != MPI_REQUEST_NULL)) {
+      test(&requests[i], pstat,&flag);
+      if (flag==1){
+        indices[count] = i;
+        if(status != MPI_STATUSES_IGNORE) {
+          status[count] = *pstat;
+        }
+        if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
+          requests[i]=MPI_REQUEST_NULL;
+        count++;
       }
       }
-      if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
-        requests[index] = MPI_REQUEST_NULL;
-    }else{
-      return MPI_UNDEFINED;
     }
   }
   return count;
     }
   }
   return count;
@@ -968,5 +1057,32 @@ void Request::free_f(int id)
   }
 }
 
   }
 }
 
+
+int Request::grequest_start( MPI_Grequest_query_function *query_fn, MPI_Grequest_free_function *free_fn, MPI_Grequest_cancel_function *cancel_fn, void *extra_state, MPI_Request *request){
+
+  *request = new Request();
+  (*request)->flags_ |= MPI_REQ_GENERALIZED;
+  (*request)->flags_ |= MPI_REQ_PERSISTENT;
+  (*request)->refcount_ = 1;
+  ((*request)->generalized_funcs)=xbt_new0(s_smpi_mpi_generalized_request_funcs_t ,1);
+  ((*request)->generalized_funcs)->query_fn=query_fn;
+  ((*request)->generalized_funcs)->free_fn=free_fn;
+  ((*request)->generalized_funcs)->cancel_fn=cancel_fn;
+  ((*request)->generalized_funcs)->extra_state=extra_state;
+  ((*request)->generalized_funcs)->cond = simgrid::s4u::ConditionVariable::create();
+  ((*request)->generalized_funcs)->mutex = simgrid::s4u::Mutex::create();
+  return MPI_SUCCESS;
+}
+
+int Request::grequest_complete( MPI_Request request){
+  if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex==NULL) 
+    return MPI_ERR_REQUEST;
+  request->generalized_funcs->mutex->lock();
+  request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
+  request->generalized_funcs->cond->notify_one();
+  request->generalized_funcs->mutex->unlock();
+  return MPI_SUCCESS;
+}
+
 }
 }
 }
 }
index 7c537a4..6fef696 100644 (file)
@@ -10,9 +10,9 @@ if(enable_smpi AND enable_smpi_MPICH3_testsuite)
   include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../include/")
 
   foreach(file anyall bottom eagerdt huge_anysrc huge_underflow inactivereq isendself isendirecv isendselfprobe issendselfcancel cancelanysrc pingping probenull
   include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../include/")
 
   foreach(file anyall bottom eagerdt huge_anysrc huge_underflow inactivereq isendself isendirecv isendselfprobe issendselfcancel cancelanysrc pingping probenull
-          dtype_send probe-unexp sendall sendflood sendrecv1 sendrecv2 sendrecv3 waitany-null waittestnull many_isend manylmt recv_any scancel scancel2 rcancel)
+          dtype_send greq1 probe-unexp sendall sendflood sendrecv1 sendrecv2 sendrecv3 waitany-null waittestnull many_isend manylmt recv_any scancel scancel2 rcancel)
     # not compiled files: big_count_status bsend1 bsend2 bsend3 bsend4 bsend5 bsendalign bsendfrag bsendpending mprobe
     # not compiled files: big_count_status bsend1 bsend2 bsend3 bsend4 bsend5 bsendalign bsendfrag bsendpending mprobe
-    # cancelrecv greq1 icsend large_message pscancel  rqfreeb rqstatus  sendself scancel_unmatch
+    # cancelrecv  icsend large_message pscancel  rqfreeb rqstatus  sendself scancel_unmatch
     add_executable(${file} EXCLUDE_FROM_ALL ${file}.c)
     add_dependencies(tests ${file})
     target_link_libraries(${file} simgrid mtest_c)
     add_executable(${file} EXCLUDE_FROM_ALL ${file}.c)
     add_dependencies(tests ${file})
     target_link_libraries(${file} simgrid mtest_c)
index 80e84cd..030f2ec 100644 (file)
@@ -29,7 +29,7 @@ isendirecv 10
 #needs MPI_Pack, MPI_Buffer_attach, MPI_Buffer_detach, MPI_Irsend, MPI_Ibsend
 #rqfreeb 4
 #needs MPI_Grequest_start MPI_Grequest_complete
 #needs MPI_Pack, MPI_Buffer_attach, MPI_Buffer_detach, MPI_Irsend, MPI_Ibsend
 #rqfreeb 4
 #needs MPI_Grequest_start MPI_Grequest_complete
-#greq1 1
+greq1 1
 probe-unexp 4
 probenull 1
 # For testing, scancel will run with 1 process as well
 probe-unexp 4
 probenull 1
 # For testing, scancel will run with 1 process as well