X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/b110f915460bff6576e1fc6d3a4f391992852714..1372e35c79dba117ae1b592394821a3f716a3f13:/src/smpi/smpi_receiver.c diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c index 579791161a..bd0d13fd84 100644 --- a/src/smpi/smpi_receiver.c +++ b/src/smpi/smpi_receiver.c @@ -3,16 +3,15 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi, "Logging specific to SMPI (receiver)"); -int smpi_receiver(int argc, char **argv) +int smpi_receiver(int argc, char*argv[]) { + smpi_host_data_t mydata = SIMIX_process_get_data(SIMIX_process_self()); smx_process_t self; - int index; + int index = mydata->index; xbt_fifo_t request_queue; xbt_fifo_t message_queue; - int running_hosts_count; - smpi_mpi_request_t request; smpi_received_message_t message; @@ -21,25 +20,15 @@ int smpi_receiver(int argc, char **argv) self = SIMIX_process_self(); - index = smpi_host_index(); - - request_queue = smpi_global->pending_recv_request_queues[index]; + request_queue = mydata->pending_recv_request_queue; message_queue = smpi_global->received_message_queues[index]; - smpi_global->receiver_processes[index] = self; - - do { - + while (1) { // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? - for (request_item = xbt_fifo_get_first_item(request_queue); - NULL != request_item; - request_item = xbt_fifo_get_next_item(request_item)) { - request = xbt_fifo_get_item_content(request_item); - for (message_item = xbt_fifo_get_first_item(message_queue); - NULL != message_item; - message_item = xbt_fifo_get_next_item(message_item)) { - message = xbt_fifo_get_item_content(message_item); + xbt_fifo_foreach(request_queue,request_item,request,smpi_mpi_request_t){ + xbt_fifo_foreach(message_queue,message_item,message, smpi_received_message_t) { + if (request->comm == message->comm && (MPI_ANY_SOURCE == request->src || request->src == message->src) && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { @@ -56,38 +45,40 @@ int smpi_receiver(int argc, char **argv) message = NULL; stopsearch: - if (NULL == request || NULL == message) { - SIMIX_process_suspend(self); + if (NULL != request) { + if (NULL == message) + DIE_IMPOSSIBLE; + + SIMIX_mutex_lock(request->mutex); + memcpy(request->buf, message->buf, + request->datatype->size * request->count); + request->src = message->src; + request->data = message->data; + request->forward = message->forward; + + if (0 == request->forward) { + request->completed = 1; + SIMIX_cond_broadcast(request->cond); + } else { + request->src = request->comm->index_to_rank_map[index]; + request->dst = (request->src + 1) % request->comm->size; + smpi_mpi_isend(request); + } + + SIMIX_mutex_unlock(request->mutex); + + xbt_free(message->buf); + xbt_mallocator_release(smpi_global->message_mallocator, message); + + } else if (mydata->finalize>0) { /* main wants me to die and nothing to do */ + // FIXME: display the list of remaining requests and messages (user code synchronization faulty?) + mydata->finalize--; + SIMIX_cond_signal(mydata->cond); + return 0; } else { - - SIMIX_mutex_lock(request->mutex); - memcpy(request->buf, message->buf, - request->datatype->size * request->count); - request->src = message->src; - request->data = message->data; - request->forward = message->forward; - - if (0 == request->forward) { - request->completed = 1; - SIMIX_cond_broadcast(request->cond); - } else { - request->src = request->comm->index_to_rank_map[index]; - request->dst = (request->src + 1) % request->comm->size; - smpi_mpi_isend(request); - } - - SIMIX_mutex_unlock(request->mutex); - - xbt_free(message->buf); - xbt_mallocator_release(smpi_global->message_mallocator, message); - + SIMIX_process_suspend(self); } - - SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); - running_hosts_count = smpi_global->running_hosts_count; - SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); - - } while (0 < running_hosts_count); + } return 0; }