Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Distinguish access to sender-side and receiver-side user data in smx_action_t.
[simgrid.git] / src / simix / smx_network.c
index 0da3641..ded44f2 100644 (file)
@@ -23,7 +23,8 @@ static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
                                                       double timeout);
 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
-static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
+static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+                                                                                 int (*match_fun)(void *, void *), void *);
 static void SIMIX_rdv_free(void *data);
 
 void SIMIX_network_init(void)
@@ -122,19 +123,26 @@ static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
  *  \param type The type of communication we are looking for (comm_send, comm_recv)
  *  \return The communication request if found, NULL otherwise
  */
-smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
-{
-  smx_action_t comm = (smx_action_t)
-      xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
-
-  if (comm && comm->comm.type == type) {
-    DEBUG0("Communication request found!");
-    xbt_fifo_shift(rdv->comm_fifo);
-    comm->comm.refcount++;
-    comm->comm.rdv = NULL;
-    return comm;
+smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+                                                                  int (*match_fun)(void *, void *), void *data)
+{
+  smx_action_t req;
+  xbt_fifo_item_t item;
+  void* req_data = NULL;
+
+  xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){
+    if(req->comm.type == SIMIX_COMM_SEND) {
+      req_data = req->comm.src_data;
+    } else if(req->comm.type == SIMIX_COMM_RECEIVE) {
+      req_data = req->comm.dst_data;
+    }
+        if(req->comm.type == type && (!match_fun || match_fun(data, req_data))) {
+          xbt_fifo_remove_item(rdv->comm_fifo, item);
+          req->comm.refcount++;
+          req->comm.rdv = NULL;
+          return req;
+        }
   }
-
   DEBUG0("Communication request not found");
   return NULL;
 }
@@ -167,7 +175,7 @@ smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
 #endif
 
   DEBUG1("Create communicate action %p", act);
-  
+
   return act;
 }
 
@@ -179,8 +187,8 @@ void SIMIX_comm_destroy(smx_action_t action)
 {
   DEBUG1("Destroy action %p", action);
 
-  if(!(action->comm.refcount > 0))
-         xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
+  if (action->comm.refcount <= 0)
+    xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
 
 #ifdef HAVE_LATENCY_BOUND_TRACKING
   //save is latency limited flag to use afterwards
@@ -232,18 +240,19 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
 
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
-                              void *src_buff, size_t src_buff_size, void *data)
+                              void *src_buff, size_t src_buff_size,
+                              int (*match_fun)(void *, void *), void *data)
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
      If it is not found then create it and push it into the rendez-vous point */
-  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
+  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_SEND);
     SIMIX_rdv_push(rdv, action);
-  }else{
+  } else {
     action->state = SIMIX_READY;
     action->comm.type = SIMIX_COMM_READY;
   }
@@ -254,26 +263,27 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
   action->comm.rate = rate;
   action->comm.src_buff = src_buff;
   action->comm.src_buff_size = src_buff_size;
-  action->comm.data = data;
-#ifdef HAVE_MC
-  if(_surf_do_model_check){
+  action->comm.src_data = data;
+
+  if (MC_IS_ENABLED) {
     action->state = SIMIX_RUNNING;
     return action;
   }
-#endif
+
   SIMIX_comm_start(action);
   return action;
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
-                      void *dst_buff, size_t *dst_buff_size)
+                      void *dst_buff, size_t *dst_buff_size,
+                      int (*match_fun)(void *, void *), void *data)
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
    * If it is not found then create it and push it into the rendez-vous point
    */
-  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
+  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
@@ -287,13 +297,12 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
   action->comm.dst_proc = dst_proc;
   action->comm.dst_buff = dst_buff;
   action->comm.dst_buff_size = dst_buff_size;
+  action->comm.dst_data = data;
 
-#ifdef HAVE_MC
-  if(_surf_do_model_check){
+  if (MC_IS_ENABLED) {
     action->state = SIMIX_RUNNING;
     return action;
   }
-#endif
 
   SIMIX_comm_start(action);
   return action;
@@ -309,12 +318,10 @@ void SIMIX_pre_comm_wait(smx_req_t req)
   xbt_fifo_push(action->request_list, req);
   req->issuer->waiting_action = action;
 
-#ifdef HAVE_MC
-  if(_surf_do_model_check){
+  if (MC_IS_ENABLED){
     action->state = SIMIX_DONE;
     SIMIX_comm_finish(action);
   }
-#endif
 
   /* If the action has already finish perform the error handling, */
   /* otherwise set up a waiting timeout on the right side         */
@@ -345,6 +352,22 @@ void SIMIX_pre_comm_test(smx_req_t req)
   }
 }
 
