Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SIMIX]add timeout to simcall_comm_waitany
authoradfaure <adrien.faure2@gmail.com>
Thu, 21 Jul 2016 07:48:33 +0000 (09:48 +0200)
committeradfaure <adrien.faure2@gmail.com>
Fri, 22 Jul 2016 08:55:52 +0000 (10:55 +0200)
include/simgrid/simix.h
src/msg/msg_gos.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.h
src/simix/popping_bodies.cpp
src/simix/popping_generated.cpp
src/simix/popping_private.h
src/simix/simcalls.in
src/simix/smx_network.cpp
src/smpi/smpi_base.cpp

index d9b6007..dd789b2 100644 (file)
@@ -359,7 +359,7 @@ XBT_PUBLIC(smx_synchro_t) simcall_comm_iprobe(smx_mailbox_t mbox, int type, int
 XBT_PUBLIC(void) simcall_comm_cancel(smx_synchro_t comm);
 
 /* FIXME: waitany is going to be a vararg function, and should take a timeout */
-XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms);
+XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms, double timeout);
 XBT_PUBLIC(void) simcall_comm_wait(smx_synchro_t comm, double timeout);
 XBT_PUBLIC(int) simcall_comm_test(smx_synchro_t comm);
 XBT_PUBLIC(int) simcall_comm_testany(xbt_dynar_t comms);
index 9e0378e..0ec8312 100644 (file)
@@ -627,7 +627,7 @@ int MSG_comm_waitany(xbt_dynar_t comms)
 
   msg_error_t status = MSG_OK;
   try {
-    finished_index = simcall_comm_waitany(s_comms);
+    finished_index = simcall_comm_waitany(s_comms, -1);
   }
   catch(xbt_ex& e) {
     switch (e.category) {
index 8f41f62..2742d6d 100644 (file)
@@ -765,9 +765,9 @@ void simcall_comm_cancel(smx_synchro_t synchro)
 /**
  * \ingroup simix_comm_management
  */
-unsigned int simcall_comm_waitany(xbt_dynar_t comms)
+unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout)
 {
-  return simcall_BODY_comm_waitany(comms);
+  return simcall_BODY_comm_waitany(comms, timeout);
 }
 
 /**
index 32e906e..e74bb3b 100644 (file)
@@ -657,6 +657,12 @@ static inline xbt_dynar_t simcall_comm_waitany__get__comms(smx_simcall_t simcall
 static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) {
     simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
 }
+static inline double simcall_comm_waitany__get__timeout(smx_simcall_t simcall) {
+  return simgrid::simix::unmarshal<double>(simcall->args[1]);
+}
+static inline void simcall_comm_waitany__set__timeout(smx_simcall_t simcall, double arg) {
+    simgrid::simix::marshal<double>(simcall->args[1], arg);
+}
 static inline int simcall_comm_waitany__get__result(smx_simcall_t simcall){
     return simgrid::simix::unmarshal<int>(simcall->result);
 }
@@ -1179,7 +1185,7 @@ XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t
 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate);
 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
-XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms);
+XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms, double timeout);
 XBT_PRIVATE void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t comm, double timeout);
 XBT_PRIVATE void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t comm);
 XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t comms);
index ba83d70..e1cbb57 100644 (file)
@@ -221,10 +221,10 @@ inline static smx_synchro_t simcall_BODY_comm_irecv(smx_process_t receiver, smx_
     return simcall<smx_synchro_t, smx_process_t, smx_mailbox_t, void*, size_t*, simix_match_func_t, simix_copy_data_func_t, void*, double>(SIMCALL_COMM_IRECV, receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
   }
   
-inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms) {
+inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms, double timeout) {
     /* Go to that function to follow the code flow through the simcall barrier */
-    if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms);
-    return simcall<int, xbt_dynar_t>(SIMCALL_COMM_WAITANY, comms);
+    if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, timeout);
+    return simcall<int, xbt_dynar_t, double>(SIMCALL_COMM_WAITANY, comms, timeout);
   }
   
 inline static void simcall_BODY_comm_wait(smx_synchro_t comm, double timeout) {
index 1ba8754..226dc13 100644 (file)
@@ -254,7 +254,7 @@ case SIMCALL_COMM_IRECV:
       break;
 
 case SIMCALL_COMM_WAITANY:
-      simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]));
+      simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]), simgrid::simix::unmarshal<double>(simcall->args[1]));
       break;
 
 case SIMCALL_COMM_WAIT:
index d6a3a57..d532dbd 100644 (file)
@@ -45,6 +45,7 @@ union u_smx_scalar {
 struct s_smx_simcall {
   e_smx_simcall_t call;
   smx_process_t issuer;
+  smx_timer_t timer;
   int mc_value;
   union u_smx_scalar args[11];
   union u_smx_scalar result;
index 55858b2..185f440 100644 (file)
@@ -72,7 +72,7 @@ void          comm_send(smx_process_t sender, smx_mailbox_t mbox, double task_si
 smx_synchro_t comm_isend(smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
 void          comm_recv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate) [[block]];
 smx_synchro_t comm_irecv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
-int           comm_waitany(xbt_dynar_t comms) [[block]];
+int           comm_waitany(xbt_dynar_t comms, double timeout) [[block]];
 void          comm_wait(smx_synchro_t comm, double timeout) [[block]];
 int           comm_test(smx_synchro_t comm) [[block]];
 int           comm_testany(xbt_dynar_t comms) [[block]];
index db3b4fd..9810cbf 100644 (file)
@@ -476,12 +476,14 @@ void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
   SIMIX_simcall_answer(simcall);
 }
 
-void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
+void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
 {
   smx_synchro_t synchro;
   unsigned int cursor = 0;
 
   if (MC_is_active() || MC_record_replay_is_active()){
+    if (timeout != -1)
+      xbt_die("Timeout not implemented for waitany in the model-checker"); 
     int idx = SIMCALL_GET_MC_VALUE(simcall);
     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
     synchro->simcalls.push_back(simcall);
@@ -490,7 +492,17 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
     SIMIX_comm_finish(synchro);
     return;
   }
-
+  
+  if (timeout == -1 ){
+    simcall->timer = NULL;
+  } else {
+    simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
+      SIMIX_waitany_remove_simcall_from_actions(simcall);
+      simcall_comm_waitany__set__result(simcall, -1);
+      SIMIX_simcall_answer(simcall);
+    });
+  }
+  
   xbt_dynar_foreach(synchros, cursor, synchro){
     /* associate this simcall to the the synchro */
     synchro->simcalls.push_back(simcall);
@@ -582,6 +594,8 @@ void SIMIX_comm_finish(smx_synchro_t synchro)
       continue; // if process handling comm is killed
     if (simcall->call == SIMCALL_COMM_WAITANY) {
       SIMIX_waitany_remove_simcall_from_actions(simcall);
+      if (simcall->timer) 
+        SIMIX_timer_remove(simcall->timer);
       if (!MC_is_active() && !MC_record_replay_is_active())
         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
     }
index 0170851..3a98771 100644 (file)
@@ -945,7 +945,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
       }
     }
     if(size > 0) {
-      i = simcall_comm_waitany(comms);
+      i = simcall_comm_waitany(comms, -1);
 
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {