From: mquinson Date: Wed, 24 Jun 2009 19:54:24 +0000 (+0000) Subject: SMPI: Change the way senders and receivers are stopped: main process kills its friend... X-Git-Tag: SVN~1298 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/1372e35c79dba117ae1b592394821a3f716a3f13?hp=cac63707a2f233d0a979a67403336292eb2a3038 SMPI: Change the way senders and receivers are stopped: main process kills its friends when leaving instead of a big armagedon at the end of simulation (one use less of the big process table) git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6342 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/simix/smx_process.c b/src/simix/smx_process.c index 76643e82dc..d715eab1f8 100644 --- a/src/simix/smx_process.c +++ b/src/simix/smx_process.c @@ -385,7 +385,7 @@ void SIMIX_process_resume(smx_process_t process) simdata = process->simdata; if (simdata->mutex) { DEBUG0("Resume process blocked on a mutex"); - simdata->suspended = 0; /* He'll wake up by itself */ + simdata->suspended = 0; /* It'll wake up by itself when mutex releases */ return; } else if (simdata->cond) { /* temporaries variables */ diff --git a/src/smpi/private.h b/src/smpi/private.h index 2db330e2ca..ff6bcb623f 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -95,7 +95,7 @@ typedef struct smpi_global_t { smx_process_t *main_processes; - int running_hosts_count; + int running_hosts_count; //FIXME: killme xbt_os_timer_t timer; smx_mutex_t timer_mutex; @@ -119,6 +119,8 @@ typedef struct smpi_host_data_t { smx_process_t sender; smx_process_t receiver; + int finalize; /* for main stopping sender&receiver */ + xbt_fifo_t pending_recv_request_queue; } s_smpi_host_data_t; typedef struct smpi_host_data_t *smpi_host_data_t; diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index d8fd995a56..eb9436f598 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -67,6 +67,7 @@ void smpi_process_init() hdata->index = i; hdata->mutex = SIMIX_mutex_init(); hdata->cond = SIMIX_cond_init(); + hdata->finalize = 0; hdata->pending_recv_request_queue = xbt_fifo_new(); @@ -91,10 +92,18 @@ void smpi_process_finalize() i = --smpi_global->running_hosts_count; - SIMIX_mutex_destroy(smpi_host_mutex()); - SIMIX_cond_destroy(smpi_host_cond()); + hdata->finalize = 2; /* Tell sender and receiver to quit */ + SIMIX_process_resume(hdata->sender); + SIMIX_process_resume(hdata->receiver); + while (hdata->finalize>0) { /* wait until it's done */ + SIMIX_cond_wait(hdata->cond,hdata->mutex); + } + + SIMIX_mutex_destroy(hdata->mutex); + SIMIX_cond_destroy(hdata->cond); xbt_fifo_free(hdata->pending_recv_request_queue); + if (0 >= i) { // wake up senders/receivers diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c index b81c997cc4..bd0d13fd84 100644 --- a/src/smpi/smpi_receiver.c +++ b/src/smpi/smpi_receiver.c @@ -12,8 +12,6 @@ int smpi_receiver(int argc, char*argv[]) xbt_fifo_t request_queue; xbt_fifo_t message_queue; - int running_hosts_count; - smpi_mpi_request_t request; smpi_received_message_t message; @@ -25,8 +23,7 @@ int smpi_receiver(int argc, char*argv[]) request_queue = mydata->pending_recv_request_queue; message_queue = smpi_global->received_message_queues[index]; - do { - + while (1) { // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? xbt_fifo_foreach(request_queue,request_item,request,smpi_mpi_request_t){ @@ -48,36 +45,40 @@ int smpi_receiver(int argc, char*argv[]) message = NULL; stopsearch: - if (NULL == request || NULL == message) { - SIMIX_process_suspend(self); + if (NULL != request) { + if (NULL == message) + DIE_IMPOSSIBLE; + + SIMIX_mutex_lock(request->mutex); + memcpy(request->buf, message->buf, + request->datatype->size * request->count); + request->src = message->src; + request->data = message->data; + request->forward = message->forward; + + if (0 == request->forward) { + request->completed = 1; + SIMIX_cond_broadcast(request->cond); + } else { + request->src = request->comm->index_to_rank_map[index]; + request->dst = (request->src + 1) % request->comm->size; + smpi_mpi_isend(request); + } + + SIMIX_mutex_unlock(request->mutex); + + xbt_free(message->buf); + xbt_mallocator_release(smpi_global->message_mallocator, message); + + } else if (mydata->finalize>0) { /* main wants me to die and nothing to do */ + // FIXME: display the list of remaining requests and messages (user code synchronization faulty?) + mydata->finalize--; + SIMIX_cond_signal(mydata->cond); + return 0; } else { - - SIMIX_mutex_lock(request->mutex); - memcpy(request->buf, message->buf, - request->datatype->size * request->count); - request->src = message->src; - request->data = message->data; - request->forward = message->forward; - - if (0 == request->forward) { - request->completed = 1; - SIMIX_cond_broadcast(request->cond); - } else { - request->src = request->comm->index_to_rank_map[index]; - request->dst = (request->src + 1) % request->comm->size; - smpi_mpi_isend(request); - } - - SIMIX_mutex_unlock(request->mutex); - - xbt_free(message->buf); - xbt_mallocator_release(smpi_global->message_mallocator, message); - + SIMIX_process_suspend(self); } - - running_hosts_count = smpi_global->running_hosts_count; - - } while (0 < running_hosts_count); + } return 0; } diff --git a/src/smpi/smpi_sender.c b/src/smpi/smpi_sender.c index 732ed78242..9098aa2c94 100644 --- a/src/smpi/smpi_sender.c +++ b/src/smpi/smpi_sender.c @@ -12,8 +12,6 @@ int smpi_sender(int argc,char*argv[]) { xbt_fifo_t request_queue; - int running_hosts_count; - smpi_mpi_request_t request; smx_host_t dhost; @@ -26,8 +24,6 @@ int smpi_sender(int argc,char*argv[]) { int dindex; - smx_process_t receiver_process; - self = SIMIX_process_self(); shost = SIMIX_host_self(); @@ -35,14 +31,10 @@ int smpi_sender(int argc,char*argv[]) { request_queue = smpi_global->pending_send_request_queues[index]; - do { - + while (1) { request = xbt_fifo_shift(request_queue); - if (NULL == request) { - SIMIX_process_suspend(self); - } else { - + if (NULL != request) { message = xbt_mallocator_get(smpi_global->message_mallocator); SIMIX_mutex_lock(request->mutex); @@ -93,14 +85,15 @@ int smpi_sender(int argc,char*argv[]) { // wake up receiver if necessary smpi_host_data_t remote_host = SIMIX_host_get_data(SIMIX_process_get_host(smpi_global->main_processes[dindex])); - receiver_process = remote_host->receiver; - if (SIMIX_process_is_suspended(receiver_process)) - SIMIX_process_resume(receiver_process); - } - - running_hosts_count = smpi_global->running_hosts_count; - - } while (0 < running_hosts_count); + SIMIX_process_resume(remote_host->receiver); + } else if (mydata->finalize>0) { /* main wants me to die and nothing to do */ + mydata->finalize--; + SIMIX_cond_signal(mydata->cond); + return 0; + } else { + SIMIX_process_suspend(self); + } + } return 0; }