+ * \param comm The communication request
+ */
+static XBT_INLINE void SIMIX_communication_start(smx_comm_t comm)
+{
+ /* If both the sender and the receiver are already there, start the communication */
+ if (comm->src_proc && comm->dst_proc) {
+ DEBUG1("Starting communication %p", comm);
+ comm->act = SIMIX_action_communicate(comm->src_proc->smx_host,
+ comm->dst_proc->smx_host, NULL,
+ comm->task_size, comm->rate);
+#ifdef HAVE_TRACING
+ TRACE_smx_action_communicate(comm->act, comm->src_proc);
+ TRACE_surf_action(comm->act->surf_action, comm->act->category);
+#endif
+
+ /* If any of the process is suspend, create the action but stop its execution,
+ it will be restarted when the sender process resume */
+ if (SIMIX_process_is_suspended(comm->src_proc) ||
+ SIMIX_process_is_suspended(comm->dst_proc)) {
+ SIMIX_action_suspend(comm->act);
+ }
+
+ /* Add the communication as user data of the action */
+ comm->act->data = comm;
+
+ /* The semaphore will only get signaled once, but since the first unlocked guy will
+ * release_forever() the semaphore, that will unlock the second (and any other)
+ * communication partner */
+ SIMIX_register_action_to_semaphore(comm->act, comm->sem);
+ }
+}
+
+/**
+ * \brief Performs error checking and cleanup
+ * \param comm The communication
+ */
+static XBT_INLINE void SIMIX_communication_cleanup(smx_comm_t comm)
+{
+ DEBUG1("Checking errors and cleaning communication %p", comm);
+
+ /* Make sure that everyone sleeping on that semaphore is awake, and that nobody will ever block on it */
+ SIMIX_sem_release_forever(comm->sem);
+
+ /* Check for errors other than timeouts */
+ if (!SIMIX_host_get_state(SIMIX_host_self())) {
+ if (comm->rdv)
+ SIMIX_rdv_remove(comm->rdv, comm);
+ SIMIX_communication_destroy(comm);
+ THROW0(host_error, 0, "Host failed");
+ } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED) {
+ SIMIX_communication_destroy(comm);
+ THROW0(network_error, 0, "Link failure");
+ } else if (!SIMIX_host_get_state(SIMIX_process_get_host(comm->dst_proc))
+ ||
+ !SIMIX_host_get_state(SIMIX_process_get_host(comm->src_proc)))
+ {
+ /* We test both src&dst because we dunno who we are today, and we already tested myself above.
+ * So, at the end, we test the remote peer only
+ * Moreover, we have to test it because if the remote peer fails, the action comm->act is not done nor failed.
+ * In that case, we got awaken by the little endless actions created in the SIMIX_sem_acquire(comm->sem)
+ * at the beginning of this function. */
+ SIMIX_communication_destroy(comm);
+ THROW0(network_error, 0, "Remote peer failed");
+
+ }
+ /* Copy network data */
+ SIMIX_network_copy_data(comm);
+
+ SIMIX_communication_destroy(comm);
+}
+
+/**
+ * \brief Waits for communication completion
+ * \param comm The communication
+ * \param timeout The max amount of time to wait for the communication to finish
+ *
+ * Throws:
+ * - host_error if local peer failed
+ * - timeout_error if communication reached the timeout specified (either because of local peer or remote peer)
+ * - network_error if network failed or remote peer failed
+ */
+static XBT_INLINE void SIMIX_communication_wait_for_completion(smx_comm_t
+ comm,
+ double
+ timeout)
+{
+ smx_action_t act_sleep = NULL;
+ int src_timeout = 0;
+ int dst_timeout = 0;
+
+ DEBUG1("Waiting for the completion of communication %p", comm);
+
+ if (timeout >= 0) {
+ act_sleep = SIMIX_action_sleep(SIMIX_host_self(), timeout);
+ if (SIMIX_process_self() == comm->src_proc)
+ comm->src_timeout = act_sleep;
+ else
+ comm->dst_timeout = act_sleep;
+ SIMIX_action_set_name(act_sleep,
+ bprintf
+ ("Timeout for comm %p and wait on semaphore %p (max_duration:%f)",
+ comm, comm->sem, timeout));
+ SIMIX_register_action_to_semaphore(act_sleep, comm->sem);
+ SIMIX_process_self()->waiting_action = act_sleep;
+ SIMIX_sem_block_onto(comm->sem);
+ SIMIX_process_self()->waiting_action = NULL;
+ SIMIX_unregister_action_to_semaphore(act_sleep, comm->sem);
+ } else {
+ SIMIX_sem_acquire(comm->sem);
+ }
+
+ /* Check for timeouts */
+ if ((src_timeout = ((comm->src_timeout)
+ && (SIMIX_action_get_state(comm->src_timeout) ==
+ SURF_ACTION_DONE)))
+ || (dst_timeout = ((comm->dst_timeout)
+ && (SIMIX_action_get_state(comm->dst_timeout) ==
+ SURF_ACTION_DONE)))) {
+ /* Somebody did a timeout! */
+ if (src_timeout)
+ DEBUG1("Communication timeout from the src! %p", comm);
+ if (dst_timeout)
+ DEBUG1("Communication timeout from the dst! %p", comm);
+
+ if (comm->act
+ && SIMIX_action_get_state(comm->act) == SURF_ACTION_RUNNING)
+ SIMIX_communication_cancel(comm);
+ else if (comm->rdv)
+ SIMIX_rdv_remove(comm->rdv, comm);
+
+ /* Make sure that everyone sleeping on that semaphore is awake, and that nobody will ever block on it */
+ SIMIX_sem_release_forever(comm->sem);
+ SIMIX_communication_destroy(comm);
+
+ THROW1(timeout_error, 0, "Communication timeouted because of %s",
+ src_timeout ? "the source" : "the destination");
+ }
+
+ DEBUG1("Communication %p complete!", comm);
+ SIMIX_communication_cleanup(comm);
+}
+
+/**
+ * \brief Cancels a communication
+ * \brief comm The communication to cancel
+ */
+XBT_INLINE void SIMIX_communication_cancel(smx_comm_t comm)
+{
+ if (comm->act)
+ SIMIX_action_cancel(comm->act);
+}
+
+/**
+ * \brief get the amount remaining from the communication
+ * \param comm The communication
+ */
+XBT_INLINE double SIMIX_communication_get_remains(smx_comm_t comm)
+{
+ DEBUG1("calling SIMIX_action_get_remains(%p)", comm->act);
+ return SIMIX_action_get_remains(comm->act);
+}
+
+#ifdef HAVE_LATENCY_BOUND_TRACKING
+/**
+ * \brief verify if communication is latency bounded
+ * \param comm The communication