Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Further investigate the dsend issue, in vain so far
authorMartin Quinson <martin.quinson@loria.fr>
Mon, 26 Dec 2011 21:55:19 +0000 (22:55 +0100)
committerMartin Quinson <martin.quinson@loria.fr>
Mon, 26 Dec 2011 21:55:19 +0000 (22:55 +0100)
- Ensure that it still compiles after the messed up merge (sorry)
- Add a simple dsend example
- Stop using the src_data to store the cleanup function since that
  field may be clear in MSG, but we store the MPI_status in there. So,
  add a cleanup_fun field down in the stacks so that both the cleanup
  function and the MPI_status can be stored
- Make that shit much more verbose (some messages should be removed
  once the bug is found, sorry for sharing my half backed changes)

16 files changed:
examples/smpi/CMakeLists.txt
examples/smpi/dsend.c [new file with mode: 0644]
include/simix/simix.h
src/gras/Msg/sg_msg.c
src/msg/msg_gos.c
src/msg/msg_mailbox.c
src/simix/network_private.h
src/simix/private.h
src/simix/smurf_private.h
src/simix/smx_network.c
src/simix/smx_process.c
src/simix/smx_smurf.c
src/simix/smx_user.c
src/smpi/smpi_base.c
src/surf/network.c
src/surf/surf.c

index 8300bfa..410e492 100644 (file)
@@ -21,6 +21,7 @@ add_executable(pingpong pingpong.c)
 add_executable(scatter scatter.c)
 add_executable(reduce reduce.c)
 add_executable(split split.c)
+add_executable(dsend dsend.c)
 add_executable(mvmul mvmul.c)
 add_executable(smpi_sendrecv sendrecv.c)
 add_executable(smpi_traced smpi_traced.c)
@@ -41,6 +42,7 @@ target_link_libraries(pingpong m simgrid smpi )
 target_link_libraries(scatter m simgrid smpi )
 target_link_libraries(reduce m simgrid smpi )
 target_link_libraries(split m simgrid smpi )
+target_link_libraries(dsend m simgrid smpi )
 target_link_libraries(mvmul m simgrid smpi )
 target_link_libraries(smpi_sendrecv m simgrid smpi )
 target_link_libraries(smpi_traced m simgrid smpi )
diff --git a/examples/smpi/dsend.c b/examples/smpi/dsend.c
new file mode 100644 (file)
index 0000000..511cf32
--- /dev/null
@@ -0,0 +1,31 @@
+/* Copyright (c) 2011. The SimGrid Team. All rights reserved.               */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This program simply does a very small exchange to test whether using SIMIX dsend to model the eager mode works */
+
+#include <stdio.h>
+#include <mpi.h>
+
+int main(int argc, char *argv[]) {
+  int rank;
+  int data=11;
+   
+  MPI_Init(&argc, &argv);
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+   
+  if (rank==1) {
+         data=22;
+         MPI_Send(&data,1,MPI_INT,(rank+1)%2,666,MPI_COMM_WORLD);
+  } else {
+         MPI_Recv(&data,1,MPI_INT,-1,666,MPI_COMM_WORLD,NULL);
+         if (data !=22) {
+                 printf("rank %d: Damn, data does not match (got %d)\n",rank, data);
+         }
+  }
+       
+  printf("rank %d: data exchanged\n", rank);
+  MPI_Finalize();
+  return 0;
+}
index cb702df..f25bf3d 100644 (file)
@@ -172,6 +172,7 @@ XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
                                               double rate, void *src_buff,
                                               size_t src_buff_size,
                                               int (*match_fun)(void *, void *),
+                                              void (*clean_fun)(void *),
                                               void *data, int detached);
 
 XBT_PUBLIC(void) SIMIX_req_comm_recv(smx_rdv_t rdv, void *dst_buff,
index 6cc6aa1..d404ecc 100644 (file)
@@ -238,7 +238,7 @@ void gras_msg_send_ext(gras_socket_t sock,
                                                 payload, msg->payl);
   }
 
-  comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, msg, sizeof(void *), NULL, msg, 0);
+  comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, msg, sizeof(void *), NULL,NULL, msg, 0);
   SIMIX_req_comm_wait(comm, -1);
 
   XBT_VERB("Message sent (and received)");
