Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Distinguish access to sender-side and receiver-side user data in smx_action_t.
[simgrid.git] / src / simix / smx_network.c
index 3c36832..ded44f2 100644 (file)
@@ -23,7 +23,8 @@ static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
                                                       double timeout);
 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
-static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
+static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+                                                                                 int (*match_fun)(void *, void *), void *);
 static void SIMIX_rdv_free(void *data);
 
 void SIMIX_network_init(void)
@@ -122,19 +123,26 @@ static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
  *  \param type The type of communication we are looking for (comm_send, comm_recv)
  *  \return The communication request if found, NULL otherwise
  */
-smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
-{
-  smx_action_t comm = (smx_action_t)
-      xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
-
-  if (comm && comm->comm.type == type) {
-    DEBUG0("Communication request found!");
-    xbt_fifo_shift(rdv->comm_fifo);
-    comm->comm.refcount++;
-    comm->comm.rdv = NULL;
-    return comm;
+smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+                                                                  int (*match_fun)(void *, void *), void *data)
+{
+  smx_action_t req;
+  xbt_fifo_item_t item;
+  void* req_data = NULL;
+
+  xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){
+    if(req->comm.type == SIMIX_COMM_SEND) {
+      req_data = req->comm.src_data;
+    } else if(req->comm.type == SIMIX_COMM_RECEIVE) {
+      req_data = req->comm.dst_data;
+    }
+        if(req->comm.type == type && (!match_fun || match_fun(data, req_data))) {
+          xbt_fifo_remove_item(rdv->comm_fifo, item);
+          req->comm.refcount++;
+          req->comm.rdv = NULL;
+          return req;
+        }
   }
-
   DEBUG0("Communication request not found");
   return NULL;
 }
@@ -232,13 +240,14 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
 
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
-                              void *src_buff, size_t src_buff_size, void *data)
+                              void *src_buff, size_t src_buff_size,
+                              int (*match_fun)(void *, void *), void *data)
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
      If it is not found then create it and push it into the rendez-vous point */
-  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
+  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_SEND);
@@ -254,7 +263,7 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
   action->comm.rate = rate;
   action->comm.src_buff = src_buff;
   action->comm.src_buff_size = src_buff_size;
-  action->comm.data = data;
+  action->comm.src_data = data;
 
   if (MC_IS_ENABLED) {
     action->state = SIMIX_RUNNING;
@@ -266,14 +275,15 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
-                      void *dst_buff, size_t *dst_buff_size)
+                      void *dst_buff, size_t *dst_buff_size,
+                      int (*match_fun)(void *, void *), void *data)
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
    * If it is not found then create it and push it into the rendez-vous point
    */
-  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
+  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
@@ -287,6 +297,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
   action->comm.dst_proc = dst_proc;
   action->comm.dst_buff = dst_buff;
   action->comm.dst_buff_size = dst_buff_size;
+  action->comm.dst_data = data;
 
   if (MC_IS_ENABLED) {
     action->state = SIMIX_RUNNING;
@@ -341,6 +352,22 @@ void SIMIX_pre_comm_test(smx_req_t req)
   }
 }
 
+void SIMIX_pre_comm_testany(smx_req_t req)
+{
+  unsigned int cursor;
+  smx_action_t action;
+  req->comm_testany.result = -1;
+  xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
+    if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
+      req->comm_testany.result = cursor;
+      xbt_fifo_push(action->request_list, req);
+      SIMIX_comm_finish(action);
+      break;
+    }
+  }
+  SIMIX_request_answer(req);
+}
+
 void SIMIX_pre_comm_waitany(smx_req_t req)
 {
   smx_action_t action;
@@ -592,13 +619,23 @@ e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
 }
 
 /**
- *  \brief Return the user data associated to the communication
+ *  \brief Return the user data associated to the sender of the communication
+ *  \param action The communication
+ *  \return the user data
+ */
+void* SIMIX_comm_get_src_data(smx_action_t action)
+{
+  return action->comm.src_data;
+}
+
+/**
+ *  \brief Return the user data associated to the receiver of the communication
  *  \param action The communication
  *  \return the user data
  */
-void* SIMIX_comm_get_data(smx_action_t action)
+void* SIMIX_comm_get_dst_data(smx_action_t action)
 {
-  return action->comm.data;
+  return action->comm.dst_data;
 }
 
 void* SIMIX_comm_get_src_buff(smx_action_t action)