#include <sys/time.h>
#include "xbt/xbt_portability.h"
#include "simix/simix.h"
+#include "simix/private.h"
#include "smpi.h"
xbt_fifo_t *smpi_pending_send_requests = NULL;
static int smpi_benchmarking;
static double smpi_reference_speed;
+// mutexes
+smx_mutex_t smpi_running_hosts_mutex = NULL;
+smx_mutex_t smpi_benchmarking_mutex = NULL;
+smx_mutex_t init_mutex = NULL;
+smx_cond_t init_cond = NULL;
+
+int rootready = 0;
+int readycount = 0;
+
XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
-void smpi_sender()
+int smpi_sender(int argc, char **argv)
{
- return;
+ return 0;
}
-void smpi_receiver()
+int smpi_receiver(int argc, char **argv)
{
- return;
+ return 0;
}
int smpi_run_simulation(int argc, char **argv)
srand(SMPI_RAND_SEED);
SIMIX_global_init(&argc, argv);
+
+ init_mutex = SIMIX_mutex_init();
+ init_cond = SIMIX_cond_init();
+
SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
SIMIX_create_environment(argv[1]);
SIMIX_launch_application(argv[2]);
{
int i;
int size;
+ smx_process_t process;
smx_host_t *hosts;
smx_host_t host;
double duration;
- // FIXME: mutex?
- smpi_running_hosts++;
-
// initialize some local variables
host = SIMIX_host_self();
hosts = SIMIX_host_get_table();
// node 0 sets the globals
if (host == hosts[0]) {
+ // running hosts
+ smpi_running_hosts_mutex = SIMIX_mutex_init();
+ smpi_running_hosts = size;
+
// global communicator
- smpi_mpi_comm_world.size = size;
- smpi_mpi_comm_world.barrier = 0;
- smpi_mpi_comm_world.hosts = hosts;
- smpi_mpi_comm_world.processes = xbt_new0(smx_process_t, size);
- smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
+ smpi_mpi_comm_world.size = size;
+ smpi_mpi_comm_world.barrier = 0;
+ smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
+ smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
+ smpi_mpi_comm_world.hosts = hosts;
+ smpi_mpi_comm_world.processes = xbt_new0(smx_process_t, size);
+ smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
// mpi datatypes
- smpi_mpi_byte.size = (size_t)1;
- smpi_mpi_int.size = sizeof(int);
- smpi_mpi_double.size = sizeof(double);
+ smpi_mpi_byte.size = (size_t)1;
+ smpi_mpi_int.size = sizeof(int);
+ smpi_mpi_double.size = sizeof(double);
// mpi operations
- smpi_mpi_land.func = &smpi_mpi_land_func;
- smpi_mpi_sum.func = &smpi_mpi_sum_func;
+ smpi_mpi_land.func = &smpi_mpi_land_func;
+ smpi_mpi_sum.func = &smpi_mpi_sum_func;
// smpi globals
- smpi_pending_send_requests = xbt_new0(xbt_fifo_t, size);
- smpi_pending_recv_requests = xbt_new0(xbt_fifo_t, size);
- smpi_received_messages = xbt_new0(xbt_fifo_t, size);
+ smpi_pending_send_requests = xbt_new0(xbt_fifo_t, size);
+ smpi_pending_recv_requests = xbt_new0(xbt_fifo_t, size);
+ smpi_received_messages = xbt_new0(xbt_fifo_t, size);
for(i = 0; i < size; i++) {
smpi_pending_send_requests[i] = xbt_fifo_new();
smpi_timer = xbt_os_timer_new();
smpi_reference_speed = SMPI_DEFAULT_SPEED;
smpi_benchmarking = 0;
+ smpi_benchmarking_mutex = SIMIX_mutex_init();
- // FIXME: tell other nodes to initialize, and wait for all clear
-
- // FIXME: send everyone okay to begin
+ // signal all nodes to perform initialization
+ SIMIX_mutex_lock(init_mutex);
+ rootready = 1;
+ SIMIX_cond_broadcast(init_cond);
+ SIMIX_mutex_unlock(init_mutex);
} else {
- // FIMXE: wait for node 0
+
+ // make sure root is done before own initialization
+ SIMIX_mutex_lock(init_mutex);
+ if (!rootready) {
+ SIMIX_cond_wait(init_cond, init_mutex);
+ }
+ SIMIX_mutex_unlock(init_mutex);
+
smpi_mpi_comm_world.processes[smpi_mpi_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
- // FIXME: signal node 0
- // FIXME: wait for node 0
+
}
+
+ // wait for all nodes to signal initializatin complete
+ SIMIX_mutex_lock(init_mutex);
+ readycount++;
+ if (readycount < size) {
+ SIMIX_cond_wait(init_cond, init_mutex);
+ } else {
+ SIMIX_cond_broadcast(init_cond);
+ }
+ SIMIX_mutex_unlock(init_mutex);
+
}
void smpi_mpi_finalize()
{
int i;
- // FIXME: mutex?
- smpi_running_hosts--;
+ SIMIX_mutex_lock(smpi_running_hosts_mutex);
+ i = --smpi_running_hosts;
+ SIMIX_mutex_unlock(smpi_running_hosts_mutex);
+
+ if (0 >= i) {
+
+ SIMIX_mutex_destroy(smpi_running_hosts_mutex);
- if (0 >= smpi_running_hosts) {
for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
xbt_fifo_free(smpi_pending_send_requests[i]);
xbt_fifo_free(smpi_pending_recv_requests[i]);
xbt_fifo_free(smpi_received_messages[i]);
}
+
xbt_free(smpi_pending_send_requests);
xbt_free(smpi_pending_recv_requests);
xbt_free(smpi_received_messages);
+
+ SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
+ SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
xbt_free(smpi_mpi_comm_world.processes);
+
xbt_os_timer_free(smpi_timer);
}
+
}
void smpi_bench_begin()
void smpi_bench_end()
{
double duration;
+ smx_host_t host;
+ smx_action_t compute_action;
+ smx_mutex_t mutex;
+ smx_cond_t cond;
+
xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
smpi_benchmarking = 0;
xbt_os_timer_stop(smpi_timer);
duration = xbt_os_timer_elapsed(smpi_timer);
- // FIXME: add simix call to perform computation
+ host = SIMIX_host_self();
+ compute_action = SIMIX_action_sleep(host, "computation", duration * SMPI_DEFAULT_SPEED);
+ mutex = SIMIX_mutex_init();
+ cond = SIMIX_cond_init();
+ SIMIX_mutex_lock(mutex);
+ SIMIX_register_condition_to_action(compute_action, cond);
+ SIMIX_register_action_to_condition(compute_action, cond);
+ SIMIX_cond_wait(cond, mutex);
+ SIMIX_mutex_unlock(mutex);
+ SIMIX_mutex_destroy(mutex);
+ SIMIX_cond_destroy(cond);
+ // FIXME: check for success/failure?
return;
}
void smpi_barrier(smpi_mpi_communicator_t *comm) {
int i;
- // FIXME: mutex
+ SIMIX_mutex_lock(comm->barrier_mutex);
comm->barrier++;
- if(comm->barrier < comm->size) {
- SIMIX_process_suspend(SIMIX_process_self());
+ if(i < comm->size) {
+ SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
} else {
comm->barrier = 0;
- for(i = 0; i < comm->size; i++) {
- if (SIMIX_process_is_suspended(comm->processes[i])) {
- SIMIX_process_resume(comm->processes[i]);
- }
- }
+ SIMIX_cond_broadcast(comm->barrier_cond);
}
+ SIMIX_mutex_unlock(comm->barrier_mutex);
}
int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
unsigned int smpi_sleep(unsigned int seconds)
{
- smx_host_t self;
+ smx_mutex_t mutex;
+ smx_cond_t cond;
+ smx_host_t host;
smx_action_t sleep_action;
smpi_bench_end();
- // FIXME: simix sleep
- self = SIMIX_host_self();
- sleep_action = SIMIX_action_sleep(self, seconds);
+ host = SIMIX_host_self();
+ sleep_action = SIMIX_action_sleep(host, seconds);
+ mutex = SIMIX_mutex_init();
+ cond = SIMIX_cond_init();
+ SIMIX_mutex_lock(mutex);
+ SIMIX_register_condition_to_action(sleep_action, cond);
+ SIMIX_register_action_to_condition(sleep_action, cond);
+ SIMIX_cond_wait(cond, mutex);
+ SIMIX_mutex_unlock(mutex);
+ SIMIX_mutex_destroy(mutex);
+ SIMIX_cond_destroy(cond);
+ // FIXME: check for success/failure?
smpi_bench_begin();
return 0;
}
void smpi_exit(int status)
{
smpi_bench_end();
- // FIXME: mutex
+ SIMIX_mutex_lock(smpi_running_hosts_mutex);
smpi_running_hosts--;
+ SIMIX_mutex_unlock(smpi_running_hosts_mutex);
SIMIX_process_kill(SIMIX_process_self());
return;
}