int (*match_fun)(void *, void *, smx_action_t),
void *data);
+XBT_PUBLIC(void) simcall_comm_recv_bounded(smx_rdv_t rdv, void *dst_buff,
+ size_t * dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void *data, double timeout, double rate);
+
+XBT_PUBLIC(smx_action_t) simcall_comm_irecv_bounded(smx_rdv_t rdv, void *dst_buff,
+ size_t * dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void *data, double rate);
+
XBT_PUBLIC(void) simcall_comm_destroy(smx_action_t comm);
XBT_PUBLIC(smx_action_t) simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag,
int (*match_fun)(void *, void *, smx_action_t), void *data);
-XBT_PUBLIC(double) simcall_comm_change_rate_first_action(smx_rdv_t rdv, double newrate);
XBT_PUBLIC(void) simcall_comm_cancel(smx_action_t comm);
/* FIXME: waitany is going to be a vararg function, and should take a timeout */
smx_rdv_t rdv = MSG_mailbox_get_by_alias(name);
- simcall_comm_change_rate_first_action(rdv,rate);
+
/* FIXME: these functions are not traceable */
/* Sanity check */
comm->task_sent = NULL;
comm->task_received = task;
comm->status = MSG_OK;
- comm->s_comm = simcall_comm_irecv(rdv, task, NULL, NULL, NULL);
+ comm->s_comm = simcall_comm_irecv_bounded(rdv, task, NULL, NULL, NULL, rate);
return comm;
}
return simcall_rdv_comm_count_by_host(mailbox, host);
}
-double MSG_set_rate_before_read(msg_mailbox_t mailbox, double newrate) {
- return simcall_comm_change_rate_first_action(mailbox,newrate);
-}
-
msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
{
MSG_RETURN(ret);
}
-
-
/** \ingroup msg_mailbox_management
* \brief Get a task from a mailbox on a given host at a given rate
*
* \param task a memory location for storing a #msg_task_t.
* \param host a #msg_host_t host from where the task was sent
* \param timeout a timeout
- * \param rate a bandwidth rate
+ * \param rate a rate
* \return Returns
* #MSG_OK if the task was successfully received,
MSG_mailbox_get_task_ext_bounded(msg_mailbox_t mailbox, msg_task_t * task,
msg_host_t host, double timeout, double rate)
{
- MSG_set_rate_before_read(mailbox,rate);
- MSG_RETURN(MSG_mailbox_get_task_ext(mailbox,task,host,timeout));
-}
+ xbt_ex_t e;
+ msg_error_t ret = MSG_OK;
+ /* We no longer support getting a task from a specific host */
+ if (host)
+ THROW_UNIMPLEMENTED;
+#ifdef HAVE_TRACING
+ TRACE_msg_task_get_start();
+ double start_time = MSG_get_clock();
+#endif
+
+ /* Sanity check */
+ xbt_assert(task, "Null pointer for the task storage");
+
+ if (*task)
+ XBT_WARN
+ ("Asked to write the received task in a non empty struct -- proceeding.");
+
+ /* Try to receive it by calling SIMIX network layer */
+ TRY {
+ simcall_comm_recv_bounded(mailbox, task, NULL, NULL, NULL, timeout, rate);
+ XBT_DEBUG("Got task %s from %p",(*task)->name,mailbox);
+ (*task)->simdata->isused=0;
+ }
+ CATCH(e) {
+ switch (e.category) {
+ case cancel_error:
+ ret = MSG_HOST_FAILURE;
+ break;
+ case network_error:
+ ret = MSG_TRANSFER_FAILURE;
+ break;
+ case timeout_error:
+ ret = MSG_TIMEOUT;
+ break;
+ default:
+ RETHROW;
+ }
+ xbt_ex_free(e);
+ }
+
+#ifdef HAVE_TRACING
+ if (ret != MSG_HOST_FAILURE &&
+ ret != MSG_TRANSFER_FAILURE &&
+ ret != MSG_TIMEOUT) {
+ TRACE_msg_task_get_end(start_time, *task);
+ }
+#endif
+ MSG_RETURN(ret);
+}
msg_error_t
MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, msg_task_t task,
simcall->mc_value = 0;
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
+
+void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void *data, double timeout, double rate){
+ smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
+ dst_buff_size, match_fun, data, rate);
+ simcall->mc_value = 0;
+ SIMIX_pre_comm_wait(simcall, comm, timeout);
+}
+
smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
match_fun, data);
}
+
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t), void *data)
return other_action;
}
+smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void *data, double rate){
+ return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
+ match_fun, data, rate);
+}
+
+smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
+{
+ XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
+ smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
+
+ smx_action_t other_action;
+ //communication already done, get it inside the fifo of completed comms
+ //permanent receive v1
+ //int already_received=0;
+ if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
+
+ XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
+ //find a match in the already received fifo
+ other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
+ //if not found, assume the receiver came first, register it to the mailbox in the classical way
+ if (!other_action) {
+ XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
+ other_action = this_action;
+ SIMIX_rdv_push(rdv, this_action);
+ }else{
+ if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
+ {
+ XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
+ other_action->state = SIMIX_DONE;
+ other_action->comm.type = SIMIX_COMM_DONE;
+ other_action->comm.rdv = NULL;
+ //SIMIX_comm_destroy(this_action);
+ //--smx_total_comms; // this creation was a pure waste
+ //already_received=1;
+ //other_action->comm.refcount--;
+ }/*else{
+ XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
+ }*/
+ other_action->comm.refcount--;
+ SIMIX_comm_destroy(this_action);
+ --smx_total_comms; // this creation was a pure waste
+ }
+ }else{
+ /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
+
+ /* Look for communication action matching our needs. We also provide a description of
+ * ourself so that the other side also gets a chance of choosing if it wants to match with us.
+ *
+ * If it is not found then push our communication into the rendez-vous point */
+ other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
+
+ if (!other_action) {
+ XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
+ other_action = this_action;
+ SIMIX_rdv_push(rdv, this_action);
+ } else {
+ SIMIX_comm_destroy(this_action);
+ --smx_total_comms; // this creation was a pure waste
+ other_action->state = SIMIX_READY;
+ other_action->comm.type = SIMIX_COMM_READY;
+ //other_action->comm.refcount--;
+ }
+ xbt_fifo_push(dst_proc->comms, other_action);
+ }
+
+ /* Setup communication action */
+ other_action->comm.dst_proc = dst_proc;
+ other_action->comm.dst_buff = dst_buff;
+ other_action->comm.dst_buff_size = dst_buff_size;
+ other_action->comm.dst_data = data;
+
+ if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
+ other_action->comm.rate = rate;
+
+ other_action->comm.match_fun = match_fun;
+
+
+ /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
+ SIMIX_comm_copy_data(other_action);*/
+
+
+ if (MC_is_active()) {
+ other_action->state = SIMIX_RUNNING;
+ return other_action;
+ }
+
+ SIMIX_comm_start(other_action);
+ // }
+ return other_action;
+}
+
smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
int src, int tag,
int (*match_fun)(void *, void *, smx_action_t),
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*)(void *, void *, smx_action_t), void *data);
+void SIMIX_comm_recv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*)(void *, void *,smx_action_t), void *data,
+ double timeout, double rate);
+smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*)(void *, void *, smx_action_t), void *data, double rate);
void SIMIX_comm_destroy(smx_action_t action);
void SIMIX_comm_destroy_internal_actions(smx_action_t action);
smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
void *data, double timeout);
+void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void *data, double timeout, double rate);
smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
ACTION(SIMCALL_COMM_ISEND, comm_isend, WITH_ANSWER, TSPEC(result, smx_action_t), TSPEC(rdv, smx_rdv_t), TDOUBLE(task_size), TDOUBLE(rate), TPTR(src_buff), TSIZE(src_buff_size), TSPEC(match_fun, simix_match_func_t), TSPEC(clean_fun, simix_clean_func_t), TPTR(data), TINT(detached)) sep \
ACTION(SIMCALL_COMM_RECV, comm_recv, WITHOUT_ANSWER, TVOID(result), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data), TDOUBLE(timeout)) sep \
ACTION(SIMCALL_COMM_IRECV, comm_irecv, WITH_ANSWER, TSPEC(result, smx_action_t), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data)) sep \
+ACTION(SIMCALL_COMM_RECV_BOUNDED, comm_recv_bounded, WITHOUT_ANSWER, TVOID(result), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data), TDOUBLE(timeout), TDOUBLE(rate)) sep \
+ACTION(SIMCALL_COMM_IRECV_BOUNDED, comm_irecv_bounded, WITH_ANSWER, TSPEC(result, smx_action_t), TSPEC(rdv, smx_rdv_t), TPTR(dst_buff), TSPEC(dst_buff_size, size_t*), TSPEC(match_fun, simix_match_func_t), TPTR(data), TDOUBLE(rate)) sep \
ACTION(SIMCALL_COMM_DESTROY, comm_destroy, WITH_ANSWER, TVOID(result), TSPEC(comm, smx_action_t)) sep \
ACTION(SIMCALL_COMM_CANCEL, comm_cancel, WITH_ANSWER, TVOID(result), TSPEC(comm, smx_action_t)) sep \
ACTION(SIMCALL_COMM_WAITANY, comm_waitany, WITHOUT_ANSWER, TINT(result), TSPEC(comms, xbt_dynar_t)) sep \
/**
* \ingroup simix_comm_management
*/
-double simcall_comm_change_rate_first_action(smx_rdv_t rdv, double newrate)
-{
- xbt_assert(rdv, "No rendez-vous point defined for change_rate_first_action");
-
- smx_action_t action;
- xbt_fifo_item_t item;
-
- item = xbt_fifo_get_first_item(rdv->comm_fifo);
- if (item != NULL) {
- action = (smx_action_t) xbt_fifo_get_item_content(item);
- if (action->comm.rate > newrate) {
- action->comm.rate = newrate;
- return newrate;
- } else
- return action->comm.rate;
- } else
- return -1.0;
+void simcall_comm_recv_bounded(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t), void *data, double timeout, double rate)
+{
+ xbt_assert(isfinite(timeout), "timeout is not finite!");
+ xbt_assert(rdv, "No rendez-vous point defined for recv");
+
+ if (MC_is_active()) {
+ /* the model-checker wants two separate simcalls */
+ smx_action_t comm = simcall_comm_irecv_bounded(rdv, dst_buff, dst_buff_size,
+ match_fun, data, rate);
+ simcall_comm_wait(comm, timeout);
+ }
+ else {
+ simcall_BODY_comm_recv_bounded(rdv, dst_buff, dst_buff_size,
+ match_fun, data, timeout, rate);
+ }
+}
+/**
+ * \ingroup simix_comm_management
+ */
+smx_action_t simcall_comm_irecv_bounded(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
+{
+ xbt_assert(rdv, "No rendez-vous point defined for irecv");
+
+ return simcall_BODY_comm_irecv_bounded(rdv, dst_buff, dst_buff_size,
+ match_fun, data, rate);
}
}
}
+ }
}
static void model_cluster_finalize(AS_t as)