Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
messages implementation: it dies when trying to forward data from one peer to another...
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Fri, 5 Oct 2012 15:57:17 +0000 (17:57 +0200)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Wed, 28 Nov 2012 10:49:25 +0000 (11:49 +0100)
examples/msg/kadeploy/kadeploy.c

index cc8b0a3..d779acd 100644 (file)
@@ -59,6 +59,14 @@ typedef struct s_message {
   unsigned int data_length;
 } s_message_t, *message_t;
 
+/* Peer struct */
+typedef struct s_peer {
+  int init;
+  const char *prev;
+  const char *next;
+  const char *me;
+} s_peer_t, *peer_t;
+
 /* Iterator methods */
 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
@@ -80,12 +88,14 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
 
 /* Broadcaster: helper functions */
-int broadcaster_build_chain(xbt_dynar_t host_list);
-int broadcaster_send_file(xbt_dynar_t host_list);
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
+int broadcaster_send_file(const char *first);
 int broadcaster_finish(xbt_dynar_t host_list);
 
 /* Peer: helper functions */
-int peer_wait_for_init();
+msg_error_t peer_wait_for_message();
+int peer_execute_task(peer_t peer, msg_task_t task);
+void peer_init_chain(peer_t peer, message_t msg);
 
 /* Initialization stuff */
 msg_error_t test_all(const char *platform_file,
@@ -231,21 +241,22 @@ void delete_hostlist(xbt_dynar_t h)
   xbt_dynar_free(&h);
 }
 
-int broadcaster_build_chain(xbt_dynar_t host_list)
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
 {
   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
   msg_task_t task = NULL;
   char **cur = (char**)xbt_dynar_iterator_next(it);
+  const char *me = MSG_host_get_name(MSG_host_self());
   const char *current_host = NULL;
   const char *prev = NULL;
   const char *next = NULL;
-  const char *me = MSG_host_get_name(MSG_host_self());
   const char *last = NULL;
 
   /* Build the chain if there's at least one peer */
   if (cur != NULL) {
     /* init: prev=NULL, host=current cur, next=next cur */
     next = *cur;
+    *first = next;
 
     /* This iterator iterates one step ahead: cur is current iterated element, 
        but it's actually the next one in the chain */
@@ -272,9 +283,26 @@ int broadcaster_build_chain(xbt_dynar_t host_list)
   return MSG_OK;
 }
 
-int broadcaster_send_file(xbt_dynar_t host_list)
+int broadcaster_send_file(const char *first)
 {
-  /* ... */
+  const char *me = MSG_host_get_name(MSG_host_self());
+  msg_task_t task = NULL;
+  msg_comm_t comm = NULL;
+  int status;
+
+  int piece_count = 10;
+  int cur = 0;
+
+  for (; cur < piece_count; cur++) {
+    /* TODO: stub */
+    task = task_message_data_new(me, first, NULL, 0);
+    XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
+    comm = MSG_task_isend(task, first);
+   
+    status = MSG_comm_wait(comm, -1);
+    xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+    MSG_comm_destroy(comm);
+  }
 
   return MSG_OK;
 }
@@ -289,10 +317,10 @@ int broadcaster_finish(xbt_dynar_t host_list)
 
   /* Send goodbye message to every peer */
   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
-      /* Send message to current peer */
-      current_host = *cur;
-      task = task_message_end_data_new(me, current_host);
-      MSG_task_send(task, current_host);
+    /* Send message to current peer */
+    current_host = *cur;
+    task = task_message_end_data_new(me, current_host);
+    MSG_task_send(task, current_host);
   }
 
   return MSG_OK;
@@ -314,100 +342,111 @@ int broadcaster(int argc, char *argv[])
   /*host_list = build_hostlist_from_argv(argc, argv);*/
   
   /* TODO: Error checking */
-  status = broadcaster_build_chain(host_list);
-  status = broadcaster_send_file(host_list);
+  status = broadcaster_build_chain(&first, host_list);
+  status = broadcaster_send_file(first);
   status = broadcaster_finish(host_list);
 
   delete_hostlist(host_list);
 
-  /* Latency */
-  /*time = MSG_get_clock();
-  sprintf(sprintf_buffer_la, "latency task");
-  task_la =
-      MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL);
-  task_la->data = xbt_new(double, 1);
-  *(double *) task_la->data = time;
-  XBT_INFO("task_la->data = %le", *((double *) task_la->data));
-  MSG_task_send(task_la, argv[1]);*/
-
-  /* Bandwidth */
-  /*time = MSG_get_clock();
-  sprintf(sprintf_buffer_bw, "bandwidth task");
-  task_bw =
-      MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL);
-  task_bw->data = xbt_new(double, 1);
-  *(double *) task_bw->data = time;
-  XBT_INFO("task_bw->data = %le", *((double *) task_bw->data));
-  MSG_task_send(task_bw, argv[1]);
-  */
   return status;
 }
 
