Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[Cristian] Lots and lots of small fixes to MSG to work on top of SMX net keeping...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 5 Oct 2009 12:27:48 +0000 (12:27 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 5 Oct 2009 12:27:48 +0000 (12:27 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6704 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/include/simix/simix.h
src/java/jmsg.c
src/msg/host.c
src/msg/m_process.c
src/msg/mailbox.h
src/msg/msg_mailbox.c
src/msg/private.h
src/msg/task.c
src/simix/private.h
src/simix/smx_network.c

index 9479cdb..7b8ce2d 100644 (file)
@@ -178,13 +178,17 @@ XBT_PUBLIC(void) SIMIX_display_process_status(void);
 XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(const char *name);
 XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp);
 XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
 XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(const char *name);
 XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp);
 XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
-                                    double timeout, void *data, size_t data_size);
-XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data,
-                                    size_t *data_size);
+                                    double timeout, void *src_buff,
+                                    size_t src_buff_size, smx_comm_t *comm, void *data);
+XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *dst_buff,
+                                    size_t *dst_buff_size, smx_comm_t *comm);
 XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm);
 XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm);
 XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm);
 XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm);
-XBT_PUBLIC(int) SIMIX_communication_isSend(smx_comm_t comm);
-XBT_PUBLIC(int) SIMIX_communication_isRecv(smx_comm_t comm);
+XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm);
+XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm);
+XBT_PUBLIC(int) SIMIX_rdv_get_count_waiting_comm(smx_rdv_t rdv, smx_host_t host);
+XBT_PUBLIC(smx_comm_t) SIMIX_rdv_get_head(smx_rdv_t rdv);
+XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm);
 
 /* These should be private to SIMIX */
 smx_comm_t SIMIX_communication_new(smx_comm_type_t type);
 
 /* These should be private to SIMIX */
 smx_comm_t SIMIX_communication_new(smx_comm_type_t type);
index 7faed5e..22d3a02 100644 (file)
@@ -149,10 +149,7 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv * env, jclass cls,
           process->name);
 
   mailbox = MSG_mailbox_new(alias);
           process->name);
 
   mailbox = MSG_mailbox_new(alias);
-  MSG_mailbox_set_hostname(mailbox,
-                           process->simdata->m_host->simdata->smx_host->name);
-
-
+  
 }
 
 JNIEXPORT void JNICALL
 }
 
 JNIEXPORT void JNICALL
index f5d3057..2d2edd9 100644 (file)
@@ -54,7 +54,6 @@ m_host_t __MSG_host_create(smx_host_t workstation, void *data)
 
     /* the key of the mailbox (in this case) is build from the name of the host and the channel number */
     simdata->mailboxes[i] = MSG_mailbox_create(alias);
 
     /* the key of the mailbox (in this case) is build from the name of the host and the channel number */
     simdata->mailboxes[i] = MSG_mailbox_create(alias);
-    MSG_mailbox_set_hostname(simdata->mailboxes[i], name);
     memset(alias, 0, MAX_ALIAS_NAME + 1);
   }
 
     memset(alias, 0, MAX_ALIAS_NAME + 1);
   }
 
index 68779a4..739a784 100644 (file)
@@ -195,10 +195,13 @@ void MSG_process_kill(m_process_t process)
   DEBUG3("Killing %s(%d) on %s",
          process->name, p_simdata->PID, p_simdata->m_host->name);
 
   DEBUG3("Killing %s(%d) on %s",
          process->name, p_simdata->PID, p_simdata->m_host->name);
 
-  if (p_simdata->waiting_action) {
-    DEBUG1("Canceling waiting task %s",
-           SIMIX_action_get_name(p_simdata->waiting_action));
-    SIMIX_action_cancel(p_simdata->waiting_action);
+  if (p_simdata->waiting_task) {
+    DEBUG1("Canceling waiting task %s", p_simdata->waiting_task->name);
+    if (p_simdata->waiting_task->simdata->compute) {
+      SIMIX_action_cancel(p_simdata->waiting_task->simdata->compute);
+    } else if (p_simdata->waiting_task->simdata->comm) {
+      SIMIX_communication_cancel(p_simdata->waiting_task->simdata->comm);
+    }
   }
 
   xbt_fifo_remove(msg_global->process_list, process);
   }
 
   xbt_fifo_remove(msg_global->process_list, process);
