Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a way to ensure MPI_Accumulate are applied in order, as per the *** standard
authordegomme <augustin.degomme@unibas.ch>
Sun, 12 Feb 2017 00:21:11 +0000 (01:21 +0100)
committerdegomme <augustin.degomme@unibas.ch>
Sun, 12 Feb 2017 01:08:28 +0000 (02:08 +0100)
src/smpi/smpi_base.cpp
src/smpi/smpi_rma.cpp

index ece561e..cdc04e8 100644 (file)
@@ -835,6 +835,12 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
     *request = MPI_REQUEST_NULL;
 }
 
+static int sort_accumulates(const void* pa, const void* pb)
+{
+  return (*static_cast<MPI_Request const*>(pa))->tag>
+                (*static_cast<MPI_Request const*>(pb))->tag;
+}
+
 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
   s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
@@ -872,11 +878,18 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {
         index = map[i];
-        finish_wait(&requests[index], status);
-        if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
-        requests[index] = MPI_REQUEST_NULL;
+        //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
+        if ((requests[index] == MPI_REQUEST_NULL)
+             ||  (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){
+          finish_wait(&requests[index], status);
+          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+            requests[index] = MPI_REQUEST_NULL;
+        }else{
+            XBT_WARN("huu?");
+        }
       }
     }
+
     xbt_dynar_free_data(&comms);
     xbt_free(map);
   }
@@ -889,6 +902,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
 int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
 {
+  s_xbt_dynar_t accumulates;
   int  index, c;
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
@@ -904,6 +918,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       }
     }
   }
+  xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr);
   for(c = 0; c < count; c++) {
 
     if (MC_is_active() || MC_record_replay_is_active()) {
@@ -913,8 +928,14 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       index = smpi_mpi_waitany(count, requests, pstat);
       if (index == MPI_UNDEFINED)
         break;
+
+      if (requests[index] != MPI_REQUEST_NULL
+           && (requests[index]->flags & RECV)
+           && (requests[index]->flags & ACCUMULATE))
+        xbt_dynar_push(&accumulates, &requests[index]);
       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
       requests[index]=MPI_REQUEST_NULL;
+
     }
     if (status != MPI_STATUSES_IGNORE) {
       status[index] = *pstat;
@@ -923,6 +944,16 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
     }
   }
 
+  if(!xbt_dynar_is_empty(&accumulates)){
+    xbt_dynar_sort(&accumulates, sort_accumulates);
+    MPI_Request req;
+    unsigned int cursor;
+    xbt_dynar_foreach(&accumulates, cursor, req) {
+      finish_wait(&req, status);
+    }
+  }
+  xbt_dynar_free_data(&accumulates);
+
   return retvalue;
 }
 
index def9c73..68d8507 100644 (file)
@@ -24,6 +24,7 @@ typedef struct s_smpi_mpi_win{
   char* name;
   int opened;
   MPI_Group group;
+  int count; //for ordering the accs
 } s_smpi_mpi_win_t;
 
 
@@ -49,7 +50,7 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info
   win->requests = new std::vector<MPI_Request>();
   win->connected_wins = xbt_new0(MPI_Win, comm_size);
   win->connected_wins[rank] = win;
-
+  win->count = 0;
   if(rank==0){
     win->bar = MSG_barrier_init(comm_size);
   }
@@ -123,6 +124,7 @@ int smpi_mpi_win_fence( int assert,  MPI_Win win){
 
     MPI_Request* treqs = &(*reqs)[0];
     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
+    win->count=0;
   }
   win->assert = assert;
 
@@ -219,14 +221,16 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
 
   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
-
+    //As the tag will be used for ordering of the operations, add count to it
     //prepare send_request
     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
-        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, win->comm, op);
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
 
     //prepare receiver request
     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
-        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, recv_win->comm, op);
+        smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
+
+    win->count++;
     //push request to receiver's win
     recv_win->requests->push_back(rreq);
     //start send