void *data);
XBT_PUBLIC(void) simcall_comm_destroy(smx_action_t comm);
-
+XBT_PUBLIC(smx_action_t) simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag,
+ int (*match_fun)(void *, void *, smx_action_t), void *data);
XBT_PUBLIC(void) simcall_comm_cancel(smx_action_t comm);
/* FIXME: waitany is going to be a vararg function, and should take a timeout */
void *recvbuf, int *recvcounts,
int *recvdisps, MPI_Datatype recvtype,
MPI_Comm comm));
+MPI_CALL(XBT_PUBLIC(int), MPI_Iprobe,
+ (int source, int tag, MPI_Comm comm,
+ int* flag, MPI_Status* status));
+MPI_CALL(XBT_PUBLIC(int), MPI_Probe,
+ (int source, int tag, MPI_Comm comm,
+ MPI_Status* status));
//FIXME: these are not yet implemented
typedef void MPI_Handler_function(MPI_Comm*, int*, ...);
MPI_CALL(XBT_PUBLIC(int), MPI_Comm_remote_group, (MPI_Comm comm, MPI_Group* group));
MPI_CALL(XBT_PUBLIC(int), MPI_Comm_remote_size, (MPI_Comm comm, int* size));
MPI_CALL(XBT_PUBLIC(int), MPI_Issend, (void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request));
-MPI_CALL(XBT_PUBLIC(int), MPI_Probe, (int source, int tag, MPI_Comm comm, MPI_Status* status));
-MPI_CALL(XBT_PUBLIC(int), MPI_Attr_delete, (MPI_Comm comm, int keyval));
MPI_CALL(XBT_PUBLIC(int), MPI_Attr_get, (MPI_Comm comm, int keyval, void* attr_value, int* flag));
MPI_CALL(XBT_PUBLIC(int), MPI_Attr_put, (MPI_Comm comm, int keyval, void* attr_value));
MPI_CALL(XBT_PUBLIC(int), MPI_Rsend, (void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm));
MPI_CALL(XBT_PUBLIC(int), MPI_Testall, (int count, MPI_Request* requests, int* flag, MPI_Status* statuses));
MPI_CALL(XBT_PUBLIC(int), MPI_Get_elements, (MPI_Status* status, MPI_Datatype datatype, int* elements));
MPI_CALL(XBT_PUBLIC(int), MPI_Dims_create, (int nnodes, int ndims, int* dims));
-MPI_CALL(XBT_PUBLIC(int), MPI_Iprobe, (int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status));
-MPI_CALL(XBT_PUBLIC(int), MPI_Initialized, (int* flag));
//FIXME: End of all the not yet implemented stuff
// smpi functions
}
+/**
+ * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
+ * \param type The type of communication we are looking for (comm_send, comm_recv)
+ * \return The communication action if found, NULL otherwise
+ */
+smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
+ int (*match_fun)(void *, void *,smx_action_t),
+ void *this_user_data, smx_action_t my_action)
+{
+ smx_action_t action;
+ xbt_fifo_item_t item;
+ void* other_user_data = NULL;
+
+ xbt_fifo_foreach(fifo, item, action, smx_action_t) {
+ if (action->comm.type == SIMIX_COMM_SEND) {
+ other_user_data = action->comm.src_data;
+ } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
+ other_user_data = action->comm.dst_data;
+ }
+ if (action->comm.type == type &&
+ (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
+ (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
+ XBT_DEBUG("Found a matching communication action %p", action);
+ action->comm.refcount++;
+
+ return action;
+ }
+ XBT_DEBUG("Sorry, communication action %p does not match our needs:"
+ " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
+ action, (int)action->comm.type, (int)type);
+ }
+ XBT_DEBUG("No matching communication action found");
+ return NULL;
+}
/******************************************************************************/
/* Communication Actions */
/******************************************************************************/
other_action->state = SIMIX_DONE;
other_action->comm.type = SIMIX_COMM_DONE;
other_action->comm.rdv = NULL;
- SIMIX_comm_destroy(this_action);
- --smx_total_comms; // this creation was a pure waste
+ //SIMIX_comm_destroy(this_action);
+ //--smx_total_comms; // this creation was a pure waste
//already_received=1;
- other_action->comm.refcount--;
+ //other_action->comm.refcount--;
}/*else{
XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
}*/
- other_action->comm.refcount--;
+ // other_action->comm.refcount--;
SIMIX_comm_destroy(this_action);
--smx_total_comms; // this creation was a pure waste
}
--smx_total_comms; // this creation was a pure waste
other_action->state = SIMIX_READY;
other_action->comm.type = SIMIX_COMM_READY;
+ // other_action->comm.refcount--;
}
xbt_fifo_push(dst_proc->comms, other_action);
}
return other_action;
}
+
+smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
+ int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
+{
+ XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
+ smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
+
+ smx_action_t other_action;
+ if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
+ //find a match in the already received fifo
+ other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
+ }else{
+ other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
+ }
+ if(other_action)other_action->comm.refcount--;
+
+ SIMIX_comm_destroy(this_action);
+ --smx_total_comms;
+ return other_action;
+}
+
void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
{
void SIMIX_comm_destroy(smx_action_t action);
void SIMIX_comm_destroy_internal_actions(smx_action_t action);
void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx);
+smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
+ int tag, int (*match_fun)(void *, void *, smx_action_t), void *data);
void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx);
void SIMIX_post_comm(smx_action_t action);
void SIMIX_pre_comm_test(smx_simcall_t simcall);
SIMIX_simcall_answer(simcall);
break;
+ case SIMCALL_COMM_IPROBE:
+ simcall->comm_iprobe.result = SIMIX_comm_iprobe(
+ simcall->issuer,
+ simcall->comm_iprobe.rdv,
+ simcall->comm_iprobe.src,
+ simcall->comm_iprobe.tag,
+ simcall->comm_iprobe.match_fun,
+ simcall->comm_iprobe.data);
+ SIMIX_simcall_answer(simcall);
+ break;
+
case SIMCALL_COMM_DESTROY:
SIMIX_comm_destroy(simcall->comm_destroy.comm);
SIMIX_simcall_answer(simcall);
SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_WAIT),\
SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_TEST),\
SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_TESTANY),\
+SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_IPROBE),\
SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_GET_REMAINS),\
SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_GET_STATE),\
SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_GET_SRC_DATA),\
smx_action_t result;
} comm_irecv;
+ struct {
+ smx_rdv_t rdv;
+ int src;
+ int tag;
+ int (*match_fun)(void *, void *, smx_action_t);
+ void *data;
+ smx_action_t result;
+ } comm_iprobe;
+
struct {
smx_action_t comm;
} comm_destroy;
SIMIX_simcall_push(simcall->issuer);
return simcall->comm_irecv.result;
}
+
+
+/**
+ * \ingroup simix_comm_management
+ */
+smx_action_t simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag,
+ int (*match_fun)(void *, void *, smx_action_t), void *data)
+{
+ xbt_assert(rdv, "No rendez-vous point defined for iprobe");
+
+ smx_simcall_t simcall = SIMIX_simcall_mine();
+
+ simcall->call = SIMCALL_COMM_IPROBE;
+ simcall->comm_iprobe.rdv = rdv;
+ simcall->comm_iprobe.src = src;
+ simcall->comm_iprobe.match_fun = match_fun;
+ simcall->comm_iprobe.data = data;
+ if(MC_IS_ENABLED) /* Initialize result to NULL for snapshot comparison done during simcall */
+ simcall->comm_iprobe.result = NULL;
+ SIMIX_simcall_push(simcall->issuer);
+ return simcall->comm_iprobe.result;
+}
+
+
void simcall_comm_destroy(smx_action_t comm)
{
xbt_assert(comm, "Invalid parameter");
int smpi_mpi_test(MPI_Request * request, MPI_Status * status);
int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
MPI_Status * status);
+void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status);
+MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag,
+ MPI_Status* status);
int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype);
void smpi_mpi_wait(MPI_Request * request, MPI_Status * status);
int smpi_mpi_waitany(int count, MPI_Request requests[],
if (oldbuf)
memcpy(request->buf,oldbuf,request->size);
XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
- }else{
- XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
- mailbox = smpi_process_remote_mailbox(
- smpi_group_index(smpi_comm_group(request->comm), request->dst));
}
request->action =
static void finish_wait(MPI_Request * request, MPI_Status * status)
{
MPI_Request req = *request;
+ // if we have a sender, we should use its data, and not the data from the receive
+ if((req->action)&&
+ (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
+ req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
if(status != MPI_STATUS_IGNORE) {
status->MPI_SOURCE = req->src;
// right?
status->count = req->size;
}
+ req = *request;
+
print_request("Finishing", req);
if(req->flags & NON_PERSISTENT) {
smpi_mpi_request_free(request);
return flag;
}
+void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
+ int flag=0;
+ //FIXME find another wait to avoid busy waiting ?
+ // the issue here is that we have to wait on a nonexistent comm
+ MPI_Request request;
+ while(flag==0){
+ request = smpi_mpi_iprobe(source, tag, comm, &flag, status);
+ XBT_DEBUG("Busy Waiting on probing : %d", flag);
+ if(!flag) {
+ smpi_mpi_request_free(&request);
+ simcall_process_sleep(0.0001);
+ }
+ }
+}
+
+MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
+ MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
+ comm, NON_PERSISTENT | RECV);
+ // behave like a receive, but don't do it
+ smx_rdv_t mailbox;
+
+ print_request("New iprobe", request);
+ // We have to test both mailboxes as we don't know if we will receive one one or another
+ if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
+ mailbox = smpi_process_mailbox_small();
+ request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
+
+ }
+ if (request->action==NULL){
+ mailbox = smpi_process_mailbox();
+ request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
+ }
+
+ if(request->action){
+ MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
+ *flag=true;
+ if(status != MPI_STATUS_IGNORE) {
+ status->MPI_SOURCE = req->src;
+ status->MPI_TAG = req->tag;
+ status->MPI_ERROR = MPI_SUCCESS;
+ status->count = req->size;
+ }
+ }
+ else *flag=false;
+
+ return request;
+}
+
void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
{
print_request("Waiting", *request);
return retval;
}
+
+
+int PMPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status) {
+ int retval;
+ smpi_bench_end();
+
+ if (status == NULL) {
+ retval = MPI_ERR_ARG;
+ }else if (comm == MPI_COMM_NULL) {
+ retval = MPI_ERR_COMM;
+ } else {
+ smpi_mpi_probe(source, tag, comm, status);
+ retval = MPI_SUCCESS;
+ }
+ smpi_bench_begin();
+ return retval;
+}
+
+
+int PMPI_Iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status) {
+ int retval;
+ smpi_bench_end();
+
+ if (flag == NULL) {
+ retval = MPI_ERR_ARG;
+ }else if (status == NULL) {
+ retval = MPI_ERR_ARG;
+ }else if (comm == MPI_COMM_NULL) {
+ retval = MPI_ERR_COMM;
+ } else {
+ smpi_mpi_iprobe(source, tag, comm, flag, status);
+ retval = MPI_SUCCESS;
+ }
+ smpi_bench_begin();
+ return retval;
+}
+
int PMPI_Wait(MPI_Request * request, MPI_Status * status)
{
int retval;
return not_yet_implemented();
}
-int PMPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status) {
- return not_yet_implemented();
-}
+
int PMPI_Attr_delete(MPI_Comm comm, int keyval) {
return not_yet_implemented();
return not_yet_implemented();
}
-int PMPI_Iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status) {
- return not_yet_implemented();
-}
-
int PMPI_Initialized(int* flag) {
return not_yet_implemented();
}