#include "xbt/log.h"
#include "mc/mc.h"
#include "xbt/dict.h"
+#include "smpi/private.h"
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
"Logging specific to SIMIX (network)");
void SIMIX_network_init(void)
{
rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
- if(MC_is_active())
- MC_ignore_global_variable("smx_total_comms");
}
void SIMIX_network_exit(void)
rdv->done_comm_fifo = xbt_fifo_new();
rdv->permanent_receiver=NULL;
- XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
+ XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
if (rdv->name)
xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
xbt_fifo_free(rdv->comm_fifo);
xbt_fifo_free(rdv->done_comm_fifo);
- xbt_free(rdv);
+ xbt_free(rdv);
}
xbt_dict_t SIMIX_get_rdv_points()
return act;
}
-void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
- SIMIX_comm_destroy(action);
-}
/**
* \brief Destroy a communicate action
* \param action The communicate action to be destroyed
}
}
-void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
+void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double timeout){
- smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
- src_buff, src_buff_size, match_fun, NULL,
+ smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
+ src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
data, 0);
- simcall->mc_value = 0;
+ SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
-smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
+smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
- void (*clean_fun)(void *),
+ void (*clean_fun)(void *),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, int detached){
- return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
- src_buff_size, match_fun, clean_fun, data, detached);
+ return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
+ src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
}
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
void *data,
int detached)
{
- XBT_DEBUG("send from %p\n", rdv);
+ XBT_DEBUG("send from %p", rdv);
/* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
other_action->comm.refcount++;
xbt_fifo_push(rdv->done_comm_fifo,other_action);
other_action->comm.rdv=rdv;
- XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
+ XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_action->comm));
}else{
SIMIX_rdv_push(rdv, this_action);
}
} else {
- XBT_DEBUG("Receive already pushed\n");
+ XBT_DEBUG("Receive already pushed");
SIMIX_comm_destroy(this_action);
--smx_total_comms; // this creation was a pure waste
other_action->comm.src_data = data;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
+
if (MC_is_active()) {
other_action->state = SIMIX_RUNNING;
void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double timeout, double rate)
{
smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
- dst_buff_size, match_fun, data, rate);
- simcall->mc_value = 0;
+ dst_buff_size, match_fun, copy_data_fun, data, rate);
+ SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double rate)
{
return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
- match_fun, data, rate);
+ match_fun, copy_data_fun, data, rate);
}
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
void *data, double rate)
{
- XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
+ XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
smx_action_t other_action;
//int already_received=0;
if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
- XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
+ XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
//find a match in the already received fifo
other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
//if not found, assume the receiver came first, register it to the mailbox in the classical way
if (!other_action) {
- XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
+ XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
other_action = this_action;
SIMIX_rdv_push(rdv, this_action);
}else{
if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
{
- XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
+ XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_action->comm));
other_action->state = SIMIX_DONE;
other_action->comm.type = SIMIX_COMM_DONE;
other_action->comm.rdv = NULL;
- //SIMIX_comm_destroy(this_action);
- //--smx_total_comms; // this creation was a pure waste
- //already_received=1;
- //other_action->comm.refcount--;
}/*else{
- XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
+ XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
}*/
other_action->comm.refcount--;
SIMIX_comm_destroy(this_action);
other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
if (!other_action) {
- XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
+ XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
other_action = this_action;
SIMIX_rdv_push(rdv, this_action);
} else {
other_action->comm.rate = rate;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
/*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
}
smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
- int src, int tag,
+ int type, int src, int tag,
int (*match_fun)(void *, void *, smx_action_t),
void *data){
- return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
+ return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
}
-smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
+smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
{
- XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
- smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
-
+ XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
+ smx_action_t this_action;
+ int smx_type;
+ if(type == 1){
+ this_action=SIMIX_comm_new(SIMIX_COMM_SEND);
+ smx_type = SIMIX_COMM_RECEIVE;
+ } else{
+ this_action=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
+ smx_type = SIMIX_COMM_SEND;
+ }
smx_action_t other_action=NULL;
if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
//find a match in the already received fifo
- XBT_DEBUG("first try in the perm recv mailbox \n");
+ XBT_DEBUG("first try in the perm recv mailbox");
- other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
+ other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_action);
}
// }else{
if(!other_action){
- XBT_DEBUG("second try in the other mailbox");
- other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
+ XBT_DEBUG("try in the normal mailbox");
+ other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_action);
}
// }
if(other_action)other_action->comm.refcount--;
simcall->issuer->waiting_action = action;
if (MC_is_active()) {
- int idx = simcall->mc_value;
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
if (idx == 0) {
action->state = SIMIX_DONE;
} else {
simcall_comm_testany__set__result(simcall, -1);
if (MC_is_active()){
- int idx = simcall->mc_value;
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
if(idx == -1){
SIMIX_simcall_answer(simcall);
}else{
unsigned int cursor = 0;
if (MC_is_active()){
- int idx = simcall->mc_value;
+ int idx = SIMCALL_GET_MC_VALUE(simcall);
action = xbt_dynar_get_as(actions, idx, smx_action_t);
xbt_fifo_push(action->simcalls, simcall);
simcall_comm_waitany__set__result(simcall, idx);
it from the other actions in the waitany list. Afterwards, get the
position of the actual action in the waitany dynar and
return it as the result of the simcall */
+
+ if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
+ continue; // if process handling comm is killed
if (simcall->call == SIMCALL_COMM_WAITANY) {
SIMIX_waitany_remove_simcall_from_actions(simcall);
if (!MC_is_active())
simcall->issuer->waiting_action = NULL;
xbt_fifo_remove(simcall->issuer->comms, action);
if(action->comm.detached){
- smx_process_t proc;
- int still_alive = 0;
-
if(simcall->issuer == action->comm.src_proc){
- if(action->comm.dst_proc){
- xbt_swag_foreach(proc, simix_global->process_list)
- {
- if(proc==action->comm.dst_proc){
- still_alive=1;
- break;
- }
- }
- }
- if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
+ if(action->comm.dst_proc)
+ xbt_fifo_remove(action->comm.dst_proc->comms, action);
}
if(simcall->issuer == action->comm.dst_proc){
if(action->comm.src_proc)
- if(action->comm.dst_proc){
- xbt_swag_foreach(proc, simix_global->process_list)
- {
- if(proc==action->comm.src_proc){
- still_alive=1;
- break;
- }
- }
- }
- if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
+ xbt_fifo_remove(action->comm.src_proc->comms, action);
}
}
SIMIX_simcall_answer(simcall);
action->state = SIMIX_DST_HOST_FAILURE;
else if (action->comm.surf_comm &&
surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
- XBT_DEBUG("Puta madre. Surf says that the link broken");
+ XBT_DEBUG("Puta madre. Surf says that the link broke");
action->state = SIMIX_LINK_FAILURE;
} else
action->state = SIMIX_DONE;
/* destroy the surf actions associated with the Simix communication */
SIMIX_comm_destroy_internal_actions(action);
- /* remove the communication action from the list of pending communications
- * of both processes (if they still exist) */
- if (action->comm.src_proc) {
- xbt_fifo_remove(action->comm.src_proc->comms, action);
- }
- if (action->comm.dst_proc) {
- xbt_fifo_remove(action->comm.dst_proc->comms, action);
- }
-
/* if there are simcalls associated with the action, then answer them */
if (xbt_fifo_size(action->simcalls)) {
SIMIX_comm_finish(action);
* \brief verify if communication is latency bounded
* \param comm The communication
*/
-XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
+int SIMIX_comm_is_latency_bounded(smx_action_t action)
{
if(!action){
return 0;
}
if (action->comm.surf_comm){
XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
- action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
+ action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
XBT_DEBUG("Action limited is %d", action->latency_limited);
}
return action->latency_limited;
if (comm->comm.dst_buff_size)
*comm->comm.dst_buff_size = buff_size;
- if (buff_size > 0)
- SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ if (buff_size > 0){
+ if(comm->comm.copy_data_fun)
+ comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
+ else
+ SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ }
+
/* Set the copied flag so we copy data only once */
/* (this function might be called from both communication ends) */