Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
further reduce the amount of includes
[simgrid.git] / examples / msg / dht-pastry / dht-pastry.c
1 /* Copyright (c) 2013-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "simgrid/msg.h"
8 #include "xbt/fifo.h"
9 #include <math.h>
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry, "Messages specific for this msg example");
12
13 /* TODO:                               *
14  *  - handle node departure            *
15  *  - handle objects on the network    *
16  *  - handle neighborhood in the update */
17
18 #define COMM_SIZE 10
19 #define COMP_SIZE 0
20 #define MAILBOX_NAME_SIZE 10
21
22 #define DOMAIN_SIZE 4
23 #define LEVELS_COUNT 8 // sizeof(int)*8/DOMAIN_SIZE
24 #define LEVEL_SIZE 16 // 2^DOMAIN_SIZE
25 #define NEIGHBORHOOD_SIZE 6
26 #define NAMESPACE_SIZE 6
27 #define MAILBOX_NAME_SIZE 10
28
29 static int nb_bits = 16;
30 static int timeout = 50;
31 static int max_simulation_time = 1000;
32
33 typedef struct s_node {
34   int id;                                 //128bits generated random(2^128 -1)
35   int known_id;
36   char mailbox[MAILBOX_NAME_SIZE];        // my mailbox name (string representation of the id)
37   int namespace_set[NAMESPACE_SIZE];
38   int neighborhood_set[NEIGHBORHOOD_SIZE];
39   int routing_table[LEVELS_COUNT][LEVEL_SIZE];
40   int ready;
41   msg_comm_t comm_receive;                // current communication to receive
42   xbt_fifo_t pending_tasks;
43 } s_node_t, *node_t;
44
45 typedef struct s_state {
46   int id;
47   int namespace_set[NAMESPACE_SIZE];
48   int neighborhood_set[NEIGHBORHOOD_SIZE];
49   int routing_table[LEVELS_COUNT][LEVEL_SIZE];
50 } s_state_t, *state_t;
51
52 /** Types of tasks exchanged between nodes. */
53 typedef enum {
54   TASK_JOIN,
55   TASK_JOIN_REPLY,
56   TASK_JOIN_LAST_REPLY,
57   TASK_UPDATE
58 } e_task_type_t;
59
60 typedef struct s_task_data {
61   e_task_type_t type;                     // type of task
62   int sender_id;                          // id paramater (used by some types of tasks)
63   //int request_finger;                     // finger parameter (used by some types of tasks)
64   int answer_id;                          // answer (used by some types of tasks)
65   char answer_to[MAILBOX_NAME_SIZE];      // mailbox to send an answer to (if any)
66   //const char* issuer_host_name;           // used for logging
67   int steps;
68   state_t state;
69 } s_task_data_t, *task_data_t;
70
71 static void get_mailbox(int node_id, char* mailbox);
72 static int domain(unsigned int a, unsigned int level);
73 static int shl(int a, int b);
74 static int closest_in_namespace_set(node_t node, int dest);
75 static int routing_next(node_t node, int dest);
76
77 /**
78  * \brief Gets the mailbox name of a host given its chord id.
79  * \param node_id id of a node
80  * \param mailbox pointer to where the mailbox name should be written
81  * (there must be enough space)
82  */
83 static void get_mailbox(int node_id, char* mailbox)
84 {
85   snprintf(mailbox, MAILBOX_NAME_SIZE - 1, "%d", node_id);
86 }
87
88 /** Get the specific level of a node id */
89 unsigned int domain_mask = 0;
90 static int domain(unsigned int a, unsigned int level)
91 {
92   if (domain_mask == 0)
93     domain_mask = pow(2, DOMAIN_SIZE) - 1;
94   unsigned int shift = (LEVELS_COUNT-level-1)*DOMAIN_SIZE;
95   return (a >> shift) & domain_mask;
96 }
97
98 /* Get the shared domains between the two givens ids */
99 static int shl(int a, int b) {
100   int l = 0;
101   while(l<LEVELS_COUNT && domain(a,l) == domain(b,l))
102     l++;
103   return l;
104 }
105
106 /* Frees the memory used by a task and destroy it */
107 static void task_free(void* task)
108 {
109   // TODO add a parameter data_free_function to MSG_task_create?
110   if(task != NULL){
111     s_task_data_t* data = (s_task_data_t*)MSG_task_get_data(task);
112     xbt_free(data->state);
113     xbt_free(data);
114     MSG_task_destroy(task);
115   }
116 }
117
118 /* Get the closest id to the dest in the node namespace_set */
119 static int closest_in_namespace_set(node_t node, int dest) {
120   int res = -1;
121   if ((node->namespace_set[NAMESPACE_SIZE-1] <= dest) && (dest <= node->namespace_set[0])) {
122     int best_dist = abs(node->id - dest);
123     res = node->id;
124     for (int i=0; i<NAMESPACE_SIZE; i++) {
125       if (node->namespace_set[i]!=-1) {
126         int dist = abs(node->namespace_set[i] - dest);
127         if (dist<best_dist) {
128           best_dist = dist;
129           res = node->namespace_set[i];
130         }
131       }
132     }
133   }
134   return res;
135 }
136
137 /* Find the next node to forward a message to */
138 static int routing_next(node_t node, int dest) {
139   int closest = closest_in_namespace_set(node, dest);
140   int res = -1;
141   if (closest!=-1)
142     return closest;
143
144   int l = shl(node->id, dest);
145   res = node->routing_table[l][domain(dest, l)];
146   if (res!=-1)
147     return res;
148
149   //rare case
150   int dist = abs(node->id - dest);
151   int i;
152   for (i=l; i<LEVELS_COUNT; i++) {
153     for (int j=0; j<LEVEL_SIZE; j++) {
154       res = node->routing_table[i][j];
155       if (res!=-1 && abs(res - dest)<dist)
156         return res;
157     }
158   }
159
160   for (i=0; i<NEIGHBORHOOD_SIZE; i++) {
161     res = node->neighborhood_set[i];
162     if (res!=-1 && shl(res, dest)>=l && abs(res - dest)<dist)
163         return res;
164   }
165
166   for (i=0; i<NAMESPACE_SIZE; i++) {
167     res = node->namespace_set[i];
168     if (res!=-1 && shl(res, dest)>=l && abs(res - dest)<dist)
169         return res;
170   }
171
172   return node->id;
173 }
174
175 /* Get the corresponding state of a node */
176 static state_t node_get_state(node_t node) {
177   int i;
178   state_t state = xbt_new0(s_state_t,1);
179   state->id = node->id;
180   for (i=0; i<NEIGHBORHOOD_SIZE; i++)
181     state->neighborhood_set[i] = node->neighborhood_set[i];
182
183   for (i=0; i<LEVELS_COUNT; i++)
184     for (int j=0; j<LEVEL_SIZE; j++)
185       state->routing_table[i][j] = node->routing_table[i][j];
186
187   for (i=0; i<NAMESPACE_SIZE; i++)
188     state->namespace_set[i] = node->namespace_set[i];
189
190   return state;
191 }
192
193 /* Print the node id */
194 static void print_node_id(node_t node) {
195   XBT_INFO(" Id: %i '%08x' ", node->id, node->id);
196 }
197
198 /* * Print the node neighborhood set */
199 static void print_node_neighborood_set(node_t node) {
200   XBT_INFO(" Neighborhood:");
201   for (int i=0; i<NEIGHBORHOOD_SIZE; i++)
202     XBT_INFO("  %08x", node->neighborhood_set[i]);
203 }
204
205 /* Print the routing table */
206 static void print_node_routing_table(node_t node) {
207   XBT_INFO(" Routing table:");
208   for (int i=0; i<LEVELS_COUNT; i++){
209     for (int j=0; j<LEVEL_SIZE; j++)
210       XBT_INFO("  %08x ", node->routing_table[i][j]);
211   }
212 }
213
214 /* Print the node namespace set */
215 static void print_node_namespace_set(node_t node) {
216   XBT_INFO(" Namespace:");
217   for (int i=0; i<NAMESPACE_SIZE; i++)
218     XBT_INFO("  %08x", node->namespace_set[i]);
219 }
220
221 /* Print the node information */
222 static void print_node(node_t node) {
223   XBT_INFO("Node:");
224   print_node_id(node);
225   print_node_neighborood_set(node);
226   print_node_routing_table(node);
227   print_node_namespace_set(node);
228 }
229
230 /** Handle a given task */
231 static void handle_task(node_t node, msg_task_t task) {
232   XBT_DEBUG("Handling task %p", task);
233   char mailbox[MAILBOX_NAME_SIZE];
234   int i;
235   int j;
236   int min;
237   int max;
238   int d;
239   msg_task_t task_sent;
240   task_data_t req_data;
241   task_data_t task_data = (task_data_t) MSG_task_get_data(task);
242   e_task_type_t type = task_data->type;
243   // If the node is not ready keep the task for later
244   if (node->ready != 0 && !(type==TASK_JOIN_LAST_REPLY || type==TASK_JOIN_REPLY)) {
245     XBT_DEBUG("Task pending %i", type);
246     xbt_fifo_push(node->pending_tasks, task);
247     return;
248   }
249   switch (type) {
250     /* Try to join the ring */
251     case TASK_JOIN: {
252       int next = routing_next(node, task_data->answer_id);
253       XBT_DEBUG("Join request from %08x forwarding to %08x", task_data->answer_id, next);      
254       type = TASK_JOIN_LAST_REPLY;
255
256       req_data = xbt_new0(s_task_data_t,1);
257       req_data->answer_id = task_data->sender_id;
258       req_data->steps = task_data->steps + 1;
259       
260       // if next different from current node forward the join
261       if (next!=node->id) {
262         get_mailbox(next, mailbox);
263         task_data->sender_id = node->id;
264         task_data->steps++;
265         task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, task_data);
266         if (MSG_task_send_with_timeout(task_sent, mailbox, timeout)== MSG_TIMEOUT) {
267           XBT_DEBUG("Timeout expired when forwarding join to next %d", next);
268           task_free(task_sent);
269         }
270         type = TASK_JOIN_REPLY;
271       } 
272       
273       // send back the current node state to the joining node
274       req_data->type = type;
275       req_data->sender_id = node->id;
276       get_mailbox(node->id, req_data->answer_to);
277       req_data->state = node_get_state(node);
278       task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
279       if (MSG_task_send_with_timeout(task_sent, task_data->answer_to, timeout)== MSG_TIMEOUT) {
280         XBT_DEBUG("Timeout expired when sending back the current node state to the joining node to %d", node->id);
281         task_free(task_sent);
282       }
283       break;
284     }
285     /* Join reply from all the node touched by the join  */
286     case TASK_JOIN_LAST_REPLY:
287       // if last node touched reply, copy its namespace set
288       // TODO: it's work only if the two nodes are side to side (is it really the case ?)
289       j = (task_data->sender_id < node->id) ? -1 : 0;
290       for (i=0; i<NAMESPACE_SIZE/2; i++) {
291         node->namespace_set[i] = task_data->state->namespace_set[i-j];
292         node->namespace_set[NAMESPACE_SIZE-1-i] = task_data->state->namespace_set[NAMESPACE_SIZE-1-i-j-1];
293       }
294       node->namespace_set[NAMESPACE_SIZE/2+j] = task_data->sender_id;
295       node->ready += task_data->steps + 1;
296       /* no break */
297     case TASK_JOIN_REPLY:
298       XBT_DEBUG("Joining Reply");
299
300       // if first node touched reply, copy its neighborhood set
301       if (task_data->sender_id == node->known_id) {
302         node->neighborhood_set[0] = task_data->sender_id;
303         for (i=1; i<NEIGHBORHOOD_SIZE; i++)
304           node->neighborhood_set[i] = task_data->state->neighborhood_set[i-1];
305       }
306
307       // copy the corresponding routing table levels
308       min = (node->id==task_data->answer_id) ? 0 : shl(node->id, task_data->answer_id);
309       max = shl(node->id, task_data->sender_id)+1;
310       for (i=min;i<max;i++) {
311         d = domain(node->id, i); 
312         for (j=0; j<LEVEL_SIZE; j++)
313           if (d!=j)
314             node->routing_table[i][j] =  task_data->state->routing_table[i][j];
315           }
316
317       node->ready--;
318       // if the node is ready, do all the pending tasks and send update to known nodes
319       if (node->ready==0) {
320         XBT_DEBUG("Node %i is ready!!!", node->id);
321
322         while(xbt_fifo_size(node->pending_tasks))
323           handle_task(node, xbt_fifo_pop(node->pending_tasks));
324
325         for (i=0; i<NAMESPACE_SIZE; i++) {
326           j = node->namespace_set[i];
327           if (j!=-1) {
328             XBT_DEBUG("Send update to %i", j);
329             get_mailbox(j, mailbox);
330
331             req_data = xbt_new0(s_task_data_t,1);
332             req_data->answer_id = node->id;
333             req_data->steps = 0;
334             req_data->type = TASK_UPDATE;
335             req_data->sender_id = node->id;
336             get_mailbox(node->id, req_data->answer_to);
337             req_data->state = node_get_state(node);
338             task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
339             if (MSG_task_send_with_timeout(task_sent, mailbox, timeout)== MSG_TIMEOUT) {
340               XBT_DEBUG("Timeout expired when sending update to %d", j);
341               task_free(task_sent);
342             }
343           }
344         }
345         }
346       break;
347     /* Received an update of state */
348     case TASK_UPDATE:
349       XBT_DEBUG("Task update %i !!!", node->id);
350
351       /* Update namespace ses */
352       XBT_INFO("Task update from %i !!!", task_data->sender_id);
353       XBT_INFO("Node:");
354       print_node_id(node);
355       print_node_namespace_set(node);
356       int curr_namespace_set[NAMESPACE_SIZE];
357       int task_namespace_set[NAMESPACE_SIZE+1];
358       
359       // Copy the current namespace and the task state namespace with state->id in the middle
360       i=0;
361       for (; i<NAMESPACE_SIZE/2; i++){
362         curr_namespace_set[i] = node->namespace_set[i];
363         task_namespace_set[i] = task_data->state->namespace_set[i];
364       }
365       task_namespace_set[i] = task_data->state->id;
366       for (; i<NAMESPACE_SIZE; i++){
367         curr_namespace_set[i] = node->namespace_set[i];  
368         task_namespace_set[i+1] = task_data->state->namespace_set[i];
369       }
370
371       // get the index of values before and after node->id in task_namespace
372       min = -1;
373       max = -1;
374       for (i=0; i<=NAMESPACE_SIZE; i++) {
375         j = task_namespace_set[i];
376         if (j != -1 && j < node->id)
377           min = i;
378         if (j != -1 && max == -1 && j > node->id)
379           max = i;
380       }
381
382       // add lower elements
383       j = NAMESPACE_SIZE/2-1;
384       for (i=NAMESPACE_SIZE/2-1; i>=0; i--) {
385         if (min<0) {
386           node->namespace_set[i] = curr_namespace_set[j];
387           j--;
388         } else if (curr_namespace_set[j] == task_namespace_set[min]) {
389           node->namespace_set[i] = curr_namespace_set[j];
390           j--;
391           min--;
392         } else if (curr_namespace_set[j] > task_namespace_set[min]) {
393           node->namespace_set[i] = curr_namespace_set[j];
394           j--;
395         } else {
396           node->namespace_set[i] = task_namespace_set[min];
397           min--;
398         }
399       }
400
401       // add greater elements
402       j = NAMESPACE_SIZE/2;
403       for (i=NAMESPACE_SIZE/2; i<NAMESPACE_SIZE; i++) {
404         if (min<0 || max>=NAMESPACE_SIZE) {
405          node->namespace_set[i] = curr_namespace_set[j];
406          j++;
407         } else if (curr_namespace_set[j] == -1) {
408           node->namespace_set[i] = task_namespace_set[max];
409           max++;
410         } else if (curr_namespace_set[j] == task_namespace_set[max]) {
411           node->namespace_set[i] = curr_namespace_set[j];
412           j++;
413           max++;
414         } else if (curr_namespace_set[j] < task_namespace_set[max]) {
415           node->namespace_set[i] = curr_namespace_set[j];
416           j++;
417         } else {
418           node->namespace_set[i] = task_namespace_set[max];
419           max++;
420         }
421       }
422
423       /* Update routing table */
424       for (i=shl(node->id, task_data->state->id); i<LEVELS_COUNT; i++) {
425         for (j=0; j<LEVEL_SIZE; j++) {
426           if (node->routing_table[i][j]==-1 && task_data->state->routing_table[i][j]==-1)
427             node->routing_table[i][j] = task_data->state->routing_table[i][j];
428         }
429       }
430       break;
431     default:
432       THROW_IMPOSSIBLE;
433   }
434   task_free(task);
435 }
436
437 /** \brief Initializes the current node as the first one of the system.
438  *  \param node the current node
439  */
440 static void create(node_t node){
441   node->ready = 0;
442   XBT_DEBUG("Create a new Pastry ring...");
443 }
444
445 /* Join the ring */
446 static int join(node_t node){
447   task_data_t req_data = xbt_new0(s_task_data_t,1);
448   req_data->type = TASK_JOIN;
449   req_data->sender_id = node->id;
450   req_data->answer_id = node->id;
451   req_data->steps = 0;
452   get_mailbox(node->id, req_data->answer_to);
453
454   char mailbox[MAILBOX_NAME_SIZE];
455   get_mailbox(node->known_id, mailbox);
456
457   msg_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
458   XBT_DEBUG("Trying to join Pastry ring... (with node %s)", mailbox);
459   if (MSG_task_send_with_timeout(task_sent, mailbox, timeout)== MSG_TIMEOUT) {
460     XBT_DEBUG("Timeout expired when joining ring with node %d", node->known_id);
461     task_free(task_sent);
462   }
463
464   return 1;
465 }
466
467 /**
468  * \brief Node Function
469  * Arguments:
470  * - my id
471  * - the id of a guy I know in the system (except for the first node)
472  * - the time to sleep before I join (except for the first node)
473  * - the deadline time
474  */
475 static int node(int argc, char *argv[])
476 {
477   double init_time = MSG_get_clock();
478   msg_task_t task_received = NULL;  
479   int join_success = 0;  
480   double deadline;
481   xbt_assert(argc == 3 || argc == 5, "Wrong number of arguments for this node");
482   s_node_t node = {0};
483   node.id = xbt_str_parse_int(argv[1], "Invalid ID: %s");
484   node.known_id = -1;
485   node.ready = -1;
486   node.pending_tasks = xbt_fifo_new();
487   get_mailbox(node.id, node.mailbox);
488   XBT_DEBUG("New node with id %s (%08x)", node.mailbox, node.id);
489   
490   int i;
491   for (i=0; i<LEVELS_COUNT; i++){
492     int d = domain(node.id, i);
493     for (int j=0; j<LEVEL_SIZE; j++)
494       node.routing_table[i][j] = (d==j) ? node.id : -1;
495   }
496
497   for (i=0; i<NEIGHBORHOOD_SIZE; i++)
498     node.neighborhood_set[i] = -1;
499
500   for (i=0; i<NAMESPACE_SIZE; i++)
501     node.namespace_set[i] = -1;
502
503   if (argc == 3) { // first ring
504     XBT_DEBUG("Hey! Let's create the system.");
505     deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
506     create(&node);
507     join_success = 1;
508   }
509   else {
510     node.known_id = xbt_str_parse_int(argv[2], "Invalid known ID: %s");
511     double sleep_time = xbt_str_parse_double(argv[3], "Invalid sleep time: %s");
512     deadline = xbt_str_parse_double(argv[4], "Invalid deadline: %s");
513
514     // sleep before starting
515     XBT_DEBUG("Let's sleep during %f", sleep_time);
516     MSG_process_sleep(sleep_time);
517     XBT_DEBUG("Hey! Let's join the system.");
518
519     join_success = join(&node);
520   }
521
522   if (join_success) {
523     XBT_DEBUG("Waiting ….");
524
525     while (MSG_get_clock() < init_time + deadline
526 //      && MSG_get_clock() < node.last_change_date + 1000
527         && MSG_get_clock() < max_simulation_time) {
528       if (node.comm_receive == NULL) {
529         task_received = NULL;
530         node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
531         // FIXME: do not make MSG_task_irecv() calls from several functions
532       }
533       if (!MSG_comm_test(node.comm_receive)) {
534         MSG_process_sleep(5);
535       } else {
536         // a transfer has occurred
537
538         msg_error_t status = MSG_comm_get_status(node.comm_receive);
539
540         if (status != MSG_OK) {
541           XBT_DEBUG("Failed to receive a task. Nevermind.");
542           MSG_comm_destroy(node.comm_receive);
543           node.comm_receive = NULL;
544         }
545         else {
546           // the task was successfully received
547           MSG_comm_destroy(node.comm_receive);
548           node.comm_receive = NULL;
549           handle_task(&node, task_received);
550         }
551       }
552
553     }
554   //Cleanup the receiving communication.
555   if (node.comm_receive != NULL) {
556     if (MSG_comm_test(node.comm_receive) && MSG_comm_get_status(node.comm_receive) == MSG_OK) {
557       task_free(MSG_comm_get_task(node.comm_receive));
558     }
559     MSG_comm_destroy(node.comm_receive);
560   }
561
562   }
563   xbt_free(node.pending_tasks);
564   return 1;
565 }
566
567 /** \brief Main function. */
568 int main(int argc, char *argv[])
569 {
570   MSG_init(&argc, argv);
571   xbt_assert(argc > 2, 
572        "Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n"
573        "\tExample: %s ../msg_platform.xml pastry10.xml\n", 
574        argv[0], argv[0]);
575
576   char **options = &argv[1];
577   while (!strncmp(options[0], "-", 1)) {
578     int length = strlen("-nb_bits=");
579     if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
580       nb_bits = xbt_str_parse_int(options[0] + length, "Invalid nb_bits parameter: %s");
581       XBT_DEBUG("Set nb_bits to %d", nb_bits);
582     } else {
583       length = strlen("-timeout=");
584       if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
585         timeout = xbt_str_parse_int(options[0] + length, "Invalid timeout parameter: %s");
586         XBT_DEBUG("Set timeout to %d", timeout);
587       } else {
588         xbt_die("Invalid pastry option '%s'", options[0]);
589       }
590     }
591     options++;
592   }
593
594   MSG_create_environment(options[0]);
595
596   MSG_function_register("node", node);
597   MSG_launch_application(options[1]);
598
599   msg_error_t res = MSG_main();
600   XBT_INFO("Simulated time: %g", MSG_get_clock());
601
602   return res != MSG_OK;
603 }