+ smx_process_t self;
+ smx_host_t shost;
+ int rank;
+ xbt_fifo_t request_queue;
+ smx_mutex_t request_queue_mutex;
+ int size;
+ int running_hosts = 0;
+ smpi_mpi_request_t *request;
+ smx_host_t dhost;
+ smx_action_t communicate_action;
+ smpi_received_message_t *scratch;
+ int drank;
+ smx_process_t waitproc;
+
+ self = SIMIX_process_self();
+ shost = SIMIX_host_self();
+ rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
+
+ // make sure root is done before own initialization
+ SIMIX_mutex_lock(init_mutex);
+ if (!smpi_root_ready) {
+ SIMIX_cond_wait(init_cond, init_mutex);
+ }
+ SIMIX_mutex_unlock(init_mutex);
+
+ request_queue = smpi_pending_send_requests[rank];
+ request_queue_mutex = smpi_pending_send_requests_mutex[rank];
+
+ size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
+
+ smpi_sender_processes[rank] = self;
+
+ // wait for all nodes to signal initializatin complete
+ SIMIX_mutex_lock(init_mutex);
+ smpi_ready_count++;
+ if (smpi_ready_count < 3 * size) {
+ SIMIX_cond_wait(init_cond, init_mutex);
+ } else {
+ SIMIX_cond_broadcast(init_cond);
+ }
+ SIMIX_mutex_unlock(init_mutex);
+
+ SIMIX_mutex_lock(smpi_running_hosts_mutex);
+ running_hosts = smpi_running_hosts;
+ SIMIX_mutex_unlock(smpi_running_hosts_mutex);
+
+ while (0 < running_hosts) {
+
+ SIMIX_mutex_lock(request_queue_mutex);
+ request = xbt_fifo_shift(request_queue);
+ SIMIX_mutex_unlock(request_queue_mutex);
+
+ if (NULL == request) {
+ SIMIX_process_suspend(self);
+ } else {
+ SIMIX_mutex_lock(request->mutex);
+
+ dhost = request->comm->hosts[request->dst];
+
+ // FIXME: not at all sure I can assume magic just happens here....
+ communicate_action = SIMIX_action_communicate(shost, dhost,
+ "communication", request->datatype->size * request->count * 1.0, -1.0);
+
+ SIMIX_register_condition_to_action(communicate_action, request->cond);
+ SIMIX_register_action_to_condition(communicate_action, request->cond);
+
+ SIMIX_cond_wait(request->cond, request->mutex);
+
+ // copy request to appropriate received queue
+ scratch = xbt_mallocator_get(smpi_message_mallocator);
+ scratch->comm = request->comm;
+ scratch->src = request->src;
+ scratch->dst = request->dst;
+ scratch->tag = request->tag;
+ scratch->buf = request->buf;
+ drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
+ SIMIX_mutex_lock(smpi_received_messages_mutex[drank]);
+ xbt_fifo_push(smpi_received_messages[drank], scratch);
+ SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]);
+
+ request->completed = 1;
+
+ // wake up receiver, then any waiting sender
+ waitproc = smpi_receiver_processes[drank];
+
+ do {
+ if (SIMIX_process_is_suspended(waitproc)) {
+ SIMIX_process_resume(waitproc);
+ }
+ } while(waitproc = xbt_fifo_shift(request->waitlist));
+
+ SIMIX_mutex_unlock(request->mutex);
+ }
+
+ SIMIX_mutex_lock(smpi_running_hosts_mutex);
+ running_hosts = smpi_running_hosts;
+ SIMIX_mutex_unlock(smpi_running_hosts_mutex);
+ }
+