From f1a4bce3b5ddd9ba8c826c36356dfa71e732309e Mon Sep 17 00:00:00 2001 From: Jonathan Rouzaud-Cornabas Date: Thu, 14 Feb 2013 13:59:13 +0100 Subject: [PATCH] New version of recv/irecv with bounded rate --- include/simgrid/simix.h | 11 +++- src/msg/msg_gos.c | 4 +- src/msg/msg_mailbox.c | 60 +++++++++++++++--- src/simix/smx_network.c | 108 ++++++++++++++++++++++++++++++++ src/simix/smx_network_private.h | 11 ++++ src/simix/smx_smurf_private.h | 2 + src/simix/smx_user.c | 44 ++++++++----- src/surf/surf_routing_cluster.c | 1 + 8 files changed, 211 insertions(+), 30 deletions(-) diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 599746406a..15b5e9a227 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -406,10 +406,19 @@ XBT_PUBLIC(smx_action_t) simcall_comm_irecv(smx_rdv_t rdv, void *dst_buff, int (*match_fun)(void *, void *, smx_action_t), void *data); +XBT_PUBLIC(void) simcall_comm_recv_bounded(smx_rdv_t rdv, void *dst_buff, + size_t * dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), + void *data, double timeout, double rate); + +XBT_PUBLIC(smx_action_t) simcall_comm_irecv_bounded(smx_rdv_t rdv, void *dst_buff, + size_t * dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), + void *data, double rate); + XBT_PUBLIC(void) simcall_comm_destroy(smx_action_t comm); XBT_PUBLIC(smx_action_t) simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag, int (*match_fun)(void *, void *, smx_action_t), void *data); -XBT_PUBLIC(double) simcall_comm_change_rate_first_action(smx_rdv_t rdv, double newrate); XBT_PUBLIC(void) simcall_comm_cancel(smx_action_t comm); /* FIXME: waitany is going to be a vararg function, and should take a timeout */ diff --git a/src/msg/msg_gos.c b/src/msg/msg_gos.c index aeb6ad343b..2b28c971cb 100644 --- a/src/msg/msg_gos.c +++ b/src/msg/msg_gos.c @@ -622,7 +622,7 @@ msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rat smx_rdv_t rdv = MSG_mailbox_get_by_alias(name); - simcall_comm_change_rate_first_action(rdv,rate); + /* FIXME: these functions are not traceable */ /* Sanity check */ @@ -637,7 +637,7 @@ msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rat comm->task_sent = NULL; comm->task_received = task; comm->status = MSG_OK; - comm->s_comm = simcall_comm_irecv(rdv, task, NULL, NULL, NULL); + comm->s_comm = simcall_comm_irecv_bounded(rdv, task, NULL, NULL, NULL, rate); return comm; } diff --git a/src/msg/msg_mailbox.c b/src/msg/msg_mailbox.c index 15d9c3ff82..f2eb97f935 100644 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@ -44,10 +44,6 @@ MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, return simcall_rdv_comm_count_by_host(mailbox, host); } -double MSG_set_rate_before_read(msg_mailbox_t mailbox, double newrate) { - return simcall_comm_change_rate_first_action(mailbox,newrate); -} - msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias) { @@ -145,8 +141,6 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, msg_task_t * task, MSG_RETURN(ret); } - - /** \ingroup msg_mailbox_management * \brief Get a task from a mailbox on a given host at a given rate * @@ -154,7 +148,7 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, msg_task_t * task, * \param task a memory location for storing a #msg_task_t. * \param host a #msg_host_t host from where the task was sent * \param timeout a timeout - * \param rate a bandwidth rate + * \param rate a rate * \return Returns * #MSG_OK if the task was successfully received, @@ -164,10 +158,56 @@ msg_error_t MSG_mailbox_get_task_ext_bounded(msg_mailbox_t mailbox, msg_task_t * task, msg_host_t host, double timeout, double rate) { - MSG_set_rate_before_read(mailbox,rate); - MSG_RETURN(MSG_mailbox_get_task_ext(mailbox,task,host,timeout)); -} + xbt_ex_t e; + msg_error_t ret = MSG_OK; + /* We no longer support getting a task from a specific host */ + if (host) + THROW_UNIMPLEMENTED; +#ifdef HAVE_TRACING + TRACE_msg_task_get_start(); + double start_time = MSG_get_clock(); +#endif + + /* Sanity check */ + xbt_assert(task, "Null pointer for the task storage"); + + if (*task) + XBT_WARN + ("Asked to write the received task in a non empty struct -- proceeding."); + + /* Try to receive it by calling SIMIX network layer */ + TRY { + simcall_comm_recv_bounded(mailbox, task, NULL, NULL, NULL, timeout, rate); + XBT_DEBUG("Got task %s from %p",(*task)->name,mailbox); + (*task)->simdata->isused=0; + } + CATCH(e) { + switch (e.category) { + case cancel_error: + ret = MSG_HOST_FAILURE; + break; + case network_error: + ret = MSG_TRANSFER_FAILURE; + break; + case timeout_error: + ret = MSG_TIMEOUT; + break; + default: + RETHROW; + } + xbt_ex_free(e); + } + +#ifdef HAVE_TRACING + if (ret != MSG_HOST_FAILURE && + ret != MSG_TRANSFER_FAILURE && + ret != MSG_TIMEOUT) { + TRACE_msg_task_get_end(start_time, *task); + } +#endif + MSG_RETURN(ret); +} msg_error_t MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, msg_task_t task, diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index eb1ee0da9f..fd1a3a47ae 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -458,6 +458,17 @@ void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv, simcall->mc_value = 0; SIMIX_pre_comm_wait(simcall, comm, timeout); } + +void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv, + void *dst_buff, size_t *dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), + void *data, double timeout, double rate){ + smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, + dst_buff_size, match_fun, data, rate); + simcall->mc_value = 0; + SIMIX_pre_comm_wait(simcall, comm, timeout); +} + smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, int (*match_fun)(void *, void *, smx_action_t), @@ -465,6 +476,7 @@ smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv, return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size, match_fun, data); } + smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, int (*match_fun)(void *, void *, smx_action_t), void *data) @@ -550,6 +562,102 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, return other_action; } +smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv, + void *dst_buff, size_t *dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), + void *data, double rate){ + return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size, + match_fun, data, rate); +} + +smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv, + void *dst_buff, size_t *dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), void *data, double rate) +{ + XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo); + smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE); + + smx_action_t other_action; + //communication already done, get it inside the fifo of completed comms + //permanent receive v1 + //int already_received=0; + if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){ + + XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n"); + //find a match in the already received fifo + other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action); + //if not found, assume the receiver came first, register it to the mailbox in the classical way + if (!other_action) { + XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n"); + other_action = this_action; + SIMIX_rdv_push(rdv, this_action); + }else{ + if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0) + { + XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm)); + other_action->state = SIMIX_DONE; + other_action->comm.type = SIMIX_COMM_DONE; + other_action->comm.rdv = NULL; + //SIMIX_comm_destroy(this_action); + //--smx_total_comms; // this creation was a pure waste + //already_received=1; + //other_action->comm.refcount--; + }/*else{ + XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo)); + }*/ + other_action->comm.refcount--; + SIMIX_comm_destroy(this_action); + --smx_total_comms; // this creation was a pure waste + } + }else{ + /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */ + + /* Look for communication action matching our needs. We also provide a description of + * ourself so that the other side also gets a chance of choosing if it wants to match with us. + * + * If it is not found then push our communication into the rendez-vous point */ + other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action); + + if (!other_action) { + XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo)); + other_action = this_action; + SIMIX_rdv_push(rdv, this_action); + } else { + SIMIX_comm_destroy(this_action); + --smx_total_comms; // this creation was a pure waste + other_action->state = SIMIX_READY; + other_action->comm.type = SIMIX_COMM_READY; + //other_action->comm.refcount--; + } + xbt_fifo_push(dst_proc->comms, other_action); + } + + /* Setup communication action */ + other_action->comm.dst_proc = dst_proc; + other_action->comm.dst_buff = dst_buff; + other_action->comm.dst_buff_size = dst_buff_size; + other_action->comm.dst_data = data; + + if (rate < other_action->comm.rate || other_action->comm.rate == -1.0) + other_action->comm.rate = rate; + + other_action->comm.match_fun = match_fun; + + + /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info + SIMIX_comm_copy_data(other_action);*/ + + + if (MC_is_active()) { + other_action->state = SIMIX_RUNNING; + return other_action; + } + + SIMIX_comm_start(other_action); + // } + return other_action; +} + smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv, int src, int tag, int (*match_fun)(void *, void *, smx_action_t), diff --git a/src/simix/smx_network_private.h b/src/simix/smx_network_private.h index 4add3efb31..85a651eff5 100644 --- a/src/simix/smx_network_private.h +++ b/src/simix/smx_network_private.h @@ -54,6 +54,13 @@ void SIMIX_comm_recv(smx_process_t dst_proc, smx_rdv_t rdv, smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, int (*)(void *, void *, smx_action_t), void *data); +void SIMIX_comm_recv_bounded(smx_process_t dst_proc, smx_rdv_t rdv, + void *dst_buff, size_t *dst_buff_size, + int (*)(void *, void *,smx_action_t), void *data, + double timeout, double rate); +smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv, + void *dst_buff, size_t *dst_buff_size, + int (*)(void *, void *, smx_action_t), void *data, double rate); void SIMIX_comm_destroy(smx_action_t action); void SIMIX_comm_destroy_internal_actions(smx_action_t action); smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src, @@ -99,6 +106,10 @@ void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, int (*match_fun)(void *, void *, smx_action_t), void *data, double timeout); +void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv, + void *dst_buff, size_t *dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), + void *data, double timeout, double rate); smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, int (*match_fun)(void *, void *, smx_action_t), diff --git a/src/simix/smx_smurf_private.h b/src/simix/smx_smurf_private.h index 601bf8ef1e..714f4561b2 100644 --- a/src/simix/smx_smurf_private.h +++ b/src/simix/smx_smurf_private.h @@ -307,6 +307,8 @@ ACTION(SIMCALL_COMM_SEND, comm_send, WITHOUT_ANSWER, TVOID(result), TSPEC(rdv, s ACTION(SIMCALL_COMM_ISEND, comm_isend, WITH_ANSWER, TSPEC(result, smx_action_t), TSPEC(rdv, smx_rdv_t), TDOUBLE(task_size), TDOUBLE(rate), TPTR(src_buff), TSIZE(src_buff_size), TSPEC(match_fun, simix_match_func_t), TSPEC(clean_fun, simix_clean_func_t), TPTR(data), TINT(detached)) sep \ ACTION(SIMCALL_COMM_RECV, comm_recv, WITHOUT_ANSWER, TVOID(result), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data), TDOUBLE(timeout)) sep \ ACTION(SIMCALL_COMM_IRECV, comm_irecv, WITH_ANSWER, TSPEC(result, smx_action_t), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data)) sep \ +ACTION(SIMCALL_COMM_RECV_BOUNDED, comm_recv_bounded, WITHOUT_ANSWER, TVOID(result), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data), TDOUBLE(timeout), TDOUBLE(rate)) sep \ +ACTION(SIMCALL_COMM_IRECV_BOUNDED, comm_irecv_bounded, WITH_ANSWER, TSPEC(result, smx_action_t), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data), TDOUBLE(rate)) sep \ ACTION(SIMCALL_COMM_DESTROY, comm_destroy, WITH_ANSWER, TVOID(result), TSPEC(comm, smx_action_t)) sep \ ACTION(SIMCALL_COMM_CANCEL, comm_cancel, WITH_ANSWER, TVOID(result), TSPEC(comm, smx_action_t)) sep \ ACTION(SIMCALL_COMM_WAITANY, comm_waitany, WITHOUT_ANSWER, TINT(result), TSPEC(comms, xbt_dynar_t)) sep \ diff --git a/src/simix/smx_user.c b/src/simix/smx_user.c index f618fdf95a..6d8a67c947 100644 --- a/src/simix/smx_user.c +++ b/src/simix/smx_user.c @@ -738,23 +738,33 @@ smx_action_t simcall_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_ /** * \ingroup simix_comm_management */ -double simcall_comm_change_rate_first_action(smx_rdv_t rdv, double newrate) -{ - xbt_assert(rdv, "No rendez-vous point defined for change_rate_first_action"); - - smx_action_t action; - xbt_fifo_item_t item; - - item = xbt_fifo_get_first_item(rdv->comm_fifo); - if (item != NULL) { - action = (smx_action_t) xbt_fifo_get_item_content(item); - if (action->comm.rate > newrate) { - action->comm.rate = newrate; - return newrate; - } else - return action->comm.rate; - } else - return -1.0; +void simcall_comm_recv_bounded(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), void *data, double timeout, double rate) +{ + xbt_assert(isfinite(timeout), "timeout is not finite!"); + xbt_assert(rdv, "No rendez-vous point defined for recv"); + + if (MC_is_active()) { + /* the model-checker wants two separate simcalls */ + smx_action_t comm = simcall_comm_irecv_bounded(rdv, dst_buff, dst_buff_size, + match_fun, data, rate); + simcall_comm_wait(comm, timeout); + } + else { + simcall_BODY_comm_recv_bounded(rdv, dst_buff, dst_buff_size, + match_fun, data, timeout, rate); + } +} +/** + * \ingroup simix_comm_management + */ +smx_action_t simcall_comm_irecv_bounded(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, + int (*match_fun)(void *, void *, smx_action_t), void *data, double rate) +{ + xbt_assert(rdv, "No rendez-vous point defined for irecv"); + + return simcall_BODY_comm_irecv_bounded(rdv, dst_buff, dst_buff_size, + match_fun, data, rate); } diff --git a/src/surf/surf_routing_cluster.c b/src/surf/surf_routing_cluster.c index dc7fdbd415..99bb075f55 100644 --- a/src/surf/surf_routing_cluster.c +++ b/src/surf/surf_routing_cluster.c @@ -122,6 +122,7 @@ static void cluster_get_graph(xbt_graph_t graph, xbt_dict_t nodes, } } + } } static void model_cluster_finalize(AS_t as) -- 2.20.1