git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9063
48e7efb5-ca39-0410-a469-
dd3cf9ba447f
XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
double rate, void *src_buff,
size_t src_buff_size,
XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size,
double rate, void *src_buff,
size_t src_buff_size,
+ int (*match_fun)(void *, void *),
void *data);
XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
void *data);
XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff,
- size_t * dst_buff_size);
+ size_t * dst_buff_size,
+ int (*match_fun)(void *, void *),
+ void *data);
XBT_PUBLIC(void) SIMIX_req_comm_destroy(smx_action_t comm);
XBT_PUBLIC(void) SIMIX_req_comm_destroy(smx_action_t comm);
sock_data->comm_recv =
SIMIX_req_comm_irecv(gras_socket_im_the_server(sock) ?
sock_data->rdv_server : sock_data->rdv_client,
sock_data->comm_recv =
SIMIX_req_comm_irecv(gras_socket_im_the_server(sock) ?
sock_data->rdv_server : sock_data->rdv_client,
- comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), msg);
+ comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), NULL, msg);
SIMIX_req_comm_wait(comm, -1);
VERB0("Message sent (and received)");
SIMIX_req_comm_wait(comm, -1);
VERB0("Message sent (and received)");
/* initialize synchronization stuff on the socket */
data->rdv_server = pr->rdv;
data->rdv_client = SIMIX_req_rdv_create(NULL);
/* initialize synchronization stuff on the socket */
data->rdv_server = pr->rdv;
data->rdv_client = SIMIX_req_rdv_create(NULL);
- data->comm_recv = SIMIX_req_comm_irecv(data->rdv_client, NULL, 0);
+ data->comm_recv = SIMIX_req_comm_irecv(data->rdv_client, NULL, 0, NULL, NULL);
/* connect that simulation data to the socket */
sock->data = data;
/* connect that simulation data to the socket */
sock->data = data;
data->client = NULL;
data->rdv_server = pr->rdv;
data->rdv_client = NULL;
data->client = NULL;
data->rdv_server = pr->rdv;
data->rdv_client = NULL;
- data->comm_recv = SIMIX_req_comm_irecv(pr->rdv, NULL, 0);
+ data->comm_recv = SIMIX_req_comm_irecv(pr->rdv, NULL, 0, NULL, NULL);
/* Send it by calling SIMIX network layer */
return SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
/* Send it by calling SIMIX network layer */
return SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
- t_simdata->rate, task, sizeof(void *),
+ t_simdata->rate, task, sizeof(void *), NULL,
("MSG_task_get() was asked to write in a non empty task struct.");
/* Try to receive it by calling SIMIX network layer */
("MSG_task_get() was asked to write in a non empty task struct.");
/* Try to receive it by calling SIMIX network layer */
- return SIMIX_req_comm_irecv(rdv, task, NULL);
+ return SIMIX_req_comm_irecv(rdv, task, NULL, NULL, NULL);
}
/** \ingroup msg_gos_functions
}
/** \ingroup msg_gos_functions
/* Try to receive it by calling SIMIX network layer */
TRY {
/* Try to receive it by calling SIMIX network layer */
TRY {
- comm = SIMIX_req_comm_irecv(mailbox, task, NULL);
+ comm = SIMIX_req_comm_irecv(mailbox, task, NULL, NULL, NULL);
SIMIX_req_comm_wait(comm, timeout);
SIMIX_req_comm_destroy(comm);
DEBUG2("Got task %s from %p",(*task)->name,mailbox);
SIMIX_req_comm_wait(comm, timeout);
SIMIX_req_comm_destroy(comm);
DEBUG2("Got task %s from %p",(*task)->name,mailbox);
/* Try to send it by calling SIMIX network layer */
TRY {
t_simdata->comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
/* Try to send it by calling SIMIX network layer */
TRY {
t_simdata->comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
- t_simdata->rate, task, sizeof(void *), task);
+ t_simdata->rate, task, sizeof(void *), NULL, task);
#ifdef HAVE_TRACING
SIMIX_req_set_category(t_simdata->comm, task->category);
#endif
#ifdef HAVE_TRACING
SIMIX_req_set_category(t_simdata->comm, task->category);
#endif
smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv);
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
double task_size, double rate,
smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv);
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
double task_size, double rate,
- void *src_buff, size_t src_buff_size, void *data);
+ void *src_buff, size_t src_buff_size,
+ int (*)(void *, void *), void *data);
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size);
+ void *dst_buff, size_t *dst_buff_size,
+ int (*)(void *, void *), void *data);
void SIMIX_comm_destroy(smx_action_t action);
void SIMIX_comm_destroy_internal_actions(smx_action_t action);
void SIMIX_pre_comm_wait(smx_req_t req);
void SIMIX_comm_destroy(smx_action_t action);
void SIMIX_comm_destroy_internal_actions(smx_action_t action);
void SIMIX_pre_comm_wait(smx_req_t req);
double rate;
void *src_buff;
size_t src_buff_size;
double rate;
void *src_buff;
size_t src_buff_size;
+ int (*match_fun)(void *, void *);
} comm_isend;
struct {
smx_rdv_t rdv;
void *dst_buff;
size_t *dst_buff_size;
} comm_isend;
struct {
smx_rdv_t rdv;
void *dst_buff;
size_t *dst_buff_size;
+ int (*match_fun)(void *, void *);
+ void *data;
smx_action_t result;
} comm_irecv;
smx_action_t result;
} comm_irecv;
double timeout);
static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
double timeout);
static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
-static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
+static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *), void *);
static void SIMIX_rdv_free(void *data);
void SIMIX_network_init(void)
static void SIMIX_rdv_free(void *data);
void SIMIX_network_init(void)
* \param type The type of communication we are looking for (comm_send, comm_recv)
* \return The communication request if found, NULL otherwise
*/
* \param type The type of communication we are looking for (comm_send, comm_recv)
* \return The communication request if found, NULL otherwise
*/
-smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
-{
- smx_action_t comm = (smx_action_t)
- xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
-
- if (comm && comm->comm.type == type) {
- DEBUG0("Communication request found!");
- xbt_fifo_shift(rdv->comm_fifo);
- comm->comm.refcount++;
- comm->comm.rdv = NULL;
- return comm;
+smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *), void *data)
+{
+ smx_action_t req;
+ xbt_fifo_item_t item;
+
+ xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){
+ if(req->comm.type == type && (!match_fun || match_fun(data, req->comm.data))){
+ xbt_fifo_remove_item(rdv->comm_fifo, item);
+ req->comm.refcount++;
+ req->comm.rdv = NULL;
+ return req;
+ }
}
DEBUG0("Communication request not found");
}
DEBUG0("Communication request not found");
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
double task_size, double rate,
smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
double task_size, double rate,
- void *src_buff, size_t src_buff_size, void *data)
+ void *src_buff, size_t src_buff_size,
+ int (*match_fun)(void *, void *), void *data)
{
smx_action_t action;
/* Look for communication request matching our needs.
If it is not found then create it and push it into the rendez-vous point */
{
smx_action_t action;
/* Look for communication request matching our needs.
If it is not found then create it and push it into the rendez-vous point */
- action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
+ action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
if (!action) {
action = SIMIX_comm_new(SIMIX_COMM_SEND);
if (!action) {
action = SIMIX_comm_new(SIMIX_COMM_SEND);
}
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
}
smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
- void *dst_buff, size_t *dst_buff_size)
+ void *dst_buff, size_t *dst_buff_size,
+ int (*match_fun)(void *, void *), void *data)
{
smx_action_t action;
/* Look for communication request matching our needs.
* If it is not found then create it and push it into the rendez-vous point
*/
{
smx_action_t action;
/* Look for communication request matching our needs.
* If it is not found then create it and push it into the rendez-vous point
*/
- action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
+ action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
if (!action) {
action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
if (!action) {
action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
req->comm_isend.rate,
req->comm_isend.src_buff,
req->comm_isend.src_buff_size,
req->comm_isend.rate,
req->comm_isend.src_buff,
req->comm_isend.src_buff_size,
+ req->comm_isend.match_fun,
req->comm_isend.data);
SIMIX_request_answer(req);
break;
req->comm_isend.data);
SIMIX_request_answer(req);
break;
req->issuer,
req->comm_irecv.rdv,
req->comm_irecv.dst_buff,
req->issuer,
req->comm_irecv.rdv,
req->comm_irecv.dst_buff,
- req->comm_irecv.dst_buff_size);
+ req->comm_irecv.dst_buff_size,
+ req->comm_irecv.match_fun,
+ req->comm_irecv.data);
SIMIX_request_answer(req);
break;
SIMIX_request_answer(req);
break;
}
smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
}
smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate,
- void *src_buff, size_t src_buff_size, void *data)
+ void *src_buff, size_t src_buff_size,
+ int (*match_fun)(void *, void *), void *data)
req.comm_isend.rate = rate;
req.comm_isend.src_buff = src_buff;
req.comm_isend.src_buff_size = src_buff_size;
req.comm_isend.rate = rate;
req.comm_isend.src_buff = src_buff;
req.comm_isend.src_buff_size = src_buff_size;
+ req.comm_isend.match_fun = match_fun;
req.comm_isend.data = data;
SIMIX_request_push(&req);
return req.comm_isend.result;
}
req.comm_isend.data = data;
SIMIX_request_push(&req);
return req.comm_isend.result;
}
-smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size)
+smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size,
+ int (*match_fun)(void *, void *), void *data)
req.comm_irecv.rdv = rdv;
req.comm_irecv.dst_buff = dst_buff;
req.comm_irecv.dst_buff_size = dst_buff_size;
req.comm_irecv.rdv = rdv;
req.comm_irecv.dst_buff = dst_buff;
req.comm_irecv.dst_buff_size = dst_buff_size;
+ req.comm_irecv.match_fun = match_fun;
+ req.comm_irecv.data = data;
SIMIX_request_push(&req);
return req.comm_irecv.result;
SIMIX_request_push(&req);
return req.comm_irecv.result;
smpi_process_post_recv(request);
print_request("New recv", request);
request->pair =
smpi_process_post_recv(request);
print_request("New recv", request);
request->pair =
- SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size);
+ SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size, NULL, NULL);
} else {
smpi_process_post_send(request->comm, request); // FIXME
print_request("New send", request);
request->pair =
SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
} else {
smpi_process_post_send(request->comm, request); // FIXME
print_request("New send", request);
request->pair =
SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
- request->buf, request->size, NULL);
+ request->buf, request->size, NULL, NULL);
#ifdef HAVE_TRACING
SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category());
#endif
#ifdef HAVE_TRACING
SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category());
#endif