Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
chainsend: remove end of data message, send that information at chain initialization
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Fri, 1 Feb 2013 08:59:39 +0000 (09:59 +0100)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Fri, 1 Feb 2013 08:59:39 +0000 (09:59 +0100)
examples/msg/chainsend/broadcaster.c
examples/msg/chainsend/messages.c
examples/msg/chainsend/messages.h
examples/msg/chainsend/peer.c
examples/msg/chainsend/peer.h

index 8ab4f44..fc68dab 100644 (file)
@@ -48,8 +48,7 @@ int broadcaster_build_chain(broadcaster_t bc)
       XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
     
       /* Send message to current peer */
-      task = task_message_chain_new(me, current_host, prev, next);
-      //MSG_task_set_category(task, current_host);
+      task = task_message_chain_new(me, current_host, prev, next, bc->piece_count);
       MSG_task_send(task, current_host);
 
       last = current_host;
@@ -77,27 +76,6 @@ int broadcaster_send_file(broadcaster_t bc)
   return MSG_OK;
 }
 
-int broadcaster_finish(broadcaster_t bc)
-{
-  msg_task_t task = NULL;
-  const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
-  const char *current_host = NULL;
-  char **cur = NULL;
-
-  xbt_dynar_iterator_seek(bc->it, 0);
-
-  /* Send goodbye message to every peer in the order generated by iterator it */
-  for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
-    /* Send message to current peer */
-    current_host = *cur;
-    task = task_message_end_data_new(me, current_host);
-    //MSG_task_set_category(task, current_host);
-    MSG_task_send(task, current_host);
-  }
-
-  return MSG_OK;
-}
-
 broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
 {
   int status;
@@ -148,7 +126,6 @@ int broadcaster(int argc, char *argv[])
 
   /* TODO: Error checking */
   status = broadcaster_send_file(bc);
-  status = broadcaster_finish(bc);
 
   broadcaster_destroy(bc);
 
index 0bc23cd..c91243c 100644 (file)
@@ -11,12 +11,13 @@ msg_task_t task_message_new(e_message_type type, unsigned int len, const char *i
   return task;
 }
 
-msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next, const unsigned int num_pieces)
 {
   msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, MESSAGE_BUILD_CHAIN_SIZE, issuer_hostname, mailbox);
   message_t msg = MSG_task_get_data(task);
   msg->prev_hostname = prev;
   msg->next_hostname = next;
+  msg->num_pieces = num_pieces;
 
   return task;
 }
@@ -24,8 +25,6 @@ msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailb
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
 {
   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, MESSAGE_SEND_DATA_HEADER_SIZE + len, issuer_hostname, mailbox);
-  //if (strcmp(mailbox, "host4") == 0) 
-  //MSG_task_set_category(task, mailbox);
   message_t msg = MSG_task_get_data(task);
   msg->data_block = block;
   msg->data_length = len;
@@ -33,11 +32,6 @@ msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbo
   return task;
 }
 
-msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
-{
-  return task_message_new(MESSAGE_END_DATA, MESSAGE_END_DATA_SIZE, issuer_hostname, mailbox);
-}
-
 void task_message_delete(void *task)
 {
   message_t msg = MSG_task_get_data(task);
index 52ba874..7118d29 100644 (file)
@@ -11,8 +11,7 @@
 /* Messages enum */
 typedef enum {
   MESSAGE_BUILD_CHAIN = 0,
-  MESSAGE_SEND_DATA,
-  MESSAGE_END_DATA
+  MESSAGE_SEND_DATA
 } e_message_type;
 
 /* Message struct */
@@ -24,13 +23,13 @@ typedef struct s_message {
   const char *next_hostname;
   const char *data_block;
   unsigned int data_length;
+  unsigned int num_pieces;
 } s_message_t, *message_t;
 
 /* Message methods */
 msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox);
-msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next, const unsigned int num_pieces);
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
-msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
 void task_message_delete(void *);
 
 #endif /* KADEPLOY_MESSAGES_H */
index 43c055d..1827df8 100644 (file)
@@ -11,6 +11,7 @@ void peer_init_chain(peer_t peer, message_t msg)
 {
   peer->prev = msg->prev_hostname;
   peer->next = msg->next_hostname;
+  peer->total_pieces = msg->num_pieces;
   peer->init = 1;
 }
 
@@ -39,11 +40,10 @@ int peer_execute_task(peer_t peer, msg_task_t task)
         peer_forward_msg(peer, msg);
       peer->pieces++;
       peer->bytes += msg->data_length;
-      break;
-    case MESSAGE_END_DATA:
-      xbt_assert(peer->init, "peer_execute_task() failed: got msg_type %d before initialization", msg->type);
-      done = 1;
-      XBT_DEBUG("%d pieces receieved", peer->pieces);
+      if (peer->pieces >= peer->total_pieces) {
+        XBT_DEBUG("%d pieces receieved", peer->pieces);
+        done = 1;
+      }
       break;
   }
 
@@ -91,7 +91,6 @@ void peer_init(peer_t p, int argc, char *argv[])
   p->next = NULL;
   p->pieces = 0;
   p->bytes = 0;
-  p->close_asap = 0;
   p->pending_recvs = xbt_dynar_new(sizeof(msg_comm_t), NULL);
   p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
   p->me = xbt_new(char, HOSTNAME_LENGTH);
index 8727e15..fd20eca 100644 (file)
@@ -19,7 +19,7 @@ typedef struct s_peer {
   unsigned long long bytes;
   xbt_dynar_t pending_recvs;
   xbt_dynar_t pending_sends;
-  int close_asap; /* TODO: unused */
+  unsigned int total_pieces;
 } s_peer_t, *peer_t;
 
 /* Peer: helper functions */