Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix for [#136] on github.
[simgrid.git] / src / smpi / smpi_base.cpp
index 4960bd4..cdc04e8 100644 (file)
@@ -343,7 +343,7 @@ void smpi_mpi_start(MPI_Request request)
 
     int rank = request->src;
     if (TRACE_smpi_view_internals()) {
-      TRACE_smpi_send(rank, rank, receiver,request->size);
+      TRACE_smpi_send(rank, rank, receiver, request->tag, request->size);
     }
     print_request("New send", request);
 
@@ -645,7 +645,7 @@ static void finish_wait(MPI_Request * request, MPI_Status * status)
   if (TRACE_smpi_view_internals() && ((req->flags & RECV) != 0)){
     int rank = smpi_process_index();
     int src_traced = (req->src == MPI_ANY_SOURCE ? req->real_src : req->src);
-    TRACE_smpi_recv(rank, src_traced, rank);
+    TRACE_smpi_recv(rank, src_traced, rank,req->tag);
   }
 
   if(req->detached_sender != nullptr){
@@ -686,7 +686,7 @@ int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
       nsleeps=1;//reset the number of sleeps we will do next time
       if (*request != MPI_REQUEST_NULL && ((*request)->flags & PERSISTENT)==0)
       *request = MPI_REQUEST_NULL;
-    }else{
+    } else if (xbt_cfg_get_boolean("smpi/grow-injected-times")){
       nsleeps++;
     }
   }
@@ -812,11 +812,10 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   }
   else {
     *flag = 0;
-    nsleeps++;
+    if (xbt_cfg_get_boolean("smpi/grow-injected-times"))
+      nsleeps++;
   }
   smpi_mpi_request_free(&request);
-
-  return;
 }
 
 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
@@ -836,9 +835,15 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
     *request = MPI_REQUEST_NULL;
 }
 
+static int sort_accumulates(const void* pa, const void* pb)
+{
+  return (*static_cast<MPI_Request const*>(pa))->tag>
+                (*static_cast<MPI_Request const*>(pb))->tag;
+}
+
 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
-  xbt_dynar_t comms;
+  s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
   int i;
   int size = 0;
   int index = MPI_UNDEFINED;
@@ -846,14 +851,14 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
   if(count > 0) {
     // Wait for a request to complete
-    comms = xbt_dynar_new(sizeof(smx_activity_t), nullptr);
+    xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr);
     map = xbt_new(int, count);
     XBT_DEBUG("Wait for one of %d", count);
     for(i = 0; i < count; i++) {
       if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED) && !(requests[i]->flags & FINISHED)) {
         if (requests[i]->action != nullptr) {
           XBT_DEBUG("Waiting any %p ", requests[i]);
-          xbt_dynar_push(comms, &requests[i]->action);
+          xbt_dynar_push(&comms, &requests[i]->action);
           map[size] = i;
           size++;
         }else{
@@ -868,18 +873,25 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
       }
     }
     if(size > 0) {
-      i = simcall_comm_waitany(comms, -1);
+      i = simcall_comm_waitany(&comms, -1);
 
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {
         index = map[i];
-        finish_wait(&requests[index], status);
-        if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
-        requests[index] = MPI_REQUEST_NULL;
+        //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
+        if ((requests[index] == MPI_REQUEST_NULL)
+             ||  (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){
+          finish_wait(&requests[index], status);
+          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+            requests[index] = MPI_REQUEST_NULL;
+        }else{
+            XBT_WARN("huu?");
+        }
       }
     }
+
+    xbt_dynar_free_data(&comms);
     xbt_free(map);
-    xbt_dynar_free(&comms);
   }
 
   if (index==MPI_UNDEFINED)
@@ -890,6 +902,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
 int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
 {
+  s_xbt_dynar_t accumulates;
   int  index, c;
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
@@ -905,6 +918,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       }
     }
   }
+  xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr);
   for(c = 0; c < count; c++) {
 
     if (MC_is_active() || MC_record_replay_is_active()) {
@@ -914,8 +928,14 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       index = smpi_mpi_waitany(count, requests, pstat);
       if (index == MPI_UNDEFINED)
         break;
+
+      if (requests[index] != MPI_REQUEST_NULL
+           && (requests[index]->flags & RECV)
+           && (requests[index]->flags & ACCUMULATE))
+        xbt_dynar_push(&accumulates, &requests[index]);
       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
       requests[index]=MPI_REQUEST_NULL;
+
     }
     if (status != MPI_STATUSES_IGNORE) {
       status[index] = *pstat;
@@ -924,6 +944,16 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
     }
   }
 
+  if(!xbt_dynar_is_empty(&accumulates)){
+    xbt_dynar_sort(&accumulates, sort_accumulates);
+    MPI_Request req;
+    unsigned int cursor;
+    xbt_dynar_foreach(&accumulates, cursor, req) {
+      finish_wait(&req, status);
+    }
+  }
+  xbt_dynar_free_data(&accumulates);
+
   return retvalue;
 }