XBT_PUBLIC(void) simcall_comm_cancel(smx_synchro_t comm);
/* FIXME: waitany is going to be a vararg function, and should take a timeout */
- XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms);
+ XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms, double timeout);
XBT_PUBLIC(void) simcall_comm_wait(smx_synchro_t comm, double timeout);
XBT_PUBLIC(int) simcall_comm_test(smx_synchro_t comm);
-XBT_PUBLIC(int) simcall_comm_testany(xbt_dynar_t comms);
+XBT_PUBLIC(int) simcall_comm_testany(smx_synchro_t* comms, size_t count);
/************************** Tracing handling **********************************/
XBT_PUBLIC(void) simcall_set_category(smx_synchro_t synchro, const char *category);
{
int finished_index = -1;
- /* create the equivalent dynar with SIMIX objects */
- xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_synchro_t), nullptr);
+ /* Create the equivalent array with SIMIX objects: */
+ std::vector<simgrid::simix::Synchro*> s_comms;
+ s_comms.reserve(xbt_dynar_length(comms));
msg_comm_t comm;
unsigned int cursor;
xbt_dynar_foreach(comms, cursor, comm) {
- xbt_dynar_push(s_comms, &comm->s_comm);
+ s_comms.push_back(comm->s_comm);
}
msg_error_t status = MSG_OK;
try {
- finished_index = simcall_comm_testany(s_comms);
+ finished_index = simcall_comm_testany(s_comms.data(), s_comms.size());
}
catch (xbt_ex& e) {
switch (e.category) {
throw;
}
}
- xbt_dynar_free(&s_comms);
if (finished_index != -1) {
comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
msg_error_t status = MSG_OK;
try {
- finished_index = simcall_comm_waitany(s_comms);
+ finished_index = simcall_comm_waitany(s_comms, -1);
}
catch(xbt_ex& e) {
switch (e.category) {
/**
* \ingroup simix_comm_management
*/
- unsigned int simcall_comm_waitany(xbt_dynar_t comms)
+ unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout)
{
- return simcall_BODY_comm_waitany(comms);
+ return simcall_BODY_comm_waitany(comms, timeout);
}
/**
* \ingroup simix_comm_management
*/
-int simcall_comm_testany(xbt_dynar_t comms)
+int simcall_comm_testany(smx_synchro_t* comms, size_t count)
{
- if (xbt_dynar_is_empty(comms))
+ if (count == 0)
return -1;
- return simcall_BODY_comm_testany(comms);
+ return simcall_BODY_comm_testany(comms, count);
}
/**
static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) {
simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
}
+ static inline double simcall_comm_waitany__get__timeout(smx_simcall_t simcall) {
+ return simgrid::simix::unmarshal<double>(simcall->args[1]);
+ }
+ static inline void simcall_comm_waitany__set__timeout(smx_simcall_t simcall, double arg) {
+ simgrid::simix::marshal<double>(simcall->args[1], arg);
+ }
static inline int simcall_comm_waitany__get__result(smx_simcall_t simcall){
return simgrid::simix::unmarshal<int>(simcall->result);
}
simgrid::simix::marshal<int>(simcall->result, result);
}
-static inline xbt_dynar_t simcall_comm_testany__get__comms(smx_simcall_t simcall) {
- return simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]);
+static inline smx_synchro_t* simcall_comm_testany__get__comms(smx_simcall_t simcall) {
+ return simgrid::simix::unmarshal<smx_synchro_t*>(simcall->args[0]);
}
-static inline void simcall_comm_testany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) {
- simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
+static inline void simcall_comm_testany__set__comms(smx_simcall_t simcall, smx_synchro_t* arg) {
+ simgrid::simix::marshal<smx_synchro_t*>(simcall->args[0], arg);
+}
+static inline size_t simcall_comm_testany__get__count(smx_simcall_t simcall) {
+ return simgrid::simix::unmarshal<size_t>(simcall->args[1]);
+}
+static inline void simcall_comm_testany__set__count(smx_simcall_t simcall, size_t arg) {
+ simgrid::simix::marshal<size_t>(simcall->args[1], arg);
}
static inline int simcall_comm_testany__get__result(smx_simcall_t simcall){
return simgrid::simix::unmarshal<int>(simcall->result);
XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate);
XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
- XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms);
+ XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms, double timeout);
XBT_PRIVATE void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t comm, double timeout);
XBT_PRIVATE void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t comm);
-XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t comms);
+XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, smx_synchro_t* comms, size_t count);
XBT_PRIVATE smx_mutex_t simcall_HANDLER_mutex_init(smx_simcall_t simcall);
XBT_PRIVATE void simcall_HANDLER_mutex_lock(smx_simcall_t simcall, smx_mutex_t mutex);
XBT_PRIVATE int simcall_HANDLER_mutex_trylock(smx_simcall_t simcall, smx_mutex_t mutex);
return simcall<smx_synchro_t, smx_process_t, smx_mailbox_t, void*, size_t*, simix_match_func_t, simix_copy_data_func_t, void*, double>(SIMCALL_COMM_IRECV, receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
}
- inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms) {
+ inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms, double timeout) {
/* Go to that function to follow the code flow through the simcall barrier */
- if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms);
- return simcall<int, xbt_dynar_t>(SIMCALL_COMM_WAITANY, comms);
+ if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, timeout);
+ return simcall<int, xbt_dynar_t, double>(SIMCALL_COMM_WAITANY, comms, timeout);
}
inline static void simcall_BODY_comm_wait(smx_synchro_t comm, double timeout) {
return simcall<int, smx_synchro_t>(SIMCALL_COMM_TEST, comm);
}
-inline static int simcall_BODY_comm_testany(xbt_dynar_t comms) {
+inline static int simcall_BODY_comm_testany(smx_synchro_t* comms, size_t count) {
/* Go to that function to follow the code flow through the simcall barrier */
- if (0) simcall_HANDLER_comm_testany(&SIMIX_process_self()->simcall, comms);
- return simcall<int, xbt_dynar_t>(SIMCALL_COMM_TESTANY, comms);
+ if (0) simcall_HANDLER_comm_testany(&SIMIX_process_self()->simcall, comms, count);
+ return simcall<int, smx_synchro_t*, size_t>(SIMCALL_COMM_TESTANY, comms, count);
}
inline static smx_mutex_t simcall_BODY_mutex_init() {
break;
case SIMCALL_COMM_WAITANY:
- simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]));
+ simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]), simgrid::simix::unmarshal<double>(simcall->args[1]));
break;
case SIMCALL_COMM_WAIT:
break;
case SIMCALL_COMM_TESTANY:
- simcall_HANDLER_comm_testany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]));
+ simcall_HANDLER_comm_testany(simcall, simgrid::simix::unmarshal<smx_synchro_t*>(simcall->args[0]), simgrid::simix::unmarshal<size_t>(simcall->args[1]));
break;
case SIMCALL_MUTEX_INIT:
smx_synchro_t comm_isend(smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
void comm_recv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate) [[block]];
smx_synchro_t comm_irecv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
- int comm_waitany(xbt_dynar_t comms) [[block]];
+ int comm_waitany(xbt_dynar_t comms, double timeout) [[block]];
void comm_wait(smx_synchro_t comm, double timeout) [[block]];
int comm_test(smx_synchro_t comm) [[block]];
-int comm_testany(xbt_dynar_t comms) [[block]];
+int comm_testany(smx_synchro_t* comms, size_t count) [[block]];
smx_mutex_t mutex_init();
void mutex_lock(smx_mutex_t mutex) [[block]];
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include <algorithm>
+
#include <boost/range/algorithm.hpp>
#include <xbt/ex.hpp>
}
}
-void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
+void simcall_HANDLER_comm_testany(
+ smx_simcall_t simcall, simgrid::simix::Synchro* comms[], size_t count)
{
- unsigned int cursor;
- smx_synchro_t synchro;
// The default result is -1 -- this means, "nothing is ready".
// It can be changed below, but only if something matches.
simcall_comm_testany__set__result(simcall, -1);
if(idx == -1){
SIMIX_simcall_answer(simcall);
}else{
- synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
+ simgrid::simix::Synchro* synchro = comms[idx];
simcall_comm_testany__set__result(simcall, idx);
synchro->simcalls.push_back(simcall);
synchro->state = SIMIX_DONE;
return;
}
- xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
+ for (std::size_t i = 0; i != count; ++i) {
+ simgrid::simix::Synchro* synchro = comms[i];
if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
- simcall_comm_testany__set__result(simcall, cursor);
+ simcall_comm_testany__set__result(simcall, i);
synchro->simcalls.push_back(simcall);
SIMIX_comm_finish(synchro);
return;
SIMIX_simcall_answer(simcall);
}
- void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
+ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
{
smx_synchro_t synchro;
unsigned int cursor = 0;
if (MC_is_active() || MC_record_replay_is_active()){
+ if (timeout != -1)
+ xbt_die("Timeout not implemented for waitany in the model-checker");
int idx = SIMCALL_GET_MC_VALUE(simcall);
synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
synchro->simcalls.push_back(simcall);
SIMIX_comm_finish(synchro);
return;
}
-
+
+ if (timeout == -1 ){
+ simcall->timer = NULL;
+ } else {
+ simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
+ SIMIX_waitany_remove_simcall_from_actions(simcall);
+ simcall_comm_waitany__set__result(simcall, -1);
+ SIMIX_simcall_answer(simcall);
+ });
+ }
+
xbt_dynar_foreach(synchros, cursor, synchro){
/* associate this simcall to the the synchro */
synchro->simcalls.push_back(simcall);
continue; // if process handling comm is killed
if (simcall->call == SIMCALL_COMM_WAITANY) {
SIMIX_waitany_remove_simcall_from_actions(simcall);
+ if (simcall->timer) {
+ SIMIX_timer_remove(simcall->timer);
+ simcall->timer = nullptr;
+ }
if (!MC_is_active() && !MC_record_replay_is_active())
simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
}
e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
}
else if (simcall->call == SIMCALL_COMM_TESTANY) {
- e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
+ e.value = -1;
+ auto comms = simcall_comm_testany__get__comms(simcall);
+ auto count = simcall_comm_testany__get__count(simcall);
+ auto element = std::find(comms, comms + count, synchro);
+ if (element == comms + count)
+ e.value = -1;
+ else
+ e.value = element - comms;
}
simcall->issuer->exception = std::make_exception_ptr(e);
}
int smpi_mpi_testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
{
- xbt_dynar_t comms;
+ std::vector<simgrid::simix::Synchro*> comms;
+ comms.reserve(count);
+
int i;
int flag = 0;
*index = MPI_UNDEFINED;
- comms = xbt_dynar_new(sizeof(smx_synchro_t), nullptr);
+
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
for(i = 0; i < count; i++) {
if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action && !(requests[i]->flags & PREPARED)) {
- xbt_dynar_push(comms, &requests[i]->action);
+ comms.push_back(requests[i]->action);
map.push_back(i);
}
}
if(smpi_test_sleep > 0)
simcall_process_sleep(nsleeps*smpi_test_sleep);
- i = simcall_comm_testany(comms); // The i-th element in comms matches!
+ i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
*index = map[i];
finish_wait(&requests[*index], status);
flag = 1;
smpi_empty_status(status);
}
- xbt_dynar_free(&comms);
return flag;
}
}
}
if(size > 0) {
- i = simcall_comm_waitany(comms);
+ i = simcall_comm_waitany(comms, -1);
// not MPI_UNDEFINED, as this is a simix return code
if (i != -1) {