index c742cc2..6e577fe 100644 (file)
@@ -119,30 +119,6 @@ XBT_PUBLIC(smx_cond_t)
  */
 XBT_PUBLIC(void) MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond);
 
  */
 XBT_PUBLIC(void) MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond);
 
-/*! \brief MSG_mailbox_get_hostname - get the name of the host owned a mailbox.
- *
- * The function MSG_mailbox_get_hostname returns name of the host
- * owned the mailbox specified by the parameter mailbox.
- *
- * \param mailbox      The mailbox to get the name of the host.
- *
- * \return The name of the host owned the mailbox specified by the parameter mailbox.
- */
-XBT_PUBLIC(const char *) MSG_mailbox_get_hostname(msg_mailbox_t mailbox);
-
-/*! \brief MSG_mailbox_set_hostname - set the name of the host owned a mailbox.
- *
- * The function MSG_mailbox_set_hostname sets the name of the host
- * owned the mailbox specified by the parameter mailbox.
- *
- * \param mailbox      The mailbox to set the name of the host.
- * \param hostname     The name of the owner of the mailbox.
- *
- */
-XBT_PUBLIC(void)
-MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname);
-
-
 /*! \brief MSG_mailbox_is_empty - test if a mailbox is empty.
  *
  * The function MSG_mailbox_is_empty tests if a mailbox is empty
 /*! \brief MSG_mailbox_is_empty - test if a mailbox is empty.
  *
  * The function MSG_mailbox_is_empty tests if a mailbox is empty
@@ -155,15 +131,6 @@ MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname);
  */
 XBT_PUBLIC(int) MSG_mailbox_is_empty(msg_mailbox_t mailbox);
 
  */
 XBT_PUBLIC(int) MSG_mailbox_is_empty(msg_mailbox_t mailbox);
 