index 740412c..5346028 100644 (file)
@@ -416,7 +416,7 @@ XBT_INLINE msg_comm_t MSG_task_isend_with_matching(m_task_t task, const char *al
   comm->status = MSG_OK;
   comm->s_comm =
     SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
-                         t_simdata->rate, task, sizeof(void *), match_fun, match_data, 0);
+                         t_simdata->rate, task, sizeof(void *), match_fun, NULL, match_data, 0);
   t_simdata->comm = comm->s_comm; /* FIXME: is the field t_simdata->comm still useful? */
 
   return comm;
@@ -466,7 +466,7 @@ void MSG_task_dsend(m_task_t task, const char *alias, void_f_pvoid_t cleanup)
 
   /* Send it by calling SIMIX network layer */
   smx_action_t comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
-                       t_simdata->rate, task, sizeof(void *), NULL, cleanup, 1);
+                       t_simdata->rate, task, sizeof(void *), NULL,cleanup, NULL, 1);
   t_simdata->comm = comm;
 }
 
index 9872c2b..b537c04 100644 (file)
@@ -155,7 +155,7 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
   TRY {
       smx_action_t comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
                                   t_simdata->rate, task, sizeof(void *),
-                                  NULL, NULL, 0);
+                                  NULL, NULL, NULL, 0);
 #ifdef HAVE_TRACING
     if (TRACE_is_enabled()) {
       SIMIX_req_set_category(comm, task->category);
index 80c0a7c..b3c7478 100644 (file)
@@ -38,7 +38,9 @@ void SIMIX_comm_send(smx_process_t src_proc, smx_rdv_t rdv,
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*)(void *, void *), void *data,
+                              int (*)(void *, void *),
+                              void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+                              void *data,
                               int detached);
 void SIMIX_comm_recv(smx_process_t dst_proc, smx_rdv_t rdv,
                      void *dst_buff, size_t *dst_buff_size,
index 41694a8..4032673 100644 (file)
@@ -94,6 +94,8 @@ typedef struct s_smx_action {
       int refcount;                   /* Number of processes involved in the cond */
       int detached;                   /* If detached or not */
 
+      void (*clean_fun)(void*);       /* Function to clean the detached src_buf if something goes wrong */
+
       /* Surf action data */
       surf_action_t surf_comm;        /* The Surf communication action encapsulated */
       surf_action_t src_timeout;      /* Surf's actions to instrument the timeouts */
index 40ad900..6284d5b 100644 (file)
@@ -325,6 +325,7 @@ typedef struct s_smx_req {
       void *src_buff;
       size_t src_buff_size;
       int (*match_fun)(void *, void *);
+      void (*clean_fun)(void *);
       void *data;
       int detached;
       smx_action_t result;
index 7d629df..c359576 100644 (file)
@@ -266,7 +266,8 @@ void SIMIX_comm_destroy(smx_action_t action)
   if (action->comm.detached && action->state != SIMIX_DONE) {
     /* the communication has failed and was detached:
      * we have to free the buffer */
-    ((void_f_pvoid_t) action->comm.src_data)(action->comm.src_buff);
+    action->comm.clean_fun(action->comm.src_buff);
+    action->comm.src_buff = NULL;
   }
 
   xbt_mallocator_release(simix_global->action_mallocator, action);
@@ -296,7 +297,9 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*match_fun)(void *, void *), void *data,
+                              int (*match_fun)(void *, void *),
+                              void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+                              void *data,
                               int detached)
 {
   smx_action_t action;
@@ -319,6 +322,9 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
   if (detached) {
     action->comm.detached = 1;
     action->comm.refcount--;
+    action->comm.clean_fun = clean_fun;
+  } else {
+    action->comm.clean_fun = NULL;
   }
 
   /* Setup the communication request */
@@ -375,6 +381,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
 
 void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int idx)
 {
+
   /* the request may be a wait, a send or a recv */
   surf_action_t sleep;
 
@@ -400,12 +407,15 @@ void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int
     SIMIX_comm_finish(action);
     return;
   }
+       XBT_INFO("Comm_wait. state:%d; I'm %s",action->state,
+                       req->issuer == action->comm.src_proc?"sender":"receiver");
 
   /* If the action has already finish perform the error handling, */
   /* otherwise set up a waiting timeout on the right side         */
   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
     SIMIX_comm_finish(action);
   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
+       XBT_INFO("Not done, we need a sleep action");
     sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
     surf_workstation_model->action_data_set(sleep, action);
 
@@ -628,11 +638,18 @@ void SIMIX_comm_finish(smx_action_t action)
 
       case SIMIX_LINK_FAILURE:
         TRY {
-         XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
+         XBT_INFO("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
              action,
              action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
              action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
-             req->issuer->name, req->issuer);
+             req->issuer->name, req->issuer,action->comm.detached);
+         if (action->comm.src_proc == req->issuer) {
+                 XBT_INFO("I'm source");
+         } else if (action->comm.dst_proc == req->issuer) {
+                 XBT_INFO("I'm dest");
+         } else {
+                 XBT_INFO("I'm neither source nor dest");
+         }
           THROWF(network_error, 0, "Link failure");
         }
        CATCH(req->issuer->running_ctx->exception) {
@@ -698,9 +715,10 @@ void SIMIX_post_comm(smx_action_t action)
           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
      action->state = SIMIX_DST_HOST_FAILURE;
   else if (action->comm.surf_comm &&
-          surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
+          surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+         XBT_INFO("Puta madre. Surf says that the link broke");
      action->state = SIMIX_LINK_FAILURE;
-  else
+  else
     action->state = SIMIX_DONE;
 
   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
@@ -864,9 +882,12 @@ void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
 
 void smpi_comm_copy_data_callback(smx_action_t comm, size_t buff_size)
 {
+  XBT_INFO("Copy the data over");
   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
-  if (comm->comm.detached) // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
-         free(comm->comm.src_buff);
+  if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
+         comm->comm.clean_fun(comm->comm.src_buff);
+         comm->comm.src_buff = NULL;
+  }
 }
 
 /**
@@ -893,15 +914,8 @@ void SIMIX_comm_copy_data(smx_action_t comm)
   if (comm->comm.dst_buff_size)
     *comm->comm.dst_buff_size = buff_size;
 
-<<<<<<< HEAD
   if (buff_size > 0)
-    (*SIMIX_comm_copy_data_callback) (comm, buff_size);
-=======
-  if (buff_size == 0)
-    return;
-
-  SIMIX_comm_copy_data_callback(comm, buff_size);
->>>>>>> master
+    SIMIX_comm_copy_data_callback (comm, buff_size);
 
   /* Set the copied flag so we copy data only once */
   /* (this function might be called from both communication ends) */
index dcdadd2..a13e565 100644 (file)
@@ -51,11 +51,13 @@ void SIMIX_process_cleanup(smx_process_t process)
 
       if (action->comm.detached) {
          if (action->comm.refcount == 0) {
+           XBT_DEBUG("Increase the refcount before destroying it");
            /* I'm not supposed to destroy a detached comm from the sender side,
             * unless there is no receiver matching the rdv */
            action->comm.refcount++;
            SIMIX_comm_destroy(action);
          }
+         XBT_DEBUG("Don't destroy it since its refcount is %d",action->comm.refcount);
       }
       else {
         SIMIX_comm_destroy(action);
index c307cda..6a6a1e8 100644 (file)
@@ -70,6 +70,7 @@ void SIMIX_request_pre(smx_req_t req, int value)
           req->comm_send.src_buff,
           req->comm_send.src_buff_size,
           req->comm_send.match_fun,
+          NULL, /* no clean function since it's not detached */
           req->comm_send.data,
           0);
       SIMIX_pre_comm_wait(req, comm, req->comm_send.timeout, 0);
@@ -85,6 +86,7 @@ void SIMIX_request_pre(smx_req_t req, int value)
           req->comm_isend.src_buff,
           req->comm_isend.src_buff_size,
           req->comm_isend.match_fun,
+          req->comm_isend.clean_fun,
           req->comm_isend.data,
           req->comm_isend.detached);
       SIMIX_request_answer(req);
