Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Enhance the communication filtering mechanism
authorMartin Quinson <martin.quinson@loria.fr>
Fri, 27 Apr 2012 19:51:05 +0000 (21:51 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Fri, 27 Apr 2012 19:51:05 +0000 (21:51 +0200)
* Add an extra argument to the filter: a smx_action_t describing the
  other side's communication. That's useful to write filters on other
  side's identity
* Ensure that if no comm matches the filters, the filter function gets
  saved in the comm object that gets created and stored in rdv point.
  Later on, when someone else tries to make a match, use that stored
  function to ensure that the stored filter accepts to match the new
  commer.

Thanks to Sorina Camarasu for exploring all this.

include/msg/msg.h
include/simix/simix.h
src/msg/msg_gos.c
src/simix/smx_network.c
src/simix/smx_network_private.h
src/simix/smx_private.h
src/simix/smx_smurf_private.h
src/simix/smx_user.c
src/smpi/smpi_base.c

index 4dccee2..a0d1f78 100644 (file)
@@ -12,6 +12,8 @@
 #include "msg/datatypes.h"
 #include "xbt/automaton.h"
 
+#include "simix/simix.h"
+
 SG_BEGIN_DECL()
 
 /** @brief Return code of most MSG functions
@@ -191,7 +193,7 @@ XBT_PUBLIC(MSG_error_t)
 XBT_PUBLIC(msg_comm_t) MSG_task_isend(m_task_t task, const char *alias);
 XBT_PUBLIC(msg_comm_t) MSG_task_isend_with_matching(m_task_t task,
                                                     const char *alias,
-                                                    int (*match_fun)(void*,void*),
+                                                    int (*match_fun)(void*,void*, smx_action_t),
                                                     void *match_data);
 
 XBT_PUBLIC(void) MSG_task_dsend(m_task_t task, const char *alias, void_f_pvoid_t cleanup);
index 2de942e..f34bcc3 100644 (file)
@@ -169,24 +169,24 @@ XBT_PUBLIC(xbt_dict_t) SIMIX_get_rdv_points(void);
 XBT_PUBLIC(void) simcall_comm_send(smx_rdv_t rdv, double task_size,
                                      double rate, void *src_buff,
                                      size_t src_buff_size,
-                                     int (*match_fun)(void *, void *),
+                                     int (*match_fun)(void *, void *, smx_action_t),
                                      void *data, double timeout);
 
 XBT_PUBLIC(smx_action_t) simcall_comm_isend(smx_rdv_t rdv, double task_size,
                                               double rate, void *src_buff,
                                               size_t src_buff_size,
-                                              int (*match_fun)(void *, void *),
+                                              int (*match_fun)(void *, void *, smx_action_t),
                                               void (*clean_fun)(void *),
                                               void *data, int detached);
 
 XBT_PUBLIC(void) simcall_comm_recv(smx_rdv_t rdv, void *dst_buff,
                                      size_t * dst_buff_size,
-                                     int (*match_fun)(void *, void *),
+                                     int (*match_fun)(void *, void *, smx_action_t),
                                      void *data, double timeout);
 
 XBT_PUBLIC(smx_action_t) simcall_comm_irecv(smx_rdv_t rdv, void *dst_buff,
                                               size_t * dst_buff_size,
-                                              int (*match_fun)(void *, void *),
+                                              int (*match_fun)(void *, void *, smx_action_t),
                                               void *data);
 
 XBT_PUBLIC(void) simcall_comm_destroy(smx_action_t comm);
index 8e6d1d1..7755839 100644 (file)
@@ -383,12 +383,15 @@ msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
  *
  * \param task a #m_task_t to send on another location.
  * \param alias name of the mailbox to sent the task to
- * \param match_fun boolean function taking the match_data provided by sender (here), and the one of the receiver (if any) and returning whether they match
+ * \param match_fun boolean function which parameters are:
+ *        - match_data_provided_here
+ *        - match_data_provided_by_other_side_if_any
+ *        - the_smx_action_describing_the_other_side
  * \param match_data user provided data passed to match_fun
  * \return the msg_comm_t communication created
  */
 XBT_INLINE msg_comm_t MSG_task_isend_with_matching(m_task_t task, const char *alias,
-    int (*match_fun)(void*,void*),
+    int (*match_fun)(void*,void*, smx_action_t),
     void *match_data)
 {
   simdata_task_t t_simdata = NULL;
index db18929..82a372f 100644 (file)
@@ -20,7 +20,8 @@ static void SIMIX_comm_copy_data(smx_action_t comm);
 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
 static smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
-                                         int (*match_fun)(void *, void *), void *);
+                                         int (*match_fun)(void *, void *,smx_action_t),
+                                         void *user_data, smx_action_t my_action);
 static void SIMIX_rdv_free(void *data);
 
 void SIMIX_network_init(void)
