Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
chainsend: change message sizes to more realistic values, avoid passing too many...
[simgrid.git] / examples / msg / chainsend / broadcaster.c
index 2d0b1c0..e311175 100644 (file)
@@ -7,52 +7,21 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
 {
   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
   char *hostname = NULL;
-  msg_host_t h = NULL;
   int i = 1;
   
   for (; i < hostcount+1; i++) {
     hostname = xbt_new(char, HOSTNAME_LENGTH);
     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
-    //XBT_INFO("%s", hostname);
-    /*h = MSG_get_host_by_name(hostname);
-    if (h == NULL) {
-      XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
-      abort();
-    } else {*/
-      xbt_dynar_push(host_list, &hostname);
-    /*}*/
+    XBT_DEBUG("%s", hostname);
+    xbt_dynar_push(host_list, &hostname);
   }
   return host_list;
 }
 
-/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
-{
-  xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
-  msg_host_t h = NULL;
-  int i = 1;
-  
-  for (; i < argc; i++) {
-    XBT_INFO("host%d = %s", i, argv[i]);
-    h = MSG_get_host_by_name(argv[i]);
-    if (h == NULL) {
-      XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
-      abort();
-    } else {
-      xbt_dynar_push(host_list, &(argv[i]));
-    }
-  }
-  return host_list;
-}*/
-
-void delete_hostlist(xbt_dynar_t h)
-{
-  xbt_dynar_free(&h);
-}
-
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it)
+int broadcaster_build_chain(broadcaster_t bc)
 {
   msg_task_t task = NULL;
-  char **cur = (char**)xbt_dynar_iterator_next(it);
+  char **cur = (char**)xbt_dynar_iterator_next(bc->it);
   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   const char *prev = NULL;
@@ -63,20 +32,20 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar
   if (cur != NULL) {
     /* init: prev=NULL, host=current cur, next=next cur */
     next = *cur;
-    *first = next;
+    bc->first = next;
 
     /* This iterator iterates one step ahead: cur is current iterated element, 
        but it's actually the next one in the chain */
     do {
       /* following steps: prev=last, host=next, next=cur */
-      cur = (char**)xbt_dynar_iterator_next(it);
+      cur = (char**)xbt_dynar_iterator_next(bc->it);
       prev = last;
       current_host = next;
       if (cur != NULL)
         next = *cur;
       else
         next = NULL;
-      //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+      XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
     
       /* Send message to current peer */
       task = task_message_chain_new(me, current_host, prev, next);
@@ -90,38 +59,42 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar
   return MSG_OK;
 }
 
-int broadcaster_send_file(const char *first)
+int broadcaster_send_file(broadcaster_t bc)
 {
-  const char *me = MSG_host_get_name(MSG_host_self());
-  msg_task_t task = NULL;
+  const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   msg_comm_t comm = NULL;
+  msg_task_t task = NULL;
   int status;
 
-  int piece_count = PIECE_COUNT;
-  int cur = 0;
+  bc->current_piece = 0;
 
-  for (; cur < piece_count; cur++) {
-    task = task_message_data_new(me, first, NULL, 0);
-    XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
-    status = MSG_task_send(task, first);
-   
-    xbt_assert(status == MSG_OK, "broadcaster_send_file() failed");
+  while (bc->current_piece < bc->piece_count) {
+    if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) {
+      task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
+      XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %d)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends));
+      comm = MSG_task_isend(task, bc->first);
+      queue_pending_connection(comm, bc->pending_sends);
+      bc->current_piece++;
+    } else {
+      MSG_process_sleep(0.01);
+    }
+    process_pending_connections(bc->pending_sends);
   }
 
   return MSG_OK;
 }
 
-int broadcaster_finish(xbt_dynar_iterator_t it)
+int broadcaster_finish(broadcaster_t bc)
 {
   msg_task_t task = NULL;
   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   char **cur = NULL;
 
-  xbt_dynar_iterator_seek(it, 0);
+  xbt_dynar_iterator_seek(bc->it, 0);
 
   /* Send goodbye message to every peer in the order generated by iterator it */
-  for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
+  for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
     /* Send message to current peer */
     current_host = *cur;
     task = task_message_end_data_new(me, current_host);
@@ -132,31 +105,52 @@ int broadcaster_finish(xbt_dynar_iterator_t it)
   return MSG_OK;
 }
 
+broadcaster_t broadcaster_init(xbt_dynar_t host_list)
+{
+  int status;
+  broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
+
+  bc->piece_count = PIECE_COUNT;
+  bc->current_piece = 0;
+  bc->host_list = host_list;
+  bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
+  bc->max_pending_sends = MAX_PENDING_SENDS;
+  bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+
+  status = broadcaster_build_chain(bc);
+  xbt_assert(status == MSG_OK, "Chain initialization failed");
+
+  return bc;
+}
+
+static void broadcaster_destroy(broadcaster_t bc)
+{
+  /* Destroy iterator and hostlist */
+  xbt_dynar_iterator_delete(bc->it);
+  xbt_dynar_free(&bc->pending_sends);
+  xbt_dynar_free(&bc->host_list);
+}
 
 /** Emitter function  */
 int broadcaster(int argc, char *argv[])
 {
+  broadcaster_t bc = NULL;
   xbt_dynar_t host_list = NULL;
   const char *first = NULL;
-  int status = !MSG_OK;
+  int status;
 
   XBT_INFO("broadcaster");
 
   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
-  /*host_list = build_hostlist_from_argv(argc, argv);*/
   
-  /* Initialize iterator */
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+  bc = broadcaster_init(host_list);
 
   /* TODO: Error checking */
-  status = broadcaster_build_chain(&first, host_list, it);
-  status = broadcaster_send_file(first);
-  status = broadcaster_finish(it);
+  status = broadcaster_send_file(bc);
+  status = broadcaster_finish(bc);
 
-  /* Destroy iterator and hostlist */
-  xbt_dynar_iterator_delete(it);
-  delete_hostlist(host_list);
+  broadcaster_destroy(bc);
 
   return status;
 }