Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SMPI: before terminating a process, wait for its asynchronous comms
authorChristophe Thiéry <christopho128@gmail.com>
Thu, 5 Jan 2012 17:10:46 +0000 (18:10 +0100)
committerChristophe Thiéry <christopho128@gmail.com>
Thu, 5 Jan 2012 17:10:46 +0000 (18:10 +0100)
include/simix/simix.h
src/simix/smx_process.c
src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_global.c
src/smpi/smpi_pmpi.c

index f25bf3d..96230b5 100644 (file)
@@ -67,6 +67,7 @@ XBT_PUBLIC(void) SIMIX_process_self_set_data(smx_process_t self, void *data);
 XBT_PUBLIC(void*) SIMIX_process_self_get_data(smx_process_t self);
 XBT_PUBLIC(smx_context_t) SIMIX_process_get_context(smx_process_t);
 XBT_PUBLIC(void) SIMIX_process_set_context(smx_process_t p,smx_context_t c);
+XBT_PUBLIC(int) SIMIX_process_has_pending_comms(smx_process_t process);
 
 /****************************** Communication *********************************/
 XBT_PUBLIC(void) SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t));
index e57176a..bd34137 100644 (file)
@@ -30,7 +30,16 @@ XBT_INLINE smx_process_t SIMIX_process_self(void)
 }
 
 /**
- * \brief Move a process to the list of processes to destroy.
+ * \brief Returns whether a process has pending asynchronous communications.
+ * \return true if there are asynchronous communications in this process
+ */
+int SIMIX_process_has_pending_comms(smx_process_t process) {
+
+  return xbt_fifo_size(process->comms) > 0;
+}
+
+/**
+ * \brief Moves a process to the list of processes to destroy.
  */
 void SIMIX_process_cleanup(smx_process_t process)
 {
index d457c76..6105faa 100644 (file)
@@ -40,6 +40,7 @@ typedef struct s_smpi_mpi_request {
 
 void smpi_process_init(int *argc, char ***argv);
 void smpi_process_destroy(void);
+void smpi_process_finalize(void);
 
 smpi_process_data_t smpi_process_data(void);
 smpi_process_data_t smpi_process_remote_data(int index);
index 8a02def..076cd42 100644 (file)
@@ -294,8 +294,10 @@ int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
 {
   print_request("Waiting", *request);
-  if ((*request)->action != NULL ) 
-        SIMIX_req_comm_wait((*request)->action, -1.0);
+  if ((*request)->action != NULL) {
+
+      SIMIX_req_comm_wait((*request)->action, -1.0);
+  }
   finish_wait(request, status);
 }
 
index 6fa73d2..470be9c 100644 (file)
@@ -70,6 +70,17 @@ void smpi_process_destroy(void)
   XBT_DEBUG("<%d> Process left the game", index);
 }
 
+/**
+ * @brief Prepares the current process for termination.
+ */
+void smpi_process_finalize(void)
+{
+  // wait for all pending asynchronous comms to finish
+  while (SIMIX_process_has_pending_comms(SIMIX_process_self())) {
+    SIMIX_req_process_sleep(1);
+  }
+}
+
 int smpi_process_argc(void) {
   smpi_process_data_t data = smpi_process_data();
 
index ec3da34..ba07a45 100644 (file)
@@ -36,6 +36,7 @@ int PMPI_Init(int *argc, char ***argv)
 
 int PMPI_Finalize(void)
 {
+  smpi_process_finalize();
   smpi_bench_end();
 #ifdef HAVE_TRACING
   TRACE_smpi_finalize(smpi_process_index());