typedef smpi_mpi_op_t MPI_Op;
// MPI_Status
-// FIXME: status is kind of an odd duck, is this required by the standard?
struct smpi_mpi_status_t {
int MPI_SOURCE;
int MPI_TAG;
extern smpi_mpi_global_t smpi_mpi_global;
#define MPI_COMM_WORLD (smpi_mpi_global->mpi_comm_world)
+#define MPI_COMM_NULL NULL
#define MPI_STATUS_IGNORE NULL
// smpi mpi communicator
typedef struct smpi_mpi_communicator_t {
int size;
- smx_host_t *hosts;
- smx_process_t *processes;
int barrier_count;
smx_mutex_t barrier_mutex;
smx_cond_t barrier_cond;
+
+ int *rank_to_index_map;
+ int *index_to_rank_map;
+
} s_smpi_mpi_communicator_t;
// smpi mpi datatype
int tag;
void *buf;
- smpi_mpi_datatype_t datatype;
int count;
+ smpi_mpi_datatype_t datatype;
short int completed :1;
} s_smpi_mpi_request_t;
// smpi mpi op
+// FIXME: type should be (void *a, void *b, int *length, MPI_Datatype *datatype)
+//, oper is b[i] = a[i] op b[i]
typedef struct smpi_mpi_op_t {
void (*func)(void *x, void *y, void *z);
} s_smpi_mpi_op_t;
typedef struct smpi_received_message_t {
smpi_mpi_communicator_t comm;
int src;
- int dst;
int tag;
+
void *buf;
} s_smpi_received_message_t;
typedef struct smpi_received_message_t *smpi_received_message_t;
smx_mutex_t start_stop_mutex;
smx_cond_t start_stop_cond;
+ smx_host_t *hosts;
+ int host_count;
xbt_mallocator_t request_mallocator;
xbt_mallocator_t message_mallocator;
// function prototypes
void smpi_mpi_init(void);
void smpi_mpi_finalize(void);
-int smpi_mpi_comm_size(smpi_mpi_communicator_t comm);
-int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm, smx_host_t host);
-int smpi_mpi_comm_rank_self(smpi_mpi_communicator_t comm);
-int smpi_mpi_comm_world_rank_self(void);
+int smpi_mpi_comm_size(smpi_mpi_communicator_t comm, int *size);
+int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm, int *rank);
int smpi_mpi_barrier(smpi_mpi_communicator_t comm);
int smpi_mpi_isend(smpi_mpi_request_t request);
int smpi_mpi_irecv(smpi_mpi_request_t request);
void smpi_global_init(void);
void smpi_global_destroy(void);
+int smpi_host_index(void);
int smpi_run_simulation(int argc, char **argv);
int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype,
int src, int dst, int tag, smpi_mpi_communicator_t comm, smpi_mpi_request_t *request);
*(int *)z = *(int *)x + *(int *)y;
}
-int inline smpi_mpi_comm_size(smpi_mpi_communicator_t comm)
+int inline smpi_mpi_comm_size(smpi_mpi_communicator_t comm, int *size)
{
- return comm->size;
+ int retval = MPI_SUCCESS;
+ if (NULL == size) {
+ retval = MPI_ERR_ARG;
+ } else {
+ *size = comm->size;
+ }
+ return retval;
}
// FIXME: smarter algorithm?
-int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm, smx_host_t host)
+int smpi_mpi_comm_rank(smpi_mpi_communicator_t comm, int *rank)
{
int i;
+ int retval = MPI_SUCCESS;
- for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
+ if (NULL == rank) {
+ retval = MPI_ERR_ARG;
+ } else {
+ smx_host_t host = SIMIX_host_self();
- return i;
-}
+ for (i = 0; i < comm->size && host != smpi_global->hosts[comm->rank_to_index_map[i]]; i++);
-int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t comm)
-{
- return smpi_mpi_comm_rank(comm, SIMIX_host_self());
+ *rank = i;
+ }
+
+ return retval;
}
void smpi_mpi_init()
{
- smx_process_t process;
smx_host_t host;
smx_host_t *hosts;
- int size;
+ int host_count;
+ int i;
SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
smpi_global->running_hosts_count++;
SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
// initialize some local variables
- process = SIMIX_process_self();
- host = SIMIX_host_self();
- hosts = SIMIX_host_get_table();
- size = SIMIX_host_get_number();
+ host = SIMIX_host_self();
+ hosts = SIMIX_host_get_table();
+ host_count = SIMIX_host_get_number();
// node 0 sets the globals
if (host == hosts[0]) {
- smpi_mpi_global = xbt_new(s_smpi_mpi_global_t, 1);
+ smpi_global->hosts = hosts;
+ smpi_global->host_count = host_count;
+
+ smpi_mpi_global = xbt_new(s_smpi_mpi_global_t, 1);
// global communicator
- smpi_mpi_global->mpi_comm_world = xbt_new(s_smpi_mpi_communicator_t, 1);
- smpi_mpi_global->mpi_comm_world->size = size;
- smpi_mpi_global->mpi_comm_world->hosts = hosts;
- smpi_mpi_global->mpi_comm_world->processes = xbt_new(smx_process_t, size);
- smpi_mpi_global->mpi_comm_world->processes[0] = process;
- smpi_mpi_global->mpi_comm_world->barrier_count = 0;
- smpi_mpi_global->mpi_comm_world->barrier_mutex = SIMIX_mutex_init();
- smpi_mpi_global->mpi_comm_world->barrier_cond = SIMIX_cond_init();
+ smpi_mpi_global->mpi_comm_world = xbt_new(s_smpi_mpi_communicator_t, 1);
+ smpi_mpi_global->mpi_comm_world->size = host_count;
+ smpi_mpi_global->mpi_comm_world->barrier_count = 0;
+ smpi_mpi_global->mpi_comm_world->barrier_mutex = SIMIX_mutex_init();
+ smpi_mpi_global->mpi_comm_world->barrier_cond = SIMIX_cond_init();
+ smpi_mpi_global->mpi_comm_world->rank_to_index_map = xbt_new(int, host_count);
+ smpi_mpi_global->mpi_comm_world->index_to_rank_map = xbt_new(int, host_count);
+ for (i = 0; i < host_count; i++) {
+ smpi_mpi_global->mpi_comm_world->rank_to_index_map[i] = i;
+ smpi_mpi_global->mpi_comm_world->index_to_rank_map[i] = i;
+ }
// mpi datatypes
- smpi_mpi_global->mpi_byte = xbt_new(s_smpi_mpi_datatype_t, 1);
- smpi_mpi_global->mpi_byte->size = (size_t)1;
- smpi_mpi_global->mpi_int = xbt_new(s_smpi_mpi_datatype_t, 1);
- smpi_mpi_global->mpi_int->size = sizeof(int);
- smpi_mpi_global->mpi_double = xbt_new(s_smpi_mpi_datatype_t, 1);
- smpi_mpi_global->mpi_double->size = sizeof(double);
+ smpi_mpi_global->mpi_byte = xbt_new(s_smpi_mpi_datatype_t, 1);
+ smpi_mpi_global->mpi_byte->size = (size_t)1;
+ smpi_mpi_global->mpi_int = xbt_new(s_smpi_mpi_datatype_t, 1);
+ smpi_mpi_global->mpi_int->size = sizeof(int);
+ smpi_mpi_global->mpi_double = xbt_new(s_smpi_mpi_datatype_t, 1);
+ smpi_mpi_global->mpi_double->size = sizeof(double);
// mpi operations
- smpi_mpi_global->mpi_land = xbt_new(s_smpi_mpi_op_t, 1);
- smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
- smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
- smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
+ smpi_mpi_global->mpi_land = xbt_new(s_smpi_mpi_op_t, 1);
+ smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
+ smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
+ smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
// signal all nodes to perform initialization
SIMIX_mutex_lock(smpi_global->start_stop_mutex);
}
SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
- smpi_mpi_global->mpi_comm_world->processes[smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world)] = process;
}
// wait for all nodes to signal initializatin complete
SIMIX_mutex_lock(smpi_global->start_stop_mutex);
smpi_global->ready_process_count++;
- if (smpi_global->ready_process_count < 3 * size) {
+ if (smpi_global->ready_process_count < 3 * host_count) {
SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
} else {
SIMIX_cond_broadcast(smpi_global->start_stop_cond);
if (0 >= i) {
// wake up senders/receivers
- for (i = 0; i < smpi_mpi_global->mpi_comm_world->size; i++) {
+ for (i = 0; i < smpi_global->host_count; i++) {
if (SIMIX_process_is_suspended(smpi_global->sender_processes[i])) {
SIMIX_process_resume(smpi_global->sender_processes[i]);
}
SIMIX_mutex_destroy(smpi_mpi_global->mpi_comm_world->barrier_mutex);
SIMIX_cond_destroy(smpi_mpi_global->mpi_comm_world->barrier_cond);
- xbt_free(smpi_mpi_global->mpi_comm_world->processes);
xbt_free(smpi_mpi_global->mpi_comm_world);
xbt_free(smpi_mpi_global->mpi_byte);
int smpi_mpi_isend(smpi_mpi_request_t request)
{
int retval = MPI_SUCCESS;
- int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
+ int index = smpi_host_index();
if (NULL == request) {
retval = MPI_ERR_INTERN;
} else {
- SIMIX_mutex_lock(smpi_global->pending_send_request_queues_mutexes[rank]);
- xbt_fifo_push(smpi_global->pending_send_request_queues[rank], request);
- SIMIX_mutex_unlock(smpi_global->pending_send_request_queues_mutexes[rank]);
+ SIMIX_mutex_lock(smpi_global->pending_send_request_queues_mutexes[index]);
+ xbt_fifo_push(smpi_global->pending_send_request_queues[index], request);
+ SIMIX_mutex_unlock(smpi_global->pending_send_request_queues_mutexes[index]);
- if (SIMIX_process_is_suspended(smpi_global->sender_processes[rank])) {
- SIMIX_process_resume(smpi_global->sender_processes[rank]);
+ if (SIMIX_process_is_suspended(smpi_global->sender_processes[index])) {
+ SIMIX_process_resume(smpi_global->sender_processes[index]);
}
}
int smpi_mpi_irecv(smpi_mpi_request_t request)
{
int retval = MPI_SUCCESS;
- int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
+ int index = smpi_host_index();
if (NULL == request) {
retval = MPI_ERR_INTERN;
} else {
- SIMIX_mutex_lock(smpi_global->pending_recv_request_queues_mutexes[rank]);
- xbt_fifo_push(smpi_global->pending_recv_request_queues[rank], request);
- SIMIX_mutex_unlock(smpi_global->pending_recv_request_queues_mutexes[rank]);
+ SIMIX_mutex_lock(smpi_global->pending_recv_request_queues_mutexes[index]);
+ xbt_fifo_push(smpi_global->pending_recv_request_queues[index], request);
+ SIMIX_mutex_unlock(smpi_global->pending_recv_request_queues_mutexes[index]);
- if (SIMIX_process_is_suspended(smpi_global->receiver_processes[rank])) {
- SIMIX_process_resume(smpi_global->receiver_processes[rank]);
+ if (SIMIX_process_is_suspended(smpi_global->receiver_processes[index])) {
+ SIMIX_process_resume(smpi_global->receiver_processes[index]);
}
}
void smpi_bench_begin()
{
- int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
+ int index = smpi_host_index();
- SIMIX_mutex_lock(smpi_global->timers_mutexes[rank]);
+ SIMIX_mutex_lock(smpi_global->timers_mutexes[index]);
- xbt_os_timer_start(smpi_global->timers[rank]);
+ xbt_os_timer_start(smpi_global->timers[index]);
return;
}
void smpi_bench_end()
{
- int rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
+ int index = smpi_host_index();
double duration;
smx_host_t host;
char computation[] = "computation";
smx_mutex_t mutex;
smx_cond_t cond;
- xbt_os_timer_stop(smpi_global->timers[rank]);
+ xbt_os_timer_stop(smpi_global->timers[index]);
- duration = xbt_os_timer_elapsed(smpi_global->timers[rank]);
+ duration = xbt_os_timer_elapsed(smpi_global->timers[index]);
- SIMIX_mutex_unlock(smpi_global->timers_mutexes[rank]);
+ SIMIX_mutex_unlock(smpi_global->timers_mutexes[index]);
- host = smpi_mpi_global->mpi_comm_world->hosts[rank];
+ host = smpi_global->hosts[index];
action = SIMIX_action_execute(host, computation, duration * SMPI_DEFAULT_SPEED);
mutex = SIMIX_mutex_init();
cond = SIMIX_cond_init();
smpi_global->start_stop_mutex = SIMIX_mutex_init();
smpi_global->start_stop_cond = SIMIX_cond_init();
- // processes
- smpi_global->sender_processes = xbt_new(smx_process_t, size);
- smpi_global->receiver_processes = xbt_new(smx_process_t, size);
+ // host info blank until sim starts
+ // FIXME: is this okay?
+ smpi_global->hosts = NULL;
+ smpi_global->host_count = 0;
// running hosts
smpi_global->running_hosts_count_mutex = SIMIX_mutex_init();
smpi_global->received_message_queues = xbt_new(xbt_fifo_t, size);
smpi_global->received_message_queues_mutexes = xbt_new(smx_mutex_t, size);
+ // sender/receiver processes
+ smpi_global->sender_processes = xbt_new(smx_process_t, size);
+ smpi_global->receiver_processes = xbt_new(smx_process_t, size);
+
// timers
smpi_global->timers = xbt_new(xbt_os_timer_t, size);
smpi_global->timers_mutexes = xbt_new(smx_mutex_t, size);
smpi_global = NULL;
}
+// FIXME: smarter algorithm?
+int smpi_host_index()
+{
+ int i;
+ smx_host_t host = SIMIX_host_self();
+
+ for(i = smpi_global->host_count - 1; i > 0 && host != smpi_global->hosts[i]; i--);
+
+ return i;
+}
+
int smpi_run_simulation(int argc, char **argv)
{
xbt_fifo_item_t cond_item = NULL;
} else if (NULL == rank) {
retval = MPI_ERR_ARG;
} else {
- *rank = smpi_mpi_comm_rank_self(comm);
+ retval = smpi_mpi_comm_rank(comm, rank);
}
smpi_bench_begin();
int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request)
{
int retval = MPI_SUCCESS;
- int dst;
+ int dst = 0;
smpi_bench_end();
- dst = smpi_mpi_comm_rank_self(comm);
+ //dst = smpi_mpi_comm_rank(comm);
if (NULL == request) {
retval = MPI_ERR_ARG;
} else {
int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status)
{
int retval = MPI_SUCCESS;
- int dst;
+ int dst = 0;
smpi_mpi_request_t request;
smpi_bench_end();
- dst = smpi_mpi_comm_rank_self(comm);
+ // FIXME: necessary?
+ //dst = smpi_mpi_comm_rank(comm);
retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &request);
if (NULL != request && MPI_SUCCESS == retval) {
int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm, MPI_Request *request)
{
int retval = MPI_SUCCESS;
- int src;
+ int src = 0;
smpi_bench_end();
- src = smpi_mpi_comm_rank_self(comm);
+ //src = smpi_mpi_comm_rank(comm);
if (NULL == request) {
retval = MPI_ERR_ARG;
} else {
int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
int retval = MPI_SUCCESS;
- int src;
+ int src = 0;
smpi_mpi_request_t request;
smpi_bench_end();
- src = smpi_mpi_comm_rank_self(comm);
+ //src = smpi_mpi_comm_rank(comm);
retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &request);
if (NULL != request && MPI_SUCCESS == retval) {
retval = smpi_mpi_isend(request);
int smpi_receiver(int argc, char **argv)
{
smx_process_t self;
- int rank;
+ int index;
xbt_fifo_t request_queue;
smx_mutex_t request_queue_mutex;
xbt_fifo_t message_queue;
smx_mutex_t message_queue_mutex;
+ // FIXME: remove? also sender
int size;
int running_hosts_count;
}
SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
- rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
- size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+ index = smpi_host_index();
+ size = smpi_global->host_count;
- request_queue = smpi_global->pending_recv_request_queues[rank];
- request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
- message_queue = smpi_global->received_message_queues[rank];
- message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
+ request_queue = smpi_global->pending_recv_request_queues[index];
+ request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[index];
+ message_queue = smpi_global->received_message_queues[index];
+ message_queue_mutex = smpi_global->received_message_queues_mutexes[index];
- smpi_global->receiver_processes[rank] = self;
+ smpi_global->receiver_processes[index] = self;
// wait for all nodes to signal initializatin complete
SIMIX_mutex_lock(smpi_global->start_stop_mutex);
{
smx_process_t self;
smx_host_t shost;
- int rank;
+ int index;
xbt_fifo_t request_queue;
smx_mutex_t request_queue_mutex;
smpi_received_message_t message;
- int drank;
+ int dindex;
smx_process_t receiver_process;
}
SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
- rank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost);
- size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+ index = smpi_host_index();
+ size = smpi_global->host_count;
- request_queue = smpi_global->pending_send_request_queues[rank];
- request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank];
+ request_queue = smpi_global->pending_send_request_queues[index];
+ request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[index];
- smpi_global->sender_processes[rank] = self;
+ smpi_global->sender_processes[index] = self;
// wait for all nodes to signal initializatin complete
SIMIX_mutex_lock(smpi_global->start_stop_mutex);
SIMIX_mutex_lock(request->mutex);
message->comm = request->comm;
- message->src = request->src;
- message->dst = request->dst;
+ // FIXME: maybe we don't need this map
+ message->src = request->comm->index_to_rank_map[index];
message->tag = request->tag;
message->buf = xbt_malloc(request->datatype->size * request->count);
memcpy(message->buf, request->buf, request->datatype->size * request->count);
- dhost = request->comm->hosts[request->dst];
- drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost);
+ dindex = request->comm->rank_to_index_map[request->dst];
+ dhost = smpi_global->hosts[dindex];
- SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]);
- xbt_fifo_push(smpi_global->received_message_queues[drank], message);
- SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]);
+ SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]);
+ xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
+ SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[dindex]);
request->completed = 1;
//SIMIX_action_destroy(action);
// wake up receiver if necessary
- receiver_process = smpi_global->receiver_processes[drank];
+ receiver_process = smpi_global->receiver_processes[dindex];
if (SIMIX_process_is_suspended(receiver_process)) {
SIMIX_process_resume(receiver_process);
}