-/*! \brief MSG_mailbox_remove - remove a task from a mailbox.
- *
- * The MSG_mailbox_remove removes a task from a specified mailbox.
- *
- * \param mailbox      The mailbox concerned by this operation.
- * \param task         The task to remove from the mailbox.
- */
-XBT_PUBLIC(void) MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task);
-
 /*! \brief MSG_mailbox_get_head - get the task at the head of a mailbox.
  *
  * The MSG_mailbox_get_head returns the task at the head of the mailbox.
 /*! \brief MSG_mailbox_get_head - get the task at the head of a mailbox.
  *
  * The MSG_mailbox_get_head returns the task at the head of the mailbox.
@@ -177,33 +144,6 @@ XBT_PUBLIC(void) MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task);
 XBT_PUBLIC(m_task_t)
   MSG_mailbox_get_head(msg_mailbox_t mailbox);
 
 XBT_PUBLIC(m_task_t)
   MSG_mailbox_get_head(msg_mailbox_t mailbox);
 
-/*! \brief MSG_mailbox_pop_head - get the task at the head of a mailbox
- * and remove it from it.
- *
- * The MSG_mailbox_pop_head returns the task at the head of the mailbox
- * and remove it from it.
- *
- * \param mailbox      The mailbox concerned by the operation.
- *
- * \return             The task at the head of the mailbox.
- */
-XBT_PUBLIC(m_task_t)
-  MSG_mailbox_pop_head(msg_mailbox_t mailbox);
-
-/*! \brief MSG_mailbox_get_first_host_task - get the first msg task
- * of a specified mailbox, sended by a process of a specified host.
- *
- * \param mailbox      The mailbox concerned by the operation.
- * \param host         The msg host of the process that has sended the
- *                     task.
- *
- * \return             The first task in the mailbox specified by the
- *                     parameter mailbox and sended by a process located
- *                     on the host specified by the parameter host.
- */
-XBT_PUBLIC(m_task_t)
-  MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host);
-
 /*! \brief MSG_mailbox_get_count_host_waiting_tasks - Return the number of tasks
    waiting to be received in a mailbox and sent by a host.
  *
 /*! \brief MSG_mailbox_get_count_host_waiting_tasks - Return the number of tasks
    waiting to be received in a mailbox and sent by a host.
  *
index b0ea9b1..0164422 100644 (file)
@@ -20,10 +20,8 @@ msg_mailbox_t MSG_mailbox_create(const char *alias)
 {
   msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t, 1);
 
 {
   msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t, 1);
 
-  mailbox->tasks = xbt_fifo_new();
   mailbox->cond = NULL;
   mailbox->alias = alias ? xbt_strdup(alias) : NULL;
   mailbox->cond = NULL;
   mailbox->alias = alias ? xbt_strdup(alias) : NULL;
-  mailbox->hostname = NULL;
   mailbox->rdv = SIMIX_rdv_create(alias);
   
   return mailbox;
   mailbox->rdv = SIMIX_rdv_create(alias);
   
   return mailbox;
@@ -43,10 +41,6 @@ void MSG_mailbox_free(void *mailbox)
 {
   msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox;
 
 {
   msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox;
 
-  if (_mailbox->hostname)
-    free(_mailbox->hostname);
-
-  xbt_fifo_free(_mailbox->tasks);
   free(_mailbox->alias);
   SIMIX_rdv_destroy(_mailbox->rdv);
   
   free(_mailbox->alias);
   SIMIX_rdv_destroy(_mailbox->rdv);
   
@@ -58,59 +52,25 @@ smx_cond_t MSG_mailbox_get_cond(msg_mailbox_t mailbox)
   return mailbox->cond;
 }
 
   return mailbox->cond;
 }
 
-void MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)
-{
-  xbt_fifo_remove(mailbox->tasks, task);
-}
-
 int MSG_mailbox_is_empty(msg_mailbox_t mailbox)
 {
 int MSG_mailbox_is_empty(msg_mailbox_t mailbox)
 {
-  return (NULL == xbt_fifo_get_first_item(mailbox->tasks));
-}
-
-m_task_t MSG_mailbox_pop_head(msg_mailbox_t mailbox)
-{
-  return (m_task_t) xbt_fifo_shift(mailbox->tasks);
+  return (NULL == SIMIX_rdv_get_head(mailbox->rdv));
 }
 
 m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
 {
 }
 
 m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
 {
-  xbt_fifo_item_t item;
-
-  if (!(item = xbt_fifo_get_first_item(mailbox->tasks)))
-    return NULL;
-
-  return (m_task_t) xbt_fifo_get_item_content(item);
-}
-
-
-m_task_t MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)
-{
-  m_task_t task = NULL;
-  xbt_fifo_item_t item = NULL;
-
-  xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)
-    if (task->simdata->source == host) {
-    xbt_fifo_remove_item(mailbox->tasks, item);
-    return task;
-  }
+  smx_comm_t comm = SIMIX_rdv_get_head(mailbox->rdv);
 
 
-  return NULL;
+  if(!comm)
+    return NULL; 
+  
+  return (m_task_t)SIMIX_communication_get_data(comm);
 }
 
 int
 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)
 {
 }
 
 int
 MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)
 {
-  m_task_t task = NULL;
-  xbt_fifo_item_t item = NULL;
-  int count = 0;
-
-  xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) {
-    if (task->simdata->source == host)
-      count++;
-  }
-
-  return count;
+  return SIMIX_rdv_get_count_waiting_comm (mailbox->rdv, host->simdata->smx_host);
 }
 
 void MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)
 }
 
 void MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)
@@ -123,25 +83,13 @@ const char *MSG_mailbox_get_alias(msg_mailbox_t mailbox)
   return mailbox->alias;
 }
 
   return mailbox->alias;
 }
 
-const char *MSG_mailbox_get_hostname(msg_mailbox_t mailbox)
-{
-  return mailbox->hostname;
-}
-
-void MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname)
-{
-  mailbox->hostname = xbt_strdup(hostname);
-}
-
 msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
 {
 
   msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes, alias);
 
 msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
 {
 
   msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes, alias);
 
-  if (!mailbox) {
+  if (!mailbox)
     mailbox = MSG_mailbox_new(alias);
     mailbox = MSG_mailbox_new(alias);
-    MSG_mailbox_set_hostname(mailbox, MSG_host_self()->name);
-  }
 
   return mailbox;
 }
 
   return mailbox;
 }
@@ -157,15 +105,20 @@ msg_mailbox_t MSG_mailbox_get_by_channel(m_host_t host, m_channel_t channel)
 }
 
 MSG_error_t
 }
 
 MSG_error_t
-MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
-                         m_host_t host, double timeout)
+MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task, m_host_t host,
+                         double timeout)
 {
   xbt_ex_t e;
 {
   xbt_ex_t e;
-  MSG_error_t ret;
-  smx_host_t smx_host;
-  size_t task_size = sizeof(void*);
+  MSG_error_t ret = MSG_OK;
+  size_t buff_size = 0;
+  smx_comm_t comm;
   CHECK_HOST();
 
   CHECK_HOST();
 
+  /* Kept for compatibility with older implementation */
+  xbt_assert1(!MSG_mailbox_get_cond(mailbox),
+              "A process is already blocked on this channel %s", 
+              MSG_mailbox_get_alias(mailbox));
+
   /* Sanity check */
   xbt_assert0(task, "Null pointer for the task storage");
 
   /* Sanity check */
   xbt_assert0(task, "Null pointer for the task storage");
 
