Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Simix: clean unfinished comms when terminating a process
authorChristophe Thiéry <christopho128@gmail.com>
Fri, 14 Oct 2011 13:38:20 +0000 (15:38 +0200)
committerChristophe Thiéry <christopho128@gmail.com>
Fri, 14 Oct 2011 13:38:20 +0000 (15:38 +0200)
src/simix/process_private.h
src/simix/smx_network.c
src/simix/smx_process.c
src/simix/smx_user.c

index c832f6d..cfc9501 100644 (file)
@@ -26,7 +26,8 @@ typedef struct s_smx_process {
   int blocked:1;
   int suspended:1;
   smx_host_t new_host;          /* if not null, the host on which the process must migrate to */
   int blocked:1;
   int suspended:1;
   smx_host_t new_host;          /* if not null, the host on which the process must migrate to */
-  smx_action_t waiting_action;
+  smx_action_t waiting_action;  /* the current blocking action if any */
+  xbt_fifo_t comms;       /* the current non-blocking communication actions */
   xbt_dict_t properties;
   s_smx_req_t request;
   void *data;                   /* kept for compatibility, it should be replaced with moddata */
   xbt_dict_t properties;
   s_smx_req_t request;
   void *data;                   /* kept for compatibility, it should be replaced with moddata */
index 1325aac..3c4440e 100644 (file)
@@ -63,8 +63,7 @@ void SIMIX_rdv_destroy(smx_rdv_t rdv)
 void SIMIX_rdv_free(void *data)
 {
   smx_rdv_t rdv = (smx_rdv_t) data;
 void SIMIX_rdv_free(void *data)
 {
   smx_rdv_t rdv = (smx_rdv_t) data;
-  if (rdv->name)
-    xbt_free(rdv->name);
+  xbt_free(rdv->name);
   xbt_fifo_free(rdv->comm_fifo);
   xbt_free(rdv);  
 }
   xbt_fifo_free(rdv->comm_fifo);
   xbt_free(rdv);  
 }
@@ -310,9 +309,10 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
     action->state = SIMIX_READY;
     action->comm.type = SIMIX_COMM_READY;
   }
     action->state = SIMIX_READY;
     action->comm.type = SIMIX_COMM_READY;
   }
+  xbt_fifo_push(src_proc->comms, action);
 
 
-  /* If the communication action is detached then decrease the refcount
-   * by one, so it will be eliminated by the receivers destroy call */
+  /* if the communication action is detached then decrease the refcount
+   * by one, so it will be eliminated by the receiver's destroy call */
   if (detached) {
     action->comm.detached = 1;
     action->comm.refcount--;
   if (detached) {
     action->comm.detached = 1;
     action->comm.refcount--;
@@ -353,6 +353,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
     action->state = SIMIX_READY;
     action->comm.type = SIMIX_COMM_READY;
   }
     action->state = SIMIX_READY;
     action->comm.type = SIMIX_COMM_READY;
   }
+  xbt_fifo_push(dst_proc->comms, action);
 
   /* Setup communication request */
   action->comm.dst_proc = dst_proc;
 
   /* Setup communication request */
   action->comm.dst_proc = dst_proc;
