Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge s4u wait_any
authorGabriel Corona <gabriel.corona@loria.fr>
Fri, 22 Jul 2016 14:30:32 +0000 (16:30 +0200)
committerGabriel Corona <gabriel.corona@loria.fr>
Fri, 22 Jul 2016 14:30:32 +0000 (16:30 +0200)
1  2 
include/simgrid/simix.h
src/msg/msg_gos.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.h
src/simix/popping_bodies.cpp
src/simix/popping_generated.cpp
src/simix/simcalls.in
src/simix/smx_network.cpp
src/smpi/smpi_base.cpp

diff --combined include/simgrid/simix.h
@@@ -359,10 -359,10 +359,10 @@@ XBT_PUBLIC(smx_synchro_t) simcall_comm_
  XBT_PUBLIC(void) simcall_comm_cancel(smx_synchro_t comm);
  
  /* FIXME: waitany is going to be a vararg function, and should take a timeout */
- XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms);
+ XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms, double timeout);
  XBT_PUBLIC(void) simcall_comm_wait(smx_synchro_t comm, double timeout);
  XBT_PUBLIC(int) simcall_comm_test(smx_synchro_t comm);
 -XBT_PUBLIC(int) simcall_comm_testany(xbt_dynar_t comms);
 +XBT_PUBLIC(int) simcall_comm_testany(smx_synchro_t* comms, size_t count);
  
  /************************** Tracing handling **********************************/
  XBT_PUBLIC(void) simcall_set_category(smx_synchro_t synchro, const char *category);
