From b72bab7544ab12487f83b3134c07399986471217 Mon Sep 17 00:00:00 2001 From: markls Date: Tue, 28 Aug 2007 09:35:32 +0000 Subject: [PATCH] refactoring smpi into multiple source files. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@4123 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/Makefile.am | 4 +- src/Makefile.in | 58 ++++++----- src/smpi/receiver.c | 112 +++++++++++++++++++++ src/smpi/sender.c | 117 ++++++++++++++++++++++ src/smpi/smpi_base.c | 226 ------------------------------------------- 5 files changed, 266 insertions(+), 251 deletions(-) create mode 100644 src/smpi/receiver.c create mode 100644 src/smpi/sender.c diff --git a/src/Makefile.am b/src/Makefile.am index 6ba70ce725..8407f6fc77 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -186,7 +186,9 @@ SIMIX_SRC= \ SMPI_SRC= \ smpi/smpi_base.c \ - smpi/smpi_mpi.c + smpi/smpi_mpi.c \ + smpi/sender.c \ + smpi/receiver.c MSG_SRC= msg/msg_config.c \ msg/task.c msg/host.c msg/m_process.c msg/gos.c \ diff --git a/src/Makefile.in b/src/Makefile.in index 9e8b64bcbe..4e0f298809 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -120,7 +120,7 @@ am__libsimgrid_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c xbt/ex.c \ xbt/config.c xbt/cunit.c xbt/graphxml_parse.c surf/maxmin.c \ surf/fair_bottleneck.c surf/lagrange.c surf/trace_mgr.c \ surf/surf.c surf/surfxml_parse.c surf/cpu.c surf/network.c \ - surf/workstation.c surf/surf_timer.c surf/network_dassf.c \ + surf/workstation.c surf/surf_timer.c \ surf/workstation_KCCFLN05.c surf/workstation_ptask_L07.c \ xbt/xbt_os_thread.c xbt/xbt_os_thread_stubs.c \ surf/gtnets/gtnets_simulator.cc surf/gtnets/gtnets_topology.cc \ @@ -153,8 +153,8 @@ am__libsimgrid_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c xbt/ex.c \ @CONTEXT_THREADS_FALSE@am__objects_7 = xbt_os_thread_stubs.lo am__objects_8 = maxmin.lo fair_bottleneck.lo lagrange.lo trace_mgr.lo \ surf.lo surfxml_parse.lo cpu.lo network.lo workstation.lo \ - surf_timer.lo network_dassf.lo workstation_KCCFLN05.lo \ - workstation_ptask_L07.lo $(am__objects_6) $(am__objects_7) + surf_timer.lo workstation_KCCFLN05.lo workstation_ptask_L07.lo \ + $(am__objects_6) $(am__objects_7) am__objects_9 = gtnets_simulator.lo gtnets_topology.lo \ gtnets_interface.lo network_gtnets.lo @HAVE_GTNETS_TRUE@am__objects_10 = $(am__objects_9) @@ -189,7 +189,7 @@ am__libsimgrid4java_la_SOURCES_DIST = xbt/snprintf.c xbt/xbt_str.c \ xbt/config.c xbt/cunit.c xbt/graphxml_parse.c surf/maxmin.c \ surf/fair_bottleneck.c surf/lagrange.c surf/trace_mgr.c \ surf/surf.c surf/surfxml_parse.c surf/cpu.c surf/network.c \ - surf/workstation.c surf/surf_timer.c surf/network_dassf.c \ + surf/workstation.c surf/surf_timer.c \ surf/workstation_KCCFLN05.c surf/workstation_ptask_L07.c \ xbt/xbt_os_thread.c xbt/xbt_os_thread_stubs.c \ surf/gtnets/gtnets_simulator.cc surf/gtnets/gtnets_topology.cc \ @@ -238,7 +238,7 @@ libsimgrid4java_la_LINK = $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) \ $(CXXFLAGS) $(libsimgrid4java_la_LDFLAGS) $(LDFLAGS) -o $@ @HAVE_JAVA_TRUE@am_libsimgrid4java_la_rpath = -rpath $(libdir) libsmpi_la_DEPENDENCIES = libsimgrid.la -am__objects_20 = smpi_base.lo smpi_mpi.lo +am__objects_20 = smpi_base.lo smpi_mpi.lo sender.lo receiver.lo am_libsmpi_la_OBJECTS = $(am__objects_20) libsmpi_la_OBJECTS = $(am_libsmpi_la_OBJECTS) libsmpi_la_LINK = $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) \ @@ -363,6 +363,7 @@ PACKAGE_VERSION = @PACKAGE_VERSION@ PATH_SEPARATOR = @PATH_SEPARATOR@ PTH_STACK_GROWTH = @PTH_STACK_GROWTH@ RANLIB = @RANLIB@ +SED = @SED@ SET_MAKE = @SET_MAKE@ SHELL = @SHELL@ SIMGRID_DEP = @SIMGRID_DEP@ @@ -440,13 +441,12 @@ EXTRA_DIST = portable.h xbt/mallocator_private.h xbt/dynar_private.h \ surf/cpu_private.h surf/workstation_private.h \ surf/surf_timer_private.h surf/surfxml_parse.c surf/surfxml.l \ surf/surfxml.c surf/surfxml.dtd surf/network_private.h \ - surf/network_dassf_private.h \ - surf/workstation_KCCFLN05_private.h include/surf/maxmin.h \ - include/surf/trace_mgr.h include/surf/surf.h \ - include/surf/surfxml_parse_private.h include/xbt/xbt_os_time.h \ - include/xbt/xbt_os_thread.h include/xbt/context.h \ - msg/private.h simdag/private.h gras/DataDesc/ddt_parse.yy.l \ - gras/Virtu/virtu_interface.h \ + network_gtnets_private.h surf/workstation_KCCFLN05_private.h \ + include/surf/maxmin.h include/surf/trace_mgr.h \ + include/surf/surf.h include/surf/surfxml_parse_private.h \ + include/xbt/xbt_os_time.h include/xbt/xbt_os_thread.h \ + include/xbt/context.h msg/private.h simdag/private.h \ + gras/DataDesc/ddt_parse.yy.l gras/Virtu/virtu_interface.h \ amok/Bandwidth/bandwidth_private.h amok/amok_modinter.h \ $(am__append_3) $(am__append_4) $(am__append_6) \ $(am__append_7) @@ -535,8 +535,8 @@ XBT_SG_SRC = \ SURF_SRC = surf/maxmin.c surf/fair_bottleneck.c surf/lagrange.c \ surf/trace_mgr.c surf/surf.c surf/surfxml_parse.c surf/cpu.c \ surf/network.c surf/workstation.c surf/surf_timer.c \ - surf/network_dassf.c surf/workstation_KCCFLN05.c \ - surf/workstation_ptask_L07.c $(am__append_1) $(am__append_2) + surf/workstation_KCCFLN05.c surf/workstation_ptask_L07.c \ + $(am__append_1) $(am__append_2) GTNETS_SRC = \ surf/gtnets/gtnets_simulator.cc \ surf/gtnets/gtnets_topology.cc \ @@ -559,7 +559,9 @@ SIMIX_SRC = \ SMPI_SRC = \ smpi/smpi_base.c \ - smpi/smpi_mpi.c + smpi/smpi_mpi.c \ + smpi/sender.c \ + smpi/receiver.c MSG_SRC = msg/msg_config.c \ msg/task.c msg/host.c msg/m_process.c msg/gos.c \ @@ -860,10 +862,10 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/maxmin.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/msg_config.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/network.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/network_dassf.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/network_gtnets.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/peermanagement.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/process.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/receiver.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rl_dns.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rl_emul.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rl_msg.Plo@am__quote@ @@ -877,6 +879,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sd_task.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sd_workstation.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sdp.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sender.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/set.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/set_unit.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sg_dns.Plo@am__quote@ @@ -1440,13 +1443,6 @@ surf_timer.lo: surf/surf_timer.c @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o surf_timer.lo `test -f 'surf/surf_timer.c' || echo '$(srcdir)/'`surf/surf_timer.c -network_dassf.lo: surf/network_dassf.c -@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT network_dassf.lo -MD -MP -MF $(DEPDIR)/network_dassf.Tpo -c -o network_dassf.lo `test -f 'surf/network_dassf.c' || echo '$(srcdir)/'`surf/network_dassf.c -@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/network_dassf.Tpo $(DEPDIR)/network_dassf.Plo -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='surf/network_dassf.c' object='network_dassf.lo' libtool=yes @AMDEPBACKSLASH@ -@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o network_dassf.lo `test -f 'surf/network_dassf.c' || echo '$(srcdir)/'`surf/network_dassf.c - workstation_KCCFLN05.lo: surf/workstation_KCCFLN05.c @am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT workstation_KCCFLN05.lo -MD -MP -MF $(DEPDIR)/workstation_KCCFLN05.Tpo -c -o workstation_KCCFLN05.lo `test -f 'surf/workstation_KCCFLN05.c' || echo '$(srcdir)/'`surf/workstation_KCCFLN05.c @am__fastdepCC_TRUE@ mv -f $(DEPDIR)/workstation_KCCFLN05.Tpo $(DEPDIR)/workstation_KCCFLN05.Plo @@ -1755,6 +1751,20 @@ smpi_mpi.lo: smpi/smpi_mpi.c @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o smpi_mpi.lo `test -f 'smpi/smpi_mpi.c' || echo '$(srcdir)/'`smpi/smpi_mpi.c +sender.lo: smpi/sender.c +@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT sender.lo -MD -MP -MF $(DEPDIR)/sender.Tpo -c -o sender.lo `test -f 'smpi/sender.c' || echo '$(srcdir)/'`smpi/sender.c +@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/sender.Tpo $(DEPDIR)/sender.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='smpi/sender.c' object='sender.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o sender.lo `test -f 'smpi/sender.c' || echo '$(srcdir)/'`smpi/sender.c + +receiver.lo: smpi/receiver.c +@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT receiver.lo -MD -MP -MF $(DEPDIR)/receiver.Tpo -c -o receiver.lo `test -f 'smpi/receiver.c' || echo '$(srcdir)/'`smpi/receiver.c +@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/receiver.Tpo $(DEPDIR)/receiver.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='smpi/receiver.c' object='receiver.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o receiver.lo `test -f 'smpi/receiver.c' || echo '$(srcdir)/'`smpi/receiver.c + cunit_unit.o: @builddir@/cunit_unit.c @am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT cunit_unit.o -MD -MP -MF $(DEPDIR)/cunit_unit.Tpo -c -o cunit_unit.o `test -f '@builddir@/cunit_unit.c' || echo '$(srcdir)/'`@builddir@/cunit_unit.c @am__fastdepCC_TRUE@ mv -f $(DEPDIR)/cunit_unit.Tpo $(DEPDIR)/cunit_unit.Po @@ -2215,7 +2225,7 @@ uninstall-am: uninstall-binSCRIPTS uninstall-jarDATA \ include/simix/simix.h include/simix/datatypes.h \ simix/msg_simix_private.h \ \ - smpi/private.h smpi/smpi.h + smpi/private.h @HAVE_JAVA_TRUE@clean-local: @HAVE_JAVA_TRUE@ -rm -rf .classes diff --git a/src/smpi/receiver.c b/src/smpi/receiver.c new file mode 100644 index 0000000000..f842dd69fd --- /dev/null +++ b/src/smpi/receiver.c @@ -0,0 +1,112 @@ +#include "private.h" + +int smpi_receiver(int argc, char **argv) +{ + smx_process_t self; + int rank; + + xbt_fifo_t request_queue; + smx_mutex_t request_queue_mutex; + xbt_fifo_t message_queue; + smx_mutex_t message_queue_mutex; + int size; + + int running_hosts_count; + + smpi_mpi_request_t *request; + smpi_received_message_t *message; + + xbt_fifo_item_t request_item; + xbt_fifo_item_t message_item; + + self = SIMIX_process_self(); + rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world); + + // make sure root is done before own initialization + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + if (!smpi_global->root_ready) { + SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); + } + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); + + request_queue = smpi_global->pending_recv_request_queues[rank]; + request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank]; + message_queue = smpi_global->received_message_queues[rank]; + message_queue_mutex = smpi_global->received_message_queues_mutexes[rank]; + size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world); + + smpi_global->receiver_processes[rank] = self; + + // wait for all nodes to signal initializatin complete + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + smpi_global->ready_process_count++; + if (smpi_global->ready_process_count < 3 * size) { + SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); + } else { + SIMIX_cond_broadcast(smpi_global->start_stop_cond); + } + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); + + do { + request = NULL; + message = NULL; + + // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? + + // FIXME: not the best way to request multiple locks... + SIMIX_mutex_lock(request_queue_mutex); + SIMIX_mutex_lock(message_queue_mutex); + for (request_item = xbt_fifo_get_first_item(request_queue); + NULL != request_item; + request_item = xbt_fifo_get_next_item(request_item)) { + request = xbt_fifo_get_item_content(request_item); + for (message_item = xbt_fifo_get_first_item(message_queue); + NULL != message_item; + message_item = xbt_fifo_get_next_item(message_item)) { + message = xbt_fifo_get_item_content(message_item); + if (request->comm == message->comm && + (MPI_ANY_SOURCE == request->src || request->src == message->src) && + request->tag == message->tag) { + xbt_fifo_remove_item(request_queue, request_item); + xbt_fifo_remove_item(message_queue, message_item); + goto stopsearch; + } + } + } +stopsearch: + SIMIX_mutex_unlock(message_queue_mutex); + SIMIX_mutex_unlock(request_queue_mutex); + + if (NULL == request || NULL == message) { + SIMIX_process_suspend(self); + } else { + SIMIX_mutex_lock(request->simdata->mutex); + + memcpy(request->buf, message->buf, request->datatype->size * request->count); + request->src = message->src; + request->completed = 1; + SIMIX_cond_broadcast(request->simdata->cond); + + SIMIX_mutex_unlock(request->simdata->mutex); + + xbt_free(message->buf); + xbt_mallocator_release(smpi_global->message_mallocator, message); + } + + SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); + running_hosts_count = smpi_global->running_hosts_count; + SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); + + } while (0 < running_hosts_count); + + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + smpi_global->ready_process_count--; + if (smpi_global->ready_process_count == 0) { + SIMIX_cond_broadcast(smpi_global->start_stop_cond); + } else if (smpi_global->ready_process_count < 0) { + // FIXME: can't happen, abort! + } + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); + + return 0; +} diff --git a/src/smpi/sender.c b/src/smpi/sender.c new file mode 100644 index 0000000000..0aec631f54 --- /dev/null +++ b/src/smpi/sender.c @@ -0,0 +1,117 @@ +#include "private.h" + +int smpi_sender(int argc, char **argv) +{ + smx_process_t self; + smx_host_t shost; + int rank; + + xbt_fifo_t request_queue; + smx_mutex_t request_queue_mutex; + int size; + + int running_hosts_count; + + smpi_mpi_request_t *request; + + smx_host_t dhost; + + smx_action_t action; + + smpi_received_message_t *message; + + int drank; + + smx_process_t receiver_process; + + self = SIMIX_process_self(); + shost = SIMIX_host_self(); + rank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost); + + // make sure root is done before own initialization + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + if (!smpi_global->root_ready) { + SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); + } + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); + + request_queue = smpi_global->pending_send_request_queues[rank]; + request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank]; + size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world); + + smpi_global->sender_processes[rank] = self; + + // wait for all nodes to signal initializatin complete + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + smpi_global->ready_process_count++; + if (smpi_global->ready_process_count < 3 * size) { + SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); + } else { + SIMIX_cond_broadcast(smpi_global->start_stop_cond); + } + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); + + do { + + SIMIX_mutex_lock(request_queue_mutex); + request = xbt_fifo_shift(request_queue); + SIMIX_mutex_unlock(request_queue_mutex); + + if (NULL == request) { + SIMIX_process_suspend(self); + } else { + + message = xbt_mallocator_get(smpi_global->message_mallocator); + + SIMIX_mutex_lock(request->simdata->mutex); + + message->comm = request->comm; + message->src = request->src; + message->dst = request->dst; + message->tag = request->tag; + message->buf = xbt_malloc(request->datatype->size * request->count); + memcpy(message->buf, request->buf, request->datatype->size * request->count); + + dhost = request->comm->simdata->hosts[request->dst]; + drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost); + + SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]); + xbt_fifo_push(smpi_global->received_message_queues[drank], message); + SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]); + + request->completed = 1; + + action = SIMIX_action_communicate(shost, dhost, NULL, request->datatype->size * request->count * 1.0, -1.0); + + SIMIX_register_action_to_condition(action, request->simdata->cond); + + SIMIX_cond_wait(request->simdata->cond, request->simdata->mutex); + + SIMIX_mutex_unlock(request->simdata->mutex); + + // wake up receiver if necessary + receiver_process = smpi_global->receiver_processes[drank]; + if (SIMIX_process_is_suspended(receiver_process)) { + SIMIX_process_resume(receiver_process); + } + + } + + SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); + running_hosts_count = smpi_global->running_hosts_count; + SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); + + } while (0 < running_hosts_count); + + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + smpi_global->ready_process_count--; + if (smpi_global->ready_process_count == 0) { + SIMIX_cond_broadcast(smpi_global->start_stop_cond); + } else if (smpi_global->ready_process_count < 0) { + // FIXME: can't happen! abort! + } + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); + + return 0; +} + diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 7f64d5a7f1..c6590723e5 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -30,232 +30,6 @@ int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm) return smpi_mpi_comm_rank(comm, SIMIX_host_self()); } -int smpi_sender(int argc, char **argv) -{ - smx_process_t self; - smx_host_t shost; - int rank; - - xbt_fifo_t request_queue; - smx_mutex_t request_queue_mutex; - int size; - - int running_hosts_count; - - smpi_mpi_request_t *request; - - smx_host_t dhost; - - smx_action_t action; - - smpi_received_message_t *message; - - int drank; - - smx_process_t receiver_process; - - self = SIMIX_process_self(); - shost = SIMIX_host_self(); - rank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost); - - // make sure root is done before own initialization - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - if (!smpi_global->root_ready) { - SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - request_queue = smpi_global->pending_send_request_queues[rank]; - request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank]; - size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world); - - smpi_global->sender_processes[rank] = self; - - // wait for all nodes to signal initializatin complete - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - smpi_global->ready_process_count++; - if (smpi_global->ready_process_count < 3 * size) { - SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); - } else { - SIMIX_cond_broadcast(smpi_global->start_stop_cond); - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - do { - - SIMIX_mutex_lock(request_queue_mutex); - request = xbt_fifo_shift(request_queue); - SIMIX_mutex_unlock(request_queue_mutex); - - if (NULL == request) { - SIMIX_process_suspend(self); - } else { - - message = xbt_mallocator_get(smpi_global->message_mallocator); - - SIMIX_mutex_lock(request->simdata->mutex); - - message->comm = request->comm; - message->src = request->src; - message->dst = request->dst; - message->tag = request->tag; - message->buf = xbt_malloc(request->datatype->size * request->count); - memcpy(message->buf, request->buf, request->datatype->size * request->count); - - dhost = request->comm->simdata->hosts[request->dst]; - drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost); - - SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]); - xbt_fifo_push(smpi_global->received_message_queues[drank], message); - SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]); - - request->completed = 1; - - action = SIMIX_action_communicate(shost, dhost, NULL, request->datatype->size * request->count * 1.0, -1.0); - - SIMIX_register_action_to_condition(action, request->simdata->cond); - - SIMIX_cond_wait(request->simdata->cond, request->simdata->mutex); - - SIMIX_mutex_unlock(request->simdata->mutex); - - // wake up receiver if necessary - receiver_process = smpi_global->receiver_processes[drank]; - if (SIMIX_process_is_suspended(receiver_process)) { - SIMIX_process_resume(receiver_process); - } - - } - - SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); - running_hosts_count = smpi_global->running_hosts_count; - SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); - - } while (0 < running_hosts_count); - - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - smpi_global->ready_process_count--; - if (smpi_global->ready_process_count == 0) { - SIMIX_cond_broadcast(smpi_global->start_stop_cond); - } else if (smpi_global->ready_process_count < 0) { - // FIXME: can't happen! abort! - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - return 0; -} - -int smpi_receiver(int argc, char **argv) -{ - smx_process_t self; - int rank; - - xbt_fifo_t request_queue; - smx_mutex_t request_queue_mutex; - xbt_fifo_t message_queue; - smx_mutex_t message_queue_mutex; - int size; - - int running_hosts_count; - - smpi_mpi_request_t *request; - smpi_received_message_t *message; - - xbt_fifo_item_t request_item; - xbt_fifo_item_t message_item; - - self = SIMIX_process_self(); - rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world); - - // make sure root is done before own initialization - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - if (!smpi_global->root_ready) { - SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - request_queue = smpi_global->pending_recv_request_queues[rank]; - request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank]; - message_queue = smpi_global->received_message_queues[rank]; - message_queue_mutex = smpi_global->received_message_queues_mutexes[rank]; - size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world); - - smpi_global->receiver_processes[rank] = self; - - // wait for all nodes to signal initializatin complete - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - smpi_global->ready_process_count++; - if (smpi_global->ready_process_count < 3 * size) { - SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); - } else { - SIMIX_cond_broadcast(smpi_global->start_stop_cond); - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - do { - request = NULL; - message = NULL; - - // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? - - // FIXME: not the best way to request multiple locks... - SIMIX_mutex_lock(request_queue_mutex); - SIMIX_mutex_lock(message_queue_mutex); - for (request_item = xbt_fifo_get_first_item(request_queue); - NULL != request_item; - request_item = xbt_fifo_get_next_item(request_item)) { - request = xbt_fifo_get_item_content(request_item); - for (message_item = xbt_fifo_get_first_item(message_queue); - NULL != message_item; - message_item = xbt_fifo_get_next_item(message_item)) { - message = xbt_fifo_get_item_content(message_item); - if (request->comm == message->comm && - (MPI_ANY_SOURCE == request->src || request->src == message->src) && - request->tag == message->tag) { - xbt_fifo_remove_item(request_queue, request_item); - xbt_fifo_remove_item(message_queue, message_item); - goto stopsearch; - } - } - } -stopsearch: - SIMIX_mutex_unlock(message_queue_mutex); - SIMIX_mutex_unlock(request_queue_mutex); - - if (NULL == request || NULL == message) { - SIMIX_process_suspend(self); - } else { - SIMIX_mutex_lock(request->simdata->mutex); - - memcpy(request->buf, message->buf, request->datatype->size * request->count); - request->src = message->src; - request->completed = 1; - SIMIX_cond_broadcast(request->simdata->cond); - - SIMIX_mutex_unlock(request->simdata->mutex); - - xbt_free(message->buf); - xbt_mallocator_release(smpi_global->message_mallocator, message); - } - - SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); - running_hosts_count = smpi_global->running_hosts_count; - SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); - - } while (0 < running_hosts_count); - - SIMIX_mutex_lock(smpi_global->start_stop_mutex); - smpi_global->ready_process_count--; - if (smpi_global->ready_process_count == 0) { - SIMIX_cond_broadcast(smpi_global->start_stop_cond); - } else if (smpi_global->ready_process_count < 0) { - // FIXME: can't happen, abort! - } - SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - - return 0; -} - void *smpi_request_new() { smpi_mpi_request_t *request = xbt_new(smpi_mpi_request_t, 1); -- 2.20.1