Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add support for custom communication matching to SIMIX network interface
authorcristianrosa <cristianrosa@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 7 Dec 2010 14:02:54 +0000 (14:02 +0000)
committercristianrosa <cristianrosa@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 7 Dec 2010 14:02:54 +0000 (14:02 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9063 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/simix/simix.h
src/gras/Msg/sg_msg.c
src/gras/Transport/transport_plugin_sg.c
src/msg/gos.c
src/msg/msg_mailbox.c
src/simix/network_private.h
src/simix/smurf_private.h
src/simix/smx_network.c
src/simix/smx_smurf.c
src/simix/smx_user.c
src/smpi/smpi_base.c

index 8270b26..dfe2f25 100644 (file)
@@ -164,10 +164,13 @@ XBT_PUBLIC(smx_action_t) SIMIX_req_rdv_get_head(smx_rdv_t rdv);
 XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
                                            double rate, void *src_buff,
                                            size_t src_buff_size,
 XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
                                            double rate, void *src_buff,
                                            size_t src_buff_size,
+                                           int (*match_fun)(void *, void *),
                                            void *data);
 
 XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
                                            void *data);
 
 XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
-                                           size_t * dst_buff_size);
+                                           size_t * dst_buff_size,
+                                           int (*match_fun)(void *, void *),
+                                           void *data);
 
 XBT_PUBLIC(void) SIMIX_req_comm_destroy(smx_action_t comm);
 
 
 XBT_PUBLIC(void) SIMIX_req_comm_destroy(smx_action_t comm);
 
index 1f586f3..59d8cd0 100644 (file)
@@ -183,7 +183,7 @@ gras_msg_t gras_msg_recv_any(void)
   sock_data->comm_recv =
       SIMIX_req_comm_irecv(gras_socket_im_the_server(sock) ?
                           sock_data->rdv_server : sock_data->rdv_client,
   sock_data->comm_recv =
       SIMIX_req_comm_irecv(gras_socket_im_the_server(sock) ?
                           sock_data->rdv_server : sock_data->rdv_client,
-                          NULL, 0);
+                          NULL, 0, NULL, NULL);
 
   return msg;
 }
 
   return msg;
 }
@@ -241,7 +241,7 @@ void gras_msg_send_ext(gras_socket_t sock,
                                                 payload, msg->payl);
   }
 
                                                 payload, msg->payl);
   }
 
-  comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), msg);
+  comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), NULL, msg);
   SIMIX_req_comm_wait(comm, -1);
 
   VERB0("Message sent (and received)");
   SIMIX_req_comm_wait(comm, -1);
 
   VERB0("Message sent (and received)");
index af76cb1..05590a6 100644 (file)
@@ -180,7 +180,7 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self,
   /* initialize synchronization stuff on the socket */
   data->rdv_server = pr->rdv;
   data->rdv_client = SIMIX_req_rdv_create(NULL);
   /* initialize synchronization stuff on the socket */
   data->rdv_server = pr->rdv;
   data->rdv_client = SIMIX_req_rdv_create(NULL);
-  data->comm_recv = SIMIX_req_comm_irecv(data->rdv_client, NULL, 0);
+  data->comm_recv = SIMIX_req_comm_irecv(data->rdv_client, NULL, 0, NULL, NULL);
 
   /* connect that simulation data to the socket */
   sock->data = data;
 
   /* connect that simulation data to the socket */
   sock->data = data;
@@ -227,7 +227,7 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, int port, gras_socket_t s
   data->client = NULL;
   data->rdv_server = pr->rdv;
   data->rdv_client = NULL;
   data->client = NULL;
   data->rdv_server = pr->rdv;
   data->rdv_client = NULL;
-  data->comm_recv = SIMIX_req_comm_irecv(pr->rdv, NULL, 0);
+  data->comm_recv = SIMIX_req_comm_irecv(pr->rdv, NULL, 0, NULL, NULL);
 
   sock->data = data;
 
 
   sock->data = data;
 
index 899b135..45fdce4 100644 (file)
@@ -414,7 +414,7 @@ msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
   /* Send it by calling SIMIX network layer */
 
   return SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
   /* Send it by calling SIMIX network layer */
 
   return SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
-                             t_simdata->rate, task, sizeof(void *),
+                             t_simdata->rate, task, sizeof(void *), NULL,
                              &t_simdata->comm);
 }
 
                              &t_simdata->comm);
 }
 
@@ -444,7 +444,7 @@ msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias)
         ("MSG_task_get() was asked to write in a non empty task struct.");
 
   /* Try to receive it by calling SIMIX network layer */
         ("MSG_task_get() was asked to write in a non empty task struct.");
 
   /* Try to receive it by calling SIMIX network layer */
