Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix tesh host_on_off_processes 4
[simgrid.git] / src / simix / smx_network.c
index ab44443..14e7e57 100644 (file)
@@ -8,6 +8,8 @@
 #include "xbt/log.h"
 #include "mc/mc.h"
 #include "xbt/dict.h"
+#include "smpi/private.h"
+
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
                                 "Logging specific to SIMIX (network)");
@@ -31,8 +33,6 @@ static void SIMIX_comm_start(smx_action_t action);
 void SIMIX_network_init(void)
 {
   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
-  if(MC_is_active())
-    MC_ignore_global_variable("smx_total_comms");
 }
 
 void SIMIX_network_exit(void)
@@ -84,7 +84,7 @@ void SIMIX_rdv_free(void *data)
   xbt_fifo_free(rdv->comm_fifo);
   xbt_fifo_free(rdv->done_comm_fifo);
 
-  xbt_free(rdv);  
+  xbt_free(rdv);
 }
 
 xbt_dict_t SIMIX_get_rdv_points()
@@ -290,9 +290,6 @@ smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
   return act;
 }
 
-void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
-  SIMIX_comm_destroy(action);
-}
 /**
  *  \brief Destroy a communicate action
  *  \param action The communicate action to be destroyed
@@ -356,25 +353,27 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action)
   }
 }
 
-void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
+void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
                                   double task_size, double rate,
                                   void *src_buff, size_t src_buff_size,
                                   int (*match_fun)(void *, void *,smx_action_t),
+                                  void (*copy_data_fun)(smx_action_t, void*, size_t),
                                  void *data, double timeout){
-  smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
-                                      src_buff, src_buff_size, match_fun, NULL,
+  smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
+                                      src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
                                       data, 0);
-  simcall->mc_value = 0;
+  SIMCALL_SET_MC_VALUE(simcall, 0);
   SIMIX_pre_comm_wait(simcall, comm, timeout);
 }
-smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
+smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
                                   double task_size, double rate,
                                   void *src_buff, size_t src_buff_size,
                                   int (*match_fun)(void *, void *,smx_action_t),
-                                  void (*clean_fun)(void *), 
+                                  void (*clean_fun)(void *),
+                                  void (*copy_data_fun)(smx_action_t, void*, size_t),
                                  void *data, int detached){
-  return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
-                         src_buff_size, match_fun, clean_fun, data, detached);
+  return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
+                         src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
 
 }
 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
@@ -382,6 +381,7 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
                               void *src_buff, size_t src_buff_size,
                               int (*match_fun)(void *, void *,smx_action_t),
                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
+                              void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
                               void *data,
                               int detached)
 {
@@ -442,6 +442,8 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
   other_action->comm.src_data = data;
 
   other_action->comm.match_fun = match_fun;
+  other_action->comm.copy_data_fun = copy_data_fun;
+
 
   if (MC_is_active()) {
     other_action->state = SIMIX_RUNNING;
@@ -455,26 +457,29 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
                          void *dst_buff, size_t *dst_buff_size,
                          int (*match_fun)(void *, void *, smx_action_t),
+                         void (*copy_data_fun)(smx_action_t, void*, size_t),
                          void *data, double timeout, double rate)
 {
   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
-                                      dst_buff_size, match_fun, data, rate);
-  simcall->mc_value = 0;
+                                      dst_buff_size, match_fun, copy_data_fun, data, rate);
+  SIMCALL_SET_MC_VALUE(simcall, 0);
   SIMIX_pre_comm_wait(simcall, comm, timeout);
 }
 
 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
                                   void *dst_buff, size_t *dst_buff_size,
                                   int (*match_fun)(void *, void *, smx_action_t),
+                                  void (*copy_data_fun)(smx_action_t, void*, size_t),
                                  void *data, double rate)
 {
   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
-                         match_fun, data, rate);
+                         match_fun, copy_data_fun, data, rate);
 }
 
 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
                               void *dst_buff, size_t *dst_buff_size,
                               int (*match_fun)(void *, void *, smx_action_t),
+                              void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
                               void *data, double rate)
 {
   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
@@ -546,6 +551,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
     other_action->comm.rate = rate;
 
   other_action->comm.match_fun = match_fun;
+  other_action->comm.copy_data_fun = copy_data_fun;
 
 
   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
@@ -607,7 +613,7 @@ void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double time
   simcall->issuer->waiting_action = action;
 
   if (MC_is_active()) {
-    int idx = simcall->mc_value;
+    int idx = SIMCALL_GET_MC_VALUE(simcall);
     if (idx == 0) {
       action->state = SIMIX_DONE;
     } else {
@@ -671,7 +677,7 @@ void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
   simcall_comm_testany__set__result(simcall, -1);
 
   if (MC_is_active()){
-    int idx = simcall->mc_value;
+    int idx = SIMCALL_GET_MC_VALUE(simcall);
     if(idx == -1){
       SIMIX_simcall_answer(simcall);
     }else{
@@ -701,7 +707,7 @@ void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
   unsigned int cursor = 0;
 
   if (MC_is_active()){
-    int idx = simcall->mc_value;
+    int idx = SIMCALL_GET_MC_VALUE(simcall);
     action = xbt_dynar_get_as(actions, idx, smx_action_t);
     xbt_fifo_push(action->simcalls, simcall);
     simcall_comm_waitany__set__result(simcall, idx);
@@ -799,6 +805,9 @@ void SIMIX_comm_finish(smx_action_t action)
        it from the other actions in the waitany list. Afterwards, get the
        position of the actual action in the waitany dynar and
        return it as the result of the simcall */