@@ -118,33 +119,28 @@ XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
   comm->comm.rdv = NULL;
 }
 
-/**
- *  \brief Wrapper to SIMIX_rdv_get_comm
- */
-smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
-   return SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
-}
-
 /**
  *  \brief Checks if there is a communication action queued in a rendez-vous matching our needs
  *  \param type The type of communication we are looking for (comm_send, comm_recv)
  *  \return The communication action if found, NULL otherwise
  */
 smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
-                                   int (*match_fun)(void *, void *), void *data)
+                                   int (*match_fun)(void *, void *,smx_action_t),
+                                   void *this_user_data, smx_action_t my_action)
 {
-  // FIXME rewrite this function by using SIMIX_rdv_has_send/recv_match
   smx_action_t action;
   xbt_fifo_item_t item;
-  void* comm_data = NULL;
+  void* other_user_data = NULL;
 
   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
     if (action->comm.type == SIMIX_COMM_SEND) {
-      comm_data = action->comm.src_data;
+      other_user_data = action->comm.src_data;
     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
-      comm_data = action->comm.dst_data;
+      other_user_data = action->comm.dst_data;
     }
-    if (action->comm.type == type && (!match_fun || match_fun(data, comm_data))) {
+    if (action->comm.type == type &&
+        (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
+        (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
       XBT_DEBUG("Found a matching communication action %p", action);
       xbt_fifo_remove_item(rdv->comm_fifo, item);
       xbt_fifo_free_item(item);
@@ -153,63 +149,22 @@ smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
       return action;
     }
     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
-           " its type is %d but we are looking for a comm of type %d",
+           " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
            action, (int)action->comm.type, (int)type);
   }
   XBT_DEBUG("No matching communication action found");
   return NULL;
 }
 
-/**
- *  \brief Checks if there is a send communication action
- *  queued in a rendez-vous matching our needs.
- *  \return 1 if found, 0 otherwise
- */
-int SIMIX_comm_has_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
-
-  smx_action_t action;
-  xbt_fifo_item_t item;
-
-  xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
-    if (action->comm.type == SIMIX_COMM_SEND
-        && (!match_fun || match_fun(data, action->comm.src_data))) {
-      XBT_DEBUG("Found a matching communication action %p", action);
-      return 1;
-    }
-  }
-  XBT_DEBUG("No matching communication action found");
-  return 0;
-}
-
-/**
- *  \brief Checks if there is a recv communication action
- *  queued in a rendez-vous matching our needs.
- *  \return 1 if found, 0 otherwise
- */
-int SIMIX_comm_has_recv_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
-
-  smx_action_t action;
-  xbt_fifo_item_t item;
-
-  xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
-    if (action->comm.type == SIMIX_COMM_RECEIVE
-        && (!match_fun || match_fun(data, action->comm.dst_data))) {
-      XBT_DEBUG("Found a matching communication action %p", action);
-      return 1;
-    }
-  }
-  XBT_DEBUG("No matching communication action found");
-  return 0;
-}
 
 /******************************************************************************/