@@ -173,10 +126,13 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
     CRITICAL0
       ("MSG_task_get() was asked to write in a non empty task struct.");
 
     CRITICAL0
       ("MSG_task_get() was asked to write in a non empty task struct.");
 
-  smx_host = host ? host->simdata->smx_host : NULL;
-  
+  /* We no loger support getting a task from a specific host */
+  if(host)
+    THROW_UNIMPLEMENTED;
+
+  /* Try to receive it by calling SIMIX network layer */
   TRY{
   TRY{
-    SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size);
+    SIMIX_network_recv(mailbox->rdv, timeout, NULL, &buff_size, &comm);
   }
   CATCH(e){
     switch(e.category){
   }
   CATCH(e){
     switch(e.category){
@@ -190,16 +146,18 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
         ret = MSG_TRANSFER_FAILURE;
         break;      
       default:
         ret = MSG_TRANSFER_FAILURE;
         break;      
       default:
-        ret = MSG_OK;
-        RETHROW;
-        break;
-        /*xbt_die("Unhandled SIMIX network exception");*/
+        xbt_die("Unhandled SIMIX network exception");
     }
     }
-    xbt_ex_free(e);
-    MSG_RETURN(ret);        
+    xbt_ex_free(e);        
   }
   }
-  MSG_RETURN (MSG_OK);
+
+  *task = SIMIX_communication_get_data(comm);
+
+  /* If the sender didn't decremented the refcount so far then do it */
+  if (*task && (*task)->simdata->refcount > 1)
+    (*task)->simdata->refcount--;
+  
+  MSG_RETURN(ret);        
 }
 
 MSG_error_t
 }
 
 MSG_error_t