-  return SIMIX_req_comm_irecv(rdv, task, NULL);
+  return SIMIX_req_comm_irecv(rdv, task, NULL, NULL, NULL);
 }
 
 /** \ingroup msg_gos_functions
 }
 
 /** \ingroup msg_gos_functions
index ab9e1ec..dda4eb5 100644 (file)
@@ -95,7 +95,7 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
 
   /* Try to receive it by calling SIMIX network layer */
   TRY {
 
   /* Try to receive it by calling SIMIX network layer */
   TRY {
-    comm = SIMIX_req_comm_irecv(mailbox, task, NULL);
+    comm = SIMIX_req_comm_irecv(mailbox, task, NULL, NULL, NULL);
     SIMIX_req_comm_wait(comm, timeout);
     SIMIX_req_comm_destroy(comm);
     DEBUG2("Got task %s from %p",(*task)->name,mailbox);
     SIMIX_req_comm_wait(comm, timeout);
     SIMIX_req_comm_destroy(comm);
     DEBUG2("Got task %s from %p",(*task)->name,mailbox);
@@ -162,7 +162,7 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
   /* Try to send it by calling SIMIX network layer */
   TRY {
     t_simdata->comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
   /* Try to send it by calling SIMIX network layer */
   TRY {
     t_simdata->comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
-                       t_simdata->rate, task, sizeof(void *), task);
+                       t_simdata->rate, task, sizeof(void *), NULL, task);
 #ifdef HAVE_TRACING
     SIMIX_req_set_category(t_simdata->comm, task->category);
 #endif
 #ifdef HAVE_TRACING
     SIMIX_req_set_category(t_simdata->comm, task->category);
 #endif
index 721bc91..32b8777 100644 (file)
@@ -31,9 +31,11 @@ int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host);
 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv);
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv);
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
-                              void *src_buff, size_t src_buff_size, void *data);
+                              void *src_buff, size_t src_buff_size,
+                              int (*)(void *, void *), void *data);
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
-                              void *dst_buff, size_t *dst_buff_size);
+                              void *dst_buff, size_t *dst_buff_size,
+                              int (*)(void *, void *), void *data);
 void SIMIX_comm_destroy(smx_action_t action);
 void SIMIX_comm_destroy_internal_actions(smx_action_t action);
 void SIMIX_pre_comm_wait(smx_req_t req);
 void SIMIX_comm_destroy(smx_action_t action);
 void SIMIX_comm_destroy_internal_actions(smx_action_t action);
 void SIMIX_pre_comm_wait(smx_req_t req);
index c031459..8e65753 100644 (file)
@@ -277,14 +277,17 @@ typedef struct s_smx_req {
       double rate;
       void *src_buff;
       size_t src_buff_size;
       double rate;
       void *src_buff;
       size_t src_buff_size;
+      int (*match_fun)(void *, void *);
       void *data;
       void *data;
-      smx_action_t result;      
+      smx_action_t result;
     } comm_isend;
 
     struct {
       smx_rdv_t rdv;
       void *dst_buff;
       size_t *dst_buff_size;
     } comm_isend;
 
     struct {
       smx_rdv_t rdv;
       void *dst_buff;
       size_t *dst_buff_size;
+      int (*match_fun)(void *, void *);
+         void *data;
       smx_action_t result;
     } comm_irecv;
 
       smx_action_t result;
     } comm_irecv;
 
index 47e52a0..f874dcc 100644 (file)
@@ -23,7 +23,8 @@ static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
                                                       double timeout);
 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
                                                       double timeout);
 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
-static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
+static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+                                                                                 int (*match_fun)(void *, void *), void *);
 static void SIMIX_rdv_free(void *data);
 
 void SIMIX_network_init(void)
 static void SIMIX_rdv_free(void *data);
 
 void SIMIX_network_init(void)
@@ -122,17 +123,19 @@ static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
  *  \param type The type of communication we are looking for (comm_send, comm_recv)
  *  \return The communication request if found, NULL otherwise
  */
  *  \param type The type of communication we are looking for (comm_send, comm_recv)
  *  \return The communication request if found, NULL otherwise
  */
-smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
-{
-  smx_action_t comm = (smx_action_t)
-      xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
-
-  if (comm && comm->comm.type == type) {
-    DEBUG0("Communication request found!");
-    xbt_fifo_shift(rdv->comm_fifo);
-    comm->comm.refcount++;
-    comm->comm.rdv = NULL;
-    return comm;
+smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+                                                                  int (*match_fun)(void *, void *), void *data)
+{
+  smx_action_t req;
+  xbt_fifo_item_t item;
+
+  xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){
+       if(req->comm.type == type && (!match_fun || match_fun(data, req->comm.data))){
+         xbt_fifo_remove_item(rdv->comm_fifo, item);
+         req->comm.refcount++;
+         req->comm.rdv = NULL;
+         return req;
+       }
   }
 
   DEBUG0("Communication request not found");
   }
 
   DEBUG0("Communication request not found");
