3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
4 "Logging specific to SMPI (sender)");
6 int smpi_sender(int argc,char*argv[]) {
7 smpi_host_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
13 xbt_fifo_t request_queue;
15 int running_hosts_count;
17 smpi_mpi_request_t request;
23 e_surf_action_state_t state;
25 smpi_received_message_t message;
29 smx_process_t receiver_process;
31 self = SIMIX_process_self();
32 shost = SIMIX_host_self();
34 index = mydata->index;
36 request_queue = smpi_global->pending_send_request_queues[index];
40 request = xbt_fifo_shift(request_queue);
42 if (NULL == request) {
43 SIMIX_process_suspend(self);
46 message = xbt_mallocator_get(smpi_global->message_mallocator);
48 SIMIX_mutex_lock(request->mutex);
50 message->comm = request->comm;
51 message->src = request->comm->index_to_rank_map[index];
52 message->tag = request->tag;
53 message->data = request->data;
54 message->buf = xbt_malloc(request->datatype->size * request->count);
55 memcpy(message->buf, request->buf,
56 request->datatype->size * request->count);
58 dindex = request->comm->rank_to_index_map[request->dst];
59 dhost = smpi_global->hosts[dindex];
61 message->forward = (request->forward - 1) / 2;
62 request->forward = request->forward / 2;
64 if (0 < request->forward) {
66 (request->dst + message->forward + 1) % request->comm->size;
67 xbt_fifo_push(request_queue, request);
69 request->completed = 1;
73 SIMIX_action_communicate(shost, dhost, "communication",
74 request->datatype->size * request->count,
77 SIMIX_register_action_to_condition(action, request->cond);
79 for (state = SIMIX_action_get_state(action);
80 state == SURF_ACTION_READY ||
81 state == SURF_ACTION_RUNNING;
82 state = SIMIX_action_get_state(action)
84 SIMIX_cond_wait(request->cond, request->mutex);
87 xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
89 SIMIX_unregister_action_to_condition(action, request->cond);
90 SIMIX_action_destroy(action);
92 SIMIX_mutex_unlock(request->mutex);
94 // wake up receiver if necessary
95 smpi_host_data_t remote_host = SIMIX_host_get_data(SIMIX_process_get_host(smpi_global->main_processes[dindex]));
96 receiver_process = remote_host->receiver;
97 if (SIMIX_process_is_suspended(receiver_process))
98 SIMIX_process_resume(receiver_process);
101 running_hosts_count = smpi_global->running_hosts_count;
103 } while (0 < running_hosts_count);