@@ -207,43 +165,33 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
                              double timeout)
 {
   xbt_ex_t e;
                              double timeout)
 {
   xbt_ex_t e;
-  MSG_error_t ret;
-  m_process_t process = MSG_process_self();
-  const char *hostname;
+  MSG_error_t ret = MSG_OK;
   simdata_task_t t_simdata = NULL;
   simdata_task_t t_simdata = NULL;
-  m_host_t local_host = NULL;
-  m_host_t remote_host = NULL;
-
+  m_process_t process = MSG_process_self();
+  
   CHECK_HOST();
 
   CHECK_HOST();
 
+  /* Prepare the task to send */
   t_simdata = task->simdata;
   t_simdata->sender = process;
   t_simdata = task->simdata;
   t_simdata->sender = process;
-  t_simdata->source = MSG_process_get_host(process);
+  t_simdata->source = MSG_host_self();
 
   xbt_assert0(t_simdata->refcount == 1,
               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
 
 
   xbt_assert0(t_simdata->refcount == 1,
               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
 
-  t_simdata->comm = NULL;
-
-  /*t_simdata->refcount++;*/
-  local_host = ((simdata_process_t) process->simdata)->m_host;
+  t_simdata->refcount++;
   msg_global->sent_msg++;
 
   msg_global->sent_msg++;
 
-  /* get the host name containing the mailbox */
-  hostname = MSG_mailbox_get_hostname(mailbox);
-
-  remote_host = MSG_get_host_by_name(hostname);
-
-  if (!remote_host)
-    THROW1(not_found_error, 0, "Host %s not fount", hostname);
-
-  DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel %s",
-         t_simdata->message_size / 1000, local_host->name,
-         remote_host->name, MSG_mailbox_get_alias(mailbox));
+  process->simdata->waiting_task = task;
 
 
+  /* Try to send it by calling SIMIX network layer */
   TRY{
   TRY{
+    /* Kept for semantical compatibility with older implementation */
+    if(mailbox->cond)
+      SIMIX_cond_signal(mailbox->cond);
+
     SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
     SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
-                       timeout, &task, sizeof(void *));
+                       timeout, NULL, 0, &t_simdata->comm, task);
   }
 
   CATCH(e){
   }
 
   CATCH(e){
@@ -258,15 +206,19 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
         ret = MSG_TRANSFER_FAILURE;
         break;      
       default:
         ret = MSG_TRANSFER_FAILURE;
         break;      
       default:
-        ret = MSG_OK;
-        RETHROW;
-        break;
         xbt_die("Unhandled SIMIX network exception");
     }
     xbt_ex_free(e);
         xbt_die("Unhandled SIMIX network exception");
     }
     xbt_ex_free(e);
-    MSG_RETURN(ret);        
+    /* If the receiver end didn't decremented the refcount so far then do it */
+    if (t_simdata->refcount > 1)
+      t_simdata->refcount--;
   }
 
   }
 
-  /*t_simdata->refcount--;*/
-  MSG_RETURN (MSG_OK);
+  process->simdata->waiting_task = NULL;
+
+  /* If the receiver end didn't decremented the refcount so far then do it */
+  if (t_simdata->refcount > 1)
+    t_simdata->refcount--;
+  
+  MSG_RETURN(ret);        
 }
 }
index 29f7a41..b8f05b1 100644 (file)
@@ -26,9 +26,7 @@ SG_BEGIN_DECL()
 /* this structure represents a mailbox */
      typedef struct s_msg_mailbox {
        char *alias;             /* the key of the mailbox in the global dictionary */
 /* this structure represents a mailbox */
      typedef struct s_msg_mailbox {
        char *alias;             /* the key of the mailbox in the global dictionary */
-       xbt_fifo_t tasks;        /* the list of the tasks in the mailbox */
        smx_cond_t cond;         /* the condition on the mailbox */
        smx_cond_t cond;         /* the condition on the mailbox */
-       char *hostname;          /* the name of the host containing the mailbox */
        smx_rdv_t rdv;           /* SIMIX rendez-vous point */
      } s_msg_mailbox_t;
 
        smx_rdv_t rdv;           /* SIMIX rendez-vous point */
      } s_msg_mailbox_t;
 
@@ -42,7 +40,7 @@ SG_BEGIN_DECL()
 
      typedef struct simdata_task {
        smx_action_t compute;    /* SURF modeling of computation  */
 
      typedef struct simdata_task {
        smx_action_t compute;    /* SURF modeling of computation  */
-       smx_action_t comm;       /* SURF modeling of communication  */
+       smx_comm_t comm;         /* SIMIX communication  */
        double message_size;     /* Data size  */
        double computation_amount;       /* Computation size  */
        smx_cond_t cond;
        double message_size;     /* Data size  */
        double computation_amount;       /* Computation size  */
        smx_cond_t cond;
index 60082dc..a54b159 100644 (file)
@@ -148,9 +148,7 @@ MSG_error_t MSG_task_destroy(m_task_t task)
   action = task->simdata->compute;
   if (action)
     SIMIX_action_destroy(action);
   action = task->simdata->compute;
   if (action)
     SIMIX_action_destroy(action);
-  action = task->simdata->comm;
-  if (action)
-    SIMIX_action_destroy(action);
+  
   /* parallel tasks only */
   if (task->simdata->host_list)
     xbt_free(task->simdata->host_list);
   /* parallel tasks only */
   if (task->simdata->host_list)
     xbt_free(task->simdata->host_list);
@@ -177,7 +175,7 @@ MSG_error_t MSG_task_cancel(m_task_t task)
     return MSG_OK;
   }
   if (task->simdata->comm) {
     return MSG_OK;
   }
   if (task->simdata->comm) {
-    SIMIX_action_cancel(task->simdata->comm);
+    SIMIX_communication_cancel(task->simdata->comm);
     return MSG_OK;
   }
 
     return MSG_OK;
   }
 
