Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a mutex to lock access to the SMPI mailboxes when a message is posted
authorAugustin Degomme <augustin.degomme@imag.fr>
Sat, 6 Dec 2014 15:05:38 +0000 (16:05 +0100)
committerAugustin Degomme <augustin.degomme@imag.fr>
Sat, 6 Dec 2014 15:05:38 +0000 (16:05 +0100)
Should avoid race condition, while keeping isolation from SIMIX

src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_global.c

index bc36a1c..7c5be9d 100644 (file)
@@ -180,6 +180,8 @@ smx_rdv_t smpi_process_mailbox(void);
 smx_rdv_t smpi_process_remote_mailbox(int index);
 smx_rdv_t smpi_process_mailbox_small(void);
 smx_rdv_t smpi_process_remote_mailbox_small(int index);
+xbt_mutex_t smpi_process_mailboxes_mutex(void);
+xbt_mutex_t smpi_process_remote_mailboxes_mutex(int index);
 xbt_os_timer_t smpi_process_timer(void);
 void smpi_process_simulated_start(void);
 double smpi_process_simulated_elapsed(void);
index e76bdc1..768b13b 100644 (file)
@@ -344,12 +344,14 @@ void smpi_mpi_start(MPI_Request request)
 
   if (request->flags & RECV) {
     print_request("New recv", request);
+        
+    xbt_mutex_t mut=smpi_process_mailboxes_mutex();
+    xbt_mutex_acquire(mut);
     
     if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")){
     //We have to check both mailboxes (because SSEND messages are sent to the large mbox). begin with the more appropriate one : the small one.
       mailbox = smpi_process_mailbox_small();
       XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
-      XBT_DEBUG("Is there a corresponding send already posted the small mailbox %p (in case of SSEND)?", mailbox);
       smx_synchro_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
     
       if(action ==NULL){
@@ -358,7 +360,6 @@ void smpi_mpi_start(MPI_Request request)
         action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
         if(action ==NULL){
           XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
-          XBT_DEBUG("Still notching, switch back to the small mailbox : %p", mailbox);
           mailbox = smpi_process_mailbox_small();
           }
       }else{
@@ -395,7 +396,7 @@ void smpi_mpi_start(MPI_Request request)
                                          request, -1.0);
         XBT_DEBUG("recv simcall posted");
 
-
+    xbt_mutex_release(mut);
   } else {
 
 
@@ -421,6 +422,9 @@ void smpi_mpi_start(MPI_Request request)
         XBT_DEBUG("sending size of %zu : sleep %f ", request->size, smpi_os(request->size));
     }
     
+    xbt_mutex_t mut=smpi_process_remote_mailboxes_mutex(receiver);
+    xbt_mutex_acquire(mut);
+    
     if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
       mailbox = smpi_process_remote_mailbox(receiver);
       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
@@ -492,7 +496,7 @@ void smpi_mpi_start(MPI_Request request)
       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
 
 #endif
-
+    xbt_mutex_release(mut);
   }
 
 }
index 8a39d33..9a647ee 100644 (file)
@@ -27,6 +27,7 @@ typedef struct s_smpi_process_data {
   char ***argv;
   smx_rdv_t mailbox;
   smx_rdv_t mailbox_small;
+  xbt_mutex_t mailboxes_mutex;
   xbt_os_timer_t timer;
   MPI_Comm comm_self;
   MPI_Comm comm_intra;
@@ -107,7 +108,6 @@ void smpi_process_init(int *argc, char ***argv)
     data->argv = argv;
     // set the process attached to the mailbox
     simcall_rdv_set_receiver(data->mailbox_small, proc);
-
     XBT_DEBUG("<%d> New process in the game: %p", index, proc);
 
     if(smpi_privatize_global_variables){
@@ -255,6 +255,12 @@ smx_rdv_t smpi_process_mailbox_small(void)
   return data->mailbox_small;
 }
 
+xbt_mutex_t smpi_process_mailboxes_mutex(void)
+{
+  smpi_process_data_t data = smpi_process_data();
+  return data->mailboxes_mutex;
+}
+
 smx_rdv_t smpi_process_remote_mailbox(int index)
 {
   smpi_process_data_t data = smpi_process_remote_data(index);
@@ -268,6 +274,12 @@ smx_rdv_t smpi_process_remote_mailbox_small(int index)
   return data->mailbox_small;
 }
 
+xbt_mutex_t smpi_process_remote_mailboxes_mutex(int index)
+{
+  smpi_process_data_t data = smpi_process_remote_data(index);
+  return data->mailboxes_mutex;
+}
+
 xbt_os_timer_t smpi_process_timer(void)
 {
   smpi_process_data_t data = smpi_process_data();
@@ -422,6 +434,7 @@ void smpi_global_init(void)
     process_data[i]->mailbox = simcall_rdv_create(get_mailbox_name(name, i));
     process_data[i]->mailbox_small =
         simcall_rdv_create(get_mailbox_name_small(name, i));
+    process_data[i]->mailboxes_mutex=xbt_mutex_init();
     process_data[i]->timer = xbt_os_timer_new();
     if (MC_is_active())
       MC_ignore_heap(process_data[i]->timer, xbt_os_timer_size());
@@ -475,6 +488,7 @@ void smpi_global_destroy(void)
     xbt_os_timer_free(process_data[i]->timer);
     simcall_rdv_destroy(process_data[i]->mailbox);
     simcall_rdv_destroy(process_data[i]->mailbox_small);
+    xbt_mutex_destroy(process_data[i]->mailboxes_mutex);
     xbt_free(process_data[i]);
   }
   xbt_free(process_data);