/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-#include "private.h"
+#include "smx_private.h"
#include "xbt/log.h"
#include "mc/mc.h"
#include "xbt/dict.h"
-/* Pimple to get an histogram of message sizes in the simulation */
-xbt_dict_t msg_sizes = NULL;
-#ifdef HAVE_LATENCY_BOUND_TRACKING
-xbt_dict_t latency_limited_dict = NULL;
-#endif
-
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
"Logging specific to SIMIX (network)");
+static xbt_dict_t rdv_points = NULL;
+unsigned long int smx_total_comms = 0;
+
+static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
+static void SIMIX_comm_copy_data(smx_action_t comm);
+static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
+static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
+static smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *), void *);
+static void SIMIX_rdv_free(void *data);
+
+void SIMIX_network_init(void)
+{
+ rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
+}
+
+void SIMIX_network_exit(void)
+{
+ xbt_dict_free(&rdv_points);
+}
+
/******************************************************************************/
/* Rendez-Vous Points */
-/******************************************************************************/
+/******************************************************************************/
-/**
- * \brief Creates a new rendez-vous point
- * \param name The name of the rendez-vous point
- * \return The created rendez-vous point
- */
smx_rdv_t SIMIX_rdv_create(const char *name)
{
- smx_rdv_t rdv = xbt_new0(s_smx_rvpoint_t, 1);
- rdv->name = name ? xbt_strdup(name) : NULL;
- rdv->read = SIMIX_mutex_init();
- rdv->write = SIMIX_mutex_init();
- rdv->comm_fifo = xbt_fifo_new();
+ /* two processes may have pushed the same rdv_create simcall at the same time */
+ smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
+ if (!rdv) {
+ rdv = xbt_new0(s_smx_rvpoint_t, 1);
+ rdv->name = name ? xbt_strdup(name) : NULL;
+ rdv->comm_fifo = xbt_fifo_new();
+
+ if (rdv->name)
+ xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
+ }
return rdv;
}
-/**
- * \brief Destroy a rendez-vous point
- * \param name The rendez-vous point to destroy
- */
void SIMIX_rdv_destroy(smx_rdv_t rdv)
{
- if(rdv->name)
- xbt_free(rdv->name);
- SIMIX_mutex_destroy(rdv->read);
- SIMIX_mutex_destroy(rdv->write);
+ if (rdv->name)
+ xbt_dict_remove(rdv_points, rdv->name);
+}
+
+void SIMIX_rdv_free(void *data)
+{
+ smx_rdv_t rdv = (smx_rdv_t) data;
+ xbt_free(rdv->name);
xbt_fifo_free(rdv->comm_fifo);
- xbt_free(rdv);
+ xbt_free(rdv);
+}
+
+xbt_dict_t SIMIX_get_rdv_points()
+{
+ return rdv_points;
+}
+
+smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
+{
+ return xbt_dict_get_or_null(rdv_points, name);
+}
+
+int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
+{
+ smx_action_t comm = NULL;
+ xbt_fifo_item_t item = NULL;
+ int count = 0;
+
+ xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
+ if (comm->comm.src_proc->smx_host == host)
+ count++;
+ }
+
+ return count;
+}
+
+smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
+{
+ return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
}
/**
- * \brief Push a communication request into a rendez-vous point
+ * \brief Pushes a communication action into a rendez-vous point
* \param rdv The rendez-vous point
- * \param comm The communication request
+ * \param comm The communication action
*/
-static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_comm_t comm)
+static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
{
xbt_fifo_push(rdv->comm_fifo, comm);
- comm->rdv = rdv;
+ comm->comm.rdv = rdv;
}
/**
- * \brief Remove a communication request from a rendez-vous point
+ * \brief Removes a communication action from a rendez-vous point
* \param rdv The rendez-vous point
- * \param comm The communication request
+ * \param comm The communication action
*/
-static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_comm_t comm)
+XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
{
xbt_fifo_remove(rdv->comm_fifo, comm);
- comm->rdv = NULL;
+ comm->comm.rdv = NULL;
}
-
+
/**
- * \brief Checks if there is a communication request queued in a rendez-vous matching our needs
- * \param type The type of communication we are looking for (comm_send, comm_recv)
- * \return The communication request if found, NULL otherwise.
+ * \brief Wrapper to SIMIX_rdv_get_comm
*/
-smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type) {
- smx_comm_t comm = (smx_comm_t)xbt_fifo_get_item_content(
- xbt_fifo_get_first_item(rdv->comm_fifo));
+smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
+ return SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
+}
- if(comm && comm->type == type){
- DEBUG0("Communication request found!");
- xbt_fifo_shift(rdv->comm_fifo);
- SIMIX_communication_use(comm);
- comm->rdv = NULL;
- return comm;
+/**
+ * \brief Checks if there is a communication action queued in a rendez-vous matching our needs
+ * \param type The type of communication we are looking for (comm_send, comm_recv)
+ * \return The communication action if found, NULL otherwise
+ */
+smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *), void *data)
+{
+ // FIXME rewrite this function by using SIMIX_rdv_has_send/recv_match
+ smx_action_t action;
+ xbt_fifo_item_t item;
+ void* comm_data = NULL;
+
+ xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
+ if (action->comm.type == SIMIX_COMM_SEND) {
+ comm_data = action->comm.src_data;
+ } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
+ comm_data = action->comm.dst_data;
+ }
+ if (action->comm.type == type && (!match_fun || match_fun(data, comm_data))) {
+ XBT_DEBUG("Found a matching communication action %p", action);
+ xbt_fifo_remove_item(rdv->comm_fifo, item);
+ xbt_fifo_free_item(item);
+ action->comm.refcount++;
+ action->comm.rdv = NULL;
+ return action;
+ }
+ XBT_DEBUG("Sorry, communication action %p does not match our needs:"
+ " its type is %d but we are looking for a comm of type %d",
+ action, (int)action->comm.type, (int)type);
}
-
- DEBUG0("Communication request not found");
+ XBT_DEBUG("No matching communication action found");
return NULL;
}
/**
- * \brief counts the number of communication requests of a given host pending
- * on a rendez-vous point
- * \param rdv The rendez-vous point
- * \param host The host to be counted
- * \return The number of comm request pending in the rdv
+ * \brief Checks if there is a send communication action
+ * queued in a rendez-vous matching our needs.
+ * \return 1 if found, 0 otherwise
*/
-int
-SIMIX_rdv_get_count_waiting_comm(smx_rdv_t rdv, smx_host_t host)
-{
- smx_comm_t comm = NULL;
- xbt_fifo_item_t item = NULL;
- int count = 0;
+int SIMIX_comm_has_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
- xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_comm_t) {
- if (comm->src_proc->smx_host == host)
- count++;
- }
+ smx_action_t action;
+ xbt_fifo_item_t item;
- return count;
+ xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
+ if (action->comm.type == SIMIX_COMM_SEND
+ && (!match_fun || match_fun(data, action->comm.src_data))) {
+ XBT_DEBUG("Found a matching communication action %p", action);
+ return 1;
+ }
+ }
+ XBT_DEBUG("No matching communication action found");
+ return 0;
}
/**
- * \brief returns the communication at the head of the rendez-vous
- * \param rdv The rendez-vous point
- * \return The communication or NULL if empty
+ * \brief Checks if there is a recv communication action
+ * queued in a rendez-vous matching our needs.
+ * \return 1 if found, 0 otherwise
*/
-XBT_INLINE smx_comm_t SIMIX_rdv_get_head(smx_rdv_t rdv)
-{
- return (smx_comm_t)xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
-}
+int SIMIX_comm_has_recv_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
-/** @brief adds some API-related data to the rendez-vous point */
-XBT_INLINE void SIMIX_rdv_set_data(smx_rdv_t rdv,void *data) {
- rdv->data=data;
-}
-/** @brief gets API-related data from the rendez-vous point */
-XBT_INLINE void *SIMIX_rdv_get_data(smx_rdv_t rdv) {
- return rdv->data;
+ smx_action_t action;
+ xbt_fifo_item_t item;
+
+ xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
+ if (action->comm.type == SIMIX_COMM_RECEIVE
+ && (!match_fun || match_fun(data, action->comm.dst_data))) {
+ XBT_DEBUG("Found a matching communication action %p", action);
+ return 1;
+ }
+ }
+ XBT_DEBUG("No matching communication action found");
+ return 0;
}
/******************************************************************************/
-/* Communication Requests */
-/******************************************************************************/
+/* Comunication Actions */
+/******************************************************************************/
/**
- * \brief Creates a new communication request
- * \param type The type of communication (comm_send, comm_recv)
- * \return The new communication request
- */
-smx_comm_t SIMIX_communication_new(smx_comm_type_t type)
+ * \brief Creates a new comunicate action
+ * \param type The direction of communication (comm_send, comm_recv)
+ * \return The new comunicate action
+ */
+smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
{
+ smx_action_t act;
+
/* alloc structures */
- smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
- comm->type = type;
- comm->sem = SIMIX_sem_init(0);
- comm->refcount = 1;
-
- return comm;
+ act = xbt_mallocator_get(simix_global->action_mallocator);
+
+ act->type = SIMIX_ACTION_COMMUNICATE;
+ act->state = SIMIX_WAITING;
+
+ /* set communication */
+ act->comm.type = type;
+ act->comm.refcount = 1;
+
+#ifdef HAVE_LATENCY_BOUND_TRACKING
+ //initialize with unknown value
+ act->latency_limited = -1;
+#endif
+
+#ifdef HAVE_TRACING
+ act->category = NULL;
+#endif
+
+ XBT_DEBUG("Create communicate action %p", act);
+ ++smx_total_comms;
+
+ return act;
}
/**
- * \brief Destroy a communication request
- * \param comm The request to be destroyed
+ * \brief Destroy a communicate action
+ * \param action The communicate action to be destroyed
*/
-void SIMIX_communication_destroy(smx_comm_t comm)
+void SIMIX_comm_destroy(smx_action_t action)
{
- VERB2("Destroy communication %p; refcount initially %d",comm,comm->refcount);
+ XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
+ action, action->comm.refcount, (int)action->state);
-#ifdef HAVE_LATENCY_BOUND_TRACKING
- //save is latency limited flag to use afterwards
- if (latency_limited_dict == NULL) {
- latency_limited_dict = xbt_dict_new();
- }
- if (comm->act){
- DEBUG2("adding key %p with latency limited value %d to the dict", comm, SIMIX_action_is_latency_bounded(comm->act));
- xbt_dicti_set(latency_limited_dict, (uintptr_t)comm, SIMIX_action_is_latency_bounded(comm->act));
+ if (action->comm.refcount <= 0) {
+ xbt_backtrace_display_current();
+ xbt_die("the refcount of comm %p is already 0 before decreasing it. "
+ "That's a bug!", action);
}
+ action->comm.refcount--;
+ if (action->comm.refcount > 0)
+ return;
+ XBT_DEBUG("Really free communication %p; refcount is now %d", action,
+ action->comm.refcount);
+
+#ifdef HAVE_LATENCY_BOUND_TRACKING
+ action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
#endif
- comm->refcount--;
- if(comm->refcount > 0)
- return;
+ xbt_free(action->name);
+ SIMIX_comm_destroy_internal_actions(action);
- if(comm->sem){
- SIMIX_sem_destroy(comm->sem);
- comm->sem = NULL;
+ if (action->comm.detached && action->state != SIMIX_DONE) {
+ /* the communication has failed and was detached:
+ * we have to free the buffer */
+ if (action->comm.clean_fun) {
+ action->comm.clean_fun(action->comm.src_buff);
+ }
+ action->comm.src_buff = NULL;
}
-
- if(comm->act){
- SIMIX_action_destroy(comm->act);
- comm->act = NULL;
+
+ xbt_mallocator_release(simix_global->action_mallocator, action);
+}
+
+void SIMIX_comm_destroy_internal_actions(smx_action_t action)
+{
+ if (action->comm.surf_comm){
+#ifdef HAVE_LATENCY_BOUND_TRACKING
+ action->latency_limited = SIMIX_comm_is_latency_bounded(action);
+#endif
+ action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
+ action->comm.surf_comm = NULL;
}
- if(comm->src_timeout){
- SIMIX_action_destroy(comm->src_timeout);
- comm->src_timeout = NULL;
+ if (action->comm.src_timeout){
+ action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
+ action->comm.src_timeout = NULL;
}
- if(comm->dst_timeout){
- SIMIX_action_destroy(comm->dst_timeout);
- comm->dst_timeout = NULL;
- }
+ if (action->comm.dst_timeout){
+ action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
+ action->comm.dst_timeout = NULL;
+ }
+}
+
+smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
+ double task_size, double rate,
+ void *src_buff, size_t src_buff_size,
+ int (*match_fun)(void *, void *),
+ void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+ void *data,
+ int detached)
+{
+ smx_action_t action;
+
+ /* Look for communication action matching our needs.
+ If it is not found then create it and push it into the rendez-vous point */
+ action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
+ if (!action) {
+ action = SIMIX_comm_new(SIMIX_COMM_SEND);
+ SIMIX_rdv_push(rdv, action);
+ } else {
+ action->state = SIMIX_READY;
+ action->comm.type = SIMIX_COMM_READY;
+ }
+ xbt_fifo_push(src_proc->comms, action);
+
+ /* if the communication action is detached then decrease the refcount
+ * by one, so it will be eliminated by the receiver's destroy call */
+ if (detached) {
+ action->comm.detached = 1;
+ action->comm.refcount--;
+ action->comm.clean_fun = clean_fun;
+ } else {
+ action->comm.clean_fun = NULL;
+ }
+ /* Setup the communication action */
+ action->comm.src_proc = src_proc;
+ action->comm.task_size = task_size;
+ action->comm.rate = rate;
+ action->comm.src_buff = src_buff;
+ action->comm.src_buff_size = src_buff_size;
+ action->comm.src_data = data;
+
+ if (MC_IS_ENABLED) {
+ action->state = SIMIX_RUNNING;
+ return action;
+ }
- xbt_free(comm);
+ SIMIX_comm_start(action);
+ return (detached ? NULL : action);
}
-/**
- * \brief Increase the number of users of the communication.
- * \param comm The communication request
- * Each communication request can be used by more than one process, so it is
- * necessary to know number of them at destroy time, to avoid freeing stuff that
- * maybe is in use by others.
- * \
- */
-static XBT_INLINE void SIMIX_communication_use(smx_comm_t comm)
+smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *), void *data)
{
- comm->refcount++;
+ smx_action_t action;
+
+ /* Look for communication action matching our needs.
+ * If it is not found then create it and push it into the rendez-vous point
+ */
+ action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
+
+ if (!action) {
+ action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
+ SIMIX_rdv_push(rdv, action);
+ } else {
+ action->state = SIMIX_READY;
+ action->comm.type = SIMIX_COMM_READY;
+ }
+ xbt_fifo_push(dst_proc->comms, action);
+
+ /* Setup communication action */
+ action->comm.dst_proc = dst_proc;
+ action->comm.dst_buff = dst_buff;
+ action->comm.dst_buff_size = dst_buff_size;
+ action->comm.dst_data = data;
+
+ if (MC_IS_ENABLED) {
+ action->state = SIMIX_RUNNING;
+ return action;
+ }
+
+ SIMIX_comm_start(action);
+ return action;
}
-/**
- * \brief Start the simulation of a communication request
- * \param comm The communication request
- */
-static XBT_INLINE void SIMIX_communication_start(smx_comm_t comm)
+void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
{
- /* If both the sender and the receiver are already there, start the communication */
- if(comm->src_proc && comm->dst_proc){
- DEBUG1("Starting communication %p", comm);
- comm->act = SIMIX_action_communicate(comm->src_proc->smx_host,
- comm->dst_proc->smx_host, NULL,
- comm->task_size, comm->rate);
-#ifdef HAVE_TRACING
- TRACE_smx_action_communicate (comm->act, comm->src_proc);
-#endif
- /* If any of the process is suspend, create the action but stop its execution,
- it will be restarted when the sender process resume */
- if(SIMIX_process_is_suspended(comm->src_proc) ||
- SIMIX_process_is_suspended(comm->dst_proc)) {
- SIMIX_action_suspend(comm->act);
+ /* the simcall may be a wait, a send or a recv */
+ surf_action_t sleep;
+
+ /* Associate this simcall to the wait action */
+ xbt_fifo_push(action->simcalls, simcall);
+ simcall->issuer->waiting_action = action;
+
+ if (MC_IS_ENABLED) {
+ if (idx == 0) {
+ action->state = SIMIX_DONE;
+ } else {
+ /* If we reached this point, the wait simcall must have a timeout */
+ /* Otherwise it shouldn't be enabled and executed by the MC */
+ if (timeout == -1)
+ THROW_IMPOSSIBLE;
+
+ if (action->comm.src_proc == simcall->issuer)
+ action->state = SIMIX_SRC_TIMEOUT;
+ else
+ action->state = SIMIX_DST_TIMEOUT;
}
-
- /* Add the communication as user data of the action */
- comm->act->data = comm;
- /* The semaphore will only get signaled once, but since the first unlocked guy will
- * release_forever() the semaphore, that will unlock the second (and any other)
- * communication partner */
- SIMIX_register_action_to_semaphore(comm->act, comm->sem);
+ SIMIX_comm_finish(action);
+ return;
}
-}
-/**
- * \brief Performs error checking and cleanup
- * \param comm The communication
- */
-static XBT_INLINE void SIMIX_communication_cleanup(smx_comm_t comm)
-{
- DEBUG1("Checking errors and cleaning communication %p", comm);
-
- /* Make sure that everyone sleeping on that semaphore is awake, and that nobody will ever block on it */
- SIMIX_sem_release_forever(comm->sem);
-
- /* Check for errors other than timeouts */
- if (!SIMIX_host_get_state(SIMIX_host_self())){
- if(comm->rdv)
- SIMIX_rdv_remove(comm->rdv, comm);
- SIMIX_communication_destroy(comm);
- THROW0(host_error, 0, "Host failed");
- } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
- SIMIX_communication_destroy(comm);
- THROW0(network_error, 0, "Link failure");
- } else if (!SIMIX_host_get_state(SIMIX_process_get_host(comm->dst_proc)) ||
- !SIMIX_host_get_state(SIMIX_process_get_host(comm->src_proc))) {
- /* We test both src&dst because we dunno who we are today, and we already tested myself above.
- * So, at the end, we test the remote peer only
- * Moreover, we have to test it because if the remote peer fails, the action comm->act is not done nor failed.
- * In that case, we got awaken by the little endless actions created in the SIMIX_sem_acquire(comm->sem)
- * at the beginning of this function. */
- SIMIX_communication_destroy(comm);
- THROW0(network_error, 0, "Remote peer failed");
-
- }
- /* Copy network data */
- SIMIX_network_copy_data(comm);
-
- SIMIX_communication_destroy(comm);
+ /* If the action has already finish perform the error handling, */
+ /* otherwise set up a waiting timeout on the right side */
+ if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
+ SIMIX_comm_finish(action);
+ } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
+ sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
+ surf_workstation_model->action_data_set(sleep, action);
+
+ if (simcall->issuer == action->comm.src_proc)
+ action->comm.src_timeout = sleep;
+ else
+ action->comm.dst_timeout = sleep;
+ }
}
-/**
- * \brief Waits for communication completion
- * \param comm The communication
- * \param timeout The max amount of time to wait for the communication to finish
- *
- * Throws:
- * - host_error if local peer failed
- * - timeout_error if communication reached the timeout specified (either because of local peer or remote peer)
- * - network_error if network failed or remote peer failed
- */
-static XBT_INLINE void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout)
-{
- smx_action_t act_sleep = NULL;
- int src_timeout = 0;
- int dst_timeout = 0;
-
- DEBUG1("Waiting for the completion of communication %p", comm);
-
- if (timeout >= 0) {
- act_sleep = SIMIX_action_sleep(SIMIX_host_self(), timeout);
- if(SIMIX_process_self()==comm->src_proc)
- comm->src_timeout = act_sleep;
- else
- comm->dst_timeout = act_sleep;
- SIMIX_action_set_name(act_sleep,bprintf("Timeout for comm %p and wait on semaphore %p (max_duration:%f)", comm, comm->sem,timeout));
- SIMIX_register_action_to_semaphore(act_sleep, comm->sem);
- SIMIX_process_self()->waiting_action = act_sleep;
- SIMIX_sem_block_onto(comm->sem);
- SIMIX_process_self()->waiting_action = NULL;
- SIMIX_unregister_action_to_semaphore(act_sleep, comm->sem);
+void SIMIX_pre_comm_test(smx_simcall_t simcall)
+{
+ smx_action_t action = simcall->comm_test.comm;
+
+ if(MC_IS_ENABLED){
+ simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
+ if(simcall->comm_test.result){
+ action->state = SIMIX_DONE;
+ xbt_fifo_push(action->simcalls, simcall);
+ SIMIX_comm_finish(action);
+ }else{
+ SIMIX_simcall_answer(simcall);
+ }
+ return;
+ }
+
+ simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
+ if (simcall->comm_test.result) {
+ xbt_fifo_push(action->simcalls, simcall);
+ SIMIX_comm_finish(action);
} else {
- SIMIX_sem_acquire(comm->sem);
+ SIMIX_simcall_answer(simcall);
+ }
+}
+
+void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
+{
+ unsigned int cursor;
+ smx_action_t action;
+ xbt_dynar_t actions = simcall->comm_testany.comms;
+ simcall->comm_testany.result = -1;
+
+ if (MC_IS_ENABLED){
+ if(idx == -1){
+ SIMIX_simcall_answer(simcall);
+ }else{
+ action = xbt_dynar_get_as(actions, idx, smx_action_t);
+ simcall->comm_testany.result = idx;
+ xbt_fifo_push(action->simcalls, simcall);
+ action->state = SIMIX_DONE;
+ SIMIX_comm_finish(action);
+ }
+ return;
}
- /* Check for timeouts */
- if ((src_timeout = ((comm->src_timeout) && (SIMIX_action_get_state(comm->src_timeout) == SURF_ACTION_DONE))) ||
- (dst_timeout = ((comm->dst_timeout) && (SIMIX_action_get_state(comm->dst_timeout) == SURF_ACTION_DONE))) ) {
- /* Somebody did a timeout! */
- if (src_timeout) DEBUG1("Communication timeout from the src! %p", comm);
- if (dst_timeout) DEBUG1("Communication timeout from the dst! %p", comm);
+ xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
+ if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
+ simcall->comm_testany.result = cursor;
+ xbt_fifo_push(action->simcalls, simcall);
+ SIMIX_comm_finish(action);
+ return;
+ }
+ }
+ SIMIX_simcall_answer(simcall);
+}
- if(comm->act && SIMIX_action_get_state(comm->act) == SURF_ACTION_RUNNING)
- SIMIX_communication_cancel(comm);
- else if (comm->rdv)
- SIMIX_rdv_remove(comm->rdv, comm);
+void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
+{
+ smx_action_t action;
+ unsigned int cursor = 0;
+ xbt_dynar_t actions = simcall->comm_waitany.comms;
+
+ if (MC_IS_ENABLED){
+ action = xbt_dynar_get_as(actions, idx, smx_action_t);
+ xbt_fifo_push(action->simcalls, simcall);
+ simcall->comm_waitany.result = idx;
+ action->state = SIMIX_DONE;
+ SIMIX_comm_finish(action);
+ return;
+ }
- /* Make sure that everyone sleeping on that semaphore is awake, and that nobody will ever block on it */
- SIMIX_sem_release_forever(comm->sem);
- SIMIX_communication_destroy(comm);
+ xbt_dynar_foreach(actions, cursor, action){
+ /* associate this simcall to the the action */
+ xbt_fifo_push(action->simcalls, simcall);
- THROW1(timeout_error, 0, "Communication timeouted because of %s",src_timeout?"the source":"the destination");
+ /* see if the action is already finished */
+ if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
+ SIMIX_comm_finish(action);
+ break;
+ }
}
+}
+
+void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
+{
+ smx_action_t action;
+ unsigned int cursor = 0;
+ xbt_dynar_t actions = simcall->comm_waitany.comms;
- DEBUG1("Communication %p complete!", comm);
- SIMIX_communication_cleanup(comm);
+ xbt_dynar_foreach(actions, cursor, action) {
+ xbt_fifo_remove(action->simcalls, simcall);
+ }
}
/**
- * \brief Cancels a communication
- * \brief comm The communication to cancel
+ * \brief Starts the simulation of a communication action.
+ * \param action the communication action
*/
-XBT_INLINE void SIMIX_communication_cancel(smx_comm_t comm)
+XBT_INLINE void SIMIX_comm_start(smx_action_t action)
{
- if (comm->act)
- SIMIX_action_cancel(comm->act);
+ /* If both the sender and the receiver are already there, start the communication */
+ if (action->state == SIMIX_READY) {
+
+ smx_host_t sender = action->comm.src_proc->smx_host;
+ smx_host_t receiver = action->comm.dst_proc->smx_host;
+
+ XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
+ SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
+
+ action->comm.surf_comm = surf_workstation_model->extension.workstation.
+ communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
+
+ surf_workstation_model->action_data_set(action->comm.surf_comm, action);
+
+ action->state = SIMIX_RUNNING;
+
+ /* If a link is failed, detect it immediately */
+ if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+ XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
+ SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
+ action->state = SIMIX_LINK_FAILURE;
+ SIMIX_comm_destroy_internal_actions(action);
+ }
+
+ /* If any of the process is suspend, create the action but stop its execution,
+ it will be restarted when the sender process resume */
+ if (SIMIX_process_is_suspended(action->comm.src_proc) ||
+ SIMIX_process_is_suspended(action->comm.dst_proc)) {
+ /* FIXME: check what should happen with the action state */
+ surf_workstation_model->suspend(action->comm.surf_comm);
+ }
+ }
}
/**
- * \brief get the amount remaining from the communication
- * \param comm The communication
+ * \brief Answers the SIMIX simcalls associated to a communication action.
+ * \param action a finished communication action
*/
-XBT_INLINE double SIMIX_communication_get_remains(smx_comm_t comm)
+void SIMIX_comm_finish(smx_action_t action)
{
- DEBUG1("calling SIMIX_action_get_remains(%p)", comm->act);
- return SIMIX_action_get_remains(comm->act);
-}
+ unsigned int destroy_count = 0;
+ smx_simcall_t simcall;
+
+ while ((simcall = xbt_fifo_shift(action->simcalls))) {
+
+ /* If a waitany simcall is waiting for this action to finish, then remove
+ it from the other actions in the waitany list. Afterwards, get the
+ position of the actual action in the waitany dynar and
+ return it as the result of the simcall */
+ if (simcall->call == SIMCALL_COMM_WAITANY) {
+ SIMIX_waitany_remove_simcall_from_actions(simcall);
+ if (!MC_IS_ENABLED)
+ simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
+ }
+
+ /* If the action is still in a rendez-vous point then remove from it */
+ if (action->comm.rdv)
+ SIMIX_rdv_remove(action->comm.rdv, action);
+
+ XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
+
+ /* Check out for errors */
+ switch (action->state) {
+
+ case SIMIX_DONE:
+ XBT_DEBUG("Communication %p complete!", action);
+ SIMIX_comm_copy_data(action);
+ break;
+
+ case SIMIX_SRC_TIMEOUT:
+ SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
+ "Communication timeouted because of sender");
+ break;
+
+ case SIMIX_DST_TIMEOUT:
+ SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
+ "Communication timeouted because of receiver");
+ break;
+
+ case SIMIX_SRC_HOST_FAILURE:
+ if (simcall->issuer == action->comm.src_proc)
+ SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
+ else
+ SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
+ break;
+
+ case SIMIX_DST_HOST_FAILURE:
+ if (simcall->issuer == action->comm.dst_proc)
+ SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
+ else
+ SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
+ break;
+
+ case SIMIX_LINK_FAILURE:
+ XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
+ action,
+ action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
+ action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
+ simcall->issuer->name, simcall->issuer, action->comm.detached);
+ if (action->comm.src_proc == simcall->issuer) {
+ XBT_DEBUG("I'm source");
+ } else if (action->comm.dst_proc == simcall->issuer) {
+ XBT_DEBUG("I'm dest");
+ } else {
+ XBT_DEBUG("I'm neither source nor dest");
+ }
+ SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
+ break;
+
+ case SIMIX_CANCELED:
+ if (simcall->issuer == action->comm.dst_proc)
+ SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
+ "Communication canceled by the sender");
+ else
+ SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
+ "Communication canceled by the receiver");
+ break;
+
+ default:
+ xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
+ }
+
+ /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
+ if (simcall->issuer->doexception) {
+ if (simcall->call == SIMCALL_COMM_WAITANY) {
+ simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
+ }
+ else if (simcall->call == SIMCALL_COMM_TESTANY) {
+ simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
+ }
+ }
+
+ simcall->issuer->waiting_action = NULL;
+ xbt_fifo_remove(simcall->issuer->comms, action);
+ SIMIX_simcall_answer(simcall);
+ destroy_count++;
+ }
+
+ while (destroy_count-- > 0)
+ SIMIX_comm_destroy(action);
+}
-#ifdef HAVE_LATENCY_BOUND_TRACKING
/**
- * \brief verify if communication is latency bounded
- * \param comm The communication
+ * \brief This function is called when a Surf communication action is finished.
+ * \param action the corresponding Simix communication
*/
-XBT_INLINE int SIMIX_communication_is_latency_bounded(smx_comm_t comm)
+void SIMIX_post_comm(smx_action_t action)
{
- //try to find comm on the list of finished flows
- uintptr_t key = 0;
- uintptr_t data = 0;
- xbt_dict_cursor_t cursor;
- xbt_dict_foreach(latency_limited_dict,cursor,key,data) {
- DEBUG2("comparing key=%p with comm=%p", (void*)key, (void*)comm);
- if((void*)comm == (void*)key){
- DEBUG2("key %p found, return value latency limited value %d", (void*)key, (int)data);
- return (int)data;
- }
+ /* Update action state */
+ if (action->comm.src_timeout &&
+ surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
+ action->state = SIMIX_SRC_TIMEOUT;
+ else if (action->comm.dst_timeout &&
+ surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
+ action->state = SIMIX_DST_TIMEOUT;
+ else if (action->comm.src_timeout &&
+ surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
+ action->state = SIMIX_SRC_HOST_FAILURE;
+ else if (action->comm.dst_timeout &&
+ surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
+ action->state = SIMIX_DST_HOST_FAILURE;
+ else if (action->comm.surf_comm &&
+ surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
+ XBT_DEBUG("Puta madre. Surf says that the link broke");
+ action->state = SIMIX_LINK_FAILURE;
+ } else
+ action->state = SIMIX_DONE;
+
+ XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
+ action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
+
+ /* destroy the surf actions associated with the Simix communication */
+ SIMIX_comm_destroy_internal_actions(action);
+
+ /* remove the communication action from the list of pending communications
+ * of both processes (if they still exist) */
+ if (action->comm.src_proc) {
+ xbt_fifo_remove(action->comm.src_proc->comms, action);
+ }
+ if (action->comm.dst_proc) {
+ xbt_fifo_remove(action->comm.dst_proc->comms, action);
}
- DEBUG1("calling SIMIX_action_is_latency_bounded(%p)", comm->act);
- return SIMIX_action_is_latency_bounded(comm->act);
+ /* if there are simcalls associated with the action, then answer them */
+ if (xbt_fifo_size(action->simcalls)) {
+ SIMIX_comm_finish(action);
+ }
}
-#endif
-/******************************************************************************/
-/* SIMIX_network_copy_data callbacks */
-/******************************************************************************/
-static void (*SIMIX_network_copy_data_callback)(smx_comm_t, size_t) = &SIMIX_network_copy_pointer_callback;
+void SIMIX_comm_cancel(smx_action_t action)
+{
+ /* if the action is a waiting state means that it is still in a rdv */
+ /* so remove from it and delete it */
+ if (action->state == SIMIX_WAITING) {
+ SIMIX_rdv_remove(action->comm.rdv, action);
+ action->state = SIMIX_CANCELED;
+ }
+ else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
+ && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
-void SIMIX_network_set_copy_data_callback(void (*callback)(smx_comm_t, size_t)) {
- SIMIX_network_copy_data_callback = callback;
+ surf_workstation_model->action_cancel(action->comm.surf_comm);
+ }
}
-void SIMIX_network_copy_pointer_callback(smx_comm_t comm, size_t buff_size) {
- xbt_assert1((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)",buff_size);
- *(void**)(comm->dst_buff) = comm->src_buff;
+void SIMIX_comm_suspend(smx_action_t action)
+{
+ /*FIXME: shall we suspend also the timeout actions? */
+ surf_workstation_model->suspend(action->comm.surf_comm);
}
-void SIMIX_network_copy_buffer_callback(smx_comm_t comm, size_t buff_size) {
- memcpy(comm->dst_buff, comm->src_buff, buff_size);
+void SIMIX_comm_resume(smx_action_t action)
+{
+ /*FIXME: check what happen with the timeouts */
+ surf_workstation_model->resume(action->comm.surf_comm);
}
+
+/************* Action Getters **************/
+
/**
- * \brief Copy the communication data from the sender's buffer to the receiver's one
- * \param comm The communication
+ * \brief get the amount remaining from the communication
+ * \param action The communication
*/
-void SIMIX_network_copy_data(smx_comm_t comm)
+double SIMIX_comm_get_remains(smx_action_t action)
{
- size_t buff_size = comm->src_buff_size;
- uintptr_t casted_size = 0;
- uintptr_t amount = 0;
- /* If there is no data to be copy then return */
- if(!comm->src_buff || !comm->dst_buff || comm->copied == 1)
- return;
+ double remains;
- DEBUG6("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
- comm,
- comm->src_proc->smx_host->name, comm->src_buff,
- comm->dst_proc->smx_host->name, comm->dst_buff,
- buff_size);
+ if(!action){
+ return 0;
+ }
- /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
- if (comm->dst_buff_size)
- buff_size = MIN(buff_size,*(comm->dst_buff_size));
-
- /* Update the receiver's buffer size to the copied amount */
- if (comm->dst_buff_size)
- *comm->dst_buff_size = buff_size;
+ switch (action->state) {
- if(buff_size == 0)
- return;
- (*SIMIX_network_copy_data_callback)(comm, buff_size);
+ case SIMIX_RUNNING:
+ remains = surf_workstation_model->get_remains(action->comm.surf_comm);
+ break;
- /* Set the copied flag so we copy data only once */
- /* (this function might be called from both communication ends)*/
- comm->copied = 1;
-
- /* pimple to display the message sizes */
- {
- if (msg_sizes == NULL)
- msg_sizes = xbt_dict_new();
- casted_size = comm->task_size;
- amount = xbt_dicti_get(msg_sizes, casted_size);
- amount++;
+ case SIMIX_WAITING:
+ case SIMIX_READY:
+ remains = 0; /*FIXME: check what should be returned */
+ break;
- xbt_dicti_set(msg_sizes,casted_size, amount);
+ default:
+ remains = 0; /*FIXME: is this correct? */
+ break;
}
+ return remains;
}
-#include "xbt.h"
-/* pimple to display the message sizes */
-void SIMIX_message_sizes_output(const char *filename) {
- uintptr_t key = 0;
- uintptr_t data = 0;
- xbt_dict_cursor_t cursor;
- FILE * out = NULL;
- out = fopen(filename,"w");
- xbt_assert1(out,"Cannot open file %s",filename);
- xbt_dict_foreach(msg_sizes,cursor,key,data) {
- fprintf(out,"%zu %zu\n",key,data);
- }
- fclose(out);
+e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
+{
+ return action->state;
}
/**
- * \brief Return the user data associated to the communication
- * \param comm The communication
+ * \brief Return the user data associated to the sender of the communication
+ * \param action The communication
* \return the user data
*/
-XBT_INLINE void *SIMIX_communication_get_data(smx_comm_t comm)
+void* SIMIX_comm_get_src_data(smx_action_t action)
{
- return comm->data;
+ return action->comm.src_data;
}
-XBT_PUBLIC(void *) SIMIX_communication_get_src_buf(smx_comm_t comm)
-{
- return comm->src_buff;
-}
-XBT_PUBLIC(void *) SIMIX_communication_get_dst_buf(smx_comm_t comm)
-{
- return comm->dst_buff;
-}
-XBT_PUBLIC(size_t) SIMIX_communication_get_src_buf_size(smx_comm_t comm)
+/**
+ * \brief Return the user data associated to the receiver of the communication
+ * \param action The communication
+ * \return the user data
+ */
+void* SIMIX_comm_get_dst_data(smx_action_t action)
{
- return comm->src_buff_size;
+ return action->comm.dst_data;
}
-XBT_PUBLIC(size_t) SIMIX_communication_get_dst_buf_size(smx_comm_t comm)
+
+smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
{
- return *(comm->dst_buff_size);
+ return action->comm.src_proc;
}
-/******************************************************************************/
-/* Synchronous Communication */
-/******************************************************************************/
-/**
- * \brief Put a send communication request in a rendez-vous point and waits for
- * its completion (blocking)
- * \param rdv The rendez-vous point
- * \param task_size The size of the communication action (for surf simulation)
- * \param rate The rate of the communication action (for surf)
- * \param timeout The timeout used for the waiting the completion
- * \param src_buff The source buffer containing the message to be sent
- * \param src_buff_size The size of the source buffer
- * \param comm_ref The communication object used for the send (useful if someone else wants to cancel this communication afterward)
- * \param data User data associated to the communication object
- * Throws:
- * - host_error if peer failed
- * - timeout_error if communication reached the timeout specified
- * - network_error if network failed or peer issued a timeout
- */
-XBT_INLINE void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
- double timeout, void *src_buff, size_t src_buff_size,
- smx_comm_t *comm_ref, void *data)
+smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
{
- *comm_ref = SIMIX_network_isend(rdv,task_size,rate,src_buff,src_buff_size,data);
- SIMIX_network_wait(*comm_ref,timeout);
+ return action->comm.dst_proc;
}
+#ifdef HAVE_LATENCY_BOUND_TRACKING
/**
- * \brief Put a receive communication request in a rendez-vous point and waits
- * for its completion (blocking)
- * \param rdv The rendez-vous point
- * \param timeout The timeout used for the waiting the completion
- * \param dst_buff The destination buffer to copy the received message
- * \param src_buff_size The size of the destination buffer
- * \param comm_ref The communication object used for the send (useful if someone else wants to cancel this communication afterward)
- * Throws:
- * - host_error if peer failed
- * - timeout_error if communication reached the timeout specified
- * - network_error if network failed or peer issued a timeout
+ * \brief verify if communication is latency bounded
+ * \param comm The communication
*/
-XBT_INLINE void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *dst_buff,
- size_t *dst_buff_size, smx_comm_t *comm_ref)
+XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
{
- *comm_ref = (smx_comm_t) SIMIX_network_irecv(rdv,dst_buff,dst_buff_size);
- SIMIX_network_wait(*comm_ref,timeout);
+ if(!action){
+ return 0;
+ }
+ if (action->comm.surf_comm){
+ XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
+ action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
+ XBT_DEBUG("Action limited is %d", action->latency_limited);
+ }
+ return action->latency_limited;
}
+#endif
/******************************************************************************/
-/* Asynchronous Communication */
+/* SIMIX_comm_copy_data callbacks */
/******************************************************************************/
-smx_comm_t SIMIX_network_isend(smx_rdv_t rdv, double task_size, double rate,
- void *src_buff, size_t src_buff_size, void *data)
-{
- smx_comm_t comm;
-
- /*If running in model-checking mode then intercept the communication action*/
- #ifdef HAVE_MC
- if (_surf_do_model_check)
- MC_trans_intercept_isend(rdv);
- #endif
- /* Look for communication request matching our needs.
- If it is not found then create it and push it into the rendez-vous point */
- comm = SIMIX_rdv_get_request(rdv, comm_recv);
-
- if(!comm){
- comm = SIMIX_communication_new(comm_send);
- SIMIX_rdv_push(rdv, comm);
- }
+static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
+ &SIMIX_comm_copy_pointer_callback;
- /* Setup the communication request */
- comm->src_proc = SIMIX_process_self();
- comm->task_size = task_size;
- comm->rate = rate;
- comm->src_buff = src_buff;
- comm->src_buff_size = src_buff_size;
- comm->data = data;
-
- SIMIX_communication_start(comm);
- return comm;
+void
+SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
+{
+ SIMIX_comm_copy_data_callback = callback;
}
-smx_comm_t SIMIX_network_irecv(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size)
+void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
{
- smx_comm_t comm;
-
- /*If running in model-checking mode then intercept the communication action*/
-#ifdef HAVE_MC
- if (_surf_do_model_check)
- MC_trans_intercept_irecv(rdv);
-#endif
- /* Look for communication request matching our needs.
- * If it is not found then create it and push it into the rendez-vous point
- */
- comm = SIMIX_rdv_get_request(rdv, comm_send);
-
- if(!comm){
- comm = SIMIX_communication_new(comm_recv);
- SIMIX_rdv_push(rdv, comm);
- }
-
- /* Setup communication request */
- comm->dst_proc = SIMIX_process_self();
- comm->dst_buff = dst_buff;
- comm->dst_buff_size = dst_buff_size;
-
- SIMIX_communication_start(comm);
- return comm;
+ xbt_assert((buff_size == sizeof(void *)),
+ "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
+ *(void **) (comm->comm.dst_buff) = buff;
}
-/** @brief blocks until the communication terminates or the timeout occurs */
-XBT_INLINE void SIMIX_network_wait(smx_comm_t comm, double timeout)
+void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
{
- /*If running in model-checking mode then intercept the communication action*/
-#ifdef HAVE_MC
- if (_surf_do_model_check)
- MC_trans_intercept_wait(comm);
-#endif
- /* Wait for communication completion */
- SIMIX_communication_wait_for_completion(comm, timeout);
+ XBT_DEBUG("Copy the data over");
+ memcpy(comm->comm.dst_buff, buff, buff_size);
}
-/** @Returns whether the (asynchronous) communication is done yet or not */
-XBT_INLINE int SIMIX_network_test(smx_comm_t comm)
+void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_size)
{
- /*If running in model-checking mode then intercept the communication action*/
-#ifdef HAVE_MC
- if (_surf_do_model_check)
- MC_trans_intercept_test(comm);
-#endif
-
- /* Copy data if the communication is done */
- if(comm->sem && !SIMIX_sem_would_block(comm->sem)){
- /* Copy network data */
- SIMIX_network_copy_data(comm);
- return TRUE;
+ XBT_DEBUG("Copy the data over");
+ memcpy(comm->comm.dst_buff, buff, buff_size);
+ if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
+ xbt_free(buff);
+ comm->comm.src_buff = NULL;
}
- return FALSE;
}
-/** @brief wait for the completion of any communication of a set
- *
- * @Returns the rank in the dynar of communication which finished; destroy it after identifying which one it is
+/**
+ * \brief Copy the communication data from the sender's buffer to the receiver's one
+ * \param comm The communication
*/
-unsigned int SIMIX_network_waitany(xbt_dynar_t comms)
+void SIMIX_comm_copy_data(smx_action_t comm)
{
- xbt_dynar_t sems = xbt_dynar_new(sizeof(smx_sem_t),NULL);
- unsigned int cursor, found_comm=-1;
- smx_comm_t comm,comm_finished=NULL;
+ size_t buff_size = comm->comm.src_buff_size;
+ /* If there is no data to be copy then return */
+ if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
+ return;
- /*If running in model-checking mode then intercept the communication action*/
-#ifdef HAVE_MC
- if (_surf_do_model_check)
- MC_trans_intercept_waitany(comms);
-#endif
- xbt_dynar_foreach(comms,cursor,comm)
- xbt_dynar_push(sems,&(comm->sem));
+ XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
+ comm,
+ comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
+ comm->comm.src_buff,
+ comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
+ comm->comm.dst_buff, buff_size);
- DEBUG1("Waiting for the completion of communication set %p", comms);
+ /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
+ if (comm->comm.dst_buff_size)
+ buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
- found_comm = SIMIX_sem_acquire_any(sems);
- xbt_dynar_free_container(&sems);
- xbt_assert0(found_comm!=-1,"Cannot find which communication finished");
- xbt_dynar_get_cpy(comms,found_comm,&comm_finished);
+ /* Update the receiver's buffer size to the copied amount */
+ if (comm->comm.dst_buff_size)
+ *comm->comm.dst_buff_size = buff_size;
- DEBUG2("Communication %p of communication set %p finished", comm_finished, comms);
+ if (buff_size > 0)
+ SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
- /* let the regular code deal with the communication end (errors checking and cleanup).
- * A bit of useless work will be done, but that's good for source factorization */
- SIMIX_sem_release_forever(comm_finished->sem);
- SIMIX_communication_wait_for_completion(comm_finished, -1);
- return found_comm;
+ /* Set the copied flag so we copy data only once */
+ /* (this function might be called from both communication ends) */
+ comm->comm.copied = 1;
}