Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / src / smpi / smpi_base.cpp
index b7af999..aeb6422 100644 (file)
@@ -816,8 +816,6 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
       nsleeps++;
   }
   smpi_mpi_request_free(&request);
-
-  return;
 }
 
 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
@@ -837,9 +835,14 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
     *request = MPI_REQUEST_NULL;
 }
 
+static int sort_accumulates(MPI_Request a, MPI_Request b)
+{
+  return (a->tag < b->tag);
+}
+
 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
-  xbt_dynar_t comms;
+  s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
   int i;
   int size = 0;
   int index = MPI_UNDEFINED;
@@ -847,40 +850,47 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
   if(count > 0) {
     // Wait for a request to complete
-    comms = xbt_dynar_new(sizeof(smx_activity_t), nullptr);
+    xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr);
     map = xbt_new(int, count);
     XBT_DEBUG("Wait for one of %d", count);
     for(i = 0; i < count; i++) {
       if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED) && !(requests[i]->flags & FINISHED)) {
         if (requests[i]->action != nullptr) {
           XBT_DEBUG("Waiting any %p ", requests[i]);
-          xbt_dynar_push(comms, &requests[i]->action);
+          xbt_dynar_push(&comms, &requests[i]->action);
           map[size] = i;
           size++;
-        }else{
-         //This is a finished detached request, let's return this one
-         size=0;//so we free the dynar but don't do the waitany call
-         index=i;
-         finish_wait(&requests[i], status);//cleanup if refcount = 0
-         if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
-         requests[i]=MPI_REQUEST_NULL;//set to null
-         break;
-         }
+        } else {
+          // This is a finished detached request, let's return this one
+          size  = 0; // so we free the dynar but don't do the waitany call
+          index = i;
+          finish_wait(&requests[i], status); // cleanup if refcount = 0
+          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+            requests[i] = MPI_REQUEST_NULL; // set to null
+          break;
+        }
       }
     }
     if(size > 0) {
-      i = simcall_comm_waitany(comms, -1);
+      i = simcall_comm_waitany(&comms, -1);
 
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {
         index = map[i];
-        finish_wait(&requests[index], status);
-        if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
-        requests[index] = MPI_REQUEST_NULL;
+        //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
+        if ((requests[index] == MPI_REQUEST_NULL)
+             ||  (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){
+          finish_wait(&requests[index], status);
+          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+            requests[index] = MPI_REQUEST_NULL;
+        }else{
+            XBT_WARN("huu?");
+        }
       }
     }
+
+    xbt_dynar_free_data(&comms);
     xbt_free(map);
-    xbt_dynar_free(&comms);
   }
 
   if (index==MPI_UNDEFINED)
@@ -891,13 +901,14 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
 int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
 {
-  int  index, c;
+  std::vector<MPI_Request> accumulates;
+  int index;
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
   int retvalue = MPI_SUCCESS;
   //tag invalid requests in the set
   if (status != MPI_STATUSES_IGNORE) {
-    for (c = 0; c < count; c++) {
+    for (int c = 0; c < count; c++) {
       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL || (requests[c]->flags & PREPARED)) {
         smpi_empty_status(&status[c]);
       } else if (requests[c]->src == MPI_PROC_NULL) {
@@ -906,8 +917,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       }
     }
   }
-  for(c = 0; c < count; c++) {
-
+  for (int c = 0; c < count; c++) {
     if (MC_is_active() || MC_record_replay_is_active()) {
       smpi_mpi_wait(&requests[c], pstat);
       index = c;
@@ -915,8 +925,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       index = smpi_mpi_waitany(count, requests, pstat);
       if (index == MPI_UNDEFINED)
         break;
+
+      if (requests[index] != MPI_REQUEST_NULL
+           && (requests[index]->flags & RECV)
+           && (requests[index]->flags & ACCUMULATE))
+        accumulates.push_back(requests[index]);
       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
-      requests[index]=MPI_REQUEST_NULL;
+        requests[index] = MPI_REQUEST_NULL;
     }
     if (status != MPI_STATUSES_IGNORE) {
       status[index] = *pstat;
@@ -925,6 +940,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
     }
   }
 
+  if (!accumulates.empty()) {
+    std::sort(accumulates.begin(), accumulates.end(), sort_accumulates);
+    for (auto req : accumulates) {
+      finish_wait(&req, status);
+    }
+  }
+
   return retvalue;
 }