git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3893
48e7efb5-ca39-0410-a469-
dd3cf9ba447f
smx_mutex_t smpi_running_hosts_mutex = NULL;
smx_mutex_t smpi_benchmarking_mutex = NULL;
smx_mutex_t init_mutex = NULL;
smx_mutex_t smpi_running_hosts_mutex = NULL;
smx_mutex_t smpi_benchmarking_mutex = NULL;
smx_mutex_t init_mutex = NULL;
-smx_cond_t init_cond = NULL;
+smx_cond_t init_cond = NULL;
-int smpi_root_ready = 0;
+int smpi_root_ready = 0;
int smpi_ready_count = 0;
XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
int smpi_ready_count = 0;
XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
smx_process_t self;
smx_host_t shost;
int rank;
smx_process_t self;
smx_host_t shost;
int rank;
xbt_fifo_t request_queue;
smx_mutex_t request_queue_mutex;
int size;
xbt_fifo_t request_queue;
smx_mutex_t request_queue_mutex;
int size;
smpi_mpi_request_t *request;
smpi_mpi_request_t *request;
smx_action_t communicate_action;
smx_action_t communicate_action;
- smpi_received_message_t *scratch;
+
+ smpi_received_message_t *message;
+
smx_process_t waitproc;
self = SIMIX_process_self();
smx_process_t waitproc;
self = SIMIX_process_self();
request_queue = smpi_pending_send_requests[rank];
request_queue_mutex = smpi_pending_send_requests_mutex[rank];
request_queue = smpi_pending_send_requests[rank];
request_queue_mutex = smpi_pending_send_requests_mutex[rank];
-
- size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
+ size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
smpi_sender_processes[rank] = self;
smpi_sender_processes[rank] = self;
SIMIX_cond_wait(request->cond, request->mutex);
// copy request to appropriate received queue
SIMIX_cond_wait(request->cond, request->mutex);
// copy request to appropriate received queue
- scratch = xbt_mallocator_get(smpi_message_mallocator);
- scratch->comm = request->comm;
- scratch->src = request->src;
- scratch->dst = request->dst;
- scratch->tag = request->tag;
- scratch->buf = xbt_malloc(request->datatype->size * request->count);
- memcpy(scratch->buf, request->buf, request->datatype->size * request->count);
+ message = xbt_mallocator_get(smpi_message_mallocator);
+ message->comm = request->comm;
+ message->src = request->src;
+ message->dst = request->dst;
+ message->tag = request->tag;
+ message->buf = xbt_malloc(request->datatype->size * request->count);
+ memcpy(message->buf, request->buf, request->datatype->size * request->count);
+
drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
- xbt_fifo_push(smpi_received_messages[drank], scratch);
+ xbt_fifo_push(smpi_received_messages[drank], message);
SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
request->completed = 1;
SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
request->completed = 1;
{
smx_process_t self;
int rank;
{
smx_process_t self;
int rank;
xbt_fifo_t request_queue;
smx_mutex_t request_queue_mutex;
xbt_fifo_t message_queue;
smx_mutex_t message_queue_mutex;
int size;
xbt_fifo_t request_queue;
smx_mutex_t request_queue_mutex;
xbt_fifo_t message_queue;
smx_mutex_t message_queue_mutex;
int size;
- xbt_fifo_item_t request_item, message_item;
smpi_mpi_request_t *request;
smpi_received_message_t *message;
smpi_mpi_request_t *request;
smpi_received_message_t *message;
+
+ xbt_fifo_item_t request_item;
+ xbt_fifo_item_t message_item;
+
smx_process_t waitproc;
self = SIMIX_process_self();
smx_process_t waitproc;
self = SIMIX_process_self();
request_queue = smpi_pending_recv_requests[rank];
request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
request_queue = smpi_pending_recv_requests[rank];
request_queue_mutex = smpi_pending_recv_requests_mutex[rank];
message_queue = smpi_received_messages[rank];
message_queue_mutex = smpi_received_messages_mutex[rank];
message_queue = smpi_received_messages[rank];
message_queue_mutex = smpi_received_messages_mutex[rank];
+ size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
- size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
smpi_receiver_processes[rank] = self;
// wait for all nodes to signal initializatin complete
smpi_receiver_processes[rank] = self;
// wait for all nodes to signal initializatin complete
SIMIX_process_suspend(self);
} else {
SIMIX_mutex_lock(request->mutex);
SIMIX_process_suspend(self);
} else {
SIMIX_mutex_lock(request->mutex);
memcpy(request->buf, message->buf, request->count * request->datatype->size);
request->src = message->src;
request->completed = 1;
memcpy(request->buf, message->buf, request->count * request->datatype->size);
request->src = message->src;
request->completed = 1;
SIMIX_process_resume(waitproc);
}
}
SIMIX_process_resume(waitproc);
}
}
SIMIX_mutex_unlock(request->mutex);
SIMIX_mutex_unlock(request->mutex);
+ xbt_free(message->buf);
xbt_mallocator_release(smpi_message_mallocator, message);
}
xbt_mallocator_release(smpi_message_mallocator, message);
}
SIMIX_global_init(&argc, argv);
SIMIX_global_init(&argc, argv);
init_mutex = SIMIX_mutex_init();
init_cond = SIMIX_cond_init();
SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
SIMIX_function_register("smpi_sender", smpi_sender);
SIMIX_function_register("smpi_receiver", smpi_receiver);
init_mutex = SIMIX_mutex_init();
init_cond = SIMIX_cond_init();
SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
SIMIX_function_register("smpi_sender", smpi_sender);
SIMIX_function_register("smpi_receiver", smpi_receiver);
+
+ // FIXME: ought to verify these files...
SIMIX_create_environment(argv[1]);
SIMIX_launch_application(argv[2]);
/* Prepare to display some more info when dying on Ctrl-C pressing */
SIMIX_create_environment(argv[1]);
SIMIX_launch_application(argv[2]);
/* Prepare to display some more info when dying on Ctrl-C pressing */
//signal(SIGINT, inthandler);
/* Clean IO before the run */
//signal(SIGINT, inthandler);
/* Clean IO before the run */
SIMIX_action_destroy(action);
}
}
SIMIX_action_destroy(action);
}
}
xbt_fifo_free(actions_failed);
xbt_fifo_free(actions_done);
xbt_fifo_free(actions_failed);
xbt_fifo_free(actions_done);
INFO1("simulation time %g", SIMIX_get_clock());
INFO1("simulation time %g", SIMIX_get_clock());
void smpi_free_request(void *pointer) {
smpi_mpi_request_t *request = pointer;
void smpi_free_request(void *pointer) {
smpi_mpi_request_t *request = pointer;
+ SIMIX_mutex_destroy(request->mutex);
+ SIMIX_cond_destroy(request->cond);
xbt_fifo_free(request->waitlist);
xbt_free(request);
}
xbt_fifo_free(request->waitlist);
xbt_free(request);
}
+void smpi_reset_request(void *pointer) {
+ smpi_mpi_request_t *request = pointer;
+
+ if (NULL != request) {
+ request->mutex = SIMIX_mutex_init();
+ request->cond = SIMIX_cond_init();
+ request->waitlist = xbt_fifo_new();
+ // FIXME: clear waitlist
+ }
+
+ return;
+}
+
+
void *smpi_new_message()
{
return xbt_new(smpi_received_message_t, 1);
void *smpi_new_message()
{
return xbt_new(smpi_received_message_t, 1);
smpi_mpi_sum.func = &smpi_mpi_sum_func;
// smpi globals
smpi_mpi_sum.func = &smpi_mpi_sum_func;
// smpi globals
- smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, smpi_free_request, smpi_do_nothing);
- smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, smpi_do_nothing);
+ smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, smpi_free_request, smpi_reset_request);
+ smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, smpi_free_message, smpi_do_nothing);
smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size);
smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size);
smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
SIMIX_mutex_unlock(init_mutex);
SIMIX_mutex_destroy(init_mutex);
SIMIX_mutex_unlock(init_mutex);
SIMIX_mutex_destroy(init_mutex);
+ SIMIX_cond_destroy(init_cond);
SIMIX_mutex_destroy(smpi_running_hosts_mutex);
for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
SIMIX_mutex_destroy(smpi_running_hosts_mutex);
for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
void smpi_bench_begin()
{
void smpi_bench_begin()
{
+ SIMIX_mutex_lock(smpi_benchmarking_mutex);
xbt_assert0(!smpi_benchmarking, "Already benchmarking");
smpi_benchmarking = 1;
xbt_assert0(!smpi_benchmarking, "Already benchmarking");
smpi_benchmarking = 1;
+ SIMIX_mutex_unlock(smpi_benchmarking_mutex);
+
xbt_os_timer_start(smpi_timer);
xbt_os_timer_start(smpi_timer);
smx_mutex_t mutex;
smx_cond_t cond;
smx_mutex_t mutex;
smx_cond_t cond;
+ SIMIX_mutex_lock(smpi_benchmarking_mutex);
xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
smpi_benchmarking = 0;
xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
smpi_benchmarking = 0;
+ SIMIX_mutex_lock(smpi_benchmarking_mutex);
+
xbt_os_timer_stop(smpi_timer);
xbt_os_timer_stop(smpi_timer);
- duration = xbt_os_timer_elapsed(smpi_timer);
+
+ duration = xbt_os_timer_elapsed(smpi_timer);
host = SIMIX_host_self();
compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
mutex = SIMIX_mutex_init();
cond = SIMIX_cond_init();
host = SIMIX_host_self();
compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
mutex = SIMIX_mutex_init();
cond = SIMIX_cond_init();
SIMIX_mutex_lock(mutex);
SIMIX_register_condition_to_action(compute_action, cond);
SIMIX_register_action_to_condition(compute_action, cond);
SIMIX_mutex_lock(mutex);
SIMIX_register_condition_to_action(compute_action, cond);
SIMIX_register_action_to_condition(compute_action, cond);
SIMIX_mutex_unlock(mutex);
SIMIX_mutex_destroy(mutex);
SIMIX_cond_destroy(cond);
SIMIX_mutex_unlock(mutex);
SIMIX_mutex_destroy(mutex);
SIMIX_cond_destroy(cond);
// FIXME: check for success/failure?
// FIXME: check for success/failure?
return;
}
void smpi_barrier(smpi_mpi_communicator_t *comm) {
int i;
return;
}
void smpi_barrier(smpi_mpi_communicator_t *comm) {
int i;
SIMIX_mutex_lock(comm->barrier_mutex);
comm->barrier++;
if(i < comm->size) {
SIMIX_mutex_lock(comm->barrier_mutex);
comm->barrier++;
if(i < comm->size) {
SIMIX_cond_broadcast(comm->barrier_cond);
}
SIMIX_mutex_unlock(comm->barrier_mutex);
SIMIX_cond_broadcast(comm->barrier_cond);
}
SIMIX_mutex_unlock(comm->barrier_mutex);
+// FIXME: smarter algorithm...
int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
{
int i;
int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
{
int i;
int smpi_isend(smpi_mpi_request_t *request)
{
int smpi_isend(smpi_mpi_request_t *request)
{
- int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
+ int retval = MPI_SUCCESS;
+ int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
- SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
- xbt_fifo_push(smpi_pending_send_requests[rank], request);
- SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
+ if (NULL != request) {
+ SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]);
+ xbt_fifo_push(smpi_pending_send_requests[rank], request);
+ SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]);
+ }
if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
SIMIX_process_resume(smpi_sender_processes[rank]);
}
if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
SIMIX_process_resume(smpi_sender_processes[rank]);
}
}
int smpi_irecv(smpi_mpi_request_t *request)
{
}
int smpi_irecv(smpi_mpi_request_t *request)
{
+ int retval = MPI_SUCCESS;
int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
- SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
- xbt_fifo_push(smpi_pending_recv_requests[rank], request);
- SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
+ if (NULL != request) {
+ SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]);
+ xbt_fifo_push(smpi_pending_recv_requests[rank], request);
+ SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]);
+ }
if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
SIMIX_process_resume(smpi_receiver_processes[rank]);
}
if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
SIMIX_process_resume(smpi_receiver_processes[rank]);
}
}
void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
{
}
void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
{
+ smx_process_t self = SIMIX_process_self();
- self = SIMIX_process_self();
if (NULL != request) {
SIMIX_mutex_lock(request->mutex);
if (NULL != request) {
SIMIX_mutex_lock(request->mutex);