From: Gabriel Corona Date: Fri, 22 Jul 2016 14:30:32 +0000 (+0200) Subject: Merge s4u wait_any X-Git-Tag: v3_14~729^2 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/b51da37243dc16575499f4cb7729fe8bdd7fa514?hp=-c Merge s4u wait_any --- b51da37243dc16575499f4cb7729fe8bdd7fa514 diff --combined include/simgrid/simix.h index f996f13c96,dd789b2c3e..6b7f31b086 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@@ -359,10 -359,10 +359,10 @@@ XBT_PUBLIC(smx_synchro_t) simcall_comm_ XBT_PUBLIC(void) simcall_comm_cancel(smx_synchro_t comm); /* FIXME: waitany is going to be a vararg function, and should take a timeout */ - XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms); + XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms, double timeout); XBT_PUBLIC(void) simcall_comm_wait(smx_synchro_t comm, double timeout); XBT_PUBLIC(int) simcall_comm_test(smx_synchro_t comm); -XBT_PUBLIC(int) simcall_comm_testany(xbt_dynar_t comms); +XBT_PUBLIC(int) simcall_comm_testany(smx_synchro_t* comms, size_t count); /************************** Tracing handling **********************************/ XBT_PUBLIC(void) simcall_set_category(smx_synchro_t synchro, const char *category); diff --combined src/msg/msg_gos.cpp index 5a1f0b5a64,0ec8312b71..1d5ebf1271 --- a/src/msg/msg_gos.cpp +++ b/src/msg/msg_gos.cpp @@@ -504,18 -504,17 +504,18 @@@ int MSG_comm_testany(xbt_dynar_t comms { int finished_index = -1; - /* create the equivalent dynar with SIMIX objects */ - xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_synchro_t), nullptr); + /* Create the equivalent array with SIMIX objects: */ + std::vector s_comms; + s_comms.reserve(xbt_dynar_length(comms)); msg_comm_t comm; unsigned int cursor; xbt_dynar_foreach(comms, cursor, comm) { - xbt_dynar_push(s_comms, &comm->s_comm); + s_comms.push_back(comm->s_comm); } msg_error_t status = MSG_OK; try { - finished_index = simcall_comm_testany(s_comms); + finished_index = simcall_comm_testany(s_comms.data(), s_comms.size()); } catch (xbt_ex& e) { switch (e.category) { @@@ -531,6 -530,7 +531,6 @@@ throw; } } - xbt_dynar_free(&s_comms); if (finished_index != -1) { comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t); @@@ -627,7 -627,7 +627,7 @@@ int MSG_comm_waitany(xbt_dynar_t comms msg_error_t status = MSG_OK; try { - finished_index = simcall_comm_waitany(s_comms); + finished_index = simcall_comm_waitany(s_comms, -1); } catch(xbt_ex& e) { switch (e.category) { diff --combined src/simix/libsmx.cpp index 10177cc30d,2742d6d324..8cf08c4a39 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@@ -765,19 -765,19 +765,19 @@@ void simcall_comm_cancel(smx_synchro_t /** * \ingroup simix_comm_management */ - unsigned int simcall_comm_waitany(xbt_dynar_t comms) + unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout) { - return simcall_BODY_comm_waitany(comms); + return simcall_BODY_comm_waitany(comms, timeout); } /** * \ingroup simix_comm_management */ -int simcall_comm_testany(xbt_dynar_t comms) +int simcall_comm_testany(smx_synchro_t* comms, size_t count) { - if (xbt_dynar_is_empty(comms)) + if (count == 0) return -1; - return simcall_BODY_comm_testany(comms); + return simcall_BODY_comm_testany(comms, count); } /** diff --combined src/simix/popping_accessors.h index 64db9d15c8,e74bb3b4e9..dede08b530 --- a/src/simix/popping_accessors.h +++ b/src/simix/popping_accessors.h @@@ -657,6 -657,12 +657,12 @@@ static inline xbt_dynar_t simcall_comm_ static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) { simgrid::simix::marshal(simcall->args[0], arg); } + static inline double simcall_comm_waitany__get__timeout(smx_simcall_t simcall) { + return simgrid::simix::unmarshal(simcall->args[1]); + } + static inline void simcall_comm_waitany__set__timeout(smx_simcall_t simcall, double arg) { + simgrid::simix::marshal(simcall->args[1], arg); + } static inline int simcall_comm_waitany__get__result(smx_simcall_t simcall){ return simgrid::simix::unmarshal(simcall->result); } @@@ -690,17 -696,11 +696,17 @@@ static inline void simcall_comm_test__s simgrid::simix::marshal(simcall->result, result); } -static inline xbt_dynar_t simcall_comm_testany__get__comms(smx_simcall_t simcall) { - return simgrid::simix::unmarshal(simcall->args[0]); +static inline smx_synchro_t* simcall_comm_testany__get__comms(smx_simcall_t simcall) { + return simgrid::simix::unmarshal(simcall->args[0]); } -static inline void simcall_comm_testany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) { - simgrid::simix::marshal(simcall->args[0], arg); +static inline void simcall_comm_testany__set__comms(smx_simcall_t simcall, smx_synchro_t* arg) { + simgrid::simix::marshal(simcall->args[0], arg); +} +static inline size_t simcall_comm_testany__get__count(smx_simcall_t simcall) { + return simgrid::simix::unmarshal(simcall->args[1]); +} +static inline void simcall_comm_testany__set__count(smx_simcall_t simcall, size_t arg) { + simgrid::simix::marshal(simcall->args[1], arg); } static inline int simcall_comm_testany__get__result(smx_simcall_t simcall){ return simgrid::simix::unmarshal(simcall->result); @@@ -1185,10 -1185,10 +1191,10 @@@ XBT_PRIVATE void simcall_HANDLER_comm_s XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached); XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate); XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate); - XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms); + XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms, double timeout); XBT_PRIVATE void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t comm, double timeout); XBT_PRIVATE void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t comm); -XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t comms); +XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, smx_synchro_t* comms, size_t count); XBT_PRIVATE smx_mutex_t simcall_HANDLER_mutex_init(smx_simcall_t simcall); XBT_PRIVATE void simcall_HANDLER_mutex_lock(smx_simcall_t simcall, smx_mutex_t mutex); XBT_PRIVATE int simcall_HANDLER_mutex_trylock(smx_simcall_t simcall, smx_mutex_t mutex); diff --combined src/simix/popping_bodies.cpp index f830047458,e1cbb57b33..191080887b --- a/src/simix/popping_bodies.cpp +++ b/src/simix/popping_bodies.cpp @@@ -221,10 -221,10 +221,10 @@@ inline static smx_synchro_t simcall_BOD return simcall(SIMCALL_COMM_IRECV, receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate); } - inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms) { + inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms, double timeout) { /* Go to that function to follow the code flow through the simcall barrier */ - if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms); - return simcall(SIMCALL_COMM_WAITANY, comms); + if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, timeout); + return simcall(SIMCALL_COMM_WAITANY, comms, timeout); } inline static void simcall_BODY_comm_wait(smx_synchro_t comm, double timeout) { @@@ -239,10 -239,10 +239,10 @@@ inline static int simcall_BODY_comm_tes return simcall(SIMCALL_COMM_TEST, comm); } -inline static int simcall_BODY_comm_testany(xbt_dynar_t comms) { +inline static int simcall_BODY_comm_testany(smx_synchro_t* comms, size_t count) { /* Go to that function to follow the code flow through the simcall barrier */ - if (0) simcall_HANDLER_comm_testany(&SIMIX_process_self()->simcall, comms); - return simcall(SIMCALL_COMM_TESTANY, comms); + if (0) simcall_HANDLER_comm_testany(&SIMIX_process_self()->simcall, comms, count); + return simcall(SIMCALL_COMM_TESTANY, comms, count); } inline static smx_mutex_t simcall_BODY_mutex_init() { diff --combined src/simix/popping_generated.cpp index e5bd57b15d,226dc13059..3e13053753 --- a/src/simix/popping_generated.cpp +++ b/src/simix/popping_generated.cpp @@@ -254,7 -254,7 +254,7 @@@ case SIMCALL_COMM_IRECV break; case SIMCALL_COMM_WAITANY: - simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal(simcall->args[0])); + simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal(simcall->args[0]), simgrid::simix::unmarshal(simcall->args[1])); break; case SIMCALL_COMM_WAIT: @@@ -266,7 -266,7 +266,7 @@@ case SIMCALL_COMM_TEST break; case SIMCALL_COMM_TESTANY: - simcall_HANDLER_comm_testany(simcall, simgrid::simix::unmarshal(simcall->args[0])); + simcall_HANDLER_comm_testany(simcall, simgrid::simix::unmarshal(simcall->args[0]), simgrid::simix::unmarshal(simcall->args[1])); break; case SIMCALL_MUTEX_INIT: diff --combined src/simix/simcalls.in index dbef0797a4,185f440048..476dd575b8 --- a/src/simix/simcalls.in +++ b/src/simix/simcalls.in @@@ -72,10 -72,10 +72,10 @@@ void comm_send(smx_process_t s smx_synchro_t comm_isend(smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached); void comm_recv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate) [[block]]; smx_synchro_t comm_irecv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate); - int comm_waitany(xbt_dynar_t comms) [[block]]; + int comm_waitany(xbt_dynar_t comms, double timeout) [[block]]; void comm_wait(smx_synchro_t comm, double timeout) [[block]]; int comm_test(smx_synchro_t comm) [[block]]; -int comm_testany(xbt_dynar_t comms) [[block]]; +int comm_testany(smx_synchro_t* comms, size_t count) [[block]]; smx_mutex_t mutex_init(); void mutex_lock(smx_mutex_t mutex) [[block]]; diff --combined src/simix/smx_network.cpp index 0b47f73f02,dd86baa844..25783eb9ad --- a/src/simix/smx_network.cpp +++ b/src/simix/smx_network.cpp @@@ -3,8 -3,6 +3,8 @@@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include + #include #include @@@ -445,9 -443,10 +445,9 @@@ void simcall_HANDLER_comm_test(smx_simc } } -void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros) +void simcall_HANDLER_comm_testany( + smx_simcall_t simcall, simgrid::simix::Synchro* comms[], size_t count) { - unsigned int cursor; - smx_synchro_t synchro; // The default result is -1 -- this means, "nothing is ready". // It can be changed below, but only if something matches. simcall_comm_testany__set__result(simcall, -1); @@@ -457,7 -456,7 +457,7 @@@ if(idx == -1){ SIMIX_simcall_answer(simcall); }else{ - synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t); + simgrid::simix::Synchro* synchro = comms[idx]; simcall_comm_testany__set__result(simcall, idx); synchro->simcalls.push_back(simcall); synchro->state = SIMIX_DONE; @@@ -466,10 -465,9 +466,10 @@@ return; } - xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) { + for (std::size_t i = 0; i != count; ++i) { + simgrid::simix::Synchro* synchro = comms[i]; if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) { - simcall_comm_testany__set__result(simcall, cursor); + simcall_comm_testany__set__result(simcall, i); synchro->simcalls.push_back(simcall); SIMIX_comm_finish(synchro); return; @@@ -478,12 -476,14 +478,14 @@@ SIMIX_simcall_answer(simcall); } - void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros) + void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout) { smx_synchro_t synchro; unsigned int cursor = 0; if (MC_is_active() || MC_record_replay_is_active()){ + if (timeout != -1) + xbt_die("Timeout not implemented for waitany in the model-checker"); int idx = SIMCALL_GET_MC_VALUE(simcall); synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t); synchro->simcalls.push_back(simcall); @@@ -492,7 -492,17 +494,17 @@@ SIMIX_comm_finish(synchro); return; } - + + if (timeout == -1 ){ + simcall->timer = NULL; + } else { + simcall->timer = SIMIX_timer_set(timeout, [simcall]() { + SIMIX_waitany_remove_simcall_from_actions(simcall); + simcall_comm_waitany__set__result(simcall, -1); + SIMIX_simcall_answer(simcall); + }); + } + xbt_dynar_foreach(synchros, cursor, synchro){ /* associate this simcall to the the synchro */ synchro->simcalls.push_back(simcall); @@@ -584,6 -594,10 +596,10 @@@ void SIMIX_comm_finish(smx_synchro_t sy continue; // if process handling comm is killed if (simcall->call == SIMCALL_COMM_WAITANY) { SIMIX_waitany_remove_simcall_from_actions(simcall); + if (simcall->timer) { + SIMIX_timer_remove(simcall->timer); + simcall->timer = nullptr; + } if (!MC_is_active() && !MC_record_replay_is_active()) simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro)); } @@@ -671,14 -685,7 +687,14 @@@ e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro); } else if (simcall->call == SIMCALL_COMM_TESTANY) { - e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro); + e.value = -1; + auto comms = simcall_comm_testany__get__comms(simcall); + auto count = simcall_comm_testany__get__count(simcall); + auto element = std::find(comms, comms + count, synchro); + if (element == comms + count) + e.value = -1; + else + e.value = element - comms; } simcall->issuer->exception = std::make_exception_ptr(e); } diff --combined src/smpi/smpi_base.cpp index b1389d334e,3a987711f9..d60b2bff16 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@@ -773,18 -773,16 +773,18 @@@ int smpi_mpi_test(MPI_Request * request int smpi_mpi_testany(int count, MPI_Request requests[], int *index, MPI_Status * status) { - xbt_dynar_t comms; + std::vector comms; + comms.reserve(count); + int i; int flag = 0; *index = MPI_UNDEFINED; - comms = xbt_dynar_new(sizeof(smx_synchro_t), nullptr); + std::vector map; /** Maps all matching comms back to their location in requests **/ for(i = 0; i < count; i++) { if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action && !(requests[i]->flags & PREPARED)) { - xbt_dynar_push(comms, &requests[i]->action); + comms.push_back(requests[i]->action); map.push_back(i); } } @@@ -794,7 -792,7 +794,7 @@@ if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); - i = simcall_comm_testany(comms); // The i-th element in comms matches! + i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches! if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches) *index = map[i]; finish_wait(&requests[*index], status); @@@ -811,6 -809,7 +811,6 @@@ flag = 1; smpi_empty_status(status); } - xbt_dynar_free(&comms); return flag; } @@@ -946,7 -945,7 +946,7 @@@ int smpi_mpi_waitany(int count, MPI_Req } } if(size > 0) { - i = simcall_comm_waitany(comms); + i = simcall_comm_waitany(comms, -1); // not MPI_UNDEFINED, as this is a simix return code if (i != -1) {