+void SIMIX_pre_comm_testany(smx_req_t req)
+{
+  unsigned int cursor;
+  smx_action_t action;
+  req->comm_testany.result = -1;
+  xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
+    if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
+      req->comm_testany.result = cursor;
+      xbt_fifo_push(action->request_list, req);
+      SIMIX_comm_finish(action);
+      break;
+    }
+  }
+  SIMIX_request_answer(req);
+}
+
 void SIMIX_pre_comm_waitany(smx_req_t req)
 {
   smx_action_t action;
@@ -353,7 +376,7 @@ void SIMIX_pre_comm_waitany(smx_req_t req)
   xbt_dynar_foreach(actions, cursor, action){
     /* Associate this request to the action */
     xbt_fifo_push(action->request_list, req);
-    if(action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
+    if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
       SIMIX_comm_finish(action);
       break;
     }
@@ -383,19 +406,17 @@ static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
     smx_host_t receiver = action->comm.dst_proc->smx_host;
 
     DEBUG3("Starting communication %p from '%s' to '%s'", action,
-         SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
+           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
 
-    action->comm.surf_comm =
-      surf_workstation_model->extension.workstation.
-      communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
+    action->comm.surf_comm = surf_workstation_model->extension.workstation.
+        communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
 
     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
 
     action->state = SIMIX_RUNNING;
 
 #ifdef HAVE_TRACING
-    TRACE_smx_action_communicate(comm, comm->comm.src_proc);
-    TRACE_surf_action(comm->surf_action, comm->category);
+    TRACE_smx_action_communicate(action, action->comm.src_proc);
 #endif
 
     /* If a link is failed, detect it immediately */
@@ -420,19 +441,19 @@ void SIMIX_comm_finish(smx_action_t action)
 {
   smx_req_t req;
 
-  while((req = xbt_fifo_shift(action->request_list))){
+  while ((req = xbt_fifo_shift(action->request_list))) {
 
     /* If a waitany request is waiting for this action to finish, then remove
        it from the other actions in the waitany list. Afterwards, get the
        position of the actual action in the waitany request's actions dynar and
        return it as the result of the call */
-    if(req->call == REQ_COMM_WAITANY){
+    if (req->call == REQ_COMM_WAITANY) {
       SIMIX_waitany_req_remove_from_actions(req);
       req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
     }
 
     /* If the action is still in a rendez-vous point then remove from it */
-    if(action->comm.rdv)
+    if (action->comm.rdv)
       SIMIX_rdv_remove(action->comm.rdv, action);
 
     DEBUG1("SIMIX_comm_finish: action state = %d", action->state);
@@ -465,7 +486,7 @@ void SIMIX_comm_finish(smx_action_t action)
 
       case SIMIX_SRC_HOST_FAILURE:
         TRY {
-          if(req->issuer == action->comm.src_proc)
+          if (req->issuer == action->comm.src_proc)
             THROW0(host_error, 0, "Host failed");
           else
             THROW0(network_error, 0, "Remote peer failed");
@@ -510,19 +531,19 @@ void SIMIX_comm_finish(smx_action_t action)
 void SIMIX_post_comm(smx_action_t action)
 {
   /* Update action state */
-  if(action->comm.src_timeout &&
+  if (action->comm.src_timeout &&
      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
      action->state = SIMIX_SRC_TIMEOUT;
-  else if(action->comm.dst_timeout &&
+  else if (action->comm.dst_timeout &&
           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
      action->state = SIMIX_DST_TIMEOUT;
-  else if(action->comm.src_timeout &&
+  else if (action->comm.src_timeout &&
           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
      action->state = SIMIX_SRC_HOST_FAILURE;
-  else if(action->comm.dst_timeout &&
+  else if (action->comm.dst_timeout &&
           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
      action->state = SIMIX_DST_HOST_FAILURE;
-  else if(action->comm.surf_comm &&
+  else if (action->comm.surf_comm &&
           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
      action->state = SIMIX_LINK_FAILURE;
   else
@@ -535,7 +556,7 @@ void SIMIX_post_comm(smx_action_t action)
   SIMIX_comm_destroy_internal_actions(action);
 
   /* If there are requests associated with the action, then answer them */
-  if(xbt_fifo_size(action->request_list))
+  if (xbt_fifo_size(action->request_list))
     SIMIX_comm_finish(action);
 }
 
@@ -598,13 +619,23 @@ e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
 }
 
 /**
- *  \brief Return the user data associated to the communication
+ *  \brief Return the user data associated to the sender of the communication
+ *  \param action The communication
+ *  \return the user data
+ */
+void* SIMIX_comm_get_src_data(smx_action_t action)
+{
+  return action->comm.src_data;
+}
+
+/**
+ *  \brief Return the user data associated to the receiver of the communication
  *  \param action The communication
  *  \return the user data
  */
-void* SIMIX_comm_get_data(smx_action_t action)
+void* SIMIX_comm_get_dst_data(smx_action_t action)
 {
-  return action->comm.data;
+  return action->comm.dst_data;
 }
 
 void* SIMIX_comm_get_src_buff(smx_action_t action)
@@ -719,6 +750,7 @@ void SIMIX_comm_copy_data(smx_action_t comm)
 
   if (buff_size == 0)
     return;
+
   (*SIMIX_comm_copy_data_callback) (comm, buff_size);
 
   /* Set the copied flag so we copy data only once */