Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix tesh host_on_off_processes 4
[simgrid.git] / src / simix / smx_network.c
index b6c5e60..14e7e57 100644 (file)
@@ -8,6 +8,8 @@
 #include "xbt/log.h"
 #include "mc/mc.h"
 #include "xbt/dict.h"
+#include "smpi/private.h"
+
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
                                 "Logging specific to SIMIX (network)");
@@ -82,7 +84,7 @@ void SIMIX_rdv_free(void *data)
   xbt_fifo_free(rdv->comm_fifo);
   xbt_fifo_free(rdv->done_comm_fifo);
 
-  xbt_free(rdv);  
+  xbt_free(rdv);
 }
 
 xbt_dict_t SIMIX_get_rdv_points()
@@ -351,25 +353,27 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
   }
 }
 
-void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
+void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
                                   double task_size, double rate,
                                   void *src_buff, size_t src_buff_size,
                                   int (*match_fun)(void *, void *,smx_action_t),
+                                  void (*copy_data_fun)(smx_action_t, void*, size_t),
                                  void *data, double timeout){
-  smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
-                                      src_buff, src_buff_size, match_fun, NULL,
+  smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
+                                      src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
                                       data, 0);
   SIMCALL_SET_MC_VALUE(simcall, 0);
   SIMIX_pre_comm_wait(simcall, comm, timeout);
 }
-smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
+smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
                                   double task_size, double rate,
                                   void *src_buff, size_t src_buff_size,
                                   int (*match_fun)(void *, void *,smx_action_t),
-                                  void (*clean_fun)(void *), 
+                                  void (*clean_fun)(void *),
+                                  void (*copy_data_fun)(smx_action_t, void*, size_t),
                                  void *data, int detached){
-  return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
-                         src_buff_size, match_fun, clean_fun, data, detached);
+  return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
+                         src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
 
 }
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
@@ -377,6 +381,7 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               void *src_buff, size_t src_buff_size,
                               int (*match_fun)(void *, void *,smx_action_t),
                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+                              void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
                               void *data,
                               int detached)
 {
@@ -437,6 +442,8 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
   other_action->comm.src_data = data;
 
   other_action->comm.match_fun = match_fun;
+  other_action->comm.copy_data_fun = copy_data_fun;
+
 
   if (MC_is_active()) {
     other_action->state = SIMIX_RUNNING;
@@ -450,10 +457,11 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
                          void *dst_buff, size_t *dst_buff_size,
                          int (*match_fun)(void *, void *, smx_action_t),
+                         void (*copy_data_fun)(smx_action_t, void*, size_t),
                          void *data, double timeout, double rate)
 {
   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
-                                      dst_buff_size, match_fun, data, rate);
+                                      dst_buff_size, match_fun, copy_data_fun, data, rate);
   SIMCALL_SET_MC_VALUE(simcall, 0);
   SIMIX_pre_comm_wait(simcall, comm, timeout);
 }
@@ -461,15 +469,17 @@ void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
                                   void *dst_buff, size_t *dst_buff_size,
                                   int (*match_fun)(void *, void *, smx_action_t),
+                                  void (*copy_data_fun)(smx_action_t, void*, size_t),
                                  void *data, double rate)
 {
   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
-                         match_fun, data, rate);
+                         match_fun, copy_data_fun, data, rate);
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
                               void *dst_buff, size_t *dst_buff_size,
                               int (*match_fun)(void *, void *, smx_action_t),
+                              void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
                               void *data, double rate)
 {
   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
@@ -541,6 +551,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
     other_action->comm.rate = rate;
 
   other_action->comm.match_fun = match_fun;
+  other_action->comm.copy_data_fun = copy_data_fun;
 
 
   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
@@ -794,6 +805,9 @@ void SIMIX_comm_finish(smx_action_t action)
        it from the other actions in the waitany list. Afterwards, get the
        position of the actual action in the waitany dynar and
        return it as the result of the simcall */
+
+    if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
+      continue; // if process handling comm is killed
     if (simcall->call == SIMCALL_COMM_WAITANY) {
       SIMIX_waitany_remove_simcall_from_actions(simcall);
       if (!MC_is_active())
@@ -886,33 +900,13 @@ void SIMIX_comm_finish(smx_action_t action)
     simcall->issuer->waiting_action = NULL;
     xbt_fifo_remove(simcall->issuer->comms, action);
     if(action->comm.detached){
-      smx_process_t proc;
-      int still_alive = 0;
-
       if(simcall->issuer == action->comm.src_proc){
-        if(action->comm.dst_proc){
-            xbt_swag_foreach(proc, simix_global->process_list)
-            {
-               if(proc==action->comm.dst_proc){
-                   still_alive=1;
-                   break;
-               }
-            }
-        }
-        if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
+        if(action->comm.dst_proc)
+          xbt_fifo_remove(action->comm.dst_proc->comms, action);
       }
       if(simcall->issuer == action->comm.dst_proc){
         if(action->comm.src_proc)
-          if(action->comm.dst_proc){
-            xbt_swag_foreach(proc, simix_global->process_list)
-            {
-              if(proc==action->comm.src_proc){
-                  still_alive=1;
-                  break;
-              }
-            }
-          }
-          if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
+          xbt_fifo_remove(action->comm.src_proc->comms, action);
       }
     }
     SIMIX_simcall_answer(simcall);
@@ -1170,8 +1164,13 @@ void SIMIX_comm_copy_data(smx_action_t comm)
   if (comm->comm.dst_buff_size)
     *comm->comm.dst_buff_size = buff_size;
 
-  if (buff_size > 0)
-    SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+  if (buff_size > 0){
+      if(comm->comm.copy_data_fun)
+        comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
+      else
+        SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+  }
+
 
   /* Set the copied flag so we copy data only once */
   /* (this function might be called from both communication ends) */