Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Updated/elaborated on comment about iprobes
[simgrid.git] / src / smpi / smpi_base.cpp
index 6564ccf..b64113c 100644 (file)
 #include "simgrid/sg_config.h"
 #include "smpi/smpi_utils.hpp"
 #include "colls/colls.h"
+#include <simgrid/s4u/host.hpp>
 
 #include "src/kernel/activity/SynchroComm.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
 
+extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
+
+
 static int match_recv(void* a, void* b, smx_activity_t ignored) {
   MPI_Request ref = static_cast<MPI_Request>(a);
   MPI_Request req = static_cast<MPI_Request>(b);
@@ -332,7 +336,7 @@ void smpi_mpi_start(MPI_Request request)
     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
     request->real_size=request->size;
     request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf, &request->real_size, &match_recv,
-                                         ! smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+                                         ! smpi_process_get_replaying()? smpi_comm_copy_data_callback
                                          : &smpi_comm_null_copy_buffer_callback, request, -1.0);
     XBT_DEBUG("recv simcall posted");
 
@@ -343,7 +347,7 @@ void smpi_mpi_start(MPI_Request request)
 
     int rank = request->src;
     if (TRACE_smpi_view_internals()) {
-      TRACE_smpi_send(rank, rank, receiver,request->size);
+      TRACE_smpi_send(rank, rank, receiver, request->tag, request->size);
     }
     print_request("New send", request);
 
@@ -422,7 +426,7 @@ void smpi_mpi_start(MPI_Request request)
     request->action = simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0,
                                          buf, request->real_size, &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
-                         !smpi_process_get_replaying() ? &smpi_comm_copy_buffer_callback
+                         !smpi_process_get_replaying() ? smpi_comm_copy_data_callback
                          : &smpi_comm_null_copy_buffer_callback, request,
                          // detach if msg size < eager/rdv switch limit
                          request->detached);
@@ -645,7 +649,7 @@ static void finish_wait(MPI_Request * request, MPI_Status * status)
   if (TRACE_smpi_view_internals() && ((req->flags & RECV) != 0)){
     int rank = smpi_process_index();
     int src_traced = (req->src == MPI_ANY_SOURCE ? req->real_src : req->src);
-    TRACE_smpi_recv(rank, src_traced, rank);
+    TRACE_smpi_recv(rank, src_traced, rank,req->tag);
   }
 
   if(req->detached_sender != nullptr){
@@ -686,7 +690,7 @@ int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
       nsleeps=1;//reset the number of sleeps we will do next time
       if (*request != MPI_REQUEST_NULL && ((*request)->flags & PERSISTENT)==0)
       *request = MPI_REQUEST_NULL;
-    }else{
+    } else if (xbt_cfg_get_boolean("smpi/grow-injected-times")){
       nsleeps++;
     }
   }
@@ -775,10 +779,15 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
 
   // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
   // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... )
-  // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+  // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+  // (This can speed up the execution of certain applications by an order of magnitude, such as HPL)
   static int nsleeps = 1;
-  if(smpi_iprobe_sleep > 0)  
-    simcall_process_sleep(nsleeps*smpi_iprobe_sleep);
+  double speed       = simgrid::s4u::Actor::self()->host()->speed();
+  double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage");
+  if (smpi_iprobe_sleep > 0) {
+    smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed);
+    simcall_execution_wait(iprobe_sleep);
+  }
   // behave like a receive, but don't do it
   smx_mailbox_t mailbox;
 
@@ -812,11 +821,10 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   }
   else {
     *flag = 0;
-    nsleeps++;
+    if (xbt_cfg_get_boolean("smpi/grow-injected-times"))
+      nsleeps++;
   }
   smpi_mpi_request_free(&request);
-
-  return;
 }
 
 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
@@ -836,9 +844,14 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
     *request = MPI_REQUEST_NULL;
 }
 
+static int sort_accumulates(MPI_Request a, MPI_Request b)
+{
+  return (a->tag < b->tag);
+}
+
 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
