3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
4 "Logging specific to SMPI (sender)");
6 int smpi_sender(int argc, char *argv[])
8 smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
14 xbt_fifo_t request_queue;
16 smpi_mpi_request_t request;
22 e_surf_action_state_t state;
24 smpi_received_message_t message;
28 self = SIMIX_process_self();
29 shost = SIMIX_host_self();
31 index = mydata->index;
33 DEBUG0("Up and running");
35 request_queue = mydata->pending_send_request_queue;
38 request = xbt_fifo_shift(request_queue);
40 if (NULL != request) {
41 message = xbt_mallocator_get(smpi_global->message_mallocator);
43 SIMIX_mutex_lock(request->mutex);
45 message->comm = request->comm;
46 message->src = request->comm->index_to_rank_map[index];
47 message->tag = request->tag;
48 message->data = request->data;
49 message->buf = xbt_malloc(request->datatype->size * request->count);
50 memcpy(message->buf, request->buf,
51 request->datatype->size * request->count);
53 dindex = request->comm->rank_to_index_map[request->dst];
54 smpi_process_data_t remote_process =
55 SIMIX_process_get_data(smpi_global->main_processes[dindex]);
56 dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]);
58 DEBUG4("handle send request %p to %s (req_dst=%d,req_tag=%d)",
59 request,SIMIX_host_get_name(dhost),request->dst,message->tag);
60 message->forward = (request->forward - 1) / 2;
61 request->forward = request->forward / 2;
63 if (0 < request->forward) {
65 (request->dst + message->forward + 1) % request->comm->size;
66 xbt_fifo_push(request_queue, request);
68 DEBUG4("DONE Handling send request %p to %s (req_dst=%d,req_tag=%d)",
69 request, SIMIX_host_get_name(dhost),request->dst,message->tag);
70 request->completed = 1;
74 SIMIX_action_communicate(shost, dhost, "communication",
75 request->datatype->size * request->count,
78 SIMIX_register_action_to_condition(action, request->cond);
80 for (state = SIMIX_action_get_state(action);
81 state == SURF_ACTION_READY ||
82 state == SURF_ACTION_RUNNING;
83 state = SIMIX_action_get_state(action)
85 SIMIX_cond_wait(request->cond, request->mutex);
88 xbt_fifo_push(remote_process->received_message_queue, message);
90 SIMIX_unregister_action_to_condition(action, request->cond);
91 SIMIX_action_destroy(action);
93 SIMIX_mutex_unlock(request->mutex);
95 // wake up receiver if necessary
96 SIMIX_process_resume(remote_process->receiver);
98 } else if (mydata->finalize > 0) { /* main wants me to die and nothing to do */
99 DEBUG0("===Main wants me to die and I'm done. Bye, guys.===");
101 SIMIX_cond_signal(mydata->cond);
104 DEBUG0("Nothing to do. Let's get a nap");
105 SIMIX_process_suspend(self);
106 DEBUG0("===Uh? Someone called me?===");