a position to do unit testing...
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3704
48e7efb5-ca39-0410-a469-
dd3cf9ba447f
#include "simix/private.h"
#include "smpi.h"
#include "simix/private.h"
#include "smpi.h"
+xbt_mallocator_t smpi_request_mallocator = NULL;
xbt_fifo_t *smpi_pending_send_requests = NULL;
xbt_fifo_t *smpi_pending_recv_requests = NULL;
xbt_fifo_t *smpi_received_messages = NULL;
xbt_fifo_t *smpi_pending_send_requests = NULL;
xbt_fifo_t *smpi_pending_recv_requests = NULL;
xbt_fifo_t *smpi_received_messages = NULL;
int smpi_sender(int argc, char **argv)
{
int smpi_sender(int argc, char **argv)
{
- int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
int size;
int running_hosts = 0;
smpi_mpi_request_t *request;
smx_process_t self;
smx_host_t shost, dhost;
smx_action_t communicate_action;
int size;
int running_hosts = 0;
smpi_mpi_request_t *request;
smx_process_t self;
smx_host_t shost, dhost;
smx_action_t communicate_action;
+ smpi_mpi_request_t *scratch;
+ int drank;
self = SIMIX_process_self();
shost = SIMIX_host_self();
self = SIMIX_process_self();
shost = SIMIX_host_self();
+ rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
+
// make sure root is done before own initialization
SIMIX_mutex_lock(init_mutex);
if (!smpi_root_ready) {
// make sure root is done before own initialization
SIMIX_mutex_lock(init_mutex);
if (!smpi_root_ready) {
SIMIX_cond_wait(request->cond, request->mutex);
SIMIX_cond_wait(request->cond, request->mutex);
- // fixme, create new request, copy over to
- // should be malloc and memcpy
+ // copy request to appropriate received queue
+ scratch = xbt_mallocator_get(smpi_request_mallocator);
+ memcpy(scratch, request, sizeof smpi_mpi_request_t);
+ drank = smpi_mpi_comm_rank(MPI_COMM_WORLD, dhost);
+ xbt_fifo_push(smpi_received_messages[drank], scratch);
+
+ request->completed = 1;
SIMIX_mutex_unlock(request->mutex);
}
SIMIX_mutex_unlock(request->mutex);
}
int smpi_receiver(int argc, char **argv)
{
int smpi_receiver(int argc, char **argv)
{
- int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
+ smx_host_t dhost;
+ int rank;
+ smx_process_t self;
+ int running_hosts;
+
+ dhost = SIMIX_host_self();
+ rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
+ self = SIMIX_process_self();
// make sure root is done before own initialization
SIMIX_mutex_lock(init_mutex);
// make sure root is done before own initialization
SIMIX_mutex_lock(init_mutex);
}
SIMIX_mutex_unlock(init_mutex);
}
SIMIX_mutex_unlock(init_mutex);
+ SIMIX_mutex_lock(smpi_running_hosts_mutex);
+ running_hosts = smpi_running_hosts;
+ SIMIX_mutex_unlock(smpi_running_hosts_mutex);
+
+ while (0 < running_hosts) {
+
+ request = xbt_fifo_shift(smpi_pending_send_requests[rank]);
+
+ if (NULL == request) {
+ SIMIX_process_suspend(self);
+ } else {
+ SIMIX_mutex_lock(request->mutex);
+ }
+
+ SIMIX_mutex_lock(smpi_running_hosts_mutex);
+ running_hosts = smpi_running_hosts;
+ SIMIX_mutex_unlock(smpi_running_hosts_mutex);
+ }
+
*(int *)z = *(int *)x + *(int *)y;
}
*(int *)z = *(int *)x + *(int *)y;
}
+smpi_mpi_request_t *smpi_new_request()
+{
+ return xbt_new(smpi_mpi_request_t, 1);
+}
+
void smpi_mpi_init()
{
int i;
void smpi_mpi_init()
{
int i;
if (host == hosts[0]) {
// processes
if (host == hosts[0]) {
// processes
- smpi_sender_processes = xbt_new0(smx_process_t, size);
- smpi_receiver_processes = xbt_new0(smx_process_t, size);
+ smpi_sender_processes = xbt_new(smx_process_t, size);
+ smpi_receiver_processes = xbt_new(smx_process_t, size);
// running hosts
smpi_running_hosts_mutex = SIMIX_mutex_init();
// running hosts
smpi_running_hosts_mutex = SIMIX_mutex_init();
smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
smpi_mpi_comm_world.hosts = hosts;
smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init();
smpi_mpi_comm_world.hosts = hosts;
- smpi_mpi_comm_world.processes = xbt_new0(smx_process_t, size);
+ smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size);
smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
// mpi datatypes
smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
// mpi datatypes
smpi_mpi_sum.func = &smpi_mpi_sum_func;
// smpi globals
smpi_mpi_sum.func = &smpi_mpi_sum_func;
// smpi globals
- smpi_pending_send_requests = xbt_new0(xbt_fifo_t, size);
- smpi_pending_recv_requests = xbt_new0(xbt_fifo_t, size);
- smpi_received_messages = xbt_new0(xbt_fifo_t, size);
+ smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
+ smpi_pending_send_requests = xbt_new(xbt_fifo_t, size);
+ smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size);
+ smpi_received_messages = xbt_new(xbt_fifo_t, size);
for(i = 0; i < size; i++) {
smpi_pending_send_requests[i] = xbt_fifo_new();
for(i = 0; i < size; i++) {
smpi_pending_send_requests[i] = xbt_fifo_new();
xbt_fifo_free(smpi_received_messages[i]);
}
xbt_fifo_free(smpi_received_messages[i]);
}
+ xbt_mallocator_free(smpi_request_mallocator);
xbt_free(smpi_pending_send_requests);
xbt_free(smpi_pending_recv_requests);
xbt_free(smpi_received_messages);
xbt_free(smpi_pending_send_requests);
xbt_free(smpi_pending_recv_requests);
xbt_free(smpi_received_messages);
} else if (0 > tag) {
retval = MPI_ERR_TAG;
} else {
} else if (0 > tag) {
retval = MPI_ERR_TAG;
} else {
- *request = xbt_new0(smpi_mpi_request_t, 1);
+ *request = xbt_mallocator_get(smpi_request_mallocator);
(*request)->buf = buf;
(*request)->count = count;
(*request)->datatype = datatype;
(*request)->buf = buf;
(*request)->count = count;
(*request)->datatype = datatype;