-/* Copyright (c) 2009-2013. The SimGrid Team.
+/* Copyright (c) 2009-2014. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
#include "xbt/log.h"
#include "mc/mc.h"
#include "xbt/dict.h"
+#include "smpi/private.h"
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
"Logging specific to SIMIX (network)");
int (*match_fun)(void *, void *,smx_action_t),
void *user_data, smx_action_t my_action);
static void SIMIX_rdv_free(void *data);
+static void SIMIX_comm_start(smx_action_t action);
void SIMIX_network_init(void)
{
rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
- if(MC_is_active())
- MC_ignore_global_variable("smx_total_comms");
}
void SIMIX_network_exit(void)
xbt_fifo_free(rdv->comm_fifo);
xbt_fifo_free(rdv->done_comm_fifo);
- xbt_free(rdv);
+ xbt_free(rdv);
}
xbt_dict_t SIMIX_get_rdv_points()
return act;
}
-void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
- SIMIX_comm_destroy(action);
-}
/**
* \brief Destroy a communicate action
* \param action The communicate action to be destroyed
#ifdef HAVE_LATENCY_BOUND_TRACKING
action->latency_limited = SIMIX_comm_is_latency_bounded(action);
#endif
- action->comm.surf_comm->model_obj->action_unref(action->comm.surf_comm);
+ surf_action_unref(action->comm.surf_comm);
action->comm.surf_comm = NULL;
}
if (action->comm.src_timeout){
- action->comm.src_timeout->model_obj->action_unref(action->comm.src_timeout);
+ surf_action_unref(action->comm.src_timeout);
action->comm.src_timeout = NULL;
}
if (action->comm.dst_timeout){
- action->comm.dst_timeout->model_obj->action_unref(action->comm.dst_timeout);
+ surf_action_unref(action->comm.dst_timeout);
action->comm.dst_timeout = NULL;
}
}
-void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
+void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double timeout){
- smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
- src_buff, src_buff_size, match_fun, NULL,
+ smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
+ src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
data, 0);
- simcall->mc_value = 0;
+ SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
-smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
+smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
- void (*clean_fun)(void *),
+ void (*clean_fun)(void *),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, int detached){
- return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
- src_buff_size, match_fun, clean_fun, data, detached);
+ return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
+ src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
}
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
void *data,
int detached)
{
other_action->comm.src_data = data;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
+
if (MC_is_active()) {
other_action->state = SIMIX_RUNNING;
- return other_action;
+ return (detached ? NULL : other_action);
}
SIMIX_comm_start(other_action);
}
void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t),
- void *data, double timeout){
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
+ void *data, double timeout, double rate)
+{
smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
- dst_buff_size, match_fun, data);
- simcall->mc_value = 0;
- SIMIX_pre_comm_wait(simcall, comm, timeout);
-}
-
-void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t),
- void *data, double timeout, double rate){
- smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
- dst_buff_size, match_fun, data, rate);
- simcall->mc_value = 0;
+ dst_buff_size, match_fun, copy_data_fun, data, rate);
+ SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
- void *data){
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
+ void *data, double rate)
+{
return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
- match_fun, data);
+ match_fun, copy_data_fun, data, rate);
}
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t), void *data)
-{
- XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
- smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
-
- smx_action_t other_action;
- //communication already done, get it inside the fifo of completed comms
- //permanent receive v1
- //int already_received=0;
- if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
-
- XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
- //find a match in the already received fifo
- other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
- //if not found, assume the receiver came first, register it to the mailbox in the classical way
- if (!other_action) {
- XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
- other_action = this_action;
- SIMIX_rdv_push(rdv, this_action);
- }else{
- if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
- {
- XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
- other_action->state = SIMIX_DONE;
- other_action->comm.type = SIMIX_COMM_DONE;
- other_action->comm.rdv = NULL;
- //SIMIX_comm_destroy(this_action);
- //--smx_total_comms; // this creation was a pure waste
- //already_received=1;
- //other_action->comm.refcount--;
- }/*else{
- XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
- }*/
- other_action->comm.refcount--;
- SIMIX_comm_destroy(this_action);
- --smx_total_comms; // this creation was a pure waste
- }
- }else{
- /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
-
- /* Look for communication action matching our needs. We also provide a description of
- * ourself so that the other side also gets a chance of choosing if it wants to match with us.
- *
- * If it is not found then push our communication into the rendez-vous point */
- other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
-
- if (!other_action) {
- XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
- other_action = this_action;
- SIMIX_rdv_push(rdv, this_action);
- } else {
- SIMIX_comm_destroy(this_action);
- --smx_total_comms; // this creation was a pure waste
- other_action->state = SIMIX_READY;
- other_action->comm.type = SIMIX_COMM_READY;
- //other_action->comm.refcount--;
- }
- xbt_fifo_push(dst_proc->comms, other_action);
- }
-
- /* Setup communication action */
- other_action->comm.dst_proc = dst_proc;
- other_action->comm.dst_buff = dst_buff;
- other_action->comm.dst_buff_size = dst_buff_size;
- other_action->comm.dst_data = data;
-
- other_action->comm.match_fun = match_fun;
-
-
- /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
- SIMIX_comm_copy_data(other_action);*/
-
-
- if (MC_is_active()) {
- other_action->state = SIMIX_RUNNING;
- return other_action;
- }
-
- SIMIX_comm_start(other_action);
- // }
- return other_action;
-}
-
-smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t),
- void *data, double rate){
- return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
- match_fun, data, rate);
-}
-
-smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
+ int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
+ void *data, double rate)
{
XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
other_action->comm.dst_buff_size = dst_buff_size;
other_action->comm.dst_data = data;
- if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
- other_action->comm.rate = rate;
+ if (rate != -1.0 &&
+ (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
+ other_action->comm.rate = rate;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
/*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
simcall->issuer->waiting_action = action;
if (MC_is_active()) {
- int idx = simcall->mc_value;
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
if (idx == 0) {
action->state = SIMIX_DONE;
} else {
if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
SIMIX_comm_finish(action);
} else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
- sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
- surf_workstation_model->action_data_set(sleep, action);
+ sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
+ surf_action_set_data(sleep, action);
if (simcall->issuer == action->comm.src_proc)
action->comm.src_timeout = sleep;
simcall_comm_testany__set__result(simcall, -1);
if (MC_is_active()){
- int idx = simcall->mc_value;
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
if(idx == -1){
SIMIX_simcall_answer(simcall);
}else{
unsigned int cursor = 0;
if (MC_is_active()){
- int idx = simcall->mc_value;
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
action = xbt_dynar_get_as(actions, idx, smx_action_t);
xbt_fifo_push(action->simcalls, simcall);
simcall_comm_waitany__set__result(simcall, idx);
* \brief Starts the simulation of a communication action.
* \param action the communication action
*/
-XBT_INLINE void SIMIX_comm_start(smx_action_t action)
+static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
{
/* If both the sender and the receiver are already there, start the communication */
if (action->state == SIMIX_READY) {
XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
- action->comm.surf_comm = surf_workstation_model->extension.workstation.
- communicate(sender, receiver, action->comm.task_size, action->comm.rate);
+ action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
+ sender, receiver,
+ action->comm.task_size, action->comm.rate);
- surf_workstation_model->action_data_set(action->comm.surf_comm, action);
+ surf_action_set_data(action->comm.surf_comm, action);
action->state = SIMIX_RUNNING;
/* If a link is failed, detect it immediately */
- if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+ if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
action->state = SIMIX_LINK_FAILURE;
XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
- surf_workstation_model->suspend(action->comm.surf_comm);
+ surf_action_suspend(action->comm.surf_comm);
}
}
unsigned int destroy_count = 0;
smx_simcall_t simcall;
+
while ((simcall = xbt_fifo_shift(action->simcalls))) {
/* If a waitany simcall is waiting for this action to finish, then remove
it from the other actions in the waitany list. Afterwards, get the
position of the actual action in the waitany dynar and
return it as the result of the simcall */
+
+ if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
+ continue; // if process handling comm is killed
if (simcall->call == SIMCALL_COMM_WAITANY) {
SIMIX_waitany_remove_simcall_from_actions(simcall);
if (!MC_is_active())
}
}
- if (surf_workstation_model->extension.
- workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
+ if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
simcall->issuer->context->iwannadie = 1;
}
{
/* Update action state */
if (action->comm.src_timeout &&
- surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
+ surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
action->state = SIMIX_SRC_TIMEOUT;
else if (action->comm.dst_timeout &&
- surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
+ surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
action->state = SIMIX_DST_TIMEOUT;
else if (action->comm.src_timeout &&
- surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
+ surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
action->state = SIMIX_SRC_HOST_FAILURE;
else if (action->comm.dst_timeout &&
- surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
+ surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
action->state = SIMIX_DST_HOST_FAILURE;
else if (action->comm.surf_comm &&
- surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+ surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
XBT_DEBUG("Puta madre. Surf says that the link broke");
action->state = SIMIX_LINK_FAILURE;
} else
else if (!MC_is_active() /* when running the MC there are no surf actions */
&& (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
- surf_workstation_model->action_cancel(action->comm.surf_comm);
+ surf_action_cancel(action->comm.surf_comm);
}
}
{
/*FIXME: shall we suspend also the timeout actions? */
if (action->comm.surf_comm)
- surf_workstation_model->suspend(action->comm.surf_comm);
+ surf_action_suspend(action->comm.surf_comm);
/* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
}
{
/*FIXME: check what happen with the timeouts */
if (action->comm.surf_comm)
- surf_workstation_model->resume(action->comm.surf_comm);
+ surf_action_resume(action->comm.surf_comm);
/* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
}
switch (action->state) {
case SIMIX_RUNNING:
- remains = surf_workstation_model->get_remains(action->comm.surf_comm);
+ remains = surf_action_get_remains(action->comm.surf_comm);
break;
case SIMIX_WAITING:
* \brief verify if communication is latency bounded
* \param comm The communication
*/
-XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
+int SIMIX_comm_is_latency_bounded(smx_action_t action)
{
if(!action){
return 0;
}
if (action->comm.surf_comm){
XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
- action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
+ action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
XBT_DEBUG("Action limited is %d", action->latency_limited);
}
return action->latency_limited;
if (comm->comm.dst_buff_size)
*comm->comm.dst_buff_size = buff_size;
- if (buff_size > 0)
- SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ if (buff_size > 0){
+ if(comm->comm.copy_data_fun)
+ comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
+ else
+ SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ }
+
/* Set the copied flag so we copy data only once */
/* (this function might be called from both communication ends) */