Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Incomplete port of MSG on top of SIMIX_network [Cristian]
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 5 Oct 2009 12:26:55 +0000 (12:26 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 5 Oct 2009 12:26:55 +0000 (12:26 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6700 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/include/simix/datatypes.h
src/include/simix/simix.h
src/msg/global.c
src/msg/gos.c
src/msg/msg_mailbox.c
src/msg/private.h
src/simix/private.h
src/simix/smx_global.c
src/simix/smx_network.c

index fd193be..dd998ac 100644 (file)
@@ -55,6 +55,9 @@ SG_BEGIN_DECL()
 /******************************* Networking ***********************************/
     typedef struct s_smx_rvpoint *smx_rdv_t;
     typedef struct s_smx_comm *smx_comm_t;
+    typedef enum {comm_send,
+                  comm_recv
+    } smx_comm_type_t;
 
 
 
index 33371a8..ac8da62 100644 (file)
@@ -175,21 +175,31 @@ XBT_PUBLIC(void) SIMIX_display_process_status(void);
 /************************** Comunication Handling *****************************/
 
 /* Public */
-XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(char *name);
+XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(const char *name);
 XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp);
-XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, double timeout, void *data, size_t data_size);
-XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void **data, size_t *data_size);
+XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
+                                    double timeout, void *data, size_t data_size,
+                                    int (filter)(smx_comm_t, void *), void *arg);
+XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data,
+                                    size_t *data_size, int (filter)(smx_comm_t, void *), void *arg);
 XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm);
 XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm);
+XBT_PUBLIC(int) SIMIX_communication_isSend(smx_comm_t comm);
+XBT_PUBLIC(int) SIMIX_communication_isRecv(smx_comm_t comm);
+
+/* FIXME: Filter function */
+int comm_filter_get(smx_comm_t comm, void *arg);
+int comm_filter_put(smx_comm_t comm, void *arg);
 
 /* These should be private to SIMIX */
-smx_comm_t SIMIX_communication_new(smx_rdv_t rdv);
+smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv);
 void SIMIX_communication_destroy(smx_comm_t comm);
 static inline void SIMIX_communication_use(smx_comm_t comm);
 static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout);
-smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src);
+smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rvp, int (filter)(smx_comm_t, void *), void *arg);
 static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm);
 static inline smx_cond_t SIMIX_rdv_get_cond(smx_rdv_t rvp);
+void SIMIX_network_copy_data(smx_comm_t comm);
 
 SG_END_DECL()
 #endif /* _SIMIX_SIMIX_H */
index 3870712..454fea7 100644 (file)
@@ -146,32 +146,13 @@ int MSG_get_channel_number(void)
  */
 MSG_error_t MSG_main(void)
 {
-  smx_action_t smx_action;
-  xbt_fifo_t actions_done = xbt_fifo_new();
-  xbt_fifo_t actions_failed = xbt_fifo_new();
-
   /* Clean IO before the run */
   fflush(stdout);
   fflush(stderr);
   SIMIX_init();
 
-  //surf_solve(); /* Takes traces into account. Returns 0.0 */
-  /* xbt_fifo_size(msg_global->process_to_run) */
-
-  while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
-
-    while ((smx_action = xbt_fifo_pop(actions_failed))) {
-      DEBUG1("** %s failed **", SIMIX_action_get_name(smx_action));
-      SIMIX_action_signal_all(smx_action);
-    }
-
-    while ((smx_action = xbt_fifo_pop(actions_done))) {
-      DEBUG1("** %s done **", SIMIX_action_get_name(smx_action));
-      SIMIX_action_signal_all(smx_action);      
-    }
-  }
-  xbt_fifo_free(actions_failed);
-  xbt_fifo_free(actions_done);
+  while (SIMIX_solve(NULL, NULL) != -1.0);
+  
   return MSG_OK;
 }
 
index fc58b2b..5e1a703 100644 (file)
@@ -44,9 +44,9 @@ MSG_error_t MSG_task_execute(m_task_t task)
   CHECK_HOST();
 
   simdata = task->simdata;
-  xbt_assert0((!simdata->compute)
+  xbt_assert1((!simdata->compute)
               && (task->simdata->refcount == 1),
-              "This task is executed somewhere else. Go fix your code!");
+              "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
 
   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
 
index c285b60..2afd750 100644 (file)
@@ -24,7 +24,8 @@ msg_mailbox_t MSG_mailbox_create(const char *alias)
   mailbox->cond = NULL;
   mailbox->alias = alias ? xbt_strdup(alias) : NULL;
   mailbox->hostname = NULL;
-
+  mailbox->rdv = SIMIX_rdv_create(alias);
+  
   return mailbox;
 }
 
