3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
4 "Logging specific to SMPI (sender)");
6 int smpi_sender(int argc,char*argv[]) {
7 smpi_host_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
13 xbt_fifo_t request_queue;
15 smpi_mpi_request_t request;
21 e_surf_action_state_t state;
23 smpi_received_message_t message;
27 self = SIMIX_process_self();
28 shost = SIMIX_host_self();
30 index = mydata->index;
32 request_queue = mydata->pending_send_request_queue;
35 request = xbt_fifo_shift(request_queue);
37 if (NULL != request) {
38 message = xbt_mallocator_get(smpi_global->message_mallocator);
40 SIMIX_mutex_lock(request->mutex);
42 message->comm = request->comm;
43 message->src = request->comm->index_to_rank_map[index];
44 message->tag = request->tag;
45 message->data = request->data;
46 message->buf = xbt_malloc(request->datatype->size * request->count);
47 memcpy(message->buf, request->buf,
48 request->datatype->size * request->count);
50 dindex = request->comm->rank_to_index_map[request->dst];
51 dhost = smpi_global->hosts[dindex];
53 message->forward = (request->forward - 1) / 2;
54 request->forward = request->forward / 2;
56 if (0 < request->forward) {
58 (request->dst + message->forward + 1) % request->comm->size;
59 xbt_fifo_push(request_queue, request);
61 request->completed = 1;
65 SIMIX_action_communicate(shost, dhost, "communication",
66 request->datatype->size * request->count,
69 SIMIX_register_action_to_condition(action, request->cond);
71 for (state = SIMIX_action_get_state(action);
72 state == SURF_ACTION_READY ||
73 state == SURF_ACTION_RUNNING;
74 state = SIMIX_action_get_state(action)
76 SIMIX_cond_wait(request->cond, request->mutex);
79 xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
81 SIMIX_unregister_action_to_condition(action, request->cond);
82 SIMIX_action_destroy(action);
84 SIMIX_mutex_unlock(request->mutex);
86 // wake up receiver if necessary
87 smpi_host_data_t remote_host = SIMIX_host_get_data(SIMIX_process_get_host(smpi_global->main_processes[dindex]));
88 SIMIX_process_resume(remote_host->receiver);
90 } else if (mydata->finalize>0) { /* main wants me to die and nothing to do */
92 SIMIX_cond_signal(mydata->cond);
95 SIMIX_process_suspend(self);