3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
4 "Logging specific to SMPI (sender)");
6 int smpi_sender(int argc, char *argv[])
8 smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
14 xbt_fifo_t request_queue;
16 smpi_mpi_request_t request;
22 e_surf_action_state_t state;
24 smpi_received_message_t message;
28 self = SIMIX_process_self();
29 shost = SIMIX_host_self();
31 index = mydata->index;
33 request_queue = mydata->pending_send_request_queue;
36 request = xbt_fifo_shift(request_queue);
38 if (NULL != request) {
39 message = xbt_mallocator_get(smpi_global->message_mallocator);
41 SIMIX_mutex_lock(request->mutex);
43 message->comm = request->comm;
44 message->src = request->comm->index_to_rank_map[index];
45 message->tag = request->tag;
46 message->data = request->data;
47 message->buf = xbt_malloc(request->datatype->size * request->count);
48 memcpy(message->buf, request->buf,
49 request->datatype->size * request->count);
51 dindex = request->comm->rank_to_index_map[request->dst];
52 smpi_process_data_t remote_process =
53 SIMIX_process_get_data(smpi_global->main_processes[dindex]);
54 dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]);
56 message->forward = (request->forward - 1) / 2;
57 request->forward = request->forward / 2;
59 if (0 < request->forward) {
61 (request->dst + message->forward + 1) % request->comm->size;
62 xbt_fifo_push(request_queue, request);
64 request->completed = 1;
68 SIMIX_action_communicate(shost, dhost, "communication",
69 request->datatype->size * request->count,
72 SIMIX_register_action_to_condition(action, request->cond);
74 for (state = SIMIX_action_get_state(action);
75 state == SURF_ACTION_READY ||
76 state == SURF_ACTION_RUNNING;
77 state = SIMIX_action_get_state(action)
79 SIMIX_cond_wait(request->cond, request->mutex);
82 xbt_fifo_push(remote_process->received_message_queue, message);
84 SIMIX_unregister_action_to_condition(action, request->cond);
85 SIMIX_action_destroy(action);
87 SIMIX_mutex_unlock(request->mutex);
89 // wake up receiver if necessary
90 SIMIX_process_resume(remote_process->receiver);
92 } else if (mydata->finalize > 0) { /* main wants me to die and nothing to do */
94 SIMIX_cond_signal(mydata->cond);
97 SIMIX_process_suspend(self);