-void smpi_mpi_init() {
- int i;
- int size, rank;
- smx_host_t *hosts;
- smx_host_t host;
- double duration;
- m_task_t mtask;
-
- // will eventually need mutex
- smpi_running_hosts++;
-
- // initialize some local variables
- size = SIMIX_host_get_number();
- host = SIMIX_host_self();
- hosts = SIMIX_host_get_table();
- for(i = 0; i < size && host != hosts[i]; i++);
- rank = i;
-
- // node 0 sets the globals
- if (0 == rank) {
-
- // global communicator
- smpi_mpi_comm_world.id = 0;
- smpi_mpi_comm_world.size = size;
- smpi_mpi_comm_world.barrier = 0;
- smpi_mpi_comm_world.hosts = hosts;
- smpi_mpi_comm_world.processes = xbt_malloc(sizeof(m_process_t) * size);
- smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
-
- // mpi datatypes
- smpi_mpi_byte.size = (size_t)1;
- smpi_mpi_int.size = sizeof(int);
- smpi_mpi_double.size = sizeof(double);
-
- // mpi operations
- smpi_mpi_land.func = &smpi_mpi_land_func;
- smpi_mpi_sum.func = &smpi_mpi_sum_func;
-
- // smpi globals
- smpi_pending_send_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
- smpi_last_pending_send_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
- smpi_pending_recv_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
- smpi_last_pending_recv_requests = xbt_malloc(sizeof(smpi_mpi_request_t*) * size);
- smpi_received = xbt_malloc(sizeof(smpi_received_t*) * size);
- smpi_last_received = xbt_malloc(sizeof(smpi_received_t*) * size);
- smpi_sender_processes = xbt_malloc(sizeof(m_process_t) * size);
- smpi_receiver_processes = xbt_malloc(sizeof(m_process_t) * size);
- for(i = 0; i < size; i++) {
- smpi_pending_send_requests[i] = NULL;
- smpi_last_pending_send_requests[i] = NULL;
- smpi_pending_recv_requests[i] = NULL;
- smpi_last_pending_recv_requests[i] = NULL;
- smpi_received[i] = NULL;
- smpi_last_received[i] = NULL;
- }
- smpi_timer = xbt_os_timer_new();
- smpi_reference = DEFAULT_POWER;
- smpi_benchmarking = 0;
-
- // tell send/recv nodes to begin
- for(i = 0; i < size; i++) {
- mtask = MSG_task_create("READY", 0, 0, NULL);
- MSG_task_put(mtask, hosts[i], SEND_SYNC_PORT);
- mtask = (m_task_t)0;
- MSG_task_get_from_host(&mtask, SEND_SYNC_PORT, hosts[i]);
- MSG_task_destroy(mtask);
- mtask = MSG_task_create("READY", 0, 0, NULL);
- MSG_task_put(mtask, hosts[i], RECV_SYNC_PORT);
- mtask = (m_task_t)0;
- MSG_task_get_from_host(&mtask, RECV_SYNC_PORT, hosts[i]);
- MSG_task_destroy(mtask);
- }
-
- // now everyone else
- for(i = 1; i < size; i++) {
- mtask = MSG_task_create("READY", 0, 0, NULL);
- MSG_task_put(mtask, hosts[i], MPI_PORT);
- }
-
- } else {
- // everyone needs to wait for node 0 to finish
- mtask = (m_task_t)0;
- MSG_task_get(&mtask, MPI_PORT);
- MSG_task_destroy(mtask);
- smpi_mpi_comm_world.processes[rank] = SIMIX_process_self();
- }
-
- // now that mpi_comm_world_processes is set, it's safe to set a barrier
- smpi_barrier(&smpi_mpi_comm_world);
+int smpi_receiver(int argc, char **argv)
+{
+ smx_process_t self;
+ int rank;
+
+ xbt_fifo_t request_queue;
+ smx_mutex_t request_queue_mutex;
+ xbt_fifo_t message_queue;
+ smx_mutex_t message_queue_mutex;
+ int size;
+
+ int running_hosts_count;
+
+ smpi_mpi_request_t *request;
+ smpi_received_message_t *message;
+
+ xbt_fifo_item_t request_item;
+ xbt_fifo_item_t message_item;
+
+ smx_process_t waitproc;
+
+ self = SIMIX_process_self();
+ rank = smpi_mpi_comm_world_rank_self();
+
+ // make sure root is done before own initialization
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ if (!smpi_global->root_ready) {
+ SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ request_queue = smpi_global->pending_recv_request_queues[rank];
+ request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
+ message_queue = smpi_global->received_message_queues[rank];
+ message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
+ size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+
+ smpi_global->receiver_processes[rank] = self;
+
+ // wait for all nodes to signal initializatin complete
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ smpi_global->ready_process_count++;
+ if (smpi_global->ready_process_count < 3 * size) {
+ SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+ } else {
+ SIMIX_cond_broadcast(smpi_global->start_stop_cond);
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ do {
+ request = NULL;
+ message = NULL;
+
+ // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
+
+ // FIXME: not the best way to request multiple locks...
+ SIMIX_mutex_lock(request_queue_mutex);
+ SIMIX_mutex_lock(message_queue_mutex);
+ for (request_item = xbt_fifo_get_first_item(request_queue);
+ NULL != request_item;
+ request_item = xbt_fifo_get_next_item(request_item)) {
+ request = xbt_fifo_get_item_content(request_item);
+ for (message_item = xbt_fifo_get_first_item(message_queue);
+ NULL != message_item;
+ message_item = xbt_fifo_get_next_item(message_item)) {
+ message = xbt_fifo_get_item_content(message_item);
+ if (request->comm == message->comm &&
+ (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
+ request->tag == message->tag) {
+ xbt_fifo_remove_item(request_queue, request_item);
+ xbt_fifo_remove_item(message_queue, message_item);
+ goto stopsearch;
+ }
+ }
+ }
+stopsearch:
+ SIMIX_mutex_unlock(message_queue_mutex);
+ SIMIX_mutex_unlock(request_queue_mutex);
+
+ if (NULL == request || NULL == message) {
+ SIMIX_process_suspend(self);
+ } else {
+ SIMIX_mutex_lock(request->mutex);
+
+ memcpy(request->buf, message->buf, request->datatype->size * request->count);
+ request->src = message->src;
+ request->completed = 1;
+ SIMIX_cond_broadcast(request->cond);
+
+ SIMIX_mutex_unlock(request->mutex);
+
+ xbt_free(message->buf);
+ xbt_mallocator_release(smpi_global->message_mallocator, message);
+ }
+
+ SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
+ running_hosts_count = smpi_global->running_hosts_count;
+ SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
+
+ } while (0 < running_hosts_count);
+
+ SIMIX_mutex_lock(smpi_global->start_stop_mutex);
+ smpi_global->ready_process_count--;
+ if (smpi_global->ready_process_count == 0) {
+ SIMIX_cond_broadcast(smpi_global->start_stop_cond);
+ } else if (smpi_global->ready_process_count < 0) {
+ // FIXME: can't happen, abort!
+ }
+ SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
+
+ return 0;