Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Avoid using simcalls here, as by descheduling the process, we could misplace some...
[simgrid.git] / src / smpi / smpi_base.c
index 035c5ef..57ae7e0 100644 (file)
@@ -17,7 +17,7 @@
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
 
 
-static int match_recv(void* a, void* b, smx_action_t ignored) {
+static int match_recv(void* a, void* b, smx_synchro_t ignored) {
    MPI_Request ref = (MPI_Request)a;
    MPI_Request req = (MPI_Request)b;
    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
@@ -39,7 +39,7 @@ static int match_recv(void* a, void* b, smx_action_t ignored) {
   }else return 0;
 }
 
-static int match_send(void* a, void* b,smx_action_t ignored) {
+static int match_send(void* a, void* b,smx_synchro_t ignored) {
    MPI_Request ref = (MPI_Request)a;
    MPI_Request req = (MPI_Request)b;
    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
@@ -348,15 +348,15 @@ void smpi_mpi_start(MPI_Request request)
     if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")){
     //We have to check both mailboxes (because SSEND messages are sent to the large mbox). begin with the more appropriate one : the small one.
       mailbox = smpi_process_mailbox_small();
-      XBT_DEBUG("Is there a corresponding send already posted the small mailbox %p (in case of SSEND)?", mailbox);
-      smx_action_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
+      XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
+      smx_synchro_t action = SIMIX_comm_iprobe(SIMIX_process_self(), mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
     
       if(action ==NULL){
         mailbox = smpi_process_mailbox();
         XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
-        action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
+        action = SIMIX_comm_iprobe(SIMIX_process_self(), mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
         if(action ==NULL){
-          XBT_DEBUG("Still notching, switch back to the small mailbox : %p", mailbox);
+          XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
           mailbox = smpi_process_mailbox_small();
           }
       }else{
@@ -365,7 +365,7 @@ void smpi_mpi_start(MPI_Request request)
     }else{
       mailbox = smpi_process_mailbox_small();
       XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
-    smx_action_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
+    smx_synchro_t action = SIMIX_comm_iprobe(SIMIX_process_self(), mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
     
       if(action ==NULL){
         XBT_DEBUG("No, nothing in the permanent receive mailbox");
@@ -388,7 +388,8 @@ void smpi_mpi_start(MPI_Request request)
     smpi_comm_use(request->comm);
     request->action = simcall_comm_irecv(mailbox, request->buf,
                                          &request->real_size, &match_recv,
-                                         &smpi_comm_copy_buffer_callback,
+                                         !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+                                         : &smpi_comm_null_copy_buffer_callback,
                                          request, -1.0);
         XBT_DEBUG("recv simcall posted");
 
@@ -421,7 +422,7 @@ void smpi_mpi_start(MPI_Request request)
     if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
       mailbox = smpi_process_remote_mailbox(receiver);
       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
-      smx_action_t action = simcall_comm_iprobe(mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
+      smx_synchro_t action = SIMIX_comm_iprobe(SIMIX_process_self(), mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
       if(action ==NULL){
        if (! (request->flags & SSEND)){
          mailbox = smpi_process_remote_mailbox_small(receiver);
@@ -429,7 +430,7 @@ void smpi_mpi_start(MPI_Request request)
        } else{
          mailbox = smpi_process_remote_mailbox_small(receiver);
          XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
-         action = simcall_comm_iprobe(mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
+         action = SIMIX_comm_iprobe(SIMIX_process_self(), mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
          if(action ==NULL){
            XBT_DEBUG("No, we are first, send to large mailbox");
            mailbox = smpi_process_remote_mailbox(receiver);
@@ -451,7 +452,7 @@ void smpi_mpi_start(MPI_Request request)
       request->refcount++;
       if(request->old_type->has_subtype == 0){
         oldbuf = request->buf;
-        if (!_xbt_replay_is_active() && oldbuf && request->size!=0){
+        if (!smpi_process_get_replaying() && oldbuf && request->size!=0){
           if((smpi_privatize_global_variables)
              && ((char*)request->buf >= start_data_exe)
              && ((char*)request->buf < start_data_exe + size_data_exe )){
@@ -474,7 +475,8 @@ void smpi_mpi_start(MPI_Request request)
                          buf, request->real_size,
                          &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
-                         &smpi_comm_copy_buffer_callback,
+                         !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+                         : &smpi_comm_null_copy_buffer_callback,
                          request,
                          // detach if msg size < eager/rdv switch limit
                          request->detached);
@@ -690,7 +692,7 @@ static void finish_wait(MPI_Request * request, MPI_Status * status)
     MPI_Datatype datatype = req->old_type;
 
     if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){
-      if (!_xbt_replay_is_active()){
+      if (!smpi_process_get_replaying()){
         if( smpi_privatize_global_variables
             && ((char*)req->old_buf >= start_data_exe)
             && ((char*)req->old_buf < start_data_exe + size_data_exe )
@@ -774,7 +776,7 @@ int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
 
   *index = MPI_UNDEFINED;
   flag = 0;
-  comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
+  comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
   map = xbt_new(int, count);
   size = 0;
   for(i = 0; i < count; i++) {
@@ -903,10 +905,9 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
 
   if ((*request)->action != NULL) { // this is not a detached send
     simcall_comm_wait((*request)->action, -1.0);
-#ifdef HAVE_MC
-  if(MC_is_active() && (*request)->action)
+
+  if((MC_is_active() || MC_record_replay_is_active()) && (*request)->action)
     (*request)->action->comm.dst_data = NULL; // dangling pointer : dst_data is freed with a wait, need to set it to NULL for system state comparison
-#endif
   }
 
   finish_wait(request, status);
@@ -925,7 +926,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[],
   index = MPI_UNDEFINED;
   if(count > 0) {
     // Wait for a request to complete
-    comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
+    comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
     map = xbt_new(int, count);
     size = 0;
     XBT_DEBUG("Wait for one of %d", count);
@@ -991,7 +992,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[],
   }
   for(c = 0; c < count; c++) {
 
-    if (MC_is_active()) {
+    if (MC_is_active() || MC_record_replay_is_active()) {
       smpi_mpi_wait(&requests[c], pstat);
       index = c;
     } else {
@@ -1372,7 +1373,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
 
   char* sendtmpbuf = (char*) sendbuf;
   if( sendbuf == MPI_IN_PLACE ) {
-    sendtmpbuf = (char *)xbt_malloc(count*smpi_datatype_get_extent(datatype));
+    sendtmpbuf = (char *)smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype));
     smpi_datatype_copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
   }
 
@@ -1403,7 +1404,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
       if(src != root) {
         // FIXME: possibly overkill we we have contiguous/noncontiguous data
         //  mapping...
-           if (!_xbt_replay_is_active())
+           if (!smpi_process_get_replaying())
           tmpbufs[index] = xbt_malloc(count * dataext);
            else
              tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
@@ -1433,7 +1434,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
     xbt_free(requests);
 
     if( sendbuf == MPI_IN_PLACE ) {
-      xbt_free(sendtmpbuf);
+      smpi_free_tmp_buffer(sendtmpbuf);
     }
   }
 }
@@ -1470,7 +1471,7 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
   for(other = 0; other < rank; other++) {
     // FIXME: possibly overkill we we have contiguous/noncontiguous data
     // mapping...
-    tmpbufs[index] = xbt_malloc(count * dataext);
+    tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
     requests[index] =
       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
                       comm);
@@ -1505,7 +1506,7 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
     }
   }
   for(index = 0; index < rank; index++) {
-    xbt_free(tmpbufs[index]);
+    smpi_free_tmp_buffer(tmpbufs[index]);
   }
   for(index = 0; index < size-1; index++) {
     smpi_mpi_request_free(&requests[index]);
@@ -1536,7 +1537,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count,
   for(other = 0; other < rank; other++) {
     // FIXME: possibly overkill we we have contiguous/noncontiguous data
     // mapping...
-    tmpbufs[index] = xbt_malloc(count * dataext);
+    tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
     requests[index] =
       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
                       comm);
@@ -1577,7 +1578,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count,
     }
   }
   for(index = 0; index < rank; index++) {
-    xbt_free(tmpbufs[index]);
+    smpi_free_tmp_buffer(tmpbufs[index]);
   }
   for(index = 0; index < size-1; index++) {
     smpi_mpi_request_free(&requests[index]);