XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
"Logging specific to SMPI (sender)");
-int smpi_sender(int argc, char **argv)
-{
+int smpi_sender(int argc,char*argv[]) {
+ smpi_host_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
smx_process_t self;
smx_host_t shost;
int index;
xbt_fifo_t request_queue;
- smx_mutex_t request_queue_mutex;
-
- int running_hosts_count;
smpi_mpi_request_t request;
int dindex;
- smx_process_t receiver_process;
-
self = SIMIX_process_self();
shost = SIMIX_host_self();
- index = smpi_host_index();
-
- request_queue = smpi_global->pending_send_request_queues[index];
- request_queue_mutex =
- smpi_global->pending_send_request_queues_mutexes[index];
+ index = mydata->index;
- smpi_global->sender_processes[index] = self;
+ request_queue = mydata->pending_send_request_queue;
- do {
-
- SIMIX_mutex_lock(request_queue_mutex);
+ while (1) {
request = xbt_fifo_shift(request_queue);
- SIMIX_mutex_unlock(request_queue_mutex);
-
- if (NULL == request) {
- SIMIX_process_suspend(self);
- } else {
+ if (NULL != request) {
message = xbt_mallocator_get(smpi_global->message_mallocator);
SIMIX_mutex_lock(request->mutex);
request->datatype->size * request->count);
dindex = request->comm->rank_to_index_map[request->dst];
- dhost = smpi_global->hosts[dindex];
+ dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]);
+ smpi_host_data_t remote_host = SIMIX_host_get_data(dhost);
message->forward = (request->forward - 1) / 2;
request->forward = request->forward / 2;
if (0 < request->forward) {
request->dst =
(request->dst + message->forward + 1) % request->comm->size;
- SIMIX_mutex_lock(request_queue_mutex);
xbt_fifo_push(request_queue, request);
- SIMIX_mutex_unlock(request_queue_mutex);
} else {
request->completed = 1;
}
SIMIX_cond_wait(request->cond, request->mutex);
}
- SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]);
- xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
- SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes
- [dindex]);
+ xbt_fifo_push(remote_host->received_message_queue, message);
SIMIX_unregister_action_to_condition(action, request->cond);
SIMIX_action_destroy(action);
SIMIX_mutex_unlock(request->mutex);
// wake up receiver if necessary
- receiver_process = smpi_global->receiver_processes[dindex];
- if (SIMIX_process_is_suspended(receiver_process)) {
- SIMIX_process_resume(receiver_process);
- }
+ SIMIX_process_resume(remote_host->receiver);
+ } else if (mydata->finalize>0) { /* main wants me to die and nothing to do */
+ mydata->finalize--;
+ SIMIX_cond_signal(mydata->cond);
+ return 0;
+ } else {
+ SIMIX_process_suspend(self);
}
-
- SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
- running_hosts_count = smpi_global->running_hosts_count;
- SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
-
- } while (0 < running_hosts_count);
-
+ }
return 0;
}