-/*                            Comunication Actions                            */
+/*                            Communication Actions                            */
 /******************************************************************************/
 
 /**
- *  \brief Creates a new comunicate action
+ *  \brief Creates a new communicate action
  *  \param type The direction of communication (comm_send, comm_recv)
- *  \return The new comunicate action
+ *  \return The new communicate action
  */
 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
 {
@@ -303,86 +258,101 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*match_fun)(void *, void *),
+                              int (*match_fun)(void *, void *,smx_action_t),
                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
                               void *data,
                               int detached)
 {
-  smx_action_t action;
+  /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
+  smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
 
-  /* Look for communication action matching our needs.
-     If it is not found then create it and push it into the rendez-vous point */
-  action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
+  /* Look for communication action matching our needs. We also provide a description of
+   * ourself so that the other side also gets a chance of choosing if it wants to match with us.
+   *
+   * If it is not found then push our communication into the rendez-vous point */
+  smx_action_t other_action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
 
-  if (!action) {
-    action = SIMIX_comm_new(SIMIX_COMM_SEND);
-    SIMIX_rdv_push(rdv, action);
+  if (!other_action) {
+    other_action = this_action;
+    SIMIX_rdv_push(rdv, this_action);
   } else {
-    action->state = SIMIX_READY;
-    action->comm.type = SIMIX_COMM_READY;
+    SIMIX_comm_destroy(this_action);
+    --smx_total_comms; // this creation was a pure waste
+
+    other_action->state = SIMIX_READY;
+    other_action->comm.type = SIMIX_COMM_READY;
   }
-  xbt_fifo_push(src_proc->comms, action);
+  xbt_fifo_push(src_proc->comms, other_action);
 
   /* if the communication action is detached then decrease the refcount
    * by one, so it will be eliminated by the receiver's destroy call */
   if (detached) {
-    action->comm.detached = 1;
-    action->comm.refcount--;
-    action->comm.clean_fun = clean_fun;
+    other_action->comm.detached = 1;
+    other_action->comm.refcount--;
+    other_action->comm.clean_fun = clean_fun;
   } else {
-    action->comm.clean_fun = NULL;
+    other_action->comm.clean_fun = NULL;
   }
 
   /* Setup the communication action */
-  action->comm.src_proc = src_proc;
-  action->comm.task_size = task_size;
-  action->comm.rate = rate;
-  action->comm.src_buff = src_buff;
-  action->comm.src_buff_size = src_buff_size;
-  action->comm.src_data = data;
+  other_action->comm.src_proc = src_proc;
+  other_action->comm.task_size = task_size;
+  other_action->comm.rate = rate;
+  other_action->comm.src_buff = src_buff;
+  other_action->comm.src_buff_size = src_buff_size;
+  other_action->comm.src_data = data;
+
+  other_action->comm.match_fun = match_fun;
 
   if (MC_IS_ENABLED) {
-    action->state = SIMIX_RUNNING;
-    return action;
+    other_action->state = SIMIX_RUNNING;
+    return other_action;
   }
 
-  SIMIX_comm_start(action);
-  return (detached ? NULL : action);
+  SIMIX_comm_start(other_action);
+  return (detached ? NULL : other_action);
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
                       void *dst_buff, size_t *dst_buff_size,
-                      int (*match_fun)(void *, void *), void *data)
+                      int (*match_fun)(void *, void *, smx_action_t), void *data)
 {
-  smx_action_t action;
+  /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
+  smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
 
-  /* Look for communication action matching our needs.
-   * If it is not found then create it and push it into the rendez-vous point
-   */
-  action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
+  /* Look for communication action matching our needs. We also provide a description of
+   * ourself so that the other side also gets a chance of choosing if it wants to match with us.
+   *
+   * If it is not found then push our communication into the rendez-vous point */
+  smx_action_t other_action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data, this_action);
 
-  if (!action) {
-    action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
-    SIMIX_rdv_push(rdv, action);
+  if (!other_action) {
+    other_action = this_action;
+    SIMIX_rdv_push(rdv, this_action);
   } else {
-    action->state = SIMIX_READY;
-    action->comm.type = SIMIX_COMM_READY;
+    SIMIX_comm_destroy(this_action);
+    --smx_total_comms; // this creation was a pure waste
+
+    other_action->state = SIMIX_READY;
+    other_action->comm.type = SIMIX_COMM_READY;
   }
-  xbt_fifo_push(dst_proc->comms, action);
+  xbt_fifo_push(dst_proc->comms, other_action);
 
   /* Setup communication action */
-  action->comm.dst_proc = dst_proc;
-  action->comm.dst_buff = dst_buff;
-  action->comm.dst_buff_size = dst_buff_size;
-  action->comm.dst_data = data;
+  other_action->comm.dst_proc = dst_proc;
+  other_action->comm.dst_buff = dst_buff;
+  other_action->comm.dst_buff_size = dst_buff_size;
+  other_action->comm.dst_data = data;
+
+  other_action->comm.match_fun = match_fun;
 
   if (MC_IS_ENABLED) {
-    action->state = SIMIX_RUNNING;
-    return action;
+    other_action->state = SIMIX_RUNNING;
+    return other_action;
   }
 
-  SIMIX_comm_start(action);
-  return action;
+  SIMIX_comm_start(other_action);
+  return other_action;
 }
 
 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
index d9e2fcf..ac8dcdc 100644 (file)
@@ -34,22 +34,22 @@ void SIMIX_comm_start(smx_action_t action);
 void SIMIX_comm_send(smx_process_t src_proc, smx_rdv_t rdv,
                      double task_size, double rate,
                      void *src_buff, size_t src_buff_size,
-                     int (*)(void *, void *), void *data,
+                     int (*match_fun)(void *, void *,smx_action_t), void *data,
                      double timeout);
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*)(void *, void *),
+                              int (*match_fun)(void *, void *, smx_action_t),
                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
                               void *data,
                               int detached);
 void SIMIX_comm_recv(smx_process_t dst_proc, smx_rdv_t rdv,
                      void *dst_buff, size_t *dst_buff_size,
-                     int (*)(void *, void *), void *data,
+                     int (*)(void *, void *,smx_action_t), void *data,
                      double timeout);
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
                               void *dst_buff, size_t *dst_buff_size,
