Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MSG_task_dsend function to MSG.
authorcristianrosa <cristianrosa@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 31 Jan 2011 13:24:36 +0000 (13:24 +0000)
committercristianrosa <cristianrosa@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 31 Jan 2011 13:24:36 +0000 (13:24 +0000)
MSG_task_dsend performs a best effort asynchronous send or dettached send.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9529 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/msg/msg.h
include/simix/simix.h
src/gras/Msg/sg_msg.c
src/msg/gos.c
src/msg/msg_mailbox.c
src/simix/network_private.h
src/simix/smurf_private.h
src/simix/smx_network.c
src/simix/smx_smurf.c
src/simix/smx_user.c
src/smpi/smpi_base.c

index 48c42b0..6764b1a 100644 (file)
@@ -178,6 +178,7 @@ XBT_PUBLIC(MSG_error_t)
     MSG_task_receive(m_task_t * task, const char *alias);
 
 XBT_PUBLIC(msg_comm_t) MSG_task_isend(m_task_t task, const char *alias);
     MSG_task_receive(m_task_t * task, const char *alias);
 
 XBT_PUBLIC(msg_comm_t) MSG_task_isend(m_task_t task, const char *alias);
+XBT_PUBLIC(void) MSG_task_dsend(m_task_t task, const char *alias);
 XBT_PUBLIC(msg_comm_t) MSG_task_irecv(m_task_t * task, const char *alias);
 XBT_PUBLIC(int) MSG_comm_test(msg_comm_t comm);
 XBT_PUBLIC(int) MSG_comm_testany(xbt_dynar_t comms);
 XBT_PUBLIC(msg_comm_t) MSG_task_irecv(m_task_t * task, const char *alias);
 XBT_PUBLIC(int) MSG_comm_test(msg_comm_t comm);
 XBT_PUBLIC(int) MSG_comm_testany(xbt_dynar_t comms);
index 956902e..0340009 100644 (file)
@@ -161,7 +161,7 @@ XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
                                            double rate, void *src_buff,
                                            size_t src_buff_size,
                                            int (*match_fun)(void *, void *),
                                            double rate, void *src_buff,
                                            size_t src_buff_size,
                                            int (*match_fun)(void *, void *),
-                                           void *data);
+                                           void *data, int detached);
 
 XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
                                            size_t * dst_buff_size,
 
 XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
                                            size_t * dst_buff_size,
index b2e4dfd..06bebc5 100644 (file)
@@ -241,7 +241,7 @@ void gras_msg_send_ext(gras_socket_t sock,
                                                 payload, msg->payl);
   }
 
                                                 payload, msg->payl);
   }
 
-  comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), NULL, msg);
+  comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), NULL, msg, 0);
   SIMIX_req_comm_wait(comm, -1);
 
   VERB0("Message sent (and received)");
   SIMIX_req_comm_wait(comm, -1);
 
   VERB0("Message sent (and received)");
index e581ab0..9ab4dc0 100644 (file)
@@ -407,12 +407,53 @@ msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
   comm->status = MSG_OK;
   comm->s_comm =
     SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
   comm->status = MSG_OK;
   comm->s_comm =
     SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
-                         t_simdata->rate, task, sizeof(void *), NULL, NULL);
+                         t_simdata->rate, task, sizeof(void *), NULL, NULL, 0);
   t_simdata->comm = comm->s_comm; /* FIXME: is the field t_simdata->comm still useful? */
 
   return comm;
 }
 
   t_simdata->comm = comm->s_comm; /* FIXME: is the field t_simdata->comm still useful? */
 
   return comm;
 }
 
+/** \ingroup msg_gos_functions
+ * \brief Sends a task on a mailbox.
+ *
+ * This is a non blocking detached send function.
+ * Think of it as a best effort send. The communication
+ * object will be destroyed by the receiver (if any).
+ *
+ * \param task a #m_task_t to send on another location.
+ * \param alias name of the mailbox to sent the task to
+ */
+void MSG_task_dsend(m_task_t task, const char *alias)
+{
+  simdata_task_t t_simdata = NULL;
+  m_process_t process = MSG_process_self();
+  msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
+
+  CHECK_HOST();
+
+  /* FIXME: these functions are not traceable */
+
+  /* Prepare the task to send */
+  t_simdata = task->simdata;
+  t_simdata->sender = process;
+  t_simdata->source = MSG_host_self();
+
+  xbt_assert0(t_simdata->isused == 0,
+              "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
+
+  t_simdata->isused = 1;
+  msg_global->sent_msg++;
+
+  /* Send it by calling SIMIX network layer */
+  msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
+  comm->task_sent = task;
+  comm->task_received = NULL;
+  comm->status = MSG_OK;
+    SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
+                         t_simdata->rate, task, sizeof(void *), NULL, NULL, 1);
+    /*t_simdata->comm = comm->s_comm;  FIXME: is the field t_simdata->comm still useful? */
+}
+
 /** \ingroup msg_gos_functions
  * \brief Starts listening for receiving a task from an asynchronous communication.
  *
 /** \ingroup msg_gos_functions
  * \brief Starts listening for receiving a task from an asynchronous communication.
  *
index 3aa3a90..07e19c4 100644 (file)
@@ -164,7 +164,7 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
   /* Try to send it by calling SIMIX network layer */
   TRY {
     comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
   /* Try to send it by calling SIMIX network layer */
   TRY {
     comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
-             t_simdata->rate, task, sizeof(void *), NULL, NULL);
+             t_simdata->rate, task, sizeof(void *), NULL, NULL, 0);
     t_simdata->comm = comm;
 #ifdef HAVE_TRACING
     SIMIX_req_set_category(comm, task->category);
     t_simdata->comm = comm;
 #ifdef HAVE_TRACING
     SIMIX_req_set_category(comm, task->category);
