double timeout);
static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
-static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
+static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *), void *);
static void SIMIX_rdv_free(void *data);
void SIMIX_network_init(void)
* \param type The type of communication we are looking for (comm_send, comm_recv)
* \return The communication request if found, NULL otherwise
*/
-smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
-{
- smx_action_t comm = (smx_action_t)
- xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
-
- if (comm && comm->comm.type == type) {
- DEBUG0("Communication request found!");
- xbt_fifo_shift(rdv->comm_fifo);
- comm->comm.refcount++;
- comm->comm.rdv = NULL;
- return comm;
+smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *), void *data)
+{
+ smx_action_t req;
+ xbt_fifo_item_t item;
+ void* req_data = NULL;
+
+ xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){
+ if(req->comm.type == SIMIX_COMM_SEND) {
+ req_data = req->comm.src_data;
+ } else if(req->comm.type == SIMIX_COMM_RECEIVE) {
+ req_data = req->comm.dst_data;
+ }
+ if(req->comm.type == type && (!match_fun || match_fun(data, req_data))) {
+ xbt_fifo_remove_item(rdv->comm_fifo, item);
+ req->comm.refcount++;
+ req->comm.rdv = NULL;
+ return req;
+ }
}
-
DEBUG0("Communication request not found");
return NULL;
}
#endif
DEBUG1("Create communicate action %p", act);
-
+
return act;
}
{
DEBUG1("Destroy action %p", action);
- if(!(action->comm.refcount > 0))
- xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
+ if (action->comm.refcount <= 0)
+ xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
#ifdef HAVE_LATENCY_BOUND_TRACKING
//save is latency limited flag to use afterwards
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
double task_size, double rate,
- void *src_buff, size_t src_buff_size, void *data)
+ void *src_buff, size_t src_buff_size,
+ int (*match_fun)(void *, void *), void *data)
{
smx_action_t action;
/* Look for communication request matching our needs.
If it is not found then create it and push it into the rendez-vous point */
- action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
+ action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
if (!action) {
action = SIMIX_comm_new(SIMIX_COMM_SEND);
SIMIX_rdv_push(rdv, action);
- }else{
+ } else {
action->state = SIMIX_READY;
action->comm.type = SIMIX_COMM_READY;
}
action->comm.rate = rate;
action->comm.src_buff = src_buff;
action->comm.src_buff_size = src_buff_size;
- action->comm.data = data;
-#ifdef HAVE_MC
- if(_surf_do_model_check){
+ action->comm.src_data = data;
+
+ if (MC_IS_ENABLED) {
action->state = SIMIX_RUNNING;
return action;
}
-#endif
+
SIMIX_comm_start(action);
return action;
}
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size)
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *), void *data)
{
smx_action_t action;
/* Look for communication request matching our needs.
* If it is not found then create it and push it into the rendez-vous point
*/
- action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
+ action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
if (!action) {
action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
action->comm.dst_proc = dst_proc;
action->comm.dst_buff = dst_buff;
action->comm.dst_buff_size = dst_buff_size;
+ action->comm.dst_data = data;
-#ifdef HAVE_MC
- if(_surf_do_model_check){
+ if (MC_IS_ENABLED) {
action->state = SIMIX_RUNNING;
return action;
}
-#endif
SIMIX_comm_start(action);
return action;
xbt_fifo_push(action->request_list, req);
req->issuer->waiting_action = action;
-#ifdef HAVE_MC
- if(_surf_do_model_check){
+ if (MC_IS_ENABLED){
action->state = SIMIX_DONE;
SIMIX_comm_finish(action);
+ return;
}
-#endif
/* If the action has already finish perform the error handling, */
/* otherwise set up a waiting timeout on the right side */
}
}
+void SIMIX_pre_comm_testany(smx_req_t req)
+{
+ unsigned int cursor;
+ smx_action_t action;
+ req->comm_testany.result = -1;
+ xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
+ if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
+ req->comm_testany.result = cursor;
+ xbt_fifo_push(action->request_list, req);
+ SIMIX_comm_finish(action);
+ break;
+ }
+ }
+ SIMIX_request_answer(req);
+}
+
void SIMIX_pre_comm_waitany(smx_req_t req)
{
smx_action_t action;
xbt_dynar_foreach(actions, cursor, action){
/* Associate this request to the action */
xbt_fifo_push(action->request_list, req);
- if(action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
+ if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
SIMIX_comm_finish(action);
break;
}
smx_host_t receiver = action->comm.dst_proc->smx_host;
DEBUG3("Starting communication %p from '%s' to '%s'", action,
- SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
+ SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
- action->comm.surf_comm =
- surf_workstation_model->extension.workstation.
- communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
+ action->comm.surf_comm = surf_workstation_model->extension.workstation.
+ communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
surf_workstation_model->action_data_set(action->comm.surf_comm, action);
action->state = SIMIX_RUNNING;
#ifdef HAVE_TRACING
- TRACE_smx_action_communicate(comm, comm->comm.src_proc);
- TRACE_surf_action(comm->surf_action, comm->category);
+ TRACE_smx_action_communicate(action, action->comm.src_proc);
#endif
/* If a link is failed, detect it immediately */
{
smx_req_t req;
- while((req = xbt_fifo_shift(action->request_list))){
+ while ((req = xbt_fifo_shift(action->request_list))) {
/* If a waitany request is waiting for this action to finish, then remove
it from the other actions in the waitany list. Afterwards, get the
position of the actual action in the waitany request's actions dynar and
return it as the result of the call */
- if(req->call == REQ_COMM_WAITANY){
+ if (req->call == REQ_COMM_WAITANY) {
SIMIX_waitany_req_remove_from_actions(req);
req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
}
/* If the action is still in a rendez-vous point then remove from it */
- if(action->comm.rdv)
+ if (action->comm.rdv)
SIMIX_rdv_remove(action->comm.rdv, action);
DEBUG1("SIMIX_comm_finish: action state = %d", action->state);
case SIMIX_SRC_HOST_FAILURE:
TRY {
- if(req->issuer == action->comm.src_proc)
+ if (req->issuer == action->comm.src_proc)
THROW0(host_error, 0, "Host failed");
else
THROW0(network_error, 0, "Remote peer failed");
void SIMIX_post_comm(smx_action_t action)
{
/* Update action state */
- if(action->comm.src_timeout &&
+ if (action->comm.src_timeout &&
surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
action->state = SIMIX_SRC_TIMEOUT;
- else if(action->comm.dst_timeout &&
+ else if (action->comm.dst_timeout &&
surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
action->state = SIMIX_DST_TIMEOUT;
- else if(action->comm.src_timeout &&
+ else if (action->comm.src_timeout &&
surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
action->state = SIMIX_SRC_HOST_FAILURE;
- else if(action->comm.dst_timeout &&
+ else if (action->comm.dst_timeout &&
surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
action->state = SIMIX_DST_HOST_FAILURE;
- else if(action->comm.surf_comm &&
+ else if (action->comm.surf_comm &&
surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
action->state = SIMIX_LINK_FAILURE;
else
SIMIX_comm_destroy_internal_actions(action);
/* If there are requests associated with the action, then answer them */
- if(xbt_fifo_size(action->request_list))
+ if (xbt_fifo_size(action->request_list))
SIMIX_comm_finish(action);
}
}
/**
- * \brief Return the user data associated to the communication
+ * \brief Return the user data associated to the sender of the communication
+ * \param action The communication
+ * \return the user data
+ */
+void* SIMIX_comm_get_src_data(smx_action_t action)
+{
+ return action->comm.src_data;
+}
+
+/**
+ * \brief Return the user data associated to the receiver of the communication
* \param action The communication
* \return the user data
*/
-void* SIMIX_comm_get_data(smx_action_t action)
+void* SIMIX_comm_get_dst_data(smx_action_t action)
{
- return action->comm.data;
+ return action->comm.dst_data;
}
void* SIMIX_comm_get_src_buff(smx_action_t action)
if (buff_size == 0)
return;
+
(*SIMIX_comm_copy_data_callback) (comm, buff_size);
/* Set the copied flag so we copy data only once */