-/* Copyright (c) 2009, 2010. The SimGrid Team.
+/* Copyright (c) 2009-2014. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
#include "xbt/log.h"
#include "mc/mc.h"
#include "xbt/dict.h"
+#include "smpi/private.h"
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
"Logging specific to SIMIX (network)");
static xbt_dict_t rdv_points = NULL;
-XBT_IMPORT_NO_EXPORT(unsigned long int) smx_total_comms = 0;
+XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
static void SIMIX_comm_copy_data(smx_action_t comm);
int (*match_fun)(void *, void *,smx_action_t),
void *user_data, smx_action_t my_action);
static void SIMIX_rdv_free(void *data);
+static void SIMIX_comm_start(smx_action_t action);
void SIMIX_network_init(void)
{
rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
- if(MC_is_active())
- MC_ignore_data_bss(&smx_total_comms, sizeof(smx_total_comms));
}
void SIMIX_network_exit(void)
xbt_fifo_free(rdv->comm_fifo);
xbt_fifo_free(rdv->done_comm_fifo);
- xbt_free(rdv);
+ xbt_free(rdv);
}
xbt_dict_t SIMIX_get_rdv_points()
xbt_fifo_remove_item(fifo, item);
xbt_fifo_free_item(item);
action->comm.refcount++;
+#ifdef HAVE_MC
+ action->comm.rdv_cpy = action->comm.rdv;
+#endif
action->comm.rdv = NULL;
return action;
}
/* set communication */
act->comm.type = type;
act->comm.refcount = 1;
+ act->comm.src_data=NULL;
+ act->comm.dst_data=NULL;
+
#ifdef HAVE_LATENCY_BOUND_TRACKING
//initialize with unknown value
return act;
}
-void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
- SIMIX_comm_destroy(action);
-}
/**
* \brief Destroy a communicate action
* \param action The communicate action to be destroyed
}
action->comm.refcount--;
if (action->comm.refcount > 0)
- return;
+ return;
XBT_DEBUG("Really free communication %p; refcount is now %d", action,
action->comm.refcount);
action->comm.src_buff = NULL;
}
+ if(action->comm.rdv)
+ SIMIX_rdv_remove(action->comm.rdv, action);
+
xbt_mallocator_release(simix_global->action_mallocator, action);
}
#ifdef HAVE_LATENCY_BOUND_TRACKING
action->latency_limited = SIMIX_comm_is_latency_bounded(action);
#endif
- action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
+ surf_action_unref(action->comm.surf_comm);
action->comm.surf_comm = NULL;
}
if (action->comm.src_timeout){
- action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
+ surf_action_unref(action->comm.src_timeout);
action->comm.src_timeout = NULL;
}
if (action->comm.dst_timeout){
- action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
+ surf_action_unref(action->comm.dst_timeout);
action->comm.dst_timeout = NULL;
}
}
-void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
+void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double timeout){
- smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
- src_buff, src_buff_size, match_fun, NULL,
+ smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
+ src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
data, 0);
- simcall->mc_value = 0;
+ SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
-smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
+smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
- void (*clean_fun)(void *),
+ void (*clean_fun)(void *),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, int detached){
- return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
- src_buff_size, match_fun, clean_fun, data, detached);
+ return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
+ src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
}
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
void *data,
int detached)
{
other_action->state = SIMIX_READY;
other_action->comm.dst_proc=rdv->permanent_receiver;
other_action->comm.refcount++;
- other_action->comm.rdv = rdv;
xbt_fifo_push(rdv->done_comm_fifo,other_action);
other_action->comm.rdv=rdv;
XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
other_action->comm.src_data = data;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
+
if (MC_is_active()) {
other_action->state = SIMIX_RUNNING;
- return other_action;
+ return (detached ? NULL : other_action);
}
SIMIX_comm_start(other_action);
}
void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t),
- void *data, double timeout){
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
+ void *data, double timeout, double rate)
+{
smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
- dst_buff_size, match_fun, data);
- simcall->mc_value = 0;
+ dst_buff_size, match_fun, copy_data_fun, data, rate);
+ SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
+
smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
- void *data){
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
+ void *data, double rate)
+{
return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
- match_fun, data);
+ match_fun, copy_data_fun, data, rate);
}
+
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
- int (*match_fun)(void *, void *, smx_action_t), void *data)
+ int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
+ void *data, double rate)
{
XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
}/*else{
XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
}*/
- // other_action->comm.refcount--;
+ other_action->comm.refcount--;
SIMIX_comm_destroy(this_action);
--smx_total_comms; // this creation was a pure waste
}
--smx_total_comms; // this creation was a pure waste
other_action->state = SIMIX_READY;
other_action->comm.type = SIMIX_COMM_READY;
- // other_action->comm.refcount--;
+ //other_action->comm.refcount--;
}
xbt_fifo_push(dst_proc->comms, other_action);
}
other_action->comm.dst_buff_size = dst_buff_size;
other_action->comm.dst_data = data;
+ if (rate != -1.0 &&
+ (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
+ other_action->comm.rate = rate;
+
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
/*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
{
- int idx = simcall->mc_value;
/* the simcall may be a wait, a send or a recv */
surf_action_t sleep;
simcall->issuer->waiting_action = action;
if (MC_is_active()) {
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
if (idx == 0) {
action->state = SIMIX_DONE;
} else {
if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
SIMIX_comm_finish(action);
} else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
- sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
- surf_workstation_model->action_data_set(sleep, action);
+ sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
+ surf_action_set_data(sleep, action);
if (simcall->issuer == action->comm.src_proc)
action->comm.src_timeout = sleep;
void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
{
- int idx = simcall->mc_value;
unsigned int cursor;
smx_action_t action;
simcall_comm_testany__set__result(simcall, -1);
if (MC_is_active()){
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
if(idx == -1){
SIMIX_simcall_answer(simcall);
}else{
void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
{
- int idx = simcall->mc_value;
smx_action_t action;
unsigned int cursor = 0;
if (MC_is_active()){
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
action = xbt_dynar_get_as(actions, idx, smx_action_t);
xbt_fifo_push(action->simcalls, simcall);
simcall_comm_waitany__set__result(simcall, idx);
* \brief Starts the simulation of a communication action.
* \param action the communication action
*/
-XBT_INLINE void SIMIX_comm_start(smx_action_t action)
+static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
{
/* If both the sender and the receiver are already there, start the communication */
if (action->state == SIMIX_READY) {
XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
- action->comm.surf_comm = surf_workstation_model->extension.workstation.
- communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
+ action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
+ sender, receiver,
+ action->comm.task_size, action->comm.rate);
- surf_workstation_model->action_data_set(action->comm.surf_comm, action);
+ surf_action_set_data(action->comm.surf_comm, action);
action->state = SIMIX_RUNNING;
/* If a link is failed, detect it immediately */
- if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+ if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
action->state = SIMIX_LINK_FAILURE;
XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
- surf_workstation_model->suspend(action->comm.surf_comm);
+ surf_action_suspend(action->comm.surf_comm);
}
}
unsigned int destroy_count = 0;
smx_simcall_t simcall;
+
while ((simcall = xbt_fifo_shift(action->simcalls))) {
/* If a waitany simcall is waiting for this action to finish, then remove
it from the other actions in the waitany list. Afterwards, get the
position of the actual action in the waitany dynar and
return it as the result of the simcall */
+
+ if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
+ continue; // if process handling comm is killed
if (simcall->call == SIMCALL_COMM_WAITANY) {
SIMIX_waitany_remove_simcall_from_actions(simcall);
if (!MC_is_active())
case SIMIX_LINK_FAILURE:
XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
action,
- action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
- action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
+ action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
+ action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
simcall->issuer->name, simcall->issuer, action->comm.detached);
if (action->comm.src_proc == simcall->issuer) {
XBT_DEBUG("I'm source");
}
}
- if (surf_workstation_model->extension.
- workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
+ if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
simcall->issuer->context->iwannadie = 1;
}
simcall->issuer->waiting_action = NULL;
xbt_fifo_remove(simcall->issuer->comms, action);
+ if(action->comm.detached){
+ if(simcall->issuer == action->comm.src_proc){
+ if(action->comm.dst_proc)
+ xbt_fifo_remove(action->comm.dst_proc->comms, action);
+ }
+ if(simcall->issuer == action->comm.dst_proc){
+ if(action->comm.src_proc)
+ xbt_fifo_remove(action->comm.src_proc->comms, action);
+ }
+ }
SIMIX_simcall_answer(simcall);
destroy_count++;
}
{
/* Update action state */
if (action->comm.src_timeout &&
- surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
+ surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
action->state = SIMIX_SRC_TIMEOUT;
else if (action->comm.dst_timeout &&
- surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
+ surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
action->state = SIMIX_DST_TIMEOUT;
else if (action->comm.src_timeout &&
- surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
+ surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
action->state = SIMIX_SRC_HOST_FAILURE;
else if (action->comm.dst_timeout &&
- surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
+ surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
action->state = SIMIX_DST_HOST_FAILURE;
else if (action->comm.surf_comm &&
- surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+ surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
XBT_DEBUG("Puta madre. Surf says that the link broke");
action->state = SIMIX_LINK_FAILURE;
} else
/* destroy the surf actions associated with the Simix communication */
SIMIX_comm_destroy_internal_actions(action);
- /* remove the communication action from the list of pending communications
- * of both processes (if they still exist) */
- if (action->comm.src_proc) {
- xbt_fifo_remove(action->comm.src_proc->comms, action);
- }
- if (action->comm.dst_proc) {
- xbt_fifo_remove(action->comm.dst_proc->comms, action);
- }
-
/* if there are simcalls associated with the action, then answer them */
if (xbt_fifo_size(action->simcalls)) {
SIMIX_comm_finish(action);
else if (!MC_is_active() /* when running the MC there are no surf actions */
&& (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
- surf_workstation_model->action_cancel(action->comm.surf_comm);
+ surf_action_cancel(action->comm.surf_comm);
}
}
{
/*FIXME: shall we suspend also the timeout actions? */
if (action->comm.surf_comm)
- surf_workstation_model->suspend(action->comm.surf_comm);
+ surf_action_suspend(action->comm.surf_comm);
/* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
}
{
/*FIXME: check what happen with the timeouts */
if (action->comm.surf_comm)
- surf_workstation_model->resume(action->comm.surf_comm);
+ surf_action_resume(action->comm.surf_comm);
/* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
}
switch (action->state) {
case SIMIX_RUNNING:
- remains = surf_workstation_model->get_remains(action->comm.surf_comm);
+ remains = surf_action_get_remains(action->comm.surf_comm);
break;
case SIMIX_WAITING:
}
#ifdef HAVE_LATENCY_BOUND_TRACKING
+int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
+{
+ return SIMIX_comm_is_latency_bounded(action);
+}
+
/**
* \brief verify if communication is latency bounded
* \param comm The communication
*/
-XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
+int SIMIX_comm_is_latency_bounded(smx_action_t action)
{
if(!action){
return 0;
}
if (action->comm.surf_comm){
XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
- action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
+ action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
XBT_DEBUG("Action limited is %d", action->latency_limited);
}
return action->latency_limited;
XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
comm,
- comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
+ comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
comm->comm.src_buff,
- comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
+ comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
comm->comm.dst_buff, buff_size);
/* Copy at most dst_buff_size bytes of the message to receiver's buffer */
if (comm->comm.dst_buff_size)
*comm->comm.dst_buff_size = buff_size;
- if (buff_size > 0)
- SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ if (buff_size > 0){
+ if(comm->comm.copy_data_fun)
+ comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
+ else
+ SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ }
+
/* Set the copied flag so we copy data only once */
/* (this function might be called from both communication ends) */