@@ -47,7 +48,8 @@ void MSG_mailbox_free(void *mailbox)
 
   xbt_fifo_free(_mailbox->tasks);
   free(_mailbox->alias);
-
+  SIMIX_rdv_destroy(_mailbox->rdv);
+  
   free(_mailbox);
 }
 
@@ -155,18 +157,13 @@ msg_mailbox_t MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)
 }
 
 MSG_error_t
-MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
+MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
                          m_host_t host, double timeout)
 {
-  m_process_t process = MSG_process_self();
-  m_task_t t = NULL;
-  m_host_t h = NULL;
-  simdata_task_t t_simdata = NULL;
-  simdata_host_t h_simdata = NULL;
-  double start_time = SIMIX_get_clock();
-
-  smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
-
+  xbt_ex_t e;
+  MSG_error_t ret;
+  smx_host_t smx_host;
+  size_t task_size = sizeof(void*);
   CHECK_HOST();
 
   /* Sanity check */
@@ -176,138 +173,44 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
     CRITICAL0
       ("MSG_task_get() was asked to write in a non empty task struct.");
 
-  /* Get the task */
-  h = MSG_host_self();
-  h_simdata = h->simdata;
-
-  SIMIX_mutex_lock(h_simdata->mutex);   //FIXME: lock the mailbox instead
-
-  if (MSG_mailbox_get_cond(mailbox)) {
-    CRITICAL1
-      ("A process is already blocked on the channel %s (meaning that someone is already doing a get on this)",
-       MSG_mailbox_get_alias(mailbox));
-    SIMIX_cond_display_info(MSG_mailbox_get_cond(mailbox));
-    xbt_die("Go fix your code!");
+  smx_host = host ? host->simdata->smx_host : NULL;
+  
+  TRY{
+    SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size, 
+                       comm_filter_get, smx_host);
   }
-
-  while (1) {
-    /* if the mailbox is not empty (has a task) */
-    if (!MSG_mailbox_is_empty(mailbox)) {
-      if (!host) {
-        /* pop the head of the mailbox */
-        t = MSG_mailbox_pop_head(mailbox);
+  CATCH(e){
+    switch(e.category){
+      case host_error:
+        ret = MSG_HOST_FAILURE;
         break;
-      } else {
-        /* get the first task of the host */
-        if ((t = MSG_mailbox_get_first_host_task(mailbox, host)))
-          break;
-      }
-    }
-
-    if ((timeout > 0) && (SIMIX_get_clock() - start_time >= timeout)) { // Timeout already elapsed
-      SIMIX_mutex_unlock(h_simdata->mutex);
-      MSG_mailbox_set_cond(mailbox, NULL);
-      SIMIX_cond_destroy(cond);
-      MSG_RETURN(MSG_TRANSFER_FAILURE);
-    }
-
-    if (!cond) {
-      cond = SIMIX_cond_init();
-      MSG_mailbox_set_cond(mailbox, cond);
-    }
-
-    if (timeout > 0)
-      SIMIX_cond_wait_timeout(cond, h_simdata->mutex,
-                              timeout - start_time + SIMIX_get_clock());
-    else
-      SIMIX_cond_wait(cond, h_simdata->mutex);
-
-    if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
-      SIMIX_mutex_unlock(h_simdata->mutex);
-      MSG_mailbox_set_cond(mailbox, NULL);
-      SIMIX_cond_destroy(cond);
-      MSG_RETURN(MSG_HOST_FAILURE);
+      case network_error:
+        ret = MSG_TRANSFER_FAILURE;
+        break;
+      case timeout_error:
+        ret = MSG_TRANSFER_FAILURE;
+        break;      
+      default:
+        xbt_die("Unhandled SIMIX network exception");       
     }
+    xbt_ex_free(e);
+    MSG_RETURN(ret);        
   }
-
-
-  DEBUG1("OK, got a task (%s)", t->name);
-  /* clean conditional */
-  if (cond) {
-    MSG_mailbox_set_cond(mailbox, NULL);
-    SIMIX_cond_destroy(cond);
-  }
-
-  SIMIX_mutex_unlock(h_simdata->mutex);
-
-  t_simdata = t->simdata;
-  t_simdata->receiver = process;
-  *task = t;
-
-  SIMIX_mutex_lock(t_simdata->mutex);
-
-  /* Transfer */
-  /* create SIMIX action to the communication */
-  t_simdata->comm =
-    SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
-                             simdata->smx_host,
-                             process->simdata->m_host->simdata->smx_host,
-                             t->name, t_simdata->message_size,
-                             t_simdata->rate);
-
-  SIMIX_action_use(t_simdata->comm);
-
-  /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
-  if (MSG_process_is_suspended(t_simdata->sender)) {
-    DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
-    SIMIX_action_set_priority(t_simdata->comm, 0);
-  }
-  SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
-  // breaking point if asynchrounous
-  process->simdata->waiting_action = t_simdata->comm;
-
-  while (1) {
-    SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
-
-    if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
-      break;
-    if (!SIMIX_host_get_state(h_simdata->smx_host))
-      break;
-    if (!SIMIX_host_get_state(process->simdata->m_host->simdata->smx_host))
-      break;
-  }
-
-  SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
-  process->simdata->waiting_action = NULL;
-
-  /* for this process, don't need to change in get function */
-  SIMIX_mutex_unlock(t_simdata->mutex);
-
-  if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
-    if (SIMIX_action_destroy(t_simdata->comm))
-      t_simdata->comm = NULL;
-    MSG_RETURN(MSG_OK);
-  } else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
-    if (SIMIX_action_destroy(t_simdata->comm))
-      t_simdata->comm = NULL;
-    MSG_RETURN(MSG_HOST_FAILURE);
-  } else {
-    if (SIMIX_action_destroy(t_simdata->comm))
-      t_simdata->comm = NULL;
-    MSG_RETURN(MSG_TRANSFER_FAILURE);
-  }
+  MSG_RETURN (MSG_OK);
 }
 
 MSG_error_t
 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
                              double timeout)
 {
+  xbt_ex_t e;
+  MSG_error_t ret;
   m_process_t process = MSG_process_self();
   const char *hostname;
   simdata_task_t t_simdata = NULL;
   m_host_t local_host = NULL;
   m_host_t remote_host = NULL;
-  smx_cond_t cond = NULL;
 
   CHECK_HOST();
 
@@ -332,110 +235,33 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
   if (!remote_host)
     THROW1(not_found_error, 0, "Host %s not fount", hostname);
 
-
   DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel %s",
          t_simdata->message_size / 1000, local_host->name,
          remote_host->name, MSG_mailbox_get_alias(mailbox));
 