-  xbt_dynar_t comms;
+  s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
   int i;
   int size = 0;
   int index = MPI_UNDEFINED;
@@ -846,40 +859,47 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
   if(count > 0) {
     // Wait for a request to complete
-    comms = xbt_dynar_new(sizeof(smx_activity_t), nullptr);
+    xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr);
     map = xbt_new(int, count);
     XBT_DEBUG("Wait for one of %d", count);
     for(i = 0; i < count; i++) {
       if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED) && !(requests[i]->flags & FINISHED)) {
         if (requests[i]->action != nullptr) {
           XBT_DEBUG("Waiting any %p ", requests[i]);
-          xbt_dynar_push(comms, &requests[i]->action);
+          xbt_dynar_push(&comms, &requests[i]->action);
           map[size] = i;
           size++;
-        }else{
-         //This is a finished detached request, let's return this one
-         size=0;//so we free the dynar but don't do the waitany call
-         index=i;
-         finish_wait(&requests[i], status);//cleanup if refcount = 0
-         if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
-         requests[i]=MPI_REQUEST_NULL;//set to null
-         break;
-         }
+        } else {
+          // This is a finished detached request, let's return this one
+          size  = 0; // so we free the dynar but don't do the waitany call
+          index = i;
+          finish_wait(&requests[i], status); // cleanup if refcount = 0
+          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+            requests[i] = MPI_REQUEST_NULL; // set to null
+          break;
+        }
       }
     }
     if(size > 0) {
-      i = simcall_comm_waitany(comms, -1);
+      i = simcall_comm_waitany(&comms, -1);
 
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {
         index = map[i];
-        finish_wait(&requests[index], status);
-        if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
-        requests[index] = MPI_REQUEST_NULL;
+        //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
+        if ((requests[index] == MPI_REQUEST_NULL)
+             ||  (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){
+          finish_wait(&requests[index], status);
+          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
+            requests[index] = MPI_REQUEST_NULL;
+        }else{
+            XBT_WARN("huu?");
+        }
       }
     }
+
+    xbt_dynar_free_data(&comms);
     xbt_free(map);
-    xbt_dynar_free(&comms);
   }
 
   if (index==MPI_UNDEFINED)
@@ -890,13 +910,14 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
 
 int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
 {
-  int  index, c;
+  std::vector<MPI_Request> accumulates;
+  int index;
   MPI_Status stat;
   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
   int retvalue = MPI_SUCCESS;
   //tag invalid requests in the set
   if (status != MPI_STATUSES_IGNORE) {
-    for (c = 0; c < count; c++) {
+    for (int c = 0; c < count; c++) {
       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL || (requests[c]->flags & PREPARED)) {
         smpi_empty_status(&status[c]);
       } else if (requests[c]->src == MPI_PROC_NULL) {
@@ -905,8 +926,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       }
     }
   }
-  for(c = 0; c < count; c++) {
-
+  for (int c = 0; c < count; c++) {
     if (MC_is_active() || MC_record_replay_is_active()) {
       smpi_mpi_wait(&requests[c], pstat);
       index = c;
@@ -914,8 +934,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
       index = smpi_mpi_waitany(count, requests, pstat);
       if (index == MPI_UNDEFINED)
         break;
+
+      if (requests[index] != MPI_REQUEST_NULL
+           && (requests[index]->flags & RECV)
+           && (requests[index]->flags & ACCUMULATE))
+        accumulates.push_back(requests[index]);
       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
-      requests[index]=MPI_REQUEST_NULL;
+        requests[index] = MPI_REQUEST_NULL;
     }
     if (status != MPI_STATUSES_IGNORE) {
       status[index] = *pstat;
@@ -924,6 +949,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[])
     }
   }
 
+  if (!accumulates.empty()) {
+    std::sort(accumulates.begin(), accumulates.end(), sort_accumulates);
+    for (auto req : accumulates) {
+      finish_wait(&req, status);
+    }
+  }
+
   return retvalue;
 }
 