+
+    if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
+      continue; // if process handling comm is killed
     if (simcall->call == SIMCALL_COMM_WAITANY) {
       SIMIX_waitany_remove_simcall_from_actions(simcall);
       if (!MC_is_active())
@@ -891,33 +900,13 @@ void SIMIX_comm_finish(smx_action_t action)
     simcall->issuer->waiting_action = NULL;
     xbt_fifo_remove(simcall->issuer->comms, action);
     if(action->comm.detached){
-      smx_process_t proc;
-      int still_alive = 0;
-
       if(simcall->issuer == action->comm.src_proc){
-        if(action->comm.dst_proc){
-            xbt_swag_foreach(proc, simix_global->process_list)
-            {
-               if(proc==action->comm.dst_proc){
-                   still_alive=1;
-                   break;
-               }
-            }
-        }
-        if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
+        if(action->comm.dst_proc)
+          xbt_fifo_remove(action->comm.dst_proc->comms, action);
       }
       if(simcall->issuer == action->comm.dst_proc){
         if(action->comm.src_proc)
-          if(action->comm.dst_proc){
-            xbt_swag_foreach(proc, simix_global->process_list)
-            {
-              if(proc==action->comm.src_proc){
-                  still_alive=1;
-                  break;
-              }
-            }
-          }
-          if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
+          xbt_fifo_remove(action->comm.src_proc->comms, action);
       }
     }
     SIMIX_simcall_answer(simcall);
@@ -1105,14 +1094,14 @@ int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action
  *  \brief verify if communication is latency bounded
  *  \param comm The communication
  */
-XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
+int SIMIX_comm_is_latency_bounded(smx_action_t action)
 {
   if(!action){
     return 0;
   }
   if (action->comm.surf_comm){
     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
-    action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
+    action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
     XBT_DEBUG("Action limited is %d", action->latency_limited);
   }
   return action->latency_limited;
@@ -1175,8 +1164,13 @@ void SIMIX_comm_copy_data(smx_action_t comm)
   if (comm->comm.dst_buff_size)
     *comm->comm.dst_buff_size = buff_size;
 
-  if (buff_size > 0)
-    SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+  if (buff_size > 0){
+      if(comm->comm.copy_data_fun)
+        comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
+      else
+        SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
+  }
+
 
   /* Set the copied flag so we copy data only once */
   /* (this function might be called from both communication ends) */