/***** Communication Requests *****/
-/* Constructors and Destructor */
+XBT_PUBLIC(void) SIMIX_req_comm_send(smx_rdv_t rdv, double task_size,
+ double rate, void *src_buff,
+ size_t src_buff_size,
+ int (*match_fun)(void *, void *),
+ void *data, double timeout);
+
XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
- double rate, void *src_buff,
- size_t src_buff_size,
- int (*match_fun)(void *, void *),
- void *data, int detached);
+ double rate, void *src_buff,
+ size_t src_buff_size,
+ int (*match_fun)(void *, void *),
+ void *data, int detached);
+
+XBT_PUBLIC(void) SIMIX_req_comm_recv(smx_rdv_t rdv, void *dst_buff,
+ size_t * dst_buff_size,
+ int (*match_fun)(void *, void *),
+ void *data, double timeout);
XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
- size_t * dst_buff_size,
- int (*match_fun)(void *, void *),
- void *data);
+ size_t * dst_buff_size,
+ int (*match_fun)(void *, void *),
+ void *data);
XBT_PUBLIC(void) SIMIX_req_comm_destroy(smx_action_t comm);
-/* Communication handling */
XBT_INLINE XBT_PUBLIC(void) SIMIX_req_comm_cancel(smx_action_t comm);
/* FIXME: waitany is going to be a vararg function, and should take a timeout */
{
xbt_ex_t e;
MSG_error_t ret = MSG_OK;
- volatile smx_action_t comm = NULL;
/* We no longer support getting a task from a specific host */
if (host)
THROW_UNIMPLEMENTED;
/* Try to receive it by calling SIMIX network layer */
TRY {
- comm = SIMIX_req_comm_irecv(mailbox, task, NULL, NULL, NULL);
- SIMIX_req_comm_wait(comm, timeout);
- (*task)->simdata->comm = comm;
+ SIMIX_req_comm_recv(mailbox, task, NULL, NULL, NULL, timeout);
DEBUG2("Got task %s from %p",(*task)->name,mailbox);
(*task)->simdata->isused=0;
}
}
xbt_ex_free(e);
}
- if (comm != NULL) {
- //SIMIX_req_comm_destroy(comm);
- }
#ifdef HAVE_TRACING
if (ret != MSG_HOST_FAILURE &&
MSG_error_t ret = MSG_OK;
simdata_task_t t_simdata = NULL;
m_process_t process = MSG_process_self();
- volatile smx_action_t comm = NULL;
#ifdef HAVE_TRACING
+ volatile smx_action_t comm = NULL;
int call_end = 0;
#endif
CHECK_HOST();
/* Try to send it by calling SIMIX network layer */
TRY {
+#ifdef HAVE_TRACING
comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
- t_simdata->rate, task, sizeof(void *), NULL, NULL, 0);
+ t_simdata->rate, task, sizeof(void *), NULL, NULL, 0);
t_simdata->comm = comm;
-#ifdef HAVE_TRACING
SIMIX_req_set_category(comm, task->category);
-#endif
SIMIX_req_comm_wait(comm, timeout);
+#else
+ SIMIX_req_comm_send(mailbox, t_simdata->message_size,
+ t_simdata->rate, task, sizeof(void*), NULL, NULL, timeout);
+#endif
}
CATCH(e) {
xbt_ex_free(e);
/* If the send failed, it is not used anymore */
- t_simdata->isused=0;
- }
-
- if (comm != NULL) {
- //SIMIX_req_comm_destroy(comm);
+ t_simdata->isused = 0;
}
process->simdata->waiting_task = NULL;
smx_rdv_t SIMIX_rdv_get_by_name(const char *name);
int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host);
smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv);
+void SIMIX_comm_send(smx_process_t src_proc, smx_rdv_t rdv,
+ double task_size, double rate,
+ void *src_buff, size_t src_buff_size,
+ int (*)(void *, void *), void *data,
+ double timeout);
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*)(void *, void *), void *data,
int detached);
+void SIMIX_comm_recv(smx_process_t dst_proc, smx_rdv_t rdv,
+ void *dst_buff, size_t *dst_buff_size,
+ int (*)(void *, void *), void *data,
+ double timeout);
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
void *dst_buff, size_t *dst_buff_size,
int (*)(void *, void *), void *data);
void SIMIX_comm_destroy(smx_action_t action);
void SIMIX_comm_destroy_internal_actions(smx_action_t action);
-void SIMIX_pre_comm_wait(smx_req_t req, int idx);
+void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int idx);
void SIMIX_pre_comm_waitany(smx_req_t req, int idx);
void SIMIX_post_comm(smx_action_t action);
void SIMIX_pre_comm_test(smx_req_t req);
SIMIX_REQ_ENUM_ELEMENT(REQ_RDV_GEY_BY_NAME),\
SIMIX_REQ_ENUM_ELEMENT(REQ_RDV_COMM_COUNT_BY_HOST),\
SIMIX_REQ_ENUM_ELEMENT(REQ_RDV_GET_HEAD),\
+SIMIX_REQ_ENUM_ELEMENT(REQ_COMM_SEND),\
SIMIX_REQ_ENUM_ELEMENT(REQ_COMM_ISEND),\
+SIMIX_REQ_ENUM_ELEMENT(REQ_COMM_RECV),\
SIMIX_REQ_ENUM_ELEMENT(REQ_COMM_IRECV),\
SIMIX_REQ_ENUM_ELEMENT(REQ_COMM_DESTROY),\
SIMIX_REQ_ENUM_ELEMENT(REQ_COMM_CANCEL),\
smx_action_t result;
} rdv_get_head;
+ struct {
+ smx_rdv_t rdv;
+ double task_size;
+ double rate;
+ void *src_buff;
+ size_t src_buff_size;
+ int (*match_fun)(void *, void *);
+ void *data;
+ double timeout;
+ } comm_send;
+
struct {
smx_rdv_t rdv;
double task_size;
void *dst_buff;
size_t *dst_buff_size;
int (*match_fun)(void *, void *);
- void *data;
+ void *data;
+ double timeout;
+ } comm_recv;
+
+ struct {
+ smx_rdv_t rdv;
+ void *dst_buff;
+ size_t *dst_buff_size;
+ int (*match_fun)(void *, void *);
+ void *data;
smx_action_t result;
} comm_irecv;
return action;
}
-void SIMIX_pre_comm_wait(smx_req_t req, int idx)
+void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int idx)
{
- smx_action_t action = req->comm_wait.comm;
- double timeout = req->comm_wait.timeout;
+ /* the request may be a wait, a send or a recv */
surf_action_t sleep;
/* Associate this request to the action */
xbt_fifo_push(action->request_list, req);
req->issuer->waiting_action = action;
- if (MC_IS_ENABLED){
- if(idx == 0){
+ if (MC_IS_ENABLED) {
+ if (idx == 0) {
action->state = SIMIX_DONE;
- }else{
+ } else {
/* If we reached this point, the wait request must have a timeout */
/* Otherwise it shouldn't be enabled and executed by the MC */
- if(timeout == -1)
+ if (timeout == -1)
THROW_IMPOSSIBLE;
- if(action->comm.src_proc == req->issuer)
+ if (action->comm.src_proc == req->issuer)
action->state = SIMIX_SRC_TIMEOUT;
else
action->state = SIMIX_DST_TIMEOUT;
return it as the result of the call */
if (req->call == REQ_COMM_WAITANY) {
SIMIX_waitany_req_remove_from_actions(req);
- if(!MC_IS_ENABLED)
+ if (!MC_IS_ENABLED)
req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
}
destroy_count++;
}
- while(destroy_count-- > 0)
+ while (destroy_count-- > 0)
SIMIX_comm_destroy(action);
}
break;
case REQ_COMM_WAIT:
- SIMIX_pre_comm_wait(req, value);
+ SIMIX_pre_comm_wait(req,
+ req->comm_wait.comm,
+ req->comm_wait.timeout,
+ value);
break;
case REQ_COMM_WAITANY:
SIMIX_pre_comm_waitany(req, value);
break;
+ case REQ_COMM_SEND:
+ {
+ smx_action_t comm = SIMIX_comm_isend(
+ req->issuer,
+ req->comm_send.rdv,
+ req->comm_send.task_size,
+ req->comm_send.rate,
+ req->comm_send.src_buff,
+ req->comm_send.src_buff_size,
+ req->comm_send.match_fun,
+ req->comm_send.data,
+ 0);
+ SIMIX_pre_comm_wait(req, comm, req->comm_send.timeout, 0);
+ break;
+ }
+
case REQ_COMM_ISEND:
req->comm_isend.result = SIMIX_comm_isend(
req->issuer,
SIMIX_request_answer(req);
break;
+ case REQ_COMM_RECV:
+ {
+ smx_action_t comm = SIMIX_comm_irecv(
+ req->issuer,
+ req->comm_recv.rdv,
+ req->comm_recv.dst_buff,
+ req->comm_recv.dst_buff_size,
+ req->comm_recv.match_fun,
+ req->comm_recv.data);
+ SIMIX_pre_comm_wait(req, comm, req->comm_recv.timeout, 0);
+ break;
+ }
+
case REQ_COMM_IRECV:
req->comm_irecv.result = SIMIX_comm_irecv(
req->issuer,
#include "private.h"
+#include "mc/mc.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix);
return req->rdv_get_head.result;
}
+void SIMIX_req_comm_send(smx_rdv_t rdv, double task_size, double rate,
+ void *src_buff, size_t src_buff_size,
+ int (*match_fun)(void *, void *), void *data,
+ double timeout)
+{
+ xbt_assert0(rdv, "No rendez-vous point defined for send");
+
+ if (MC_IS_ENABLED) {
+ /* the model-checker wants two separate requests */
+ smx_action_t comm = SIMIX_req_comm_isend(rdv, task_size, rate,
+ src_buff, src_buff_size, match_fun, data, 0);
+ SIMIX_req_comm_wait(comm, timeout);
+ }
+ else {
+ smx_req_t req = SIMIX_req_mine();
+
+ req->call = REQ_COMM_SEND;
+ req->comm_send.rdv = rdv;
+ req->comm_send.task_size = task_size;
+ req->comm_send.rate = rate;
+ req->comm_send.src_buff = src_buff;
+ req->comm_send.src_buff_size = src_buff_size;
+ req->comm_send.match_fun = match_fun;
+ req->comm_send.data = data;
+ req->comm_send.timeout = timeout;
+
+ SIMIX_request_push();
+ }
+}
+
smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
void *src_buff, size_t src_buff_size,
int (*match_fun)(void *, void *), void *data,
int detached)
{
- smx_req_t req = SIMIX_req_mine();
-
xbt_assert0(rdv, "No rendez-vous point defined for isend");
+ smx_req_t req = SIMIX_req_mine();
+
req->call = REQ_COMM_ISEND;
req->comm_isend.rdv = rdv;
req->comm_isend.task_size = task_size;
return req->comm_isend.result;
}
+void SIMIX_req_comm_recv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
+ int (*match_fun)(void *, void *), void *data, double timeout)
+{
+ xbt_assert0(rdv, "No rendez-vous point defined for recv");
+
+ if (MC_IS_ENABLED) {
+ /* the model-checker wants two separate requests */
+ smx_action_t comm = SIMIX_req_comm_irecv(rdv, dst_buff, dst_buff_size,
+ match_fun, data);
+ SIMIX_req_comm_wait(comm, timeout);
+ }
+ else {
+ smx_req_t req = SIMIX_req_mine();
+
+ req->call = REQ_COMM_RECV;
+ req->comm_recv.rdv = rdv;
+ req->comm_recv.dst_buff = dst_buff;
+ req->comm_recv.dst_buff_size = dst_buff_size;
+ req->comm_recv.match_fun = match_fun;
+ req->comm_recv.data = data;
+ req->comm_recv.timeout = timeout;
+
+ SIMIX_request_push();
+ }
+}
+
smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
- int (*match_fun)(void *, void *), void *data)
+ int (*match_fun)(void *, void *), void *data)
{
- smx_req_t req = SIMIX_req_mine();
+ xbt_assert0(rdv, "No rendez-vous point defined for irecv");
- xbt_assert0(rdv, "No rendez-vous point defined for isend");
+ smx_req_t req = SIMIX_req_mine();
req->call = REQ_COMM_IRECV;
req->comm_irecv.rdv = rdv;