@@ -995,10 +1027,8 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
 {
   int system_tag = COLL_TAG_GATHER;
-  int src, index;
   MPI_Aint lb = 0;
   MPI_Aint recvext = 0;
-  MPI_Request *requests;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1011,9 +1041,9 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
     smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
                        recvcount, recvtype);
     // Receive buffers from senders
-    requests = xbt_new(MPI_Request, size - 1);
-    index = 0;
-    for(src = 0; src < size; src++) {
+    MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+    int index = 0;
+    for (int src = 0; src < size; src++) {
       if(src != root) {
         requests[index] = smpi_irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
                                           src, system_tag, comm);
@@ -1023,7 +1053,7 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
     // Wait for completion of irecv's.
     smpi_mpi_startall(size - 1, requests);
     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
-    for(src = 0; src < size-1; src++) {
+    for (int src = 0; src < size-1; src++) {
       smpi_mpi_request_free(&requests[src]);
     }
     xbt_free(requests);
@@ -1033,20 +1063,17 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
                              MPI_Comm comm)
 {
-  int i;
-  int *displs;
   int rank = smpi_comm_rank(comm);
-  void *tmpbuf;
 
   /* arbitrarily choose root as rank 0 */
   int size = smpi_comm_size(comm);
   int count = 0;
-  displs = xbt_new(int, size);
-  for (i = 0; i < size; i++) {
+  int *displs = xbt_new(int, size);
+  for (int i = 0; i < size; i++) {
     displs[i] = count;
     count += recvcounts[i];
   }
-  tmpbuf=static_cast<void*>(smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype)));
+  void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype)));
 
   mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
   smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
