smx_cond_t cond;
void *data;
+ int forward;
} s_smpi_mpi_request_t;
void *buf;
void *data;
+ int forward;
} s_smpi_received_message_t;
typedef struct smpi_received_message_t *smpi_received_message_t;
retval = MPI_ERR_INTERN;
} else {
SIMIX_mutex_lock(request->mutex);
- if (!request->completed) {
+ while (!request->completed) {
SIMIX_cond_wait(request->cond, request->mutex);
}
if (NULL != status) {
{
smpi_mpi_request_t request = xbt_new(s_smpi_mpi_request_t, 1);
+ request->buf = NULL;
request->completed = 0;
request->mutex = SIMIX_mutex_init();
request->cond = SIMIX_cond_init();
+ request->data = NULL;
+ request->forward = 0;
return request;
}
smpi_mpi_request_t request = pointer;
- if (NULL != request) {
- SIMIX_cond_destroy(request->cond);
- SIMIX_mutex_destroy(request->mutex);
- xbt_free(request);
- }
+ SIMIX_cond_destroy(request->cond);
+ SIMIX_mutex_destroy(request->mutex);
+ xbt_free(request);
return;
}
{
smpi_mpi_request_t request = pointer;
+ request->buf = NULL;
request->completed = 0;
+ request->data = NULL;
+ request->forward = 0;
return;
}
void *smpi_message_new()
{
- return xbt_new(s_smpi_received_message_t, 1);
+ smpi_received_message_t message = xbt_new(s_smpi_received_message_t, 1);
+ message->buf = NULL;
+ return message;
}
void smpi_message_free(void *pointer);
void smpi_message_free(void *pointer)
{
- if (NULL != pointer) {
- xbt_free(pointer);
- }
-
+ xbt_free(pointer);
return;
}
void smpi_message_reset(void *pointer)
{
+ smpi_received_message_t message = pointer;
+ message->buf = NULL;
return;
}
int retval = MPI_SUCCESS;
int rank;
+ smpi_mpi_request_t request;
smpi_bench_end();
rank = smpi_mpi_comm_rank(comm);
if (rank == root) {
- int i;
- smpi_mpi_request_t *requests = xbt_new(smpi_mpi_request_t, comm->size - 1);
- for (i = 1; i < comm->size; i++) {
- retval = smpi_create_request(buf, count, datatype, root, (root + i) % comm->size, 0, comm, requests + i - 1);
- smpi_mpi_isend(requests[i - 1]);
- }
- for (i = 0; i < comm->size - 1; i++) {
- smpi_mpi_wait(requests[i], MPI_STATUS_IGNORE);
- xbt_mallocator_release(smpi_global->request_mallocator, requests[i]);
- }
- xbt_free(requests);
+ retval = smpi_create_request(buf, count, datatype, root, (root + 1) % comm->size, 0, comm, &request);
+ request->forward = comm->size - 1;
+ smpi_mpi_isend(request);
+ smpi_mpi_wait(request, MPI_STATUS_IGNORE);
+ xbt_mallocator_release(smpi_global->request_mallocator, request);
} else {
- smpi_mpi_request_t request;
- retval = smpi_create_request(buf, count, datatype, root, rank, 0, comm, &request);
+ retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank, 0, comm, &request);
smpi_mpi_irecv(request);
smpi_mpi_wait(request, MPI_STATUS_IGNORE);
xbt_mallocator_release(smpi_global->request_mallocator, request);
SIMIX_mutex_lock(request->mutex);
memcpy(request->buf, message->buf, request->datatype->size * request->count);
request->src = message->src;
- request->completed = 1;
request->data = message->data;
- SIMIX_cond_broadcast(request->cond);
+ request->forward = message->forward;
+
+ if (0 == request->forward) {
+ request->completed = 1;
+ SIMIX_cond_broadcast(request->cond);
+ } else {
+ request->src = smpi_mpi_comm_rank(request->comm);
+ request->dst = (request->src + 1) % request->comm->size;
+ smpi_mpi_isend(request);
+ }
+
SIMIX_mutex_unlock(request->mutex);
xbt_free(message->buf);
SIMIX_process_suspend(self);
} else {
- message = xbt_mallocator_get(smpi_global->message_mallocator);
+ message = xbt_mallocator_get(smpi_global->message_mallocator);
SIMIX_mutex_lock(request->mutex);
- message->comm = request->comm;
- message->src = smpi_mpi_comm_rank(request->comm);
- message->tag = request->tag;
- message->buf = xbt_malloc(request->datatype->size * request->count);
+ message->comm = request->comm;
+ message->src = smpi_mpi_comm_rank(request->comm);
+ message->tag = request->tag;
+ message->data = request->data;
+ message->buf = xbt_malloc(request->datatype->size * request->count);
memcpy(message->buf, request->buf, request->datatype->size * request->count);
- message->data = request->data;
dindex = request->comm->rank_to_index_map[request->dst];
dhost = smpi_global->hosts[dindex];
+ message->forward = (request->forward - 1) / 2;
+ request->forward = request->forward / 2;
+
SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]);
xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[dindex]);
- request->completed = 1;
+ if (0 < request->forward) {
+ request->dst = (request->dst + message->forward + 1) % request->comm->size;
+ SIMIX_mutex_lock(request_queue_mutex);
+ xbt_fifo_push(request_queue, request);
+ SIMIX_mutex_unlock(request_queue_mutex);
+ } else {
+ request->completed = 1;
+ }
action = SIMIX_action_communicate(shost, dhost, communication, request->datatype->size * request->count, -1.0);
SIMIX_mutex_unlock(request->mutex);
- //SIMIX_action_destroy(action);
-
// wake up receiver if necessary
receiver_process = smpi_global->receiver_processes[dindex];
if (SIMIX_process_is_suspended(receiver_process)) {