Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
real isend action, w/o spawning. Still ongoing but commiting now because my previous...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 7 Dec 2010 09:51:47 +0000 (09:51 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 7 Dec 2010 09:51:47 +0000 (09:51 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9027 48e7efb5-ca39-0410-a469-dd3cf9ba447f

examples/msg/actions/actions.c

index f1f03dc..f962377 100644 (file)
@@ -16,11 +16,14 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
                              "Messages specific for this msg example");
 int communicator_size = 0;
 
                              "Messages specific for this msg example");
 int communicator_size = 0;
 
+static void action_Isend(xbt_dynar_t action);
+
 typedef struct  {
   int last_Irecv_sender_id;
   int bcast_counter;
   int reduce_counter;
   int allReduce_counter;
 typedef struct  {
   int last_Irecv_sender_id;
   int bcast_counter;
   int reduce_counter;
   int allReduce_counter;
+  xbt_dynar_t isends;
 } s_process_globals_t, *process_globals_t;
 
 /* Helper function */
 } s_process_globals_t, *process_globals_t;
 
 /* Helper function */
@@ -40,6 +43,18 @@ static int get_rank (const char *process_name)
   return atoi(&(process_name[1]));
 } 
 
   return atoi(&(process_name[1]));
 } 
 
+static void asynchronous_cleanup(void) {
+  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+
+  /* Destroy any isend which correspond to completed communications */
+  int found;
+  msg_comm_t comm;
+  while ((found = MSG_comm_testany(globals->isends)) != -1) {
+    xbt_dynar_remove_at(globals->isends,found,&comm);
+    MSG_comm_destroy(comm);
+  }
+}
+
 /* My actions */
 static int spawned_send(int argc, char *argv[])
 {
 /* My actions */
 static int spawned_send(int argc, char *argv[])
 {
@@ -74,22 +89,8 @@ static void action_send(xbt_dynar_t action)
 
   DEBUG2("Entering Send: %s (size: %lg)", name, size);
    if (size<65536) {
 
   DEBUG2("Entering Send: %s (size: %lg)", name, size);
    if (size<65536) {
-     char spawn_name[80];
-     char **myargv;
-     m_process_t comm_helper;
-     DEBUG1("Isend on %s: spawn process ",
-           MSG_process_get_name(MSG_process_self()));
-     myargv = (char **) calloc(3, sizeof(char *));
-     myargv[0] = xbt_strdup(to);
-     myargv[1] = xbt_strdup(size_str);
-     myargv[2] = NULL;
-     
-     sprintf(spawn_name, "%s_wait", to);
-     comm_helper =
-       MSG_process_create_with_arguments(spawn_name, spawned_send,
-                                        NULL, MSG_host_self(), 2, myargv);
-   }
-   else {
+     action_Isend(action);
+   } else {
      MSG_task_send(MSG_task_create(name, 0, size, NULL), to);
    }
    
      MSG_task_send(MSG_task_create(name, 0, size, NULL), to);
    }
    
@@ -100,35 +101,31 @@ static void action_send(xbt_dynar_t action)
 
 #ifdef HAVE_TRACING
   TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
 
 #ifdef HAVE_TRACING
   TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
-#endif  
+#endif
+
+  asynchronous_cleanup();
 }
 
 static void action_Isend(xbt_dynar_t action)
 {
 }
 
 static void action_Isend(xbt_dynar_t action)
 {
-  char spawn_name[80];
   char to[250];
   //  char *to = xbt_dynar_get_as(action, 2, char *);
   char *size = xbt_dynar_get_as(action, 3, char *);
   char to[250];
   //  char *to = xbt_dynar_get_as(action, 2, char *);
   char *size = xbt_dynar_get_as(action, 3, char *);
-  char **myargv;
-  m_process_t comm_helper;
   double clock = MSG_get_clock();
   double clock = MSG_get_clock();
-  DEBUG1("Isend on %s: spawn process ",
-         MSG_process_get_name(MSG_process_self()));
+  process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
+
 
   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
           xbt_dynar_get_as(action, 2, char *));
 
   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
           xbt_dynar_get_as(action, 2, char *));
-  myargv = (char **) calloc(3, sizeof(char *));
 
 
-  myargv[0] = xbt_strdup(to);
-  myargv[1] = xbt_strdup(size);
-  myargv[2] = NULL;
+  msg_comm_t comm =
+      MSG_task_isend( MSG_task_create(to,0,parse_double(size),NULL), to);
+  xbt_dynar_push(globals->isends,&comm);
 
 
-  //    sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
-  sprintf(spawn_name, "%s_wait", to);
-  comm_helper =
-      MSG_process_create_with_arguments(spawn_name, spawned_send,
-                                        NULL, MSG_host_self(), 2, myargv);
+  DEBUG1("Isend on %s", MSG_process_get_name(MSG_process_self()));
   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
+
+  asynchronous_cleanup();
 }
 
 
 }
 
 
@@ -165,6 +162,7 @@ static void action_recv(xbt_dynar_t action)
   TRACE_smpi_recv(rank, src_traced, rank);
 #endif
 
   TRACE_smpi_recv(rank, src_traced, rank);
 #endif
 
+  asynchronous_cleanup();
 }
 
 static int spawned_recv(int argc, char *argv[])
 }
 
 static int spawned_recv(int argc, char *argv[])
@@ -220,6 +218,7 @@ static void action_Irecv(xbt_dynar_t action)
   TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
 #endif
 
   TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
 #endif
 
+  asynchronous_cleanup();
 }
 
 
 }
 
 
@@ -576,7 +575,10 @@ static void action_init(xbt_dynar_t action)
   TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
 #endif
   DEBUG0("Initialize the counters");
   TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
 #endif
   DEBUG0("Initialize the counters");
-  counters = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
+  process_globals_t globals = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
+  globals->isends = xbt_dynar_new(sizeof(msg_comm_t),NULL);
+  MSG_process_set_data(MSG_process_self(),globals);
+
 }
 
 static void action_finalize(xbt_dynar_t action)
 }
 
 static void action_finalize(xbt_dynar_t action)