index e8b691f..09e26ea 100644 (file)
@@ -709,7 +709,7 @@ void SIMIX_req_comm_send(smx_rdv_t rdv, double task_size, double rate,
   if (MC_IS_ENABLED) {
     /* the model-checker wants two separate requests */
     smx_action_t comm = SIMIX_req_comm_isend(rdv, task_size, rate,
-        src_buff, src_buff_size, match_fun, data, 0);
+        src_buff, src_buff_size, match_fun, NULL, data, 0);
     SIMIX_req_comm_wait(comm, timeout);
   }
   else {
@@ -731,7 +731,9 @@ void SIMIX_req_comm_send(smx_rdv_t rdv, double task_size, double rate,
 
 smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
                               void *src_buff, size_t src_buff_size,
-                              int (*match_fun)(void *, void *), void *data,
+                              int (*match_fun)(void *, void *),
+                              void (*clean_fun)(void *),
+                              void *data,
                               int detached)
 {
   /* checking for infinite values */
@@ -749,6 +751,7 @@ smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
   req->comm_isend.src_buff = src_buff;
   req->comm_isend.src_buff_size = src_buff_size;
   req->comm_isend.match_fun = match_fun;
+  req->comm_isend.clean_fun = clean_fun;
   req->comm_isend.data = data;
   req->comm_isend.detached = detached;
 
index 0fc8226..51b0d9a 100644 (file)
@@ -84,6 +84,12 @@ MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
   return request;
 }
 
+static void myfree(void *d) {
+       xbt_backtrace_display_current();
+       XBT_INFO("myfree called on %p",d);
+       free(d);
+}
+
 void smpi_mpi_start(MPI_Request request)
 {
   smx_rdv_t mailbox;
@@ -107,13 +113,15 @@ void smpi_mpi_start(MPI_Request request)
        detached = 1;
        request->buf = malloc(request->size);
        memcpy(request->buf,oldbuf,request->size);
-       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
+       XBT_INFO("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
     } else {
        XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
     }
     request->action = 
                SIMIX_req_comm_isend(mailbox, request->size, -1.0,
-                                   request->buf, request->size, &match_send, request,  
+                                   request->buf, request->size,
+                                   &match_send,myfree, // cleanup using a simple free() FIXME: that may not be sufficient
+                                   request,
                                    // detach if msg size < eager/rdv switch limit
                                    detached);
 
index e81a17d..a0e07a7 100644 (file)
@@ -101,13 +101,6 @@ static double constant_bandwidth_constraint(double rate, double bound,
 /**********************/
 /*   SMPI callbacks   */
 /**********************/
-static double smpi_latency_factor(double size)
-{
-  /* 1 B <= size <= 1 KiB */
-  if (size <= 1024.0) {
-    return 1.0056;
-  }
-
 static double smpi_bandwidth_factor(double size)
 {
 
@@ -330,6 +323,7 @@ static int net_action_unref(surf_action_t action)
 
 static void net_action_cancel(surf_action_t action)
 {
+       XBT_DEBUG("cancel action %p",action);
   surf_network_model->action_state_set(action, SURF_ACTION_FAILED);
   if(network_update_mechanism == UM_LAZY){// remove action from the heap
     xbt_swag_remove(action, net_modified_set);
index 2fb0493..7005d3b 100644 (file)
@@ -452,7 +452,7 @@ double surf_solve(double max_date)
     }
   }
 
-  XBT_DEBUG("Min for resources (except NS3) : %f", min);
+  XBT_DEBUG("Min for resources (remember that NS3 dont update that value) : %f", min);
 
   XBT_DEBUG("Looking for next trace event");
 
@@ -477,7 +477,10 @@ double surf_solve(double max_date)
         min = model_next_action_end;
     }
 
-    if (next_event_date == -1.0) break;
+    if (next_event_date == -1.0) {
+       XBT_DEBUG("no next TRACE event. Stop searching for it");
+       break;
+    }
 
     if ((min != -1.0) && (next_event_date > NOW + min)) break;
 
@@ -502,11 +505,13 @@ double surf_solve(double max_date)
     }
   } while (1);
 
-  /* FIXME: Moved this test to here to avoid stoping simulation if there are actions running on cpus and all cpus are with availability = 0. 
+  /* FIXME: Moved this test to here to avoid stopping simulation if there are actions running on cpus and all cpus are with availability = 0.
    * This may cause an infinite loop if one cpu has a trace with periodicity = 0 and the other a trace with periodicity > 0.
    * The options are: all traces with same periodicity(0 or >0) or we need to change the way how the events are managed */
-  if (min < 0.0)
+  if (min < 0.0) {
+       XBT_DEBUG("No next event at all. Bail out now.");
     return -1.0;
+  }
 
   XBT_DEBUG("Duration set to %f", min);