X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/dff9e15c44ab6340d27215957c56fa72fad246a2..c7abe4a06a869040289677126a8c3e1b9a92216f:/src/smpi/smpi_receiver.c diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c index 365dad5edf..999f5612fe 100644 --- a/src/smpi/smpi_receiver.c +++ b/src/smpi/smpi_receiver.c @@ -3,17 +3,14 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi, "Logging specific to SMPI (receiver)"); -int smpi_receiver(int argc, char **argv) +int smpi_receiver(int argc, char *argv[]) { + smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self()); smx_process_t self; - int index; + int index = mydata->index; xbt_fifo_t request_queue; - smx_mutex_t request_queue_mutex; xbt_fifo_t message_queue; - smx_mutex_t message_queue_mutex; - - int running_hosts_count; smpi_mpi_request_t request; smpi_received_message_t message; @@ -23,59 +20,38 @@ int smpi_receiver(int argc, char **argv) self = SIMIX_process_self(); - // make sure root is done before own initialization - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - while (!smpi_global->root_ready) { - SIMIX_cond_wait(smpi_global->start_stop_cond, - smpi_global->start_stop_mutex); - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - index = smpi_host_index(); - - request_queue = smpi_global->pending_recv_request_queues[index]; - request_queue_mutex = - smpi_global->pending_recv_request_queues_mutexes[index]; - message_queue = smpi_global->received_message_queues[index]; - message_queue_mutex = smpi_global->received_message_queues_mutexes[index]; + request_queue = mydata->pending_recv_request_queue; + message_queue = mydata->received_message_queue; - smpi_global->receiver_processes[index] = self; + DEBUG0("Up and running"); - // wait for all nodes to signal initializatin complete - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - smpi_global->ready_process_count++; - if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) { - SIMIX_cond_broadcast(smpi_global->start_stop_cond); - } - while (smpi_global->ready_process_count < 3 * smpi_global->host_count) { - SIMIX_cond_wait(smpi_global->start_stop_cond, - smpi_global->start_stop_mutex); - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - do { + while (1) { // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? - // FIXME: not the best way to request multiple locks... - SIMIX_mutex_lock(request_queue_mutex); - SIMIX_mutex_lock(message_queue_mutex); - for (request_item = xbt_fifo_get_first_item(request_queue); - NULL != request_item; - request_item = xbt_fifo_get_next_item(request_item)) { - request = xbt_fifo_get_item_content(request_item); - for (message_item = xbt_fifo_get_first_item(message_queue); - NULL != message_item; - message_item = xbt_fifo_get_next_item(message_item)) { - message = xbt_fifo_get_item_content(message_item); + DEBUG0("Look for matching"); + xbt_fifo_foreach(request_queue, request_item, request, smpi_mpi_request_t) { + xbt_fifo_foreach(message_queue, message_item, message, + smpi_received_message_t) { + +//#define DEBUG_MATCH +#ifdef DEBUG_MATCH + printf("[%s] try match (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n", + __FILE__,request->src,message->src,request->tag, message->tag); +#endif if (request->comm == message->comm && - (MPI_ANY_SOURCE == request->src || request->src == message->src) - && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { - xbt_fifo_remove_item(request_queue, request_item); - xbt_fifo_free_item(request_item); - xbt_fifo_remove_item(message_queue, message_item); - xbt_fifo_free_item(message_item); - goto stopsearch; + (MPI_ANY_SOURCE == request->src || request->src == message->src) + && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { + xbt_fifo_remove_item(request_queue, request_item); + xbt_fifo_free_item(request_item); + xbt_fifo_remove_item(message_queue, message_item); + xbt_fifo_free_item(message_item); + DEBUG5("found matching request %p: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", + request,request->src,message->src,request->tag, message->tag); + goto stopsearch; + } else { + DEBUG5("fail to match request %p: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", + request,request->src,message->src,request->tag, message->tag); } } } @@ -84,12 +60,9 @@ int smpi_receiver(int argc, char **argv) message = NULL; stopsearch: - SIMIX_mutex_unlock(message_queue_mutex); - SIMIX_mutex_unlock(request_queue_mutex); - - if (NULL == request || NULL == message) { - SIMIX_process_suspend(self); - } else { + if (NULL != request) { + if (NULL == message) + DIE_IMPOSSIBLE; SIMIX_mutex_lock(request->mutex); memcpy(request->buf, message->buf, @@ -112,13 +85,18 @@ int smpi_receiver(int argc, char **argv) xbt_free(message->buf); xbt_mallocator_release(smpi_global->message_mallocator, message); + } else if (mydata->finalize > 0) { /* main wants me to die and nothing to do */ + // FIXME: display the list of remaining requests and messages (user code synchronization faulty?) + DEBUG0("Main wants me to die and I'm done. Bye, guys."); + mydata->finalize--; + SIMIX_cond_signal(mydata->cond); + return 0; + } else { + DEBUG0("Nothing to do. Let's get a nap"); + SIMIX_process_suspend(self); + DEBUG0("=== Uh? Someone called me? ==="); } - - SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); - running_hosts_count = smpi_global->running_hosts_count; - SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); - - } while (0 < running_hosts_count); + } return 0; }