Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add an "smpi/async_small_thres" configure option to smpi, to allow to toy with the...
authorAugustin Degomme <degomme@idpann.imag.fr>
Tue, 7 Aug 2012 15:56:58 +0000 (17:56 +0200)
committerAugustin Degomme <degomme@idpann.imag.fr>
Tue, 7 Aug 2012 15:56:58 +0000 (17:56 +0200)
src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_global.c
src/surf/surf_config.c

index dbe4681..a7a7fa1 100644 (file)
@@ -47,6 +47,8 @@ smpi_process_data_t smpi_process_remote_data(int index);
 int smpi_process_count(void);
 smx_rdv_t smpi_process_mailbox(void);
 smx_rdv_t smpi_process_remote_mailbox(int index);
+smx_rdv_t smpi_process_mailbox_small(void);
+smx_rdv_t smpi_process_remote_mailbox_small(int index);
 xbt_os_timer_t smpi_process_timer(void);
 void smpi_process_simulated_start(void);
 double smpi_process_simulated_elapsed(void);
index 9b3641a..2bc6b97 100644 (file)
@@ -121,39 +121,53 @@ void smpi_mpi_start(MPI_Request request)
               "Cannot (re)start a non-finished communication");
   if(request->flags & RECV) {
     print_request("New recv", request);
+    if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
+    mailbox = smpi_process_mailbox_small();
+    else
     mailbox = smpi_process_mailbox();
+
     // FIXME: SIMIX does not yet support non-contiguous datatypes
     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
   } else {
     print_request("New send", request);
-    mailbox = smpi_process_remote_mailbox(
-        smpi_group_index(smpi_comm_group(request->comm), request->dst));
-    // FIXME: SIMIX does not yet support non-contiguous datatypes
 
-    if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
+    if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
+      mailbox = smpi_process_remote_mailbox_small(
+            smpi_group_index(smpi_comm_group(request->comm), request->dst));
+    }else{
+      XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
+      mailbox = smpi_process_remote_mailbox(
+                  smpi_group_index(smpi_comm_group(request->comm), request->dst));
+    }
+    if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
       void *oldbuf = request->buf;
       detached = 1;
       request->buf = malloc(request->size);
       memcpy(request->buf,oldbuf,request->size);
       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
-    } else {
+    }else{
       XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
+      mailbox = smpi_process_remote_mailbox(
+                  smpi_group_index(smpi_comm_group(request->comm), request->dst));
+    }
+
+      request->action =
+      simcall_comm_isend(mailbox, request->size, -1.0,
+              request->buf, request->size,
+              &match_send,
+              &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
+              request,
+              // detach if msg size < eager/rdv switch limit
+              detached);
+
+  #ifdef HAVE_TRACING
+      /* FIXME: detached sends are not traceable (request->action == NULL) */
+      if (request->action)
+        simcall_set_category(request->action, TRACE_internal_smpi_get_category());
+  #endif
+
     }
-    request->action = 
-    simcall_comm_isend(mailbox, request->size, -1.0,
-            request->buf, request->size,
-            &match_send,
-            &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
-            request,
-            // detach if msg size < eager/rdv switch limit
-            detached);
 
-#ifdef HAVE_TRACING
-    /* FIXME: detached sends are not traceable (request->action == NULL) */
-    if (request->action)
-      simcall_set_category(request->action, TRACE_internal_smpi_get_category());
-#endif
-  }
 }
 
 void smpi_mpi_startall(int count, MPI_Request * requests)
@@ -220,13 +234,15 @@ void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
   smpi_mpi_wait(&request, status);
 }
 
+
+
 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
                    int tag, MPI_Comm comm)
 {
-  MPI_Request request;
+         MPI_Request request;
 
-  request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
-  smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
+         request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
+         smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
 }
 
 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
