Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fixed iterator Initial version of chain generator (doesn't work) I have to change...
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Mon, 1 Oct 2012 15:24:36 +0000 (17:24 +0200)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Wed, 28 Nov 2012 10:49:25 +0000 (11:49 +0100)
examples/msg/kadeploy/kadeploy.c

index 4b27de3..8199bc0 100644 (file)
@@ -23,6 +23,8 @@
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
                              "Messages specific for kadeploy");
 
+#define MESSAGE_SIZE 1
+
 /*
  Data structures
  */
@@ -31,18 +33,42 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
 typedef struct xbt_dynar_iterator_struct {
   xbt_dynar_t list;
   xbt_dynar_t indices_list;
-  unsigned int current;
+  int current;
   unsigned long length;
-  unsigned int (*criteria_fn)(void* it);
+  int (*criteria_fn)(void* it);
 } *xbt_dynar_iterator_t;
 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
 
-
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, unsigned int (*criteria_fn)(void*));
+/* Messages enum */
+typedef enum {
+  MESSAGE_BUILD_CHAIN = 0,
+  MESSAGE_SEND_DATA
+} e_message_type;
+
+/* Message struct */
+typedef struct s_message {
+  e_message_type type;
+  const char *issuer_hostname;
+  const char *mailbox;
+  const char *prev_hostname;
+  const char *next_hostname;
+  const char *data_block;
+  unsigned int data_length;
+} s_message_t, *message_t;
+
+/* Iterator methods */
+xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
-unsigned int xbt_dynar_iterator_forward_criteria(void *p);
+int xbt_dynar_iterator_forward_criteria(void *p);
+
+/* Message methods */
+msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
+msg_task_t task_message_chain_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
+msg_task_t task_message_data_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
+void task_message_delete(void *);
 
+/* Tasks */
 int broadcaster(int argc, char *argv[]);
 int peer(int argc, char *argv[]);
 
@@ -61,13 +87,13 @@ double task_comm_size_bw = 10e8;
 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
    criteria_fn: given an iterator, it must update the iterator and give the next element's index, 
    less than 0 otherwise*/
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, unsigned int (*criteria_fn)(void*))
+xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
 {
   xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
   
   it->list = list;
   it->length = xbt_dynar_length(list);
-  it->indices_list = xbt_dynar_new(sizeof(unsigned int), NULL);
+  it->indices_list = xbt_dynar_new(sizeof(int), NULL);
   it->criteria_fn = criteria_fn;
   it->current = -1;
 }
@@ -75,11 +101,12 @@ xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, unsigned int (*cri
 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
 {
-  unsigned int next = it->criteria_fn((xbt_dynar_iterator_t)it);
+  int next = it->criteria_fn((xbt_dynar_iterator_t)it);
   XBT_INFO("%d current\n", next);
-  if (next < 0)
+  if (next < 0) {
+    XBT_INFO("Nothing to return!\n");
     return NULL;
-  else {
+  else {
     xbt_dynar_push(it->indices_list, &next);
     return xbt_dynar_get_ptr(it->list, next);
   }
@@ -91,10 +118,10 @@ void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
   xbt_free_ref(&it);
 }
 
-unsigned int xbt_dynar_iterator_forward_criteria(void *p)
+int xbt_dynar_iterator_forward_criteria(void *p)
 {
   xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
-  unsigned int r = -1;
+  int r = -1;
   if (it->current == -1) {
     /* iterator initialization */
     it->current = 0;
@@ -107,6 +134,45 @@ unsigned int xbt_dynar_iterator_forward_criteria(void *p)
   return r;
 }
 
+msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
+{
+  message_t msg = xbt_new(s_message_t, 1);
+  msg->type = type;
+  msg->issuer_hostname = issuer_hostname;
+  msg->mailbox = mailbox;
+  msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
+
+  return task;
+}
+
+msg_task_t task_message_chain_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
+{
+  msg_task_t task = task_message_new(type, issuer_hostname, mailbox);
+  message_t msg = MSG_task_get_data(task);
+  msg->prev_hostname = prev;
+  msg->next_hostname = next;
+
+  return task;
+}
+
+msg_task_t task_message_data_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
+{
+  msg_task_t task = task_message_new(type, issuer_hostname, mailbox);
+  message_t msg = MSG_task_get_data(task);
+  msg->data_block = block;
+  msg->data_length = len;
+
+  return task;
+}
+
+void task_message_delete(void *task)
+{
+  message_t msg = MSG_task_get_data(task);
+  xbt_free(msg);
+  MSG_task_destroy(task);
+}
+
+
 xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
 {
   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
@@ -134,26 +200,24 @@ void delete_hostlist(xbt_dynar_t h)
 void build_chain(xbt_dynar_t host_list)
 {
   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
+  const char *current_host = NULL;
+  const char *prev = NULL;
+  const char *next = NULL;
+  const char *me = MSG_host_get_name(MSG_host_self());
   char **cur = NULL;
 
   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
-    XBT_INFO("iterating host = %s", *cur);
+    current_host = *cur;
+    XBT_INFO("Building chain broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+    
+    msg_task_t msg = task_message_chain_new(MESSAGE_BUILD_CHAIN, me, current_host, prev, next);
+    MSG_task_send(msg, current_host);
+    task_message_delete(msg);
   }
-}
 
-/*void setup_chain_criteria(chain_criteria_t c, char *(*fn)(void))
-{
-  
+  xbt_dynar_iterator_delete(it);
 }
 
-void build_chain(const int hostcount, char **hostlist)
-{
-  int i;
-  for (i = 0; i < hostcount; i++) {
-    
-  }
-}*/
-
 /** Emitter function  */
 int broadcaster(int argc, char *argv[])
 {
@@ -171,6 +235,8 @@ int broadcaster(int argc, char *argv[])
   
   build_chain(host_list);
 
+  delete_hostlist(host_list);
+
   /* Latency */
   /*time = MSG_get_clock();
   sprintf(sprintf_buffer_la, "latency task");
@@ -196,6 +262,15 @@ int broadcaster(int argc, char *argv[])
 
 int peer_wait_for_init()
 {
+  msg_task_t msg = NULL;
+  const char *me = MSG_host_get_name(MSG_host_self());
+
+  int a = MSG_task_receive(&msg, me);
+
+  if (a == MSG_OK) {
+    XBT_INFO("Peer %s got message\n", me);
+  }
+
   return MSG_OK;
 }