@@ -232,13 +235,14 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
 
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
 
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
-                              void *src_buff, size_t src_buff_size, void *data)
+                              void *src_buff, size_t src_buff_size,
+                              int (*match_fun)(void *, void *), void *data)
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
      If it is not found then create it and push it into the rendez-vous point */
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
      If it is not found then create it and push it into the rendez-vous point */
-  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
+  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_SEND);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_SEND);
@@ -266,14 +270,15 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
-                      void *dst_buff, size_t *dst_buff_size)
+                      void *dst_buff, size_t *dst_buff_size,
+                      int (*match_fun)(void *, void *), void *data)
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
    * If it is not found then create it and push it into the rendez-vous point
    */
 {
   smx_action_t action;
 
   /* Look for communication request matching our needs.
    * If it is not found then create it and push it into the rendez-vous point
    */
-  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
+  action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
 
   if (!action) {
     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
index b6c476d..b01ce7b 100644 (file)
@@ -311,6 +311,7 @@ void SIMIX_request_pre(smx_req_t req)
          req->comm_isend.rate,
          req->comm_isend.src_buff,
          req->comm_isend.src_buff_size,
          req->comm_isend.rate,
          req->comm_isend.src_buff,
          req->comm_isend.src_buff_size,
+         req->comm_isend.match_fun,
          req->comm_isend.data);
       SIMIX_request_answer(req);
       break;
          req->comm_isend.data);
       SIMIX_request_answer(req);
       break;
@@ -320,7 +321,9 @@ void SIMIX_request_pre(smx_req_t req)
          req->issuer,
          req->comm_irecv.rdv,
          req->comm_irecv.dst_buff,
          req->issuer,
          req->comm_irecv.rdv,
          req->comm_irecv.dst_buff,
-         req->comm_irecv.dst_buff_size);
+         req->comm_irecv.dst_buff_size,
+         req->comm_irecv.match_fun,
+         req->comm_irecv.data);
       SIMIX_request_answer(req);
       break;
 
       SIMIX_request_answer(req);
       break;
 
index cb83fd9..1280e30 100644 (file)
@@ -597,7 +597,8 @@ smx_action_t SIMIX_req_rdv_get_head(smx_rdv_t rdv)
 }
 
 smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
 }
 
 smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
-                              void *src_buff, size_t src_buff_size, void *data)
+                              void *src_buff, size_t src_buff_size,
+                              int (*match_fun)(void *, void *), void *data)
 {
   s_smx_req_t req;
 
 {
   s_smx_req_t req;
 
@@ -609,13 +610,15 @@ smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
   req.comm_isend.rate = rate;
   req.comm_isend.src_buff = src_buff;
   req.comm_isend.src_buff_size = src_buff_size;
   req.comm_isend.rate = rate;
   req.comm_isend.src_buff = src_buff;
   req.comm_isend.src_buff_size = src_buff_size;
+  req.comm_isend.match_fun = match_fun;
   req.comm_isend.data = data;
 
   SIMIX_request_push(&req);
   return req.comm_isend.result;
 }
 
   req.comm_isend.data = data;
 
   SIMIX_request_push(&req);
   return req.comm_isend.result;
 }
 
-smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size)
+smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
+                                                                 int (*match_fun)(void *, void *), void *data)
 {
   s_smx_req_t req;
 
 {
   s_smx_req_t req;
 
@@ -625,6 +628,8 @@ smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_bu
   req.comm_irecv.rdv = rdv;
   req.comm_irecv.dst_buff = dst_buff;
   req.comm_irecv.dst_buff_size = dst_buff_size;
   req.comm_irecv.rdv = rdv;
   req.comm_irecv.dst_buff = dst_buff;
   req.comm_irecv.dst_buff_size = dst_buff_size;
+  req.comm_irecv.match_fun = match_fun;
+  req.comm_irecv.data = data;
 
   SIMIX_request_push(&req);
   return req.comm_irecv.result;
 
   SIMIX_request_push(&req);
   return req.comm_irecv.result;
index e1c5fef..0581cfe 100644 (file)
@@ -73,13 +73,13 @@ void smpi_mpi_start(MPI_Request request)
     smpi_process_post_recv(request);
     print_request("New recv", request);
     request->pair =
     smpi_process_post_recv(request);
     print_request("New recv", request);
     request->pair =
-        SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size);
+        SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size, NULL, NULL);
   } else {
     smpi_process_post_send(request->comm, request);     // FIXME
     print_request("New send", request);
     request->pair =
         SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
   } else {
     smpi_process_post_send(request->comm, request);     // FIXME
     print_request("New send", request);
     request->pair =
         SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
-                            request->buf, request->size, NULL);
+                            request->buf, request->size, NULL, NULL);
 #ifdef HAVE_TRACING
     SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category());
 #endif
 #ifdef HAVE_TRACING
     SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category());
 #endif