@@ -348,7 +364,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[],
     XBT_DEBUG("Wait for one of");
     for(i = 0; i < count; i++) {
       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
-        print_request("   ", requests[i]);
+        print_request("Waiting any ", requests[i]);
         xbt_dynar_push(comms, &requests[i]->action);
         map[size] = i;
         size++;
@@ -356,10 +372,12 @@ int smpi_mpi_waitany(int count, MPI_Request requests[],
     }
     if(size > 0) {
       i = simcall_comm_waitany(comms);
+
       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
       if (i != MPI_UNDEFINED) {
         index = map[i];
         finish_wait(&requests[index], status);
+
       }
     }
     xbt_free(map);
@@ -692,6 +710,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
     smpi_mpi_startall(size - 1, requests);
     for(src = 0; src < size - 1; src++) {
       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
+      XBT_VERB("finished waiting any request with index %d", index);
       if(index == MPI_UNDEFINED) {
         break;
       }
index 5a2c2ce..cdbdc96 100644 (file)
@@ -12,6 +12,8 @@
 #include "smpi_mpi_dt_private.h"
 #include "mc/mc.h"
 #include "surf/surf.h"
+#include "simix/smx_private.h"
+
 
 XBT_LOG_NEW_CATEGORY(smpi, "All SMPI categories");
 
@@ -23,6 +25,7 @@ typedef struct s_smpi_process_data {
   int* argc;
   char*** argv;
   smx_rdv_t mailbox;
+  smx_rdv_t mailbox_small;
   xbt_os_timer_t timer;
   double simulated;
   MPI_Comm comm_self;
@@ -40,6 +43,11 @@ static char* get_mailbox_name(char* str, int index) {
   return str;
 }
 
+static char* get_mailbox_name_small(char* str, int index) {
+  snprintf(str, MAILBOX_NAME_MAXLEN, "small%0*x", (int)(sizeof(int) * 2), index);
+  return str;
+}
+
 void smpi_process_init(int *argc, char ***argv)
 {
   int index;
@@ -59,6 +67,7 @@ void smpi_process_init(int *argc, char ***argv)
     (*argc)--;
     data->argc = argc;
     data->argv = argv;
+    data->mailbox_small->permanent_receiver=proc;// set the process attached to the mailbox
     XBT_DEBUG("<%d> New process in the game: %p", index, proc);
   }
 }
@@ -77,7 +86,7 @@ void smpi_process_finalize(void)
 {
   // wait for all pending asynchronous comms to finish
   while (SIMIX_process_has_pending_comms(SIMIX_process_self())) {
-    simcall_process_sleep(1);
+    simcall_process_sleep(0.01);
   }
 }
 
@@ -144,13 +153,27 @@ smx_rdv_t smpi_process_mailbox(void) {
   return data->mailbox;
 }
 
+smx_rdv_t smpi_process_mailbox_small(void) {
+  smpi_process_data_t data = smpi_process_data();
+
+  return data->mailbox_small;
+}
+
 smx_rdv_t smpi_process_remote_mailbox(int index) {
   smpi_process_data_t data = smpi_process_remote_data(index);
 
   return data->mailbox;
 }
 
-xbt_os_timer_t smpi_process_timer(void) {
+
+smx_rdv_t smpi_process_remote_mailbox_small(int index) {
+  smpi_process_data_t data = smpi_process_remote_data(index);
+
+  return data->mailbox_small;
+}
+
+xbt_os_timer_t smpi_process_timer(void)
+{
   smpi_process_data_t data = smpi_process_data();
 
   return data->timer;
@@ -195,6 +218,7 @@ void smpi_global_init(void)
     process_data[i]->argc = NULL;
     process_data[i]->argv = NULL;
     process_data[i]->mailbox = simcall_rdv_create(get_mailbox_name(name, i));
+    process_data[i]->mailbox_small = simcall_rdv_create(get_mailbox_name_small(name, i));
     process_data[i]->timer = xbt_os_timer_new();
     group = smpi_group_new(1);
     process_data[i]->comm_self = smpi_comm_new(group);
@@ -219,6 +243,7 @@ void smpi_global_destroy(void)
     smpi_comm_destroy(process_data[i]->comm_self);
     xbt_os_timer_free(process_data[i]->timer);
     simcall_rdv_destroy(process_data[i]->mailbox);
+    simcall_rdv_destroy(process_data[i]->mailbox_small);
     xbt_free(process_data[i]);
   }
   xbt_free(process_data);
index 24cc5e4..4d286c2 100644 (file)
@@ -610,6 +610,12 @@ void surf_config_init(int *argc, char **argv)
                      xbt_cfgelm_double, &default_threshold, 1, 1, NULL,
                      NULL);
 
+    int default_small_messages_threshold = 0;
+    xbt_cfg_register(&_surf_cfg_set, "smpi/async_small_thres",
+                     "Maximal size of messages that are to be sent asynchronously, without waiting for the receiver",
+                     xbt_cfgelm_int, &default_small_messages_threshold, 1, 1, NULL,
+                     NULL);
+
     //For smpi/bw_factor and smpi/lat_factor
     //Default value have to be "threshold0:value0;threshold1:value1;...;thresholdN:valueN"
     //test is if( size >= thresholdN ) return valueN;