@@ -513,6 +514,7 @@ XBT_INLINE void SIMIX_comm_start(smx_action_t action)
 {
   /* If both the sender and the receiver are already there, start the communication */
   if (action->state == SIMIX_READY) {
 {
   /* If both the sender and the receiver are already there, start the communication */
   if (action->state == SIMIX_READY) {
+
     smx_host_t sender = action->comm.src_proc->smx_host;
     smx_host_t receiver = action->comm.dst_proc->smx_host;
 
     smx_host_t sender = action->comm.src_proc->smx_host;
     smx_host_t receiver = action->comm.dst_proc->smx_host;
 
@@ -673,13 +675,21 @@ void SIMIX_post_comm(smx_action_t action)
   else
     action->state = SIMIX_DONE;
 
   else
     action->state = SIMIX_DONE;
 
-  XBT_DEBUG("SIMIX_post_comm: action state = %d", action->state);
+  XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
+      action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
+
+  /* remove the action from pending communications of both processes (if they still exist) */
+  if (action->comm.src_proc) {
+    xbt_fifo_remove(action->comm.src_proc->comms, action);
+  }
+  if (action->comm.dst_proc) {
+    xbt_fifo_remove(action->comm.dst_proc->comms, action);
+  }
 
 
-  /* After this point the surf actions associated with the simix communicate
-     action are no longer needed, thus we delete them. */
+  /* destroy the surf actions associated with the Simix communication */
   SIMIX_comm_destroy_internal_actions(action);
 
   SIMIX_comm_destroy_internal_actions(action);
 
-  /* If there are requests associated with the action, then answer them */
+  /* if there are requests associated with the action, then answer them */
   if (xbt_fifo_size(action->request_list))
     SIMIX_comm_finish(action);
 }
   if (xbt_fifo_size(action->request_list))
     SIMIX_comm_finish(action);
 }
index 7a169dd..7138375 100644 (file)
@@ -35,7 +35,38 @@ XBT_INLINE smx_process_t SIMIX_process_self(void)
  */
 void SIMIX_process_cleanup(smx_process_t process)
 {
  */
 void SIMIX_process_cleanup(smx_process_t process)
 {
-  XBT_DEBUG("Cleanup process %s", process->name);
+  XBT_DEBUG("Cleanup process %s (%p), waiting action %p",
+      process->name, process, process->waiting_action);
+
+  /* cancel non-blocking communications */
+  smx_action_t action;
+  while ((action = xbt_fifo_pop(process->comms))) {
+
+    /* make sure no one will finish the comm after this process is destroyed */
+    SIMIX_comm_cancel(action);
+
+    if (action->comm.src_proc == process) {
+      XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d",
+          action, action->comm.detached, action->state);
+      action->comm.src_proc = NULL;
+
+      if (action->comm.detached) {
+        /* the receiver was supposed to destroy the comm after completion,
+         * but the comm will actually never finish */
+        action->comm.refcount++;
+      }
+    }
+    else if (action->comm.dst_proc == process){
+      XBT_DEBUG("Found an unfinished recv comm %p, state %d", action, action->state);
+      action->comm.dst_proc = NULL;
+    }
+    else {
+      THROW_IMPOSSIBLE;
+    }
+
+    SIMIX_comm_destroy(action);
+  }
+
   /*xbt_swag_remove(process, simix_global->process_to_run);*/
   xbt_swag_remove(process, simix_global->process_list);
   xbt_swag_remove(process, process->smx_host->process_list);
   /*xbt_swag_remove(process, simix_global->process_to_run);*/
   xbt_swag_remove(process, simix_global->process_list);
   xbt_swag_remove(process, process->smx_host->process_list);
@@ -61,6 +92,8 @@ void SIMIX_process_empty_trash(void)
     if (process->properties)
       xbt_dict_free(&process->properties);
 
     if (process->properties)
       xbt_dict_free(&process->properties);
 
+    xbt_fifo_free(process->comms);
+
     free(process->name);
     process->name = NULL;
     free(process);
     free(process->name);
     process->name = NULL;
     free(process);
@@ -143,6 +176,7 @@ void SIMIX_process_create(smx_process_t *process,
     (*process)->name = xbt_strdup(name);
     (*process)->smx_host = host;
     (*process)->data = data;
     (*process)->name = xbt_strdup(name);
     (*process)->smx_host = host;
     (*process)->data = data;
+    (*process)->comms = xbt_fifo_new();
 
     XBT_VERB("Create context %s", (*process)->name);
     (*process)->context = SIMIX_context_new(code, argc, argv,
 
     XBT_VERB("Create context %s", (*process)->name);
     (*process)->context = SIMIX_context_new(code, argc, argv,
@@ -200,9 +234,8 @@ void SIMIX_process_kill(smx_process_t process) {
   process->blocked = 0;
   process->suspended = 0;
   /* FIXME: set doexception to 0 also? */
   process->blocked = 0;
   process->suspended = 0;
   /* FIXME: set doexception to 0 also? */
-  /* FIXME: asynchronous communication actions are not destroyed:
-   * another process may find a matching rdv with me after I'm freed */
 
 
+  /* destroy the blocking action if any */
   if (process->waiting_action) {
 
     switch (process->waiting_action->type) {
   if (process->waiting_action) {
 
     switch (process->waiting_action->type) {
index 243adf3..464fc34 100644 (file)
@@ -807,8 +807,7 @@ void SIMIX_req_comm_destroy(smx_action_t comm)
 {
   xbt_assert(comm, "Invalid parameter");
 
 {
   xbt_assert(comm, "Invalid parameter");
 
-  /* FIXME remove this request type: comms are auto-destroyed now,
-   * but what happens with unfinished comms? */
+  /* FIXME remove this request type: comms are auto-destroyed now */
 
   /*
   smx_req_t req = SIMIX_req_mine();
 
   /*
   smx_req_t req = SIMIX_req_mine();