// state vars
- smx_host_t *hosts;
+ smx_host_t *hosts; //FIXME:killme
int host_count;
xbt_mallocator_t request_mallocator;
xbt_mallocator_t message_mallocator;
- // FIXME: request queues should be moved to host data...
- xbt_fifo_t *received_message_queues;
-
smx_process_t *main_processes;
xbt_os_timer_t timer;
smx_process_t sender;
smx_process_t receiver;
- int finalize; /* for main stopping sender&receiver */
+ int finalize; /* so that main process stops its sender&receiver */
xbt_fifo_t pending_recv_request_queue;
xbt_fifo_t pending_send_request_queue;
+ xbt_fifo_t received_message_queue;
} s_smpi_host_data_t;
typedef struct smpi_host_data_t *smpi_host_data_t;
hdata->pending_recv_request_queue = xbt_fifo_new();
hdata->pending_send_request_queue = xbt_fifo_new();
+ hdata->received_message_queue = xbt_fifo_new();
hdata->main = SIMIX_process_self();
hdata->sender = SIMIX_process_create("smpi_sender",
SIMIX_cond_destroy(hdata->cond);
xbt_fifo_free(hdata->pending_recv_request_queue);
xbt_fifo_free(hdata->pending_send_request_queue);
+ xbt_fifo_free(hdata->received_message_queue);
}
int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_message_new,
smpi_message_free, smpi_message_reset);
- // queues
- smpi_global->received_message_queues = xbt_new(xbt_fifo_t, size);
-
// sender/receiver processes
smpi_global->main_processes = xbt_new(smx_process_t, size);
smpi_global->do_once_duration = NULL;
smpi_global->do_once_mutex = SIMIX_mutex_init();
- for (i = 0; i < size; i++) {
- smpi_global->received_message_queues[i] = xbt_fifo_new();
- }
-
smpi_global->hosts = SIMIX_host_get_table();
smpi_global->host_count = SIMIX_host_get_number();
void smpi_global_destroy()
{
- int i;
-
- int size = SIMIX_host_get_number();
-
smpi_do_once_duration_node_t curr, next;
// processes
SIMIX_mutex_destroy(smpi_global->do_once_mutex);
- for (i = 0; i < size; i++) {
- xbt_fifo_free(smpi_global->received_message_queues[i]);
- }
-
- xbt_free(smpi_global->received_message_queues);
-
xbt_free(smpi_global);
smpi_global = NULL;
self = SIMIX_process_self();
request_queue = mydata->pending_recv_request_queue;
- message_queue = smpi_global->received_message_queues[index];
+ message_queue = mydata->received_message_queue;
while (1) {
// FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
request->datatype->size * request->count);
dindex = request->comm->rank_to_index_map[request->dst];
- dhost = smpi_global->hosts[dindex];
+ dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]);
+ smpi_host_data_t remote_host = SIMIX_host_get_data(dhost);
message->forward = (request->forward - 1) / 2;
request->forward = request->forward / 2;
SIMIX_cond_wait(request->cond, request->mutex);
}
- xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
+ xbt_fifo_push(remote_host->received_message_queue, message);
SIMIX_unregister_action_to_condition(action, request->cond);
SIMIX_action_destroy(action);
SIMIX_mutex_unlock(request->mutex);
// wake up receiver if necessary
- smpi_host_data_t remote_host = SIMIX_host_get_data(SIMIX_process_get_host(smpi_global->main_processes[dindex]));
SIMIX_process_resume(remote_host->receiver);
} else if (mydata->finalize>0) { /* main wants me to die and nothing to do */