-                              int (*)(void *, void *), void *data);
+                              int (*)(void *, void *, smx_action_t), void *data);
 void SIMIX_comm_destroy(smx_action_t action);
 void SIMIX_comm_destroy_internal_actions(smx_action_t action);
 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx);
index 824239f..943d114 100644 (file)
@@ -123,6 +123,9 @@ typedef struct s_smx_action {
       int detached;                   /* If detached or not */
 
       void (*clean_fun)(void*);       /* Function to clean the detached src_buf if something goes wrong */
+      int (*match_fun)(void*,void*,smx_action_t);  /* Filter function used by the other side. It is used when
+                                         looking if a given communication matches my needs. For that, myself must match the
+                                         expectations of the other side, too. See  */
 
       /* Surf action data */
       surf_action_t surf_comm;        /* The Surf communication action encapsulated */
index a0888c1..dbf4d2c 100644 (file)
@@ -319,7 +319,7 @@ typedef struct s_smx_simcall {
       double rate;
       void *src_buff;
       size_t src_buff_size;
-      int (*match_fun)(void *, void *);
+      int (*match_fun)(void *, void *, smx_action_t);
       void *data;
       double timeout;
     } comm_send;
@@ -330,7 +330,7 @@ typedef struct s_smx_simcall {
       double rate;
       void *src_buff;
       size_t src_buff_size;
-      int (*match_fun)(void *, void *);
+      int (*match_fun)(void *, void *, smx_action_t);
       void (*clean_fun)(void *);
       void *data;
       int detached;
@@ -341,7 +341,7 @@ typedef struct s_smx_simcall {
       smx_rdv_t rdv;
       void *dst_buff;
       size_t *dst_buff_size;
-      int (*match_fun)(void *, void *);
+      int (*match_fun)(void *, void *, smx_action_t);
       void *data;
       double timeout;
     } comm_recv;
@@ -350,8 +350,8 @@ typedef struct s_smx_simcall {
       smx_rdv_t rdv;
       void *dst_buff;
       size_t *dst_buff_size;
-      int (*match_fun)(void *, void *);
-          void *data;
+      int (*match_fun)(void *, void *, smx_action_t);
+      void *data;
       smx_action_t result;
     } comm_irecv;
 
index 8c08f25..f21e5b9 100644 (file)
@@ -697,7 +697,7 @@ smx_action_t simcall_rdv_get_head(smx_rdv_t rdv)
 
 void simcall_comm_send(smx_rdv_t rdv, double task_size, double rate,
                          void *src_buff, size_t src_buff_size,
-                         int (*match_fun)(void *, void *), void *data,
+                         int (*match_fun)(void *, void *, smx_action_t), void *data,
                          double timeout)
 {
   /* checking for infinite values */
@@ -732,7 +732,7 @@ void simcall_comm_send(smx_rdv_t rdv, double task_size, double rate,
 
 smx_action_t simcall_comm_isend(smx_rdv_t rdv, double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*match_fun)(void *, void *),
+                              int (*match_fun)(void *, void *, smx_action_t),
                               void (*clean_fun)(void *),
                               void *data,
                               int detached)
@@ -761,7 +761,7 @@ smx_action_t simcall_comm_isend(smx_rdv_t rdv, double task_size, double rate,
 }
 
 void simcall_comm_recv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
-                         int (*match_fun)(void *, void *), void *data, double timeout)
+                         int (*match_fun)(void *, void *, smx_action_t), void *data, double timeout)
 {
   xbt_assert(isfinite(timeout), "timeout is not finite!");
   xbt_assert(rdv, "No rendez-vous point defined for recv");
@@ -788,7 +788,7 @@ void simcall_comm_recv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
 }
 
 smx_action_t simcall_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
-                                  int (*match_fun)(void *, void *), void *data)
+                                  int (*match_fun)(void *, void *, smx_action_t), void *data)
 {
   xbt_assert(rdv, "No rendez-vous point defined for irecv");
 
index d27f40c..d98b98a 100644 (file)
@@ -11,7 +11,7 @@
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
                                 "Logging specific to SMPI (base)");
 
-static int match_recv(void* a, void* b) {
+static int match_recv(void* a, void* b, smx_action_t ignored) {
    MPI_Request ref = (MPI_Request)a;
    MPI_Request req = (MPI_Request)b;
 
@@ -21,7 +21,7 @@ static int match_recv(void* a, void* b) {
           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
 }
 
-static int match_send(void* a, void* b) {
+static int match_send(void* a, void* b,smx_action_t ignored) {
    MPI_Request ref = (MPI_Request)a;
    MPI_Request req = (MPI_Request)b;