@@ -1058,14 +1085,12 @@ void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void
                       MPI_Datatype recvtype, int root, MPI_Comm comm)
 {
   int system_tag = COLL_TAG_GATHERV;
-  int src, index;
   MPI_Aint lb = 0;
   MPI_Aint recvext = 0;
-  MPI_Request *requests;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
-  if(rank != root) {
+  if (rank != root) {
     // Send buffer to root
     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
   } else {
@@ -1074,9 +1099,9 @@ void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void
     smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
                        recvcounts[root], recvtype);
     // Receive buffers from senders
-    requests = xbt_new(MPI_Request, size - 1);
-    index = 0;
-    for(src = 0; src < size; src++) {
+    MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+    int index = 0;
+    for (int src = 0; src < size; src++) {
       if(src != root) {
         requests[index] = smpi_irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
                           recvcounts[src], recvtype, src, system_tag, comm);
@@ -1086,7 +1111,7 @@ void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void
     // Wait for completion of irecv's.
     smpi_mpi_startall(size - 1, requests);
     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
-    for(src = 0; src < size-1; src++) {
+    for (int src = 0; src < size-1; src++) {
       smpi_mpi_request_free(&requests[src]);
     }
     xbt_free(requests);
@@ -1097,7 +1122,6 @@ void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                         void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
 {
   int system_tag = COLL_TAG_ALLGATHER;
-  int other, index;
   MPI_Aint lb = 0;
   MPI_Aint recvext = 0;
   MPI_Request *requests;
@@ -1111,8 +1135,8 @@ void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                      recvtype);
   // Send/Recv buffers to/from others;
   requests = xbt_new(MPI_Request, 2 * (size - 1));
-  index = 0;
-  for(other = 0; other < size; other++) {
+  int index = 0;
+  for (int other = 0; other < size; other++) {
     if(other != rank) {
       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
       index++;
@@ -1124,7 +1148,7 @@ void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
   // Wait for completion of all comms.
   smpi_mpi_startall(2 * (size - 1), requests);
   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
-  for(other = 0; other < 2*(size-1); other++) {
+  for (int other = 0; other < 2*(size-1); other++) {
     smpi_mpi_request_free(&requests[other]);
   }
   xbt_free(requests);
@@ -1134,10 +1158,8 @@ void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, vo
                          int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
 {
   int system_tag = COLL_TAG_ALLGATHERV;
-  int other, index;
   MPI_Aint lb = 0;
   MPI_Aint recvext = 0;
-  MPI_Request *requests;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1146,9 +1168,9 @@ void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, vo
   smpi_datatype_copy(sendbuf, sendcount, sendtype,
                      static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
   // Send buffers to others;
-  requests = xbt_new(MPI_Request, 2 * (size - 1));
-  index = 0;
-  for(other = 0; other < size; other++) {
+  MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
+  int index = 0;
+  for (int other = 0; other < size; other++) {
     if(other != rank) {
       requests[index] =
         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
@@ -1161,7 +1183,7 @@ void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, vo
   // Wait for completion of all comms.
   smpi_mpi_startall(2 * (size - 1), requests);
   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
-  for(other = 0; other < 2*(size-1); other++) {
+  for (int other = 0; other < 2*(size-1); other++) {
     smpi_mpi_request_free(&requests[other]);
   }
   xbt_free(requests);
@@ -1171,7 +1193,6 @@ void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
                       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
 {
   int system_tag = COLL_TAG_SCATTER;
-  int dst;
   MPI_Aint lb = 0;
   MPI_Aint sendext = 0;
   MPI_Request *requests;
@@ -1191,7 +1212,7 @@ void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
     // Send buffers to receivers
     requests = xbt_new(MPI_Request, size - 1);
     int index = 0;
-    for(dst = 0; dst < size; dst++) {
+    for(int dst = 0; dst < size; dst++) {
       if(dst != root) {
         requests[index] = smpi_isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
                                           dst, system_tag, comm);
@@ -1201,7 +1222,7 @@ void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
     // Wait for completion of isend's.
     smpi_mpi_startall(size - 1, requests);
     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
-    for(dst = 0; dst < size-1; dst++) {
+    for (int dst = 0; dst < size-1; dst++) {
       smpi_mpi_request_free(&requests[dst]);
     }
     xbt_free(requests);
@@ -1212,10 +1233,8 @@ void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype
                        MPI_Datatype recvtype, int root, MPI_Comm comm)
 {
   int system_tag = COLL_TAG_SCATTERV;
-  int dst;
   MPI_Aint lb = 0;
   MPI_Aint sendext = 0;
-  MPI_Request *requests;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1230,10 +1249,10 @@ void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype
                        sendtype, recvbuf, recvcount, recvtype);
     }
     // Send buffers to receivers
-    requests = xbt_new(MPI_Request, size - 1);
+    MPI_Request *requests = xbt_new(MPI_Request, size - 1);
     int index = 0;
-    for(dst = 0; dst < size; dst++) {
-      if(dst != root) {
+    for (int dst = 0; dst < size; dst++) {
+      if (dst != root) {
         requests[index] = smpi_isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
                             sendtype, dst, system_tag, comm);
         index++;
@@ -1242,7 +1261,7 @@ void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype
     // Wait for completion of isend's.
     smpi_mpi_startall(size - 1, requests);
     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
-    for(dst = 0; dst < size-1; dst++) {
+    for (int dst = 0; dst < size-1; dst++) {
       smpi_mpi_request_free(&requests[dst]);
     }
     xbt_free(requests);
@@ -1253,11 +1272,8 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
                      MPI_Comm comm)
 {
   int system_tag = COLL_TAG_REDUCE;
-  int src, index;
   MPI_Aint lb = 0;
   MPI_Aint dataext = 0;
-  MPI_Request *requests;
-  void **tmpbufs;
 
   char* sendtmpbuf = static_cast<char *>(sendbuf);
 
@@ -1283,11 +1299,11 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
     if (sendtmpbuf != nullptr && recvbuf != nullptr)
       smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
     // Receive buffers from senders
-    requests = xbt_new(MPI_Request, size - 1);
-    tmpbufs = xbt_new(void *, size - 1);
-    index = 0;
-    for(src = 0; src < size; src++) {
-      if(src != root) {
+    MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+    void **tmpbufs = xbt_new(void *, size - 1);
+    int index = 0;
+    for (int src = 0; src < size; src++) {
+      if (src != root) {
          if (!smpi_process_get_replaying())
           tmpbufs[index] = xbt_malloc(count * dataext);
          else
@@ -1299,7 +1315,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
     }
     // Wait for completion of irecv's.
     smpi_mpi_startall(size - 1, requests);
-    for(src = 0; src < size - 1; src++) {
+    for (int src = 0; src < size - 1; src++) {
       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
       XBT_DEBUG("finished waiting any request with index %d", index);
       if(index == MPI_UNDEFINED) {
@@ -1331,10 +1347,8 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype da
 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
   int system_tag = -888;
-  int other, index;
-  MPI_Aint lb = 0, dataext = 0;
-  MPI_Request *requests;
-  void **tmpbufs;
+  MPI_Aint lb      = 0;
+  MPI_Aint dataext = 0;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1345,15 +1359,15 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp
   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
 
   // Send/Recv buffers to/from others;
-  requests = xbt_new(MPI_Request, size - 1);
-  tmpbufs = xbt_new(void *, rank);
-  index = 0;
-  for(other = 0; other < rank; other++) {
+  MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+  void **tmpbufs = xbt_new(void *, rank);
+  int index = 0;
+  for (int other = 0; other < rank; other++) {
     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
     index++;
   }
-  for(other = rank + 1; other < size; other++) {
+  for (int other = rank + 1; other < size; other++) {
     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
     index++;
   }
@@ -1361,7 +1375,7 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp
   smpi_mpi_startall(size - 1, requests);
 
   if(smpi_op_is_commute(op)){
-    for(other = 0; other < size - 1; other++) {
+    for (int other = 0; other < size - 1; other++) {
       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
       if(index == MPI_UNDEFINED) {
         break;
@@ -1373,7 +1387,7 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp
     }
   }else{
     //non commutative case, wait in order
-    for(other = 0; other < size - 1; other++) {
+    for (int other = 0; other < size - 1; other++) {
       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
       if(index < rank) {
         smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
@@ -1393,10 +1407,8 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp
 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
   int system_tag = -888;
-  int other, index;
-  MPI_Aint lb = 0, dataext = 0;
-  MPI_Request *requests;
-  void **tmpbufs;
+  MPI_Aint lb         = 0;
+  MPI_Aint dataext    = 0;
   int recvbuf_is_empty=1;
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1404,22 +1416,23 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
   smpi_datatype_extent(datatype, &lb, &dataext);
 
   // Send/Recv buffers to/from others;
-  requests = xbt_new(MPI_Request, size - 1);
-  tmpbufs = xbt_new(void *, rank);
-  index = 0;
-  for(other = 0; other < rank; other++) {
+  MPI_Request *requests = xbt_new(MPI_Request, size - 1);
+  void **tmpbufs = xbt_new(void *, rank);
+  int index = 0;
+  for (int other = 0; other < rank; other++) {
     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
     index++;
   }
-  for(other = rank + 1; other < size; other++) {
+  for (int other = rank + 1; other < size; other++) {
     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
     index++;
   }
   // Wait for completion of all comms.
   smpi_mpi_startall(size - 1, requests);
+
   if(smpi_op_is_commute(op)){
-    for(other = 0; other < size - 1; other++) {
+    for (int other = 0; other < size - 1; other++) {
       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
       if(index == MPI_UNDEFINED) {
         break;
@@ -1435,14 +1448,14 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
     }
   }else{
     //non commutative case, wait in order
-    for(other = 0; other < size - 1; other++) {
+    for (int other = 0; other < size - 1; other++) {
       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
       if(index < rank) {
-          if(recvbuf_is_empty){
-            smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
-            recvbuf_is_empty=0;
-          } else
-            smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
+        if (recvbuf_is_empty) {
+          smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
+          recvbuf_is_empty = 0;
+        } else
+          smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
       }
     }
   }