* under the terms of the license (GNU LGPL) which comes with this package. */
#include "private.h"
-#include "xbt/time.h"
+#include "xbt/virtu.h"
#include "mc/mc.h"
#include "xbt/replay.h"
#include <errno.h>
+#include "simix/smx_private.h"
#include "surf/surf.h"
xbt_assert(ref, "Cannot match recv against null reference");
xbt_assert(req, "Cannot match recv against null request");
- return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
- && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
+ if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
+ && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag)){
+ //we match, we can transfer some values
+ // FIXME : move this to the copy function ?
+ if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
+ if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
+ if(ref->real_size < req->real_size) ref->truncated = 1;
+ else ref->truncated = 0;
+ return 1;
+ }else return 0;
}
static int match_send(void* a, void* b,smx_action_t ignored) {
XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
xbt_assert(ref, "Cannot match send against null reference");
xbt_assert(req, "Cannot match send against null request");
- return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
- && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
+
+ if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
+ && (req->tag == MPI_ANY_TAG || req->tag == ref->tag))
+ {
+ if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
+ if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
+ if(req->real_size < ref->real_size) req->truncated = 1;
+ else ref->truncated = 0;
+ return 1;
+ } else return 0;
}
static MPI_Request build_request(void *buf, int count,
{
MPI_Request request;
- void *old_buf;
+ void *old_buf = NULL;
request = xbt_new(s_smpi_mpi_request_t, 1);
s_smpi_subtype_t *subtype = datatype->substruct;
if(datatype->has_subtype == 1){
- // This part handles the problem of non-contignous memory
+ // This part handles the problem of non-contiguous memory
old_buf = buf;
buf = malloc(count*smpi_datatype_size(datatype));
if (flags & SEND) {
}
request->buf = buf;
- // This part handles the problem of non-contignous memory (for the
+ // This part handles the problem of non-contiguous memory (for the
// unserialisation at the reception)
request->old_buf = old_buf;
request->old_type = datatype;
request->comm = comm;
request->action = NULL;
request->flags = flags;
+ request->detached = 0;
#ifdef HAVE_TRACING
request->send = 0;
request->recv = 0;
#endif
+ if (flags & SEND) smpi_datatype_unuse(datatype);
+
return request;
}
void smpi_mpi_start(MPI_Request request)
{
smx_rdv_t mailbox;
- int detached = 0;
xbt_assert(!request->action,
"Cannot (re)start a non-finished communication");
if(request->flags & RECV) {
print_request("New recv", request);
- if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
+ if (request->size < surf_cfg_get_int("smpi/async_small_thres"))
mailbox = smpi_process_mailbox_small();
else
mailbox = smpi_process_mailbox();
-
- // FIXME: SIMIX does not yet support non-contiguous datatypes
- request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
+ // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
+ request->real_size=request->size;
+ smpi_datatype_use(request->old_type);
+ request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
+ if (request->action)request->action->comm.refcount++;
} else {
+
+ int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
+/* if(receiver == MPI_UNDEFINED) {*/
+/* XBT_WARN("Trying to send a message to a wrong rank");*/
+/* return;*/
+/* }*/
print_request("New send", request);
- if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode
- mailbox = smpi_process_remote_mailbox_small(
- smpi_group_index(smpi_comm_group(request->comm), request->dst));
+ if (request->size < surf_cfg_get_int("smpi/async_small_thres")) { // eager mode
+ mailbox = smpi_process_remote_mailbox_small(receiver);
}else{
XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
- mailbox = smpi_process_remote_mailbox(
- smpi_group_index(smpi_comm_group(request->comm), request->dst));
+ mailbox = smpi_process_remote_mailbox(receiver);
}
if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
- void *oldbuf = request->buf;
- detached = 1;
- request->buf = malloc(request->size);
- if (oldbuf)
- memcpy(request->buf,oldbuf,request->size);
+ void *oldbuf = NULL;
+ if(request->old_type->has_subtype == 0){
+ oldbuf = request->buf;
+ request->detached = 1;
+ if (oldbuf){
+ request->buf = malloc(request->size);
+ memcpy(request->buf,oldbuf,request->size);
+ }
+ }
XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
}
+ // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
+ request->real_size=request->size;
+ smpi_datatype_use(request->old_type);
request->action =
simcall_comm_isend(mailbox, request->size, -1.0,
- request->buf, request->size,
+ request->buf, request->real_size,
&match_send,
&smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
request,
// detach if msg size < eager/rdv switch limit
- detached);
+ request->detached);
+ if (request->action)request->action->comm.refcount++;
#ifdef HAVE_TRACING
/* FIXME: detached sends are not traceable (request->action == NULL) */
void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
int tag, MPI_Comm comm)
{
- MPI_Request request;
+ MPI_Request request =
+ build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ comm, NON_PERSISTENT | SEND | RECV_DELETE);
- request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
+ smpi_mpi_start(request);
smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
+
}
void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
static void finish_wait(MPI_Request * request, MPI_Status * status)
{
MPI_Request req = *request;
- // if we have a sender, we should use its data, and not the data from the receive
- if((req->action)&&
- (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
- req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
if(status != MPI_STATUS_IGNORE) {
- status->MPI_SOURCE = req->src;
- status->MPI_TAG = req->tag;
- status->MPI_ERROR = MPI_SUCCESS;
+ status->MPI_SOURCE = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
+ status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
+ if(req->truncated)
+ status->MPI_ERROR = MPI_ERR_TRUNCATE;
+ else status->MPI_ERROR = MPI_SUCCESS ;
+ // this handles the case were size in receive differs from size in send
// FIXME: really this should just contain the count of receive-type blocks,
// right?
- status->count = req->size;
+ status->count = req->real_size;
}
req = *request;
print_request("Finishing", req);
MPI_Datatype datatype = req->old_type;
+
if(datatype->has_subtype == 1){
// This part handles the problem of non-contignous memory
// the unserialization at the reception
s_smpi_subtype_t *subtype = datatype->substruct;
if(req->flags & RECV) {
- subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
+ subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
}
- //FIXME: I am not sure that if the send is detached we have to free
- //the sender buffer thus I do it only for the reciever
- if(req->flags & RECV) free(req->buf);
+ if(req->detached == 0) free(req->buf);
}
+ smpi_datatype_unuse(datatype);
+
if(req->flags & NON_PERSISTENT) {
+ if(req->flags & RECV &&
+ req->action &&
+ (req->action->state == SIMIX_DONE))
+ {
+ MPI_Request sender_request = (MPI_Request)SIMIX_comm_get_src_data(req->action);
+ if((sender_request!=MPI_REQUEST_NULL) &&
+ ( sender_request->detached ) &&
+ ( sender_request->flags & RECV_DELETE))
+ {
+ //we are in a receiver's wait from a detached send
+ //we have to clean the sender's side request here.... but only if done by a send, not an isend
+ //the request lives senderside for an isend. As detached is currently for send + isend, we use RECV_DELETE to separate them
+ //FIXME : see if just removing detached status for isend is also good
+ smpi_mpi_request_free(&sender_request);
+ }
+ }
+
+
+ if(req->action){
+ //if we want to free our request, we have to invalidate it at the other end of the comm
+ if(req->flags & SEND){
+ req->action->comm.src_data=MPI_REQUEST_NULL;
+ }else{
+ req->action->comm.dst_data=MPI_REQUEST_NULL;
+ }
+
+ smx_action_t temp=req->action;
+ if(req->action->comm.refcount == 1)req->action = NULL;
+ SIMIX_comm_destroy(temp);
+ }
+
+
+
smpi_mpi_request_free(request);
+
+
} else {
+ if(req->action)SIMIX_comm_destroy(req->action);
req->action = NULL;
}
}
print_request("New iprobe", request);
// We have to test both mailboxes as we don't know if we will receive one one or another
- if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
+ if (surf_cfg_get_int("smpi/async_small_thres")>0){
mailbox = smpi_process_mailbox_small();
XBT_DEBUG("trying to probe the perm recv mailbox");
request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
if(request->action){
MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
- *flag=true;
+ *flag = 1;
if(status != MPI_STATUS_IGNORE) {
status->MPI_SOURCE = req->src;
status->MPI_TAG = req->tag;
status->MPI_ERROR = MPI_SUCCESS;
- status->count = req->size;
+ status->count = req->real_size;
}
}
- else *flag=false;
+ else *flag = 0;
smpi_mpi_request_free(&request);
return;
simcall_comm_wait((*request)->action, -1.0);
finish_wait(request, status);
}
+
// FIXME for a detached send, finish_wait is not called:
}
return index;
}
-void smpi_mpi_waitall(int count, MPI_Request requests[],
+int smpi_mpi_waitall(int count, MPI_Request requests[],
MPI_Status status[])
{
int index, c;
MPI_Status stat;
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+ int retvalue=MPI_SUCCESS;
//tag invalid requests in the set
for(c = 0; c < count; c++) {
if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
}
for(c = 0; c < count; c++) {
- if(MC_IS_ENABLED) {
+ if(MC_is_active()) {
smpi_mpi_wait(&requests[c], pstat);
index = c;
} else {
}
if(status != MPI_STATUSES_IGNORE) {
memcpy(&status[index], pstat, sizeof(*pstat));
+ if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
}
}
}
+ return retvalue;
}
int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
count = 0;
+ count_dead = 0;
for(i = 0; i < incount; i++) {
if((requests[i] != MPI_REQUEST_NULL)) {
if(smpi_mpi_test(&requests[i], pstat)) {