SMPI_SRC= \
smpi/smpi_base.c \
- smpi/smpi_mpi.c
+ smpi/smpi_mpi.c \
+ smpi/sender.c \
+ smpi/receiver.c
MSG_SRC= msg/msg_config.c \
msg/task.c msg/host.c msg/m_process.c msg/gos.c \
xbt/config.c xbt/cunit.c xbt/graphxml_parse.c surf/maxmin.c \
surf/fair_bottleneck.c surf/lagrange.c surf/trace_mgr.c \
surf/surf.c surf/surfxml_parse.c surf/cpu.c surf/network.c \
- surf/workstation.c surf/surf_timer.c surf/network_dassf.c \
+ surf/workstation.c surf/surf_timer.c \
surf/workstation_KCCFLN05.c surf/workstation_ptask_L07.c \
xbt/xbt_os_thread.c xbt/xbt_os_thread_stubs.c \
surf/gtnets/gtnets_simulator.cc surf/gtnets/gtnets_topology.cc \
@CONTEXT_THREADS_FALSE@am__objects_7 = xbt_os_thread_stubs.lo
am__objects_8 = maxmin.lo fair_bottleneck.lo lagrange.lo trace_mgr.lo \
surf.lo surfxml_parse.lo cpu.lo network.lo workstation.lo \
- surf_timer.lo network_dassf.lo workstation_KCCFLN05.lo \
- workstation_ptask_L07.lo $(am__objects_6) $(am__objects_7)
+ surf_timer.lo workstation_KCCFLN05.lo workstation_ptask_L07.lo \
+ $(am__objects_6) $(am__objects_7)
am__objects_9 = gtnets_simulator.lo gtnets_topology.lo \
gtnets_interface.lo network_gtnets.lo
@HAVE_GTNETS_TRUE@am__objects_10 = $(am__objects_9)
xbt/config.c xbt/cunit.c xbt/graphxml_parse.c surf/maxmin.c \
surf/fair_bottleneck.c surf/lagrange.c surf/trace_mgr.c \
surf/surf.c surf/surfxml_parse.c surf/cpu.c surf/network.c \
- surf/workstation.c surf/surf_timer.c surf/network_dassf.c \
+ surf/workstation.c surf/surf_timer.c \
surf/workstation_KCCFLN05.c surf/workstation_ptask_L07.c \
xbt/xbt_os_thread.c xbt/xbt_os_thread_stubs.c \
surf/gtnets/gtnets_simulator.cc surf/gtnets/gtnets_topology.cc \
$(CXXFLAGS) $(libsimgrid4java_la_LDFLAGS) $(LDFLAGS) -o $@
@HAVE_JAVA_TRUE@am_libsimgrid4java_la_rpath = -rpath $(libdir)
libsmpi_la_DEPENDENCIES = libsimgrid.la
-am__objects_20 = smpi_base.lo smpi_mpi.lo
+am__objects_20 = smpi_base.lo smpi_mpi.lo sender.lo receiver.lo
am_libsmpi_la_OBJECTS = $(am__objects_20)
libsmpi_la_OBJECTS = $(am_libsmpi_la_OBJECTS)
libsmpi_la_LINK = $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) \
PATH_SEPARATOR = @PATH_SEPARATOR@
PTH_STACK_GROWTH = @PTH_STACK_GROWTH@
RANLIB = @RANLIB@
+SED = @SED@
SET_MAKE = @SET_MAKE@
SHELL = @SHELL@
SIMGRID_DEP = @SIMGRID_DEP@
surf/cpu_private.h surf/workstation_private.h \
surf/surf_timer_private.h surf/surfxml_parse.c surf/surfxml.l \
surf/surfxml.c surf/surfxml.dtd surf/network_private.h \
- surf/network_dassf_private.h \
- surf/workstation_KCCFLN05_private.h include/surf/maxmin.h \
- include/surf/trace_mgr.h include/surf/surf.h \
- include/surf/surfxml_parse_private.h include/xbt/xbt_os_time.h \
- include/xbt/xbt_os_thread.h include/xbt/context.h \
- msg/private.h simdag/private.h gras/DataDesc/ddt_parse.yy.l \
- gras/Virtu/virtu_interface.h \
+ network_gtnets_private.h surf/workstation_KCCFLN05_private.h \
+ include/surf/maxmin.h include/surf/trace_mgr.h \
+ include/surf/surf.h include/surf/surfxml_parse_private.h \
+ include/xbt/xbt_os_time.h include/xbt/xbt_os_thread.h \
+ include/xbt/context.h msg/private.h simdag/private.h \
+ gras/DataDesc/ddt_parse.yy.l gras/Virtu/virtu_interface.h \
amok/Bandwidth/bandwidth_private.h amok/amok_modinter.h \
$(am__append_3) $(am__append_4) $(am__append_6) \
$(am__append_7)
SURF_SRC = surf/maxmin.c surf/fair_bottleneck.c surf/lagrange.c \
surf/trace_mgr.c surf/surf.c surf/surfxml_parse.c surf/cpu.c \
surf/network.c surf/workstation.c surf/surf_timer.c \
- surf/network_dassf.c surf/workstation_KCCFLN05.c \
- surf/workstation_ptask_L07.c $(am__append_1) $(am__append_2)
+ surf/workstation_KCCFLN05.c surf/workstation_ptask_L07.c \
+ $(am__append_1) $(am__append_2)
GTNETS_SRC = \
surf/gtnets/gtnets_simulator.cc \
surf/gtnets/gtnets_topology.cc \
SMPI_SRC = \
smpi/smpi_base.c \
- smpi/smpi_mpi.c
+ smpi/smpi_mpi.c \
+ smpi/sender.c \
+ smpi/receiver.c
MSG_SRC = msg/msg_config.c \
msg/task.c msg/host.c msg/m_process.c msg/gos.c \
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/maxmin.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/msg_config.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/network.Plo@am__quote@
-@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/network_dassf.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/network_gtnets.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/peermanagement.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/process.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/receiver.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rl_dns.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rl_emul.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rl_msg.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sd_task.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sd_workstation.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sdp.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sender.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/set.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/set_unit.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sg_dns.Plo@am__quote@
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o surf_timer.lo `test -f 'surf/surf_timer.c' || echo '$(srcdir)/'`surf/surf_timer.c
-network_dassf.lo: surf/network_dassf.c
-@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT network_dassf.lo -MD -MP -MF $(DEPDIR)/network_dassf.Tpo -c -o network_dassf.lo `test -f 'surf/network_dassf.c' || echo '$(srcdir)/'`surf/network_dassf.c
-@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/network_dassf.Tpo $(DEPDIR)/network_dassf.Plo
-@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='surf/network_dassf.c' object='network_dassf.lo' libtool=yes @AMDEPBACKSLASH@
-@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o network_dassf.lo `test -f 'surf/network_dassf.c' || echo '$(srcdir)/'`surf/network_dassf.c
-
workstation_KCCFLN05.lo: surf/workstation_KCCFLN05.c
@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT workstation_KCCFLN05.lo -MD -MP -MF $(DEPDIR)/workstation_KCCFLN05.Tpo -c -o workstation_KCCFLN05.lo `test -f 'surf/workstation_KCCFLN05.c' || echo '$(srcdir)/'`surf/workstation_KCCFLN05.c
@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/workstation_KCCFLN05.Tpo $(DEPDIR)/workstation_KCCFLN05.Plo
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o smpi_mpi.lo `test -f 'smpi/smpi_mpi.c' || echo '$(srcdir)/'`smpi/smpi_mpi.c
+sender.lo: smpi/sender.c
+@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT sender.lo -MD -MP -MF $(DEPDIR)/sender.Tpo -c -o sender.lo `test -f 'smpi/sender.c' || echo '$(srcdir)/'`smpi/sender.c
+@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/sender.Tpo $(DEPDIR)/sender.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='smpi/sender.c' object='sender.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o sender.lo `test -f 'smpi/sender.c' || echo '$(srcdir)/'`smpi/sender.c
+
+receiver.lo: smpi/receiver.c
+@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT receiver.lo -MD -MP -MF $(DEPDIR)/receiver.Tpo -c -o receiver.lo `test -f 'smpi/receiver.c' || echo '$(srcdir)/'`smpi/receiver.c
+@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/receiver.Tpo $(DEPDIR)/receiver.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='smpi/receiver.c' object='receiver.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o receiver.lo `test -f 'smpi/receiver.c' || echo '$(srcdir)/'`smpi/receiver.c
+
cunit_unit.o: @builddir@/cunit_unit.c
@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT cunit_unit.o -MD -MP -MF $(DEPDIR)/cunit_unit.Tpo -c -o cunit_unit.o `test -f '@builddir@/cunit_unit.c' || echo '$(srcdir)/'`@builddir@/cunit_unit.c
@am__fastdepCC_TRUE@ mv -f $(DEPDIR)/cunit_unit.Tpo $(DEPDIR)/cunit_unit.Po
include/simix/simix.h include/simix/datatypes.h \
simix/msg_simix_private.h \
\
- smpi/private.h smpi/smpi.h
+ smpi/private.h
@HAVE_JAVA_TRUE@clean-local:
@HAVE_JAVA_TRUE@ -rm -rf .classes
--- /dev/null
+#include "private.h"
+
+int smpi_receiver(int argc, char **argv)
+{
+ smx_process_t self;
+ int rank;
+
+ xbt_fifo_t request_queue;
+ smx_mutex_t request_queue_mutex;
+ xbt_fifo_t message_queue;
+ smx_mutex_t message_queue_mutex;
+ int size;
+
+ int running_hosts_count;
+
+ smpi_mpi_request_t *request;
+ smpi_received_message_t *message;
+
+ xbt_fifo_item_t request_item;
+ xbt_fifo_item_t message_item;
+
+ self = SIMIX_process_self();
+ rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
+
+ // make sure root is done before own initialization
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ if (!smpi_global->root_ready) {
+ SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ request_queue = smpi_global->pending_recv_request_queues[rank];
+ request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
+ message_queue = smpi_global->received_message_queues[rank];
+ message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
+ size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+
+ smpi_global->receiver_processes[rank] = self;
+
+ // wait for all nodes to signal initializatin complete
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ smpi_global->ready_process_count++;
+ if (smpi_global->ready_process_count < 3 * size) {
+ SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+ } else {
+ SIMIX_cond_broadcast(smpi_global->start_stop_cond);
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ do {
+ request = NULL;
+ message = NULL;
+
+ // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
+
+ // FIXME: not the best way to request multiple locks...
+ SIMIX_mutex_lock(request_queue_mutex);
+ SIMIX_mutex_lock(message_queue_mutex);
+ for (request_item = xbt_fifo_get_first_item(request_queue);
+ NULL != request_item;
+ request_item = xbt_fifo_get_next_item(request_item)) {
+ request = xbt_fifo_get_item_content(request_item);
+ for (message_item = xbt_fifo_get_first_item(message_queue);
+ NULL != message_item;
+ message_item = xbt_fifo_get_next_item(message_item)) {
+ message = xbt_fifo_get_item_content(message_item);
+ if (request->comm == message->comm &&
+ (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
+ request->tag == message->tag) {
+ xbt_fifo_remove_item(request_queue, request_item);
+ xbt_fifo_remove_item(message_queue, message_item);
+ goto stopsearch;
+ }
+ }
+ }
+stopsearch:
+ SIMIX_mutex_unlock(message_queue_mutex);
+ SIMIX_mutex_unlock(request_queue_mutex);
+
+ if (NULL == request || NULL == message) {
+ SIMIX_process_suspend(self);
+ } else {
+ SIMIX_mutex_lock(request->simdata->mutex);
+
+ memcpy(request->buf, message->buf, request->datatype->size * request->count);
+ request->src = message->src;
+ request->completed = 1;
+ SIMIX_cond_broadcast(request->simdata->cond);
+
+ SIMIX_mutex_unlock(request->simdata->mutex);
+
+ xbt_free(message->buf);
+ xbt_mallocator_release(smpi_global->message_mallocator, message);
+ }
+
+ SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
+ running_hosts_count = smpi_global->running_hosts_count;
+ SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
+
+ } while (0 < running_hosts_count);
+
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ smpi_global->ready_process_count--;
+ if (smpi_global->ready_process_count == 0) {
+ SIMIX_cond_broadcast(smpi_global->start_stop_cond);
+ } else if (smpi_global->ready_process_count < 0) {
+ // FIXME: can't happen, abort!
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ return 0;
+}
--- /dev/null
+#include "private.h"
+
+int smpi_sender(int argc, char **argv)
+{
+ smx_process_t self;
+ smx_host_t shost;
+ int rank;
+
+ xbt_fifo_t request_queue;
+ smx_mutex_t request_queue_mutex;
+ int size;
+
+ int running_hosts_count;
+
+ smpi_mpi_request_t *request;
+
+ smx_host_t dhost;
+
+ smx_action_t action;
+
+ smpi_received_message_t *message;
+
+ int drank;
+
+ smx_process_t receiver_process;
+
+ self = SIMIX_process_self();
+ shost = SIMIX_host_self();
+ rank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost);
+
+ // make sure root is done before own initialization
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ if (!smpi_global->root_ready) {
+ SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ request_queue = smpi_global->pending_send_request_queues[rank];
+ request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank];
+ size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+
+ smpi_global->sender_processes[rank] = self;
+
+ // wait for all nodes to signal initializatin complete
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ smpi_global->ready_process_count++;
+ if (smpi_global->ready_process_count < 3 * size) {
+ SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+ } else {
+ SIMIX_cond_broadcast(smpi_global->start_stop_cond);
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ do {
+
+ SIMIX_mutex_lock(request_queue_mutex);
+ request = xbt_fifo_shift(request_queue);
+ SIMIX_mutex_unlock(request_queue_mutex);
+
+ if (NULL == request) {
+ SIMIX_process_suspend(self);
+ } else {
+
+ message = xbt_mallocator_get(smpi_global->message_mallocator);
+
+ SIMIX_mutex_lock(request->simdata->mutex);
+
+ message->comm = request->comm;
+ message->src = request->src;
+ message->dst = request->dst;
+ message->tag = request->tag;
+ message->buf = xbt_malloc(request->datatype->size * request->count);
+ memcpy(message->buf, request->buf, request->datatype->size * request->count);
+
+ dhost = request->comm->simdata->hosts[request->dst];
+ drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost);
+
+ SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]);
+ xbt_fifo_push(smpi_global->received_message_queues[drank], message);
+ SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]);
+
+ request->completed = 1;
+
+ action = SIMIX_action_communicate(shost, dhost, NULL, request->datatype->size * request->count * 1.0, -1.0);
+
+ SIMIX_register_action_to_condition(action, request->simdata->cond);
+
+ SIMIX_cond_wait(request->simdata->cond, request->simdata->mutex);
+
+ SIMIX_mutex_unlock(request->simdata->mutex);
+
+ // wake up receiver if necessary
+ receiver_process = smpi_global->receiver_processes[drank];
+ if (SIMIX_process_is_suspended(receiver_process)) {
+ SIMIX_process_resume(receiver_process);
+ }
+
+ }
+
+ SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
+ running_hosts_count = smpi_global->running_hosts_count;
+ SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
+
+ } while (0 < running_hosts_count);
+
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ smpi_global->ready_process_count--;
+ if (smpi_global->ready_process_count == 0) {
+ SIMIX_cond_broadcast(smpi_global->start_stop_cond);
+ } else if (smpi_global->ready_process_count < 0) {
+ // FIXME: can't happen! abort!
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ return 0;
+}
+
return smpi_mpi_comm_rank(comm, SIMIX_host_self());
}
-int smpi_sender(int argc, char **argv)
-{
- smx_process_t self;
- smx_host_t shost;
- int rank;
-
- xbt_fifo_t request_queue;
- smx_mutex_t request_queue_mutex;
- int size;
-
- int running_hosts_count;
-
- smpi_mpi_request_t *request;
-
- smx_host_t dhost;
-
- smx_action_t action;
-
- smpi_received_message_t *message;
-
- int drank;
-
- smx_process_t receiver_process;
-
- self = SIMIX_process_self();
- shost = SIMIX_host_self();
- rank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost);
-
- // make sure root is done before own initialization
- SIMIX_mutex_lock(smpi_global->start_stop_mutex);
- if (!smpi_global->root_ready) {
- SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
- }
- SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
-
- request_queue = smpi_global->pending_send_request_queues[rank];
- request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank];
- size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
-
- smpi_global->sender_processes[rank] = self;
-
- // wait for all nodes to signal initializatin complete
- SIMIX_mutex_lock(smpi_global->start_stop_mutex);
- smpi_global->ready_process_count++;
- if (smpi_global->ready_process_count < 3 * size) {
- SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
- } else {
- SIMIX_cond_broadcast(smpi_global->start_stop_cond);
- }
- SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
-
- do {
-
- SIMIX_mutex_lock(request_queue_mutex);
- request = xbt_fifo_shift(request_queue);
- SIMIX_mutex_unlock(request_queue_mutex);
-
- if (NULL == request) {
- SIMIX_process_suspend(self);
- } else {
-
- message = xbt_mallocator_get(smpi_global->message_mallocator);
-
- SIMIX_mutex_lock(request->simdata->mutex);
-
- message->comm = request->comm;
- message->src = request->src;
- message->dst = request->dst;
- message->tag = request->tag;
- message->buf = xbt_malloc(request->datatype->size * request->count);
- memcpy(message->buf, request->buf, request->datatype->size * request->count);
-
- dhost = request->comm->simdata->hosts[request->dst];
- drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost);
-
- SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]);
- xbt_fifo_push(smpi_global->received_message_queues[drank], message);
- SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]);
-
- request->completed = 1;
-
- action = SIMIX_action_communicate(shost, dhost, NULL, request->datatype->size * request->count * 1.0, -1.0);
-
- SIMIX_register_action_to_condition(action, request->simdata->cond);
-
- SIMIX_cond_wait(request->simdata->cond, request->simdata->mutex);
-
- SIMIX_mutex_unlock(request->simdata->mutex);
-
- // wake up receiver if necessary
- receiver_process = smpi_global->receiver_processes[drank];
- if (SIMIX_process_is_suspended(receiver_process)) {
- SIMIX_process_resume(receiver_process);
- }
-
- }
-
- SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
- running_hosts_count = smpi_global->running_hosts_count;
- SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
-
- } while (0 < running_hosts_count);
-
- SIMIX_mutex_lock(smpi_global->start_stop_mutex);
- smpi_global->ready_process_count--;
- if (smpi_global->ready_process_count == 0) {
- SIMIX_cond_broadcast(smpi_global->start_stop_cond);
- } else if (smpi_global->ready_process_count < 0) {
- // FIXME: can't happen! abort!
- }
- SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
-
- return 0;
-}
-
-int smpi_receiver(int argc, char **argv)
-{
- smx_process_t self;
- int rank;
-
- xbt_fifo_t request_queue;
- smx_mutex_t request_queue_mutex;
- xbt_fifo_t message_queue;
- smx_mutex_t message_queue_mutex;
- int size;
-
- int running_hosts_count;
-
- smpi_mpi_request_t *request;
- smpi_received_message_t *message;
-
- xbt_fifo_item_t request_item;
- xbt_fifo_item_t message_item;
-
- self = SIMIX_process_self();
- rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
-
- // make sure root is done before own initialization
- SIMIX_mutex_lock(smpi_global->start_stop_mutex);
- if (!smpi_global->root_ready) {
- SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
- }
- SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
-
- request_queue = smpi_global->pending_recv_request_queues[rank];
- request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
- message_queue = smpi_global->received_message_queues[rank];
- message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
- size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
-
- smpi_global->receiver_processes[rank] = self;
-
- // wait for all nodes to signal initializatin complete
- SIMIX_mutex_lock(smpi_global->start_stop_mutex);
- smpi_global->ready_process_count++;
- if (smpi_global->ready_process_count < 3 * size) {
- SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
- } else {
- SIMIX_cond_broadcast(smpi_global->start_stop_cond);
- }
- SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
-
- do {
- request = NULL;
- message = NULL;
-
- // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
-
- // FIXME: not the best way to request multiple locks...
- SIMIX_mutex_lock(request_queue_mutex);
- SIMIX_mutex_lock(message_queue_mutex);
- for (request_item = xbt_fifo_get_first_item(request_queue);
- NULL != request_item;
- request_item = xbt_fifo_get_next_item(request_item)) {
- request = xbt_fifo_get_item_content(request_item);
- for (message_item = xbt_fifo_get_first_item(message_queue);
- NULL != message_item;
- message_item = xbt_fifo_get_next_item(message_item)) {
- message = xbt_fifo_get_item_content(message_item);
- if (request->comm == message->comm &&
- (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
- request->tag == message->tag) {
- xbt_fifo_remove_item(request_queue, request_item);
- xbt_fifo_remove_item(message_queue, message_item);
- goto stopsearch;
- }
- }
- }
-stopsearch:
- SIMIX_mutex_unlock(message_queue_mutex);
- SIMIX_mutex_unlock(request_queue_mutex);
-
- if (NULL == request || NULL == message) {
- SIMIX_process_suspend(self);
- } else {
- SIMIX_mutex_lock(request->simdata->mutex);
-
- memcpy(request->buf, message->buf, request->datatype->size * request->count);
- request->src = message->src;
- request->completed = 1;
- SIMIX_cond_broadcast(request->simdata->cond);
-
- SIMIX_mutex_unlock(request->simdata->mutex);
-
- xbt_free(message->buf);
- xbt_mallocator_release(smpi_global->message_mallocator, message);
- }
-
- SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
- running_hosts_count = smpi_global->running_hosts_count;
- SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
-
- } while (0 < running_hosts_count);
-
- SIMIX_mutex_lock(smpi_global->start_stop_mutex);
- smpi_global->ready_process_count--;
- if (smpi_global->ready_process_count == 0) {
- SIMIX_cond_broadcast(smpi_global->start_stop_cond);
- } else if (smpi_global->ready_process_count < 0) {
- // FIXME: can't happen, abort!
- }
- SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
-
- return 0;
-}
-
void *smpi_request_new()
{
smpi_mpi_request_t *request = xbt_new(smpi_mpi_request_t, 1);