#include "xbt/log.h"
#include "mc/mc.h"
#include "xbt/dict.h"
+#include "smpi/private.h"
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
"Logging specific to SIMIX (network)");
xbt_fifo_free(rdv->comm_fifo);
xbt_fifo_free(rdv->done_comm_fifo);
- xbt_free(rdv);
+ xbt_free(rdv);
}
xbt_dict_t SIMIX_get_rdv_points()
}
}
-void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
+void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double timeout){
- smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
- src_buff, src_buff_size, match_fun, NULL,
+ smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
+ src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
data, 0);
SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
-smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
+smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
- void (*clean_fun)(void *),
+ void (*clean_fun)(void *),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, int detached){
- return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
- src_buff_size, match_fun, clean_fun, data, detached);
+ return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
+ src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
}
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *,smx_action_t),
void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
void *data,
int detached)
{
other_action->comm.src_data = data;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
+
if (MC_is_active()) {
other_action->state = SIMIX_RUNNING;
void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double timeout, double rate)
{
smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
- dst_buff_size, match_fun, data, rate);
+ dst_buff_size, match_fun, copy_data_fun, data, rate);
SIMCALL_SET_MC_VALUE(simcall, 0);
SIMIX_pre_comm_wait(simcall, comm, timeout);
}
smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t),
void *data, double rate)
{
return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
- match_fun, data, rate);
+ match_fun, copy_data_fun, data, rate);
}
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*match_fun)(void *, void *, smx_action_t),
+ void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
void *data, double rate)
{
XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
other_action->comm.rate = rate;
other_action->comm.match_fun = match_fun;
+ other_action->comm.copy_data_fun = copy_data_fun;
/*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
it from the other actions in the waitany list. Afterwards, get the
position of the actual action in the waitany dynar and
return it as the result of the simcall */
+
+ if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
+ continue; // if process handling comm is killed
if (simcall->call == SIMCALL_COMM_WAITANY) {
SIMIX_waitany_remove_simcall_from_actions(simcall);
if (!MC_is_active())
simcall->issuer->waiting_action = NULL;
xbt_fifo_remove(simcall->issuer->comms, action);
if(action->comm.detached){
- smx_process_t proc;
- int still_alive = 0;
-
if(simcall->issuer == action->comm.src_proc){
- if(action->comm.dst_proc){
- xbt_swag_foreach(proc, simix_global->process_list)
- {
- if(proc==action->comm.dst_proc){
- still_alive=1;
- break;
- }
- }
- }
- if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
+ if(action->comm.dst_proc)
+ xbt_fifo_remove(action->comm.dst_proc->comms, action);
}
if(simcall->issuer == action->comm.dst_proc){
if(action->comm.src_proc)
- if(action->comm.dst_proc){
- xbt_swag_foreach(proc, simix_global->process_list)
- {
- if(proc==action->comm.src_proc){
- still_alive=1;
- break;
- }
- }
- }
- if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
+ xbt_fifo_remove(action->comm.src_proc->comms, action);
}
}
SIMIX_simcall_answer(simcall);
if (comm->comm.dst_buff_size)
*comm->comm.dst_buff_size = buff_size;
- if (buff_size > 0)
- SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ if (buff_size > 0){
+ if(comm->comm.copy_data_fun)
+ comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
+ else
+ SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+ }
+
/* Set the copied flag so we copy data only once */
/* (this function might be called from both communication ends) */