Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
mem leak
[simgrid.git] / src / smpi / colls / reduce-arrival-pattern-aware.c
index cf5df9e..1e25250 100644 (file)
@@ -1,4 +1,4 @@
-#include "colls.h"
+#include "colls_private.h"
 //#include <star-reduction.c>
 
 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
@@ -19,7 +19,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
                                                  MPI_Comm comm)
 {
   int rank;
-  MPI_Comm_rank(comm, &rank);
+  rank = smpi_comm_rank(comm);
 
   int tag = 50;
   MPI_Status status;
@@ -72,7 +72,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
   }
 
   char *tmp_buf;
-  tmp_buf = (char *) malloc(count * extent);
+  tmp_buf = (char *) xbt_malloc(count * extent);
 
   smpi_mpi_sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
                tag, comm, &status);
@@ -88,9 +88,11 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
       while (sent_count < (size - 1)) {
 
         for (i = 1; i < size; i++) {
-          if (already_received[i] == 0)
+          if (already_received[i] == 0) {
             smpi_mpi_iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
-                       MPI_STATUSES_IGNORE);
+                             MPI_STATUSES_IGNORE);
+            simcall_process_sleep(0.0001);
+            }
         }
 
         header_index = 0;
@@ -127,7 +129,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
 
           smpi_mpi_send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
           smpi_mpi_recv(tmp_buf, count, datatype, from, tag, comm, &status);
-          star_reduction(op, tmp_buf, rbuf, &count, &datatype);
+          smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype);
         }
       }                         /* while loop */
     }
@@ -142,7 +144,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
       /* wait for header and data, forward when required */
       smpi_mpi_recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
                &status);
-      //      MPI_Recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
+      //      smpi_mpi_recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
 
       /* search for where it is */
       int myordering = 0;
@@ -179,7 +181,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
         from = header_buf[myordering - 1];
         smpi_mpi_recv(tmp_buf, count, datatype, header_buf[myordering - 1], tag,
                  comm, &status);
-        star_reduction(op, tmp_buf, rbuf, &count, &datatype);
+        smpi_op_apply(op, tmp_buf, rbuf, &count, &datatype);
         smpi_mpi_send(rbuf, count, datatype, to, tag, comm);
       }
     }                           /* non-root */
@@ -189,13 +191,13 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
     //    printf("node %d start\n",rank);
 
     send_request_array =
-        (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
+        (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
     recv_request_array =
-        (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
+        (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
     send_status_array =
-        (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
+        (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
     recv_status_array =
-        (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
+        (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
 
     if (rank == 0) {
       sent_count = 0;
@@ -255,7 +257,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
           for (i = 0; i < pipe_length; i++) {
             smpi_mpi_recv(tmp_buf + (i * increment), segment, datatype, from, tag,
                      comm, &status);
-            star_reduction(op, tmp_buf + (i * increment),
+            smpi_op_apply(op, tmp_buf + (i * increment),
                            (char *)rbuf + (i * increment), &segment, &datatype);
           }
         }
@@ -309,7 +311,7 @@ int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
         }
         for (i = 0; i < pipe_length; i++) {
           smpi_mpi_wait(&recv_request_array[i], MPI_STATUS_IGNORE);
-          star_reduction(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
+          smpi_op_apply(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
                          &segment, &datatype);
           send_request_array[i]=smpi_mpi_isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
         }