@@ -223,7 +221,7 @@ double MSG_task_get_remaining_communication(m_task_t task)
   xbt_assert0((task != NULL)
               && (task->simdata != NULL), "Invalid parameter");
 
   xbt_assert0((task != NULL)
               && (task->simdata != NULL), "Invalid parameter");
 
-  return SIMIX_action_get_remains(task->simdata->comm);
+  return SIMIX_communication_get_remains(task->simdata->comm);
 }
 
 /** \ingroup m_task_management
 }
 
 /** \ingroup m_task_management
index ee9c9ce..e4cb08d 100644 (file)
@@ -148,6 +148,7 @@ typedef struct s_smx_comm {
   size_t src_buff_size;
   void *dst_buff;
   size_t *dst_buff_size;
   size_t src_buff_size;
   void *dst_buff;
   size_t *dst_buff_size;
+  void *data;             /* User data associated to communication */
 } s_smx_comm_t;
 
 /********************************* Action *************************************/
 } s_smx_comm_t;
 
 /********************************* Action *************************************/
index 0f7fd68..41a86d0 100644 (file)
@@ -90,6 +90,39 @@ smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type)
   return NULL;
 }
 
   return NULL;
 }
 
+/**
+ *  \brief counts the number of communication requests of a given host pending
+ *         on a rendez-vous point
+ *  \param rdv The rendez-vous point
+ *  \param host The host to be counted
+ *  \return The number of comm request pending in the rdv
+ */
+int 
+SIMIX_rdv_get_count_waiting_comm(smx_rdv_t rdv, smx_host_t host)
+{
+  smx_comm_t comm = NULL;
+  xbt_fifo_item_t item = NULL;
+  int count = 0;
+
+  xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_comm_t) {
+    if (comm->src_proc->smx_host == host)
+      count++;
+  }
+
+  return count;
+}
+
+/**
+ *  \brief returns the communication at the head of the rendez-vous
+ *  \param rdv The rendez-vous point
+ *  \return The communication or NULL if empty
+ */
+smx_comm_t SIMIX_rdv_get_head(smx_rdv_t rdv)
+{
+  return (smx_comm_t)xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
+}
+
+
 /******************************************************************************/
 /*                           Communication Requests                           */
 /******************************************************************************/ 
 /******************************************************************************/
 /*                           Communication Requests                           */
 /******************************************************************************/ 
@@ -193,7 +226,7 @@ static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, doub
       if(e.category == timeout_error){
         DEBUG1("Communication timeout! %p", comm);
         if(comm->act && SIMIX_action_get_state(comm->act) == SURF_ACTION_RUNNING)
       if(e.category == timeout_error){
         DEBUG1("Communication timeout! %p", comm);
         if(comm->act && SIMIX_action_get_state(comm->act) == SURF_ACTION_RUNNING)
-          SIMIX_action_cancel(comm->act);
+          SIMIX_communication_cancel(comm);
         else
           SIMIX_rdv_remove(comm->rdv, comm);
           
         else
           SIMIX_rdv_remove(comm->rdv, comm);
           
@@ -220,6 +253,16 @@ static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, doub
   SIMIX_unregister_action_to_condition(comm->act, comm->cond);
 }
 
   SIMIX_unregister_action_to_condition(comm->act, comm->cond);
 }
 
+void SIMIX_communication_cancel(smx_comm_t comm)
+{
+  SIMIX_action_cancel(comm->act);
+}
+
+double SIMIX_communication_get_remains(smx_comm_t comm)
+{
+  return SIMIX_action_get_remains(comm->act);
+}  
+
 /**
  *  \brief Copy the communication data from the sender's buffer to the receiver's one
  *  \param comm The communication
 /**
  *  \brief Copy the communication data from the sender's buffer to the receiver's one
  *  \param comm The communication
@@ -228,13 +271,16 @@ void SIMIX_network_copy_data(smx_comm_t comm)
 {
   size_t src_buff_size = comm->src_buff_size;
   size_t dst_buff_size = *comm->dst_buff_size;
 {
   size_t src_buff_size = comm->src_buff_size;
   size_t dst_buff_size = *comm->dst_buff_size;
-
+  
   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
   dst_buff_size = MIN(dst_buff_size, src_buff_size);
   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
   dst_buff_size = MIN(dst_buff_size, src_buff_size);
-
+  
   /* Update the receiver's buffer size to the copied amount */
   *comm->dst_buff_size = dst_buff_size;
 
   /* Update the receiver's buffer size to the copied amount */
   *comm->dst_buff_size = dst_buff_size;
 
+  if(dst_buff_size == 0)
+    return;
+
   memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
 
   DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", 
   memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
 
   DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", 
@@ -242,6 +288,16 @@ void SIMIX_network_copy_data(smx_comm_t comm)
          dst_buff_size);
 }
 
          dst_buff_size);
 }
 
+/**
+ *  \brief Return the user data associated to the communication
+ *  \param comm The communication
+ *  \return the user data
+ */
+void *SIMIX_communication_get_data(smx_comm_t comm)
+{
+  return comm->data;
+}
+
 /******************************************************************************/
 /*                        Synchronous Communication                           */
 /******************************************************************************/
 /******************************************************************************/
 /*                        Synchronous Communication                           */
 /******************************************************************************/
@@ -251,7 +307,8 @@ void SIMIX_network_copy_data(smx_comm_t comm)
  *   - network_error if network failed or peer issued a timeout
  */
 void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, 
  *   - network_error if network failed or peer issued a timeout
  */
 void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, 
-                        double timeout, void *data, size_t data_size)
+                        double timeout, void *src_buff, size_t src_buff_size,
+                        smx_comm_t *comm_ref, void *data)
 {
   smx_comm_t comm;
   
 {
   smx_comm_t comm;
   
@@ -264,12 +321,16 @@ void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
     SIMIX_rdv_push(rdv, comm);
   }
 
     SIMIX_rdv_push(rdv, comm);
   }
 
+  /* Update the communication reference with the comm to be used */
+  *comm_ref = comm;
+  
   /* Setup the communication request */
   comm->src_proc = SIMIX_process_self();
   comm->task_size = task_size;
   comm->rate = rate;
   /* Setup the communication request */
   comm->src_proc = SIMIX_process_self();
   comm->task_size = task_size;
   comm->rate = rate;
-  comm->src_buff = data;
-  comm->src_buff_size = data_size;
+  comm->src_buff = src_buff;
+  comm->src_buff_size = src_buff_size;
+  comm->data = data;
 
   SIMIX_communication_start(comm);
 
 
   SIMIX_communication_start(comm);
 
@@ -284,7 +345,8 @@ void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
  *   - timeout_error if communication reached the timeout specified
  *   - network_error if network failed or peer issued a timeout
  */
  *   - timeout_error if communication reached the timeout specified
  *   - network_error if network failed or peer issued a timeout
  */
-void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, size_t *data_size)
+void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *dst_buff, 
+                        size_t *dst_buff_size, smx_comm_t *comm_ref)
 {
   smx_comm_t comm;
 
 {
   smx_comm_t comm;
 
@@ -297,10 +359,13 @@ void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, size_t *data_
     SIMIX_rdv_push(rdv, comm);
   }
 
     SIMIX_rdv_push(rdv, comm);
   }
 
+  /* Update the communication reference with the comm to be used */
+  *comm_ref = comm;
   /* Setup communication request */
   comm->dst_proc = SIMIX_process_self();
   /* Setup communication request */
   comm->dst_proc = SIMIX_process_self();
-  comm->dst_buff = data;
-  comm->dst_buff_size = data_size;
+  comm->dst_buff = dst_buff;
+  comm->dst_buff_size = dst_buff_size;
 
   SIMIX_communication_start(comm);
 
 
   SIMIX_communication_start(comm);