index 8d65b75..2f68f85 100644 (file)
@@ -32,7 +32,8 @@ smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv);
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*)(void *, void *), void *data);
+                              int (*)(void *, void *), void *data,
+                              int detached);
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
                               void *dst_buff, size_t *dst_buff_size,
                               int (*)(void *, void *), void *data);
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
                               void *dst_buff, size_t *dst_buff_size,
                               int (*)(void *, void *), void *data);
index 972c6bc..f7d17fc 100644 (file)
@@ -313,6 +313,7 @@ typedef struct s_smx_req {
       size_t src_buff_size;
       int (*match_fun)(void *, void *);
       void *data;
       size_t src_buff_size;
       int (*match_fun)(void *, void *);
       void *data;
+      int detached;
       smx_action_t result;
     } comm_isend;
 
       smx_action_t result;
     } comm_isend;
 
index d1c83d3..11b84c6 100644 (file)
@@ -246,7 +246,8 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*match_fun)(void *, void *), void *data)
+                              int (*match_fun)(void *, void *), void *data,
+                              int detached)
 {
   smx_action_t action;
 
 {
   smx_action_t action;
 
@@ -262,6 +263,11 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
     action->comm.type = SIMIX_COMM_READY;
   }
 
     action->comm.type = SIMIX_COMM_READY;
   }
 
+  /* If the communication action is detached then decrease the refcount
+   * by one, so it will be eliminated by the receivers destroy call */
+  if(detached)
+    action->comm.refcount--;
+
   /* Setup the communication request */
   action->comm.src_proc = src_proc;
   action->comm.task_size = task_size;
   /* Setup the communication request */
   action->comm.src_proc = src_proc;
   action->comm.task_size = task_size;
index bfc0443..139d06b 100644 (file)
@@ -100,7 +100,8 @@ void SIMIX_request_pre(smx_req_t req, int value)
           req->comm_isend.src_buff,
           req->comm_isend.src_buff_size,
           req->comm_isend.match_fun,
           req->comm_isend.src_buff,
           req->comm_isend.src_buff_size,
           req->comm_isend.match_fun,
-          req->comm_isend.data);
+          req->comm_isend.data,
+          req->comm_isend.detached);
       SIMIX_request_answer(req);
       break;
 
       SIMIX_request_answer(req);
       break;
 
index c426229..3096246 100644 (file)
@@ -618,7 +618,8 @@ smx_action_t SIMIX_req_rdv_get_head(smx_rdv_t rdv)
 
 smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
 
 smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*match_fun)(void *, void *), void *data)
+                              int (*match_fun)(void *, void *), void *data,
+                              int detached)
 {
   smx_req_t req = SIMIX_req_mine();
 
 {
   smx_req_t req = SIMIX_req_mine();
 
@@ -632,6 +633,7 @@ smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
   req->comm_isend.src_buff_size = src_buff_size;
   req->comm_isend.match_fun = match_fun;
   req->comm_isend.data = data;
   req->comm_isend.src_buff_size = src_buff_size;
   req->comm_isend.match_fun = match_fun;
   req->comm_isend.data = data;
+  req->comm_isend.detached = detached;
 
   SIMIX_request_push();
   return req->comm_isend.result;
 
   SIMIX_request_push();
   return req->comm_isend.result;
index 6822762..8bb4d4f 100644 (file)
@@ -99,7 +99,7 @@ void smpi_mpi_start(MPI_Request request)
     print_request("New send", request);
     mailbox = smpi_process_remote_mailbox(request->dst);
     request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
     print_request("New send", request);
     mailbox = smpi_process_remote_mailbox(request->dst);
     request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
-                                           request->buf, request->size, &match_send, request);
+                                           request->buf, request->size, &match_send, request, 0);
 #ifdef HAVE_TRACING
     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
 #endif
 #ifdef HAVE_TRACING
     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
 #endif