-  SIMIX_mutex_lock(remote_host->simdata->mutex);        /* FIXME: lock the mailbox instead */
-
-  /* put the task in the mailbox */
-  xbt_fifo_push(mailbox->tasks, task);
-
-  if ((cond = MSG_mailbox_get_cond(mailbox))) {
-    DEBUG0("Somebody is listening. Let's wake him up!");
-    SIMIX_cond_signal(cond);
+  TRY{
+    SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
+                       timeout, &task, sizeof(void *), comm_filter_put, NULL);
   }
 
-  SIMIX_mutex_unlock(remote_host->simdata->mutex);
-
-  SIMIX_mutex_lock(t_simdata->mutex);
-
-  process->simdata->waiting_action = t_simdata->comm;   // for debugging and status displaying purpose
-
-  if (timeout > 0) {
-    xbt_ex_t e;
-    double time;
-    double time_elapsed;
-    time = SIMIX_get_clock();
-
-    TRY {
-      /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
-      while (1) {
-        time_elapsed = SIMIX_get_clock() - time;
-        SIMIX_cond_wait_timeout(t_simdata->cond, t_simdata->mutex,
-                                timeout - time_elapsed);
-
-        if (t_simdata->comm)
-          SIMIX_action_use(t_simdata->comm);
-        if (t_simdata->comm && (SIMIX_action_get_state(t_simdata->comm) !=
-                                SURF_ACTION_RUNNING))
-          break;
-        if (!SIMIX_host_get_state(local_host->simdata->smx_host))
-          break;
-        if (!SIMIX_host_get_state(remote_host->simdata->smx_host))
-          break;
-      }
-    }
-    CATCH(e) {
-      if (e.category == timeout_error) {
-        xbt_ex_free(e);
-        /* verify if the timeout happened and the communication didn't started yet */
-        if (t_simdata->comm == NULL) {
-          DEBUG1("Action terminated %s (there was a timeout)", task->name);
-          process->simdata->waiting_action = NULL;
-
-          /* remove the task from the mailbox */
-          MSG_mailbox_remove(mailbox, task);
-
-/*           if (t_simdata->receiver && t_simdata->receiver->simdata) {    /\* receiver still around *\/ */
-/*             t_simdata->receiver->simdata->waiting_task = NULL; */
-/*           } */
-
-          SIMIX_mutex_unlock(t_simdata->mutex);
-          MSG_RETURN(MSG_TRANSFER_FAILURE);
-        }
-      } else {
-        RETHROW;
-      }
-    }
-  } else {
-    while (1) {                 //FIXME: factorize with the code right above
-      SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
-
-      if (t_simdata->comm)
-        SIMIX_action_use(t_simdata->comm);
-      if (t_simdata->comm
-          && SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
+  CATCH(e){
+    switch(e.category){
+      case host_error:
+        ret = MSG_HOST_FAILURE;
         break;
-      if (!SIMIX_host_get_state(local_host->simdata->smx_host))
-        break;
-      if (!SIMIX_host_get_state(remote_host->simdata->smx_host))
+      case network_error:
+        ret = MSG_TRANSFER_FAILURE;
         break;
+      case timeout_error:
+        ret = MSG_TRANSFER_FAILURE;
+        break;      
+      default:
+        xbt_die("Unhandled SIMIX network exception");       
     }
+    xbt_ex_free(e);
+    MSG_RETURN(ret);        
   }
 
-  DEBUG1("Action terminated %s", task->name);
-  process->simdata->waiting_action = NULL;
-/*   if (t_simdata->receiver && t_simdata->receiver->simdata) {    /\* receiver still around *\/ */
-/*     t_simdata->receiver->simdata->waiting_task = NULL; */
-/*   } */
-
-  SIMIX_mutex_unlock(t_simdata->mutex);
-
-  if (t_simdata->comm
-      && SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
-    if (SIMIX_action_destroy(t_simdata->comm))
-      t_simdata->comm = NULL;
-    t_simdata->refcount--;
-    MSG_RETURN(MSG_OK);
-  } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) {
-    if (t_simdata->comm && SIMIX_action_destroy(t_simdata->comm))
-      t_simdata->comm = NULL;
-    MSG_RETURN(MSG_HOST_FAILURE);
-  } else {
-    if (t_simdata->comm && SIMIX_action_destroy(t_simdata->comm))
-      t_simdata->comm = NULL;
-    MSG_RETURN(MSG_TRANSFER_FAILURE);
-  }
+  t_simdata->refcount--;
+  MSG_RETURN (MSG_OK);
 }
index 63d2703..29f7a41 100644 (file)
@@ -25,10 +25,11 @@ SG_BEGIN_DECL()
 /**************** datatypes **********************************/
 /* this structure represents a mailbox */
      typedef struct s_msg_mailbox {
-       char *alias;             /* the key of the mailbox in the global dictionary                      */
-       xbt_fifo_t tasks;        /* the list of the tasks in the mailbox                                         */
-       smx_cond_t cond;         /* the condition on the mailbox                                                         */
-       char *hostname;          /* the name of the host containing the mailbox                          */
+       char *alias;             /* the key of the mailbox in the global dictionary */
+       xbt_fifo_t tasks;        /* the list of the tasks in the mailbox */
+       smx_cond_t cond;         /* the condition on the mailbox */
+       char *hostname;          /* the name of the host containing the mailbox */
+       smx_rdv_t rdv;           /* SIMIX rendez-vous point */
      } s_msg_mailbox_t;
 
      typedef struct simdata_host {
index 2d00b7f..8d40e0e 100644 (file)
@@ -129,6 +129,7 @@ typedef struct s_smx_rvpoint {
 } s_smx_rvpoint_t;
 
 typedef struct s_smx_comm {
+  smx_comm_type_t type;
   smx_host_t src_host;
   smx_host_t dst_host;
   smx_rdv_t rdv;
@@ -136,7 +137,7 @@ typedef struct s_smx_comm {
   smx_action_t act;
   void *data;
   size_t data_size;
-  void **dest_buff;
+  void *dest_buff;
   size_t *dest_buff_size;
   double rate;
   double task_size;
index 2c09bb6..26e0c5e 100644 (file)
@@ -409,13 +409,17 @@ double SIMIX_solve(xbt_fifo_t actions_done, xbt_fifo_t actions_failed)
       while ((action = xbt_swag_extract(model->states.failed_action_set))) {
         smx_action = action->data;
         if (smx_action) {
-          xbt_fifo_unshift(actions_failed, smx_action);
+          SIMIX_action_signal_all(smx_action);
         }
       }
       while ((action = xbt_swag_extract(model->states.done_action_set))) {
         smx_action = action->data;
         if (smx_action) {
-          xbt_fifo_unshift(actions_done, smx_action);
+          /* Copy the transfered data of the completed communication actions */
+          /* FIXME: find a better way to determine if its a comm action */
+          if(smx_action->data != NULL)
+            SIMIX_network_copy_data((smx_comm_t)smx_action->data);
+          SIMIX_action_signal_all(smx_action);      
         }
       }
     }
index 4ba18f5..f9ac690 100644 (file)
  *  \param name The name of the rendez-vous point
  *  \return The created rendez-vous point
  */
-smx_rdv_t SIMIX_rdv_create(char *name)
+smx_rdv_t SIMIX_rdv_create(const char *name)
 {
   smx_rdv_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
-  rvp->name = xbt_strdup(name);
+  rvp->name = name ? xbt_strdup(name) : NULL;
   rvp->read = SIMIX_mutex_init();
   rvp->write = SIMIX_mutex_init();
   rvp->comm_fifo = xbt_fifo_new();
@@ -35,7 +35,8 @@ smx_rdv_t SIMIX_rdv_create(char *name)
  */
 void SIMIX_rdv_destroy(smx_rdv_t rvp)
 {
-  xbt_free(rvp->name);
+  if(rvp->name)
+    xbt_free(rvp->name);
   SIMIX_mutex_destroy(rvp->read);
   SIMIX_mutex_destroy(rvp->write);
   xbt_fifo_free(rvp->comm_fifo);
@@ -59,29 +60,22 @@ static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm)
  *  \param look_for_src boolean. True: we are receiver looking for sender; False: other way round
  *  \return The communication request if found, or a newly created one otherwise.
  */
-smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src) {
-  /* Get a communication request from the rendez-vous queue. If it is the kind
-     of request we are looking for then return it, otherwise put it again in the
-     queue.
-   */
-  smx_comm_t comm = xbt_fifo_shift(rvp->comm_fifo);
-
-  if(comm != NULL) {
-    if (( look_for_src && comm->src_host != NULL) ||
-        (!look_for_src && comm->dst_host != NULL)) {
+smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rvp, int (filter)(smx_comm_t, void*), void *arg) {
+  smx_comm_t comm;
+  xbt_fifo_item_t item;
 
+  /* Traverse the rendez-vous queue looking for a comm request matching the
+     filter conditions. If found return it and remove it from the list. */
+  xbt_fifo_foreach(rvp->comm_fifo, item, comm, smx_comm_t) {
+    if(filter(comm, arg)){
       SIMIX_communication_use(comm);
+      xbt_fifo_remove_item(rvp->comm_fifo, item);
       return comm;
     }
   }
-  xbt_fifo_unshift(rvp->comm_fifo, comm);
-
-  /* no relevant request found. Create a new comm action and set it up */
-  comm = SIMIX_communication_new(rvp);
-  SIMIX_rdv_push(rvp, comm);
-  SIMIX_communication_use(comm);
 
-  return comm;
+  /* no relevant request found. Return NULL */
+  return NULL;
 }
 
 /******************************************************************************/
@@ -94,13 +88,15 @@ smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src) {
  *  \param receiver The process receiving the communication (by recv)
  *  \return the communication request
  */  
-smx_comm_t SIMIX_communication_new(smx_rdv_t rdv)
+smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv)
 {
   /* alloc structures */
   smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
+  comm->type = type;
   comm->cond = SIMIX_cond_init();
   comm->rdv = rdv;
-
+  comm->refcount = 1;
+  
   return comm;
 }
 
@@ -139,10 +135,15 @@ static inline void SIMIX_communication_use(smx_comm_t comm)
  */
 static inline void SIMIX_communication_start(smx_comm_t comm)
 {
-  comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, 
-      comm->task_size, comm->rate);
-
-  SIMIX_register_action_to_condition(comm->act, comm->cond);
+  /* If both the sender and the receiver are already there, start the communication */
+  if(comm->src_host != NULL && comm->dst_host != NULL){
+    comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, 
+                                         comm->task_size, comm->rate);
+    /* Add the communication as user data into the action, so it can be reached from it later */
+    comm->act->data = comm;
+    
+    SIMIX_register_action_to_condition(comm->act, comm->cond);
+  }
 }
 
 /**
@@ -186,6 +187,57 @@ static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, doub
   }
 }
 
+/**
+ *  \brief Copy the communication data from the sender's buffer to the receiver's one
+ *  \param comm The communication
+ */
+void SIMIX_network_copy_data(smx_comm_t comm)
+{
+  /* Copy the minimum between the size of the sender's message and the size of the
+     receiver's buffer */
+  *comm->dest_buff_size = *comm->dest_buff_size < comm->data_size ? 
+                            *comm->dest_buff_size : comm->data_size;
+
+  memcpy(comm->dest_buff, comm->data, *comm->dest_buff_size);
+}
+
+/**
+ *  \brief Checks if a communication is a send request
+ *  \param comm The communication
+ *  \return Boolean value
+ */
+int SIMIX_communication_isSend(smx_comm_t comm)
+{
+  return comm->type == comm_send ? TRUE : FALSE;
+}
+
+/**
+ *  \brief Checks if a communication is a recv request
+ *  \param comm The communication
+ *  \return Boolean value
+ */
+int SIMIX_communication_isRecv(smx_comm_t comm)
+{
+  return comm->type == comm_recv ? TRUE : FALSE;
+}
+
+/* FIXME: move to some other place */
+int comm_filter_get(smx_comm_t comm, void *arg)
+{
+  if(comm->type == comm_send){
+    if(arg && comm->src_host != (smx_host_t)arg)
+     return FALSE;
+    else
+     return TRUE;
+  }else{
+    return FALSE;
+  }
+}
+
+int comm_filter_put(smx_comm_t comm, void *arg)
+{
+  return comm->type == comm_recv ? TRUE : FALSE;
+}
 /******************************************************************************/
 /*                        Synchronous Communication                           */
 /******************************************************************************/
@@ -194,22 +246,29 @@ static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, doub
  *   - timeout_error if communication reached the timeout specified
  *   - network_error if network failed or peer issued a timeout
  */
-void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, double timeout, void *data, size_t data_size)
+void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, 
+                        double timeout, void *data, size_t data_size,
+                        int (filter)(smx_comm_t, void *), void *arg)
 {
   smx_comm_t comm;
 
-  /* Setup communication request */
-  comm = SIMIX_rdv_get_request_or_create(rdv, 0);
+  /* Look for communication request matching our needs. 
+     If it is not found then create it and push it into the rendez-vous point */
+  comm = SIMIX_rdv_get_request(rdv, filter, arg);
+
+  if(comm == NULL){
+    comm = SIMIX_communication_new(comm_send, rdv);
+    SIMIX_rdv_push(rdv, comm);
+  }
+
+  /* Setup the communication request */
   comm->src_host = SIMIX_host_self();
   comm->task_size = task_size;
   comm->rate = rate;
   comm->data = data;
   comm->data_size = data_size;
 
-  /* If the receiver is already there, start the communication */
-  /* If it's not here already, it will start that communication itself later on */
-  if(comm->dst_host != NULL)
-    SIMIX_communication_start(comm);
+  SIMIX_communication_start(comm);
 
   /* Wait for communication completion */
   /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
@@ -224,20 +283,26 @@ void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, double tim
  *   - timeout_error if communication reached the timeout specified
  *   - network_error if network failed or peer issued a timeout
  */
-void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void **data, size_t *data_size)
+void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, 
+                        size_t *data_size, int (filter)(smx_comm_t, void *), void *arg)
 {
   smx_comm_t comm;
 
+  /* Look for communication request matching our needs. 
+     If it is not found then create it and push it into the rendez-vous point */
+  comm = SIMIX_rdv_get_request(rdv, filter, arg);
+
+  if(comm == NULL){
+    comm = SIMIX_communication_new(comm_send, rdv);
+    SIMIX_rdv_push(rdv, comm);
+  }
+
   /* Setup communication request */
-  comm = SIMIX_rdv_get_request_or_create(rdv, 1);
   comm->dst_host = SIMIX_host_self();
   comm->dest_buff = data;
   comm->dest_buff_size = data_size;
 
-  /* If the sender is already there, start the communication */
-  /* If it's not here already, it will start that communication itself later on */
-  if (comm->src_host != NULL)
-    SIMIX_communication_start(comm);
+  SIMIX_communication_start(comm);
 
   /* Wait for communication completion */
   /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
@@ -272,10 +337,6 @@ XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm)
 
 
 
-  /* FIXME: MOVE DATA COPY TO MAESTRO AT ACTION SIGNAL TIME 
-   We are OK, let's copy the message to receiver's buffer 
-  *size = *size < comm->data_size ? *size : comm->data_size;
-  memcpy(*data, comm->data, *size);*/