diff --combined src/msg/msg_gos.cpp
@@@ -504,18 -504,17 +504,18 @@@ int MSG_comm_testany(xbt_dynar_t comms
  {
    int finished_index = -1;
  
 -  /* create the equivalent dynar with SIMIX objects */
 -  xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_synchro_t), nullptr);
 +  /* Create the equivalent array with SIMIX objects: */
 +  std::vector<simgrid::simix::Synchro*> s_comms;
 +  s_comms.reserve(xbt_dynar_length(comms));
    msg_comm_t comm;
    unsigned int cursor;
    xbt_dynar_foreach(comms, cursor, comm) {
 -    xbt_dynar_push(s_comms, &comm->s_comm);
 +    s_comms.push_back(comm->s_comm);
    }
  
    msg_error_t status = MSG_OK;
    try {
 -    finished_index = simcall_comm_testany(s_comms);
 +    finished_index = simcall_comm_testany(s_comms.data(), s_comms.size());
    }
    catch (xbt_ex& e) {
      switch (e.category) {
          throw;
      }
    }
 -  xbt_dynar_free(&s_comms);
  
    if (finished_index != -1) {
      comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
@@@ -627,7 -627,7 +627,7 @@@ int MSG_comm_waitany(xbt_dynar_t comms
  
    msg_error_t status = MSG_OK;
    try {
-     finished_index = simcall_comm_waitany(s_comms);
+     finished_index = simcall_comm_waitany(s_comms, -1);
    }
    catch(xbt_ex& e) {
      switch (e.category) {
diff --combined src/simix/libsmx.cpp
@@@ -765,19 -765,19 +765,19 @@@ void simcall_comm_cancel(smx_synchro_t 
  /**
   * \ingroup simix_comm_management
   */
- unsigned int simcall_comm_waitany(xbt_dynar_t comms)
+ unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout)
  {
-   return simcall_BODY_comm_waitany(comms);
+   return simcall_BODY_comm_waitany(comms, timeout);
  }
  
  /**
   * \ingroup simix_comm_management
   */
 -int simcall_comm_testany(xbt_dynar_t comms)
 +int simcall_comm_testany(smx_synchro_t* comms, size_t count)
  {
 -  if (xbt_dynar_is_empty(comms))
 +  if (count == 0)
      return -1;
 -  return simcall_BODY_comm_testany(comms);
 +  return simcall_BODY_comm_testany(comms, count);
  }
  
  /**
@@@ -657,6 -657,12 +657,12 @@@ static inline xbt_dynar_t simcall_comm_
  static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) {
      simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
  }
+ static inline double simcall_comm_waitany__get__timeout(smx_simcall_t simcall) {
+   return simgrid::simix::unmarshal<double>(simcall->args[1]);
+ }
+ static inline void simcall_comm_waitany__set__timeout(smx_simcall_t simcall, double arg) {
+     simgrid::simix::marshal<double>(simcall->args[1], arg);
+ }
  static inline int simcall_comm_waitany__get__result(smx_simcall_t simcall){
      return simgrid::simix::unmarshal<int>(simcall->result);
  }
@@@ -690,17 -696,11 +696,17 @@@ static inline void simcall_comm_test__s
      simgrid::simix::marshal<int>(simcall->result, result);
  }
  
 -static inline xbt_dynar_t simcall_comm_testany__get__comms(smx_simcall_t simcall) {
 -  return simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]);
 +static inline smx_synchro_t* simcall_comm_testany__get__comms(smx_simcall_t simcall) {
 +  return simgrid::simix::unmarshal<smx_synchro_t*>(simcall->args[0]);
  }
 -static inline void simcall_comm_testany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) {
 -    simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
 +static inline void simcall_comm_testany__set__comms(smx_simcall_t simcall, smx_synchro_t* arg) {
 +    simgrid::simix::marshal<smx_synchro_t*>(simcall->args[0], arg);
 +}
 +static inline size_t simcall_comm_testany__get__count(smx_simcall_t simcall) {
 +  return simgrid::simix::unmarshal<size_t>(simcall->args[1]);
 +}
 +static inline void simcall_comm_testany__set__count(smx_simcall_t simcall, size_t arg) {
 +    simgrid::simix::marshal<size_t>(simcall->args[1], arg);
  }
  static inline int simcall_comm_testany__get__result(smx_simcall_t simcall){
      return simgrid::simix::unmarshal<int>(simcall->result);
@@@ -1185,10 -1185,10 +1191,10 @@@ XBT_PRIVATE void simcall_HANDLER_comm_s
  XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
  XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate);
  XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
- XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms);
+ XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms, double timeout);
  XBT_PRIVATE void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t comm, double timeout);
  XBT_PRIVATE void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t comm);
 -XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t comms);
 +XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, smx_synchro_t* comms, size_t count);
  XBT_PRIVATE smx_mutex_t simcall_HANDLER_mutex_init(smx_simcall_t simcall);
  XBT_PRIVATE void simcall_HANDLER_mutex_lock(smx_simcall_t simcall, smx_mutex_t mutex);
  XBT_PRIVATE int simcall_HANDLER_mutex_trylock(smx_simcall_t simcall, smx_mutex_t mutex);
@@@ -221,10 -221,10 +221,10 @@@ inline static smx_synchro_t simcall_BOD
      return simcall<smx_synchro_t, smx_process_t, smx_mailbox_t, void*, size_t*, simix_match_func_t, simix_copy_data_func_t, void*, double>(SIMCALL_COMM_IRECV, receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
    }
    
- inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms) {
+ inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms, double timeout) {
      /* Go to that function to follow the code flow through the simcall barrier */
-     if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms);
-     return simcall<int, xbt_dynar_t>(SIMCALL_COMM_WAITANY, comms);
+     if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, timeout);
+     return simcall<int, xbt_dynar_t, double>(SIMCALL_COMM_WAITANY, comms, timeout);
    }
    
  inline static void simcall_BODY_comm_wait(smx_synchro_t comm, double timeout) {
@@@ -239,10 -239,10 +239,10 @@@ inline static int simcall_BODY_comm_tes
      return simcall<int, smx_synchro_t>(SIMCALL_COMM_TEST, comm);
    }
    
 -inline static int simcall_BODY_comm_testany(xbt_dynar_t comms) {
 +inline static int simcall_BODY_comm_testany(smx_synchro_t* comms, size_t count) {
      /* Go to that function to follow the code flow through the simcall barrier */
 -    if (0) simcall_HANDLER_comm_testany(&SIMIX_process_self()->simcall, comms);
 -    return simcall<int, xbt_dynar_t>(SIMCALL_COMM_TESTANY, comms);
 +    if (0) simcall_HANDLER_comm_testany(&SIMIX_process_self()->simcall, comms, count);
 +    return simcall<int, smx_synchro_t*, size_t>(SIMCALL_COMM_TESTANY, comms, count);
    }
    
  inline static smx_mutex_t simcall_BODY_mutex_init() {
@@@ -254,7 -254,7 +254,7 @@@ case SIMCALL_COMM_IRECV
        break;
  
  case SIMCALL_COMM_WAITANY:
-       simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]));
+       simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]), simgrid::simix::unmarshal<double>(simcall->args[1]));
        break;
  
  case SIMCALL_COMM_WAIT:
@@@ -266,7 -266,7 +266,7 @@@ case SIMCALL_COMM_TEST
        break;
  
  case SIMCALL_COMM_TESTANY:
 -      simcall_HANDLER_comm_testany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]));
 +      simcall_HANDLER_comm_testany(simcall, simgrid::simix::unmarshal<smx_synchro_t*>(simcall->args[0]), simgrid::simix::unmarshal<size_t>(simcall->args[1]));
        break;
  
  case SIMCALL_MUTEX_INIT:
diff --combined src/simix/simcalls.in
@@@ -72,10 -72,10 +72,10 @@@ void          comm_send(smx_process_t s
  smx_synchro_t comm_isend(smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
  void          comm_recv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate) [[block]];
  smx_synchro_t comm_irecv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
- int           comm_waitany(xbt_dynar_t comms) [[block]];
+ int           comm_waitany(xbt_dynar_t comms, double timeout) [[block]];
  void          comm_wait(smx_synchro_t comm, double timeout) [[block]];
  int           comm_test(smx_synchro_t comm) [[block]];
 -int           comm_testany(xbt_dynar_t comms) [[block]];
 +int           comm_testany(smx_synchro_t* comms, size_t count) [[block]];
  
  smx_mutex_t mutex_init();
  void        mutex_lock(smx_mutex_t mutex) [[block]];
@@@ -3,8 -3,6 +3,8 @@@
  /* This program is free software; you can redistribute it and/or modify it
   * under the terms of the license (GNU LGPL) which comes with this package. */
  
 +#include <algorithm>
 +
  #include <boost/range/algorithm.hpp>
  
  #include <xbt/ex.hpp>
@@@ -445,9 -443,10 +445,9 @@@ void simcall_HANDLER_comm_test(smx_simc
    }
  }
  
 -void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
 +void simcall_HANDLER_comm_testany(
 +  smx_simcall_t simcall, simgrid::simix::Synchro* comms[], size_t count)
  {
 -  unsigned int cursor;
 -  smx_synchro_t synchro;
    // The default result is -1 -- this means, "nothing is ready".
    // It can be changed below, but only if something matches.
    simcall_comm_testany__set__result(simcall, -1);
      if(idx == -1){
        SIMIX_simcall_answer(simcall);
      }else{
 -      synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
 +      simgrid::simix::Synchro* synchro = comms[idx];
        simcall_comm_testany__set__result(simcall, idx);
        synchro->simcalls.push_back(simcall);
        synchro->state = SIMIX_DONE;
      return;
    }
  
 -  xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
 +  for (std::size_t i = 0; i != count; ++i) {
 +    simgrid::simix::Synchro* synchro = comms[i];
      if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
 -      simcall_comm_testany__set__result(simcall, cursor);
 +      simcall_comm_testany__set__result(simcall, i);
        synchro->simcalls.push_back(simcall);
        SIMIX_comm_finish(synchro);
        return;
    SIMIX_simcall_answer(simcall);
  }
  
- void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
+ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
  {
    smx_synchro_t synchro;
    unsigned int cursor = 0;
  
    if (MC_is_active() || MC_record_replay_is_active()){
+     if (timeout != -1)
+       xbt_die("Timeout not implemented for waitany in the model-checker"); 
      int idx = SIMCALL_GET_MC_VALUE(simcall);
      synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
      synchro->simcalls.push_back(simcall);
      SIMIX_comm_finish(synchro);
      return;
    }
+   
+   if (timeout == -1 ){
+     simcall->timer = NULL;
+   } else {
+     simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
+       SIMIX_waitany_remove_simcall_from_actions(simcall);
+       simcall_comm_waitany__set__result(simcall, -1);
+       SIMIX_simcall_answer(simcall);
+     });
+   }
+   
    xbt_dynar_foreach(synchros, cursor, synchro){
      /* associate this simcall to the the synchro */
      synchro->simcalls.push_back(simcall);
@@@ -584,6 -594,10 +596,10 @@@ void SIMIX_comm_finish(smx_synchro_t sy
        continue; // if process handling comm is killed
      if (simcall->call == SIMCALL_COMM_WAITANY) {
        SIMIX_waitany_remove_simcall_from_actions(simcall);
+       if (simcall->timer) {
+         SIMIX_timer_remove(simcall->timer);
+         simcall->timer = nullptr;
+       }
        if (!MC_is_active() && !MC_record_replay_is_active())
          simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
      }
            e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
          }
          else if (simcall->call == SIMCALL_COMM_TESTANY) {
 -          e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
 +          e.value = -1;
 +          auto comms = simcall_comm_testany__get__comms(simcall);
 +          auto count = simcall_comm_testany__get__count(simcall);
 +          auto element = std::find(comms, comms + count, synchro);
 +          if (element == comms + count)
 +            e.value = -1;
 +          else
 +            e.value = element - comms;
          }
          simcall->issuer->exception = std::make_exception_ptr(e);
        }
diff --combined src/smpi/smpi_base.cpp
@@@ -773,18 -773,16 +773,18 @@@ int smpi_mpi_test(MPI_Request * request
  
  int smpi_mpi_testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
  {
 -  xbt_dynar_t comms;
 +  std::vector<simgrid::simix::Synchro*> comms;
 +  comms.reserve(count);
 +
    int i;
    int flag = 0;
  
    *index = MPI_UNDEFINED;
 -  comms = xbt_dynar_new(sizeof(smx_synchro_t), nullptr);
 +
    std::vector<int> map; /** Maps all matching comms back to their location in requests **/
    for(i = 0; i < count; i++) {
      if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action && !(requests[i]->flags & PREPARED)) {
 -       xbt_dynar_push(comms, &requests[i]->action);
 +       comms.push_back(requests[i]->action);
         map.push_back(i);
      }
    }
      if(smpi_test_sleep > 0) 
        simcall_process_sleep(nsleeps*smpi_test_sleep);
  
 -    i = simcall_comm_testany(comms); // The i-th element in comms matches!
 +    i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
      if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
        *index = map[i]; 
        finish_wait(&requests[*index], status);
        flag = 1;
        smpi_empty_status(status);
    }
 -  xbt_dynar_free(&comms);
  
    return flag;
  }
@@@ -946,7 -945,7 +946,7 @@@ int smpi_mpi_waitany(int count, MPI_Req
        }
      }
      if(size > 0) {
-       i = simcall_comm_waitany(comms);
+       i = simcall_comm_waitany(comms, -1);
  
        // not MPI_UNDEFINED, as this is a simix return code
        if (i != -1) {