-int peer_wait_for_init()
+/*******************************************************
+ *                     Peer                            *
+ *******************************************************/
+
+void peer_init_chain(peer_t peer, message_t msg)
+{
+  peer->prev = msg->prev_hostname;
+  peer->next = msg->next_hostname;
+}
+
+/* TODO: error checking */
+void peer_forward_msg(peer_t peer, message_t msg)
+{
+  int status;
+  msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
+  msg_comm_t comm = NULL;
+  XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
+  comm = MSG_task_isend(task, peer->next);
+   
+  status = MSG_comm_wait(comm, -1);
+  xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
+  MSG_comm_destroy(comm);
+}
+
+int peer_execute_task(peer_t peer, msg_task_t task)
+{
+  int done = 0, init = 0;
+  message_t msg = MSG_task_get_data(task);
+  
+  XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
+  switch (msg->type) {
+    case MESSAGE_BUILD_CHAIN:
+      peer_init_chain(peer, msg);
+      peer->init = 1;
+      break;
+    case MESSAGE_SEND_DATA:
+      xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
+      if (peer->next != NULL)
+        peer_forward_msg(peer, msg);
+      break;
+    case MESSAGE_END_DATA:
+      xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
+      done = 1;
+      break;
+  }
+
+  MSG_task_execute(task);
+
+  return done;
+}
+
+msg_error_t peer_wait_for_message(peer_t peer)
 {
+  msg_error_t status;
+  msg_comm_t comm;
+  int done = 0;
+
   msg_task_t task = NULL;
-  const char *me = MSG_host_get_name(MSG_host_self());
 
-  int a = MSG_task_receive(&task, me);
+  /* TODO: Error checking is not correct */
+  while (!done) {
+    comm = MSG_task_irecv(&task, peer->me);
+    status = MSG_comm_wait(comm, -1);
+    xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
+    MSG_comm_destroy(comm);
 
-  if (a == MSG_OK) {
-    XBT_INFO("Peer %s got message\n", me);
+    done = peer_execute_task(peer, task);
+    task_message_delete(task);
+    task = NULL;
   }
 
-  task_message_delete(task);
+  return status;
+}
 
-  return MSG_OK;
+void peer_init(peer_t p)
+{
+  p->init = 0;
+  p->prev = NULL;
+  p->next = NULL;
+  p->me = MSG_host_get_name(MSG_host_self());
 }
 
 /** Peer function  */
 int peer(int argc, char *argv[])
 {
-  double time, time1, sender_time;
-  msg_task_t task_la = NULL;
-  msg_task_t task_bw = NULL;
-  int a;
-  double communication_time = 0;
+  peer_t p = xbt_new(s_peer_t, 1);
+  msg_error_t status;
 
   XBT_INFO("peer");
 
-  time = MSG_get_clock();
-
-  a = peer_wait_for_init();
-  /* Get Latency */
-  /*a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self()));
-  if (a == MSG_OK) {
-    time1 = MSG_get_clock();
-    sender_time = *((double *) (task_la->data));
-    time = sender_time;
-    communication_time = time1 - time;
-    XBT_INFO("Task received : %s", task_la->name);
-    xbt_free(task_la->data);
-    MSG_task_destroy(task_la);
-    XBT_INFO("Communic. time %le", communication_time);
-    XBT_INFO("--- la %f ----", communication_time);
-  } else {
-    xbt_die("Unexpected behavior");
-  }*/
+  peer_init(p);
+  status = peer_wait_for_message(p);
 
+  xbt_free(p);
 
-  /* Get Bandwidth */
-  /*a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self()));
-  if (a == MSG_OK) {
-    time1 = MSG_get_clock();
-    sender_time = *((double *) (task_bw->data));
-    time = sender_time;
-    communication_time = time1 - time;
-    XBT_INFO("Task received : %s", task_bw->name);
-    xbt_free(task_bw->data);
-    MSG_task_destroy(task_bw);
-    XBT_INFO("Communic. time %le", communication_time);
-    XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time);
-  } else {
-    xbt_die("Unexpected behavior");
-  }*/
-
-
-  return 0;
+  return MSG_OK;
 }                               /* end_of_receiver */