Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add pastry msg example (First attempt)
[simgrid.git] / examples / msg / pastry / pastry.c
1 #include <stdio.h>
2 #include <math.h>
3 #include "msg/msg.h"
4 #include "xbt/log.h"
5 #include "xbt/asserts.h"
6
7  XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry,
8                              "Messages specific for this msg example");
9
10 /***************************************
11  * PASTRY                              *
12  *                                     *
13  * TODO:                               *
14  *  - handle node departure            *
15  *  - handle objects on the network    *
16  *  - handle neighborood in the update *
17  *                                     *
18  ***************************************/
19
20 #define COMM_SIZE 10
21 #define COMP_SIZE 0
22 #define MAILBOX_NAME_SIZE 10
23
24 #define DOMAIN_SIZE 4
25 #define LEVELS_COUNT 8 // sizeof(int)*8/DOMAIN_SIZE
26 #define LEVEL_SIZE 16 // 2^DOMAIN_SIZE
27 #define NEIGHBORHOOD_SIZE 6
28 #define NAMESPACE_SIZE 6
29 #define MAILBOX_NAME_SIZE 10
30
31 static int nb_bits = 16;
32 static int nb_keys = 0;
33 static int timeout = 50;
34 static int max_simulation_time = 1000;
35
36 extern long int smx_total_comms;
37
38
39
40 static int domain_mask = pow(2, DOMAIN_SIZE) - 1;
41
42 typedef struct s_node {
43   int id;                                 //128bits generated random(2^128 -1)
44   int known_id;
45   char mailbox[MAILBOX_NAME_SIZE];        // my mailbox name (string representation of the id)
46   int namespace_set[NAMESPACE_SIZE];
47   int neighborhood_set[NEIGHBORHOOD_SIZE];
48   int routing_table[LEVELS_COUNT][LEVEL_SIZE];
49   int ready;
50   msg_comm_t comm_receive;                // current communication to receive
51   xbt_fifo_t pending_tasks;
52 } s_node_t, *node_t;
53
54 typedef struct s_state {
55   int id;
56   int namespace_set[NAMESPACE_SIZE];
57   int neighborhood_set[NEIGHBORHOOD_SIZE];
58   int routing_table[LEVELS_COUNT][LEVEL_SIZE];
59 } s_state_t, *state_t;
60
61 /**
62  * Types of tasks exchanged between nodes.
63  */
64 typedef enum {
65   TASK_JOIN,
66   TASK_JOIN_REPLY,
67   TASK_JOIN_LAST_REPLY,
68   TASK_UPDATE
69 } e_task_type_t;
70
71 typedef struct s_task_data {
72   e_task_type_t type;                     // type of task
73   int sender_id;                          // id paramater (used by some types of tasks)
74   //int request_finger;                     // finger parameter (used by some types of tasks)
75   int answer_id;                          // answer (used by some types of tasks)
76   char answer_to[MAILBOX_NAME_SIZE];      // mailbox to send an answer to (if any)
77   //const char* issuer_host_name;           // used for logging
78   int steps;
79   state_t state;
80 } s_task_data_t, *task_data_t;
81
82 static void print_node(node_t node);
83 static void print_node_id(node_t node);
84 static void print_node_neighborood_set(node_t node);
85 static void print_node_routing_table(node_t node);
86 static void print_node_namespace_set(node_t node);
87 static state_t node_get_state(node_t node);
88 static void get_mailbox(int node_id, char* mailbox);
89 static int domain(int a, int level);
90 static int shl(int a, int b);
91 static int closest_in_namespace_set(node_t node, int dest);
92 static int routing_next(node_t node, int dest);
93 static void create(node_t node);
94 static int join(node_t node);
95
96
97 /**
98  * \brief Gets the mailbox name of a host given its chord id.
99  * \param node_id id of a node
100  * \param mailbox pointer to where the mailbox name should be written
101  * (there must be enough space)
102  */
103 static void get_mailbox(int node_id, char* mailbox)
104 {
105   snprintf(mailbox, MAILBOX_NAME_SIZE - 1, "%d", node_id);
106 }
107
108 /**
109  * Get the specific level of a node id
110  */
111 static int domain(int a, int level) {
112   int shift = (LEVELS_COUNT-level-1)*DOMAIN_SIZE;
113   return (a >> shift) & domain_mask;
114 }
115
116 /**
117  * Get the shared domains between the two givens ids
118  */
119 static int shl(int a, int b) {
120   int l = 0;
121   while(l<LEVELS_COUNT && domain(a,l) == domain(b,l))
122     l++;
123   return l;
124 }
125
126 /*
127  * Get the cloest id to the dest in the node namespace_set
128  */
129 static int closest_in_namespace_set(node_t node, int dest) {
130   int best_dist;
131   int res = -1;
132   if (node->namespace_set[NAMESPACE_SIZE-1] <= dest & dest <= node->namespace_set[0]) {
133     best_dist = abs(node->id - dest);
134     res = node->id;
135     int i, dist;
136     for (i=0; i<NAMESPACE_SIZE; i++) {
137       if (node->namespace_set[i]!=-1) {
138         dist = abs(node->namespace_set[i] - dest);
139         if (dist<best_dist) {
140           best_dist = dist;
141           res = node->namespace_set[i];   
142         }
143       }
144     }
145   }
146   return res;
147 }
148
149 /*
150  * Find the next node to forward a meassage to
151  */
152 static int routing_next(node_t node, int dest) {
153   int closest = closest_in_namespace_set(node, dest);
154   int res = -1;
155   if (closest!=-1)
156     return closest;
157
158   int l = shl(node->id, dest);
159   res = node->routing_table[l][domain(dest, l)];
160   if (res!=-1)
161     return res;
162
163   //rare case
164   int dist = abs(node->id - dest);
165   int i,j;
166   for (i=l; i<LEVELS_COUNT; i++) {
167     for (j=0; j<LEVEL_SIZE; j++) {
168       res = node->routing_table[i][j];
169       if (res!=-1 && abs(res - dest)<dist)
170         return res;
171     }
172   }
173
174   for (i=0; i<NEIGHBORHOOD_SIZE; i++) {
175     res = node->neighborhood_set[i];
176     if (res!=-1 && shl(res, dest)>=l && abs(res - dest)<dist)
177         return res;
178   }
179
180   for (i=0; i<NAMESPACE_SIZE; i++) {
181     res = node->namespace_set[i];
182     if (res!=-1 && shl(res, dest)>=l && abs(res - dest)<dist)
183         return res;
184   }
185
186   return node->id;
187 }
188
189 /*
190  * Handle a given task
191  */
192 static void handle_task(node_t node, msg_task_t task) {
193   XBT_DEBUG("Handling task %p", task);
194   char mailbox[MAILBOX_NAME_SIZE];
195   int i, j, min, max, d;
196   msg_task_t task_sent;
197   task_data_t req_data;
198   task_data_t task_data = (task_data_t) MSG_task_get_data(task);
199   e_task_type_t type = task_data->type;
200   // If the node is not ready keep the task for later
201   if (node->ready != 0 && !(type==TASK_JOIN_LAST_REPLY || type==TASK_JOIN_REPLY)) {
202     XBT_DEBUG("Task pending %i", type);
203     xbt_fifo_push(node->pending_tasks, task);
204     return;
205   }
206   switch (type) {
207     /*
208      * Try to join the ring
209      */
210     case TASK_JOIN:;
211       int next = routing_next(node, task_data->answer_id);
212       XBT_DEBUG("Join request from %08x forwarding to %08x", task_data->answer_id, next);      
213       type = TASK_JOIN_LAST_REPLY;
214
215       req_data = xbt_new0(s_task_data_t,1);
216       req_data->answer_id = task_data->sender_id;
217       req_data->steps = task_data->steps + 1;
218       
219       // if next different from current node forward the join
220       if (next!=node->id) {
221         get_mailbox(next, mailbox);
222         task_data->sender_id = node->id;
223         task_data->steps++;
224         task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, task_data);
225         MSG_task_send_with_timeout(task_sent, mailbox, timeout);
226         type = TASK_JOIN_REPLY;
227       } 
228       
229       // send back the current node state to the joining node
230       req_data->type = type;
231       req_data->sender_id = node->id;
232       get_mailbox(node->id, req_data->answer_to);
233       req_data->state = node_get_state(node);
234       task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
235       MSG_task_send_with_timeout(task_sent, task_data->answer_to, timeout);
236       break;
237
238     /*
239      * Join reply from all the node touched by the join
240      */
241     case TASK_JOIN_LAST_REPLY:
242       // if last node touched reply, copy its namespace set
243       // TODO: it's work only if the two nodes are side to side (is it really the case ?)
244       j = (task_data->sender_id < node->id) ? -1 : 0;
245       for (i=0; i<NAMESPACE_SIZE/2; i++) {
246         node->namespace_set[i] = task_data->state->namespace_set[i-j];
247         node->namespace_set[NAMESPACE_SIZE-1-i] = task_data->state->namespace_set[NAMESPACE_SIZE-1-i-j-1];
248       }
249       node->namespace_set[NAMESPACE_SIZE/2+j] = task_data->sender_id;
250       node->ready += task_data->steps + 1;
251     case TASK_JOIN_REPLY:
252       XBT_DEBUG("Joining Reply");
253
254       // if first node touched reply, copy its neighborood set
255       if (task_data->sender_id == node->known_id) {
256         node->neighborhood_set[0] = task_data->sender_id;
257         for (i=1; i<NEIGHBORHOOD_SIZE; i++)
258             node->neighborhood_set[i] = task_data->state->neighborhood_set[i-1]; 
259       }
260       
261       // copy the corresponding routing table levels
262       min = (node->id==task_data->answer_id) ? 0 : shl(node->id, task_data->answer_id);      
263       max = shl(node->id, task_data->sender_id)+1;
264       for (i=min;i<max;i++) {
265         d = domain(node->id, i); 
266         for (j=0; j<LEVEL_SIZE; j++)
267           if (d!=j)
268             node->routing_table[i][j] =  task_data->state->routing_table[i][j];
269       }
270
271       node->ready--;
272       // if the node is ready, do all the pending tasks and send update to known nodes
273       if (node->ready==0) {
274         XBT_DEBUG("Node %i is ready!!!", node->id);
275
276         while(xbt_fifo_size(node->pending_tasks))
277           handle_task(node, xbt_fifo_pop(node->pending_tasks));
278
279         for (i=0; i<NAMESPACE_SIZE; i++) {
280           j = node->namespace_set[i];
281           if (j!=-1) {
282             XBT_DEBUG("Send update to %i", j);
283             get_mailbox(j, mailbox);
284             
285             req_data = xbt_new0(s_task_data_t,1);
286             req_data->answer_id = node->id;
287             req_data->steps = 0;
288             req_data->type = TASK_UPDATE;
289             req_data->sender_id = node->id;
290             get_mailbox(node->id, req_data->answer_to);
291             req_data->state = node_get_state(node);
292             task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
293             MSG_task_send_with_timeout(task_sent, mailbox, timeout);
294           }
295         }
296       }
297       break;
298       
299     /*
300      * Recieved an update of state
301      */
302     case TASK_UPDATE:
303       XBT_DEBUG("Task update %i !!!", node->id);
304
305       /* Update namespace ses */
306       printf("Task update from %i !!!\n", task_data->sender_id);
307       print_node_id(node);
308       print_node_namespace_set(node);
309       int curr_namespace_set[NAMESPACE_SIZE];
310       int task_namespace_set[NAMESPACE_SIZE+1];
311       
312       // Copy the current namedspace
313       // and the task state namespace with state->id in the middle
314       i=0;
315       for (; i<NAMESPACE_SIZE/2; i++){
316         curr_namespace_set[i] = node->namespace_set[i];
317         task_namespace_set[i] = task_data->state->namespace_set[i];
318       }
319       task_namespace_set[i] = task_data->state->id;
320       for (; i<NAMESPACE_SIZE; i++){
321         curr_namespace_set[i] = node->namespace_set[i]; 
322         task_namespace_set[i+1] = task_data->state->namespace_set[i];   
323       }
324
325       // get the index of values before and after node->id in task_namespace
326       min = -1;
327       max = -1;
328       for (i=0; i<=NAMESPACE_SIZE; i++) {
329         j = task_namespace_set[i];
330         printf("%08x %08x | ", j, curr_namespace_set[i]);
331         if (j != -1 && j < node->id) min = i;
332         if (j != -1 && max == -1 && j > node->id) max = i;
333       }
334       printf("\n");
335
336       // add lower elements
337       j = NAMESPACE_SIZE/2-1;
338       for (i=NAMESPACE_SIZE/2-1; i>=0; i--) {
339         printf("i:%i, j:%i, min:%i, currj:%08x, taskmin:%08x\n", i, j, min, curr_namespace_set[j], task_namespace_set[min]);
340         if (min<0) {
341           node->namespace_set[i] = curr_namespace_set[j];
342           j--; 
343         } else if (curr_namespace_set[j] == task_namespace_set[min]) {
344           node->namespace_set[i] = curr_namespace_set[j];
345           j--; min--;
346         } else if (curr_namespace_set[j] > task_namespace_set[min]) {
347           node->namespace_set[i] = curr_namespace_set[j];
348           j--;
349         } else {
350           node->namespace_set[i] = task_namespace_set[min];
351           min--;
352         }
353       }
354
355       // add greater elements
356       j = NAMESPACE_SIZE/2;
357       for (i=NAMESPACE_SIZE/2; i<NAMESPACE_SIZE; i++) {
358         printf("i:%i, j:%i, max:%i, currj:%08x, taskmax:%08x\n", i, j, max, curr_namespace_set[j], task_namespace_set[max]);          
359         if (min<0 || max>=NAMESPACE_SIZE) {
360           node->namespace_set[i] = curr_namespace_set[j];
361           j++;
362         } else if (curr_namespace_set[j] == -1) {
363           node->namespace_set[i] = task_namespace_set[max];
364           max++;
365         } else if (curr_namespace_set[j] == task_namespace_set[max]) {
366           node->namespace_set[i] = curr_namespace_set[j];
367           j++; max++;
368         } else if (curr_namespace_set[j] < task_namespace_set[max]) {
369           node->namespace_set[i] = curr_namespace_set[j];
370           j++;
371         } else {
372           node->namespace_set[i] = task_namespace_set[max];
373           max++;
374         }
375       }
376       print_node_namespace_set(node);
377
378       /* Update routing table */
379       for (i=shl(node->id, task_data->state->id); i<LEVELS_COUNT; i++) {
380         for (j=0; j<LEVEL_SIZE; j++) {
381           if (node->routing_table[i][j]==-1 && task_data->state->routing_table[i][j]==-1)
382             node->routing_table[i][j] = task_data->state->routing_table[i][j];
383         }
384       }
385   }        
386 }
387
388 /**
389  * \brief Initializes the current node as the first one of the system.
390  * \param node the current node
391  */
392 static void create(node_t node){
393   node->ready = 0;
394   XBT_DEBUG("Create a new Pastry ring...");
395 }
396
397 /*
398  * Join the ring
399  */
400 static int join(node_t node){
401   task_data_t req_data = xbt_new0(s_task_data_t,1);
402   req_data->type = TASK_JOIN;
403   req_data->sender_id = node->id;
404   req_data->answer_id = node->id;
405   req_data->steps = 0;
406   get_mailbox(node->id, req_data->answer_to);
407
408   char mailbox[MAILBOX_NAME_SIZE];
409   get_mailbox(node->known_id, mailbox);
410
411   msg_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
412   XBT_DEBUG("Trying to join Pastry ring... (with node %s)", mailbox);
413   MSG_task_send_with_timeout(task_sent, mailbox, timeout);
414
415   return 1;
416 }
417
418 /*
419  * Print the node infomations
420  */
421 static void print_node(node_t node) {
422   printf("Node:\n");
423   print_node_id(node);
424   print_node_neighborood_set(node);
425   print_node_routing_table(node);
426   print_node_namespace_set(node);
427 }
428
429 /*
430  * Print the node id
431  */
432 static void print_node_id(node_t node) {
433   int i;        
434   printf(" id: %i '%08x' ", node->id, node->id);
435   for (i=0;i<LEVELS_COUNT;i++)
436     printf(" %x", domain(node->id, i));
437   printf("\n");
438 }
439
440 /*
441  * Print the node neighborood set
442  */
443 static void print_node_neighborood_set(node_t node) {
444   int i;        
445   printf(" neighborood:\n");
446   for (i=0; i<NEIGHBORHOOD_SIZE; i++)
447     printf("  %08x\n", node->neighborhood_set[i]);
448 }
449
450 /*
451  * Print the routing table
452  */
453 static void print_node_routing_table(node_t node) {
454   int i,j;      
455   printf(" routing table:\n");
456   for (i=0; i<LEVELS_COUNT; i++){
457     printf("  ");
458     for (j=0; j<LEVEL_SIZE; j++)
459       printf("%08x ", node->routing_table[i][j]);
460     printf("\n");
461   }
462 }
463
464 /*
465  * Print the node namespace set
466  */
467 static void print_node_namespace_set(node_t node) {
468   int i;
469   printf(" namespace:\n");
470   for (i=0; i<NAMESPACE_SIZE; i++)
471     printf("  %08x\n", node->namespace_set[i]);
472   printf("\n");
473
474 }
475
476 /*
477  * Get the corresponding state of a node
478  */
479 static state_t node_get_state(node_t node) {
480   int i,j;      
481   state_t state = xbt_new0(s_state_t,1);
482   state->id = node->id;
483   for (i=0; i<NEIGHBORHOOD_SIZE; i++)
484     state->neighborhood_set[i] = node->neighborhood_set[i];
485
486   for (i=0; i<LEVELS_COUNT; i++)
487     for (j=0; j<LEVEL_SIZE; j++)
488       state->routing_table[i][j] = node->routing_table[i][j];
489
490   for (i=0; i<NAMESPACE_SIZE; i++)
491     state->namespace_set[i] = node->namespace_set[i];
492
493   return state;
494 }
495
496
497 /**
498  * \brief Node Function
499  * Arguments:
500  * - my id
501  * - the id of a guy I know in the system (except for the first node)
502  * - the time to sleep before I join (except for the first node)
503  * - the deadline time
504  */
505 int node(int argc, char *argv[])
506 {
507   double init_time = MSG_get_clock();
508   msg_task_t task_received = NULL;  
509   int join_success = 0;  
510   double deadline;
511   xbt_assert(argc == 3 || argc == 5, "Wrong number of arguments for this node");
512   s_node_t node = {0};
513   node.id = atoi(argv[1]);
514   node.known_id = -1;
515   node.ready = -1;
516   node.pending_tasks = xbt_fifo_new();
517   get_mailbox(node.id, node.mailbox);
518   XBT_DEBUG("New node with id %s (%08x)", node.mailbox, node.id);
519   
520   int i,j,d;
521   for (i=0; i<LEVELS_COUNT; i++){
522     d = domain(node.id, i);
523     for (j=0; j<LEVEL_SIZE; j++)
524       node.routing_table[i][j] = (d==j) ? node.id : -1;
525   }
526
527   for (i=0; i<NEIGHBORHOOD_SIZE; i++)
528     node.neighborhood_set[i] = -1;
529
530   for (i=0; i<NAMESPACE_SIZE; i++)
531     node.namespace_set[i] = -1;
532
533   if (argc == 3) { // first ring
534     XBT_DEBUG("Hey! Let's create the system.");
535     deadline = atof(argv[2]);
536     create(&node);
537     join_success = 1;
538   }
539   else {
540     node.known_id = atoi(argv[2]);
541     double sleep_time = atof(argv[3]);
542     deadline = atof(argv[4]);
543
544     // sleep before starting
545     XBT_DEBUG("Let's sleep during %f", sleep_time);
546     MSG_process_sleep(sleep_time);
547     XBT_DEBUG("Hey! Let's join the system.");
548
549     join_success = join(&node);
550   }
551
552   if (join_success) {
553     XBT_DEBUG("Waiting ….");
554
555     while (MSG_get_clock() < init_time + deadline
556 //      && MSG_get_clock() < node.last_change_date + 1000
557         && MSG_get_clock() < max_simulation_time) {
558       if (node.comm_receive == NULL) {
559         task_received = NULL;
560         node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
561         // FIXME: do not make MSG_task_irecv() calls from several functions
562       }
563       if (!MSG_comm_test(node.comm_receive)) {
564         MSG_process_sleep(5);
565       } else {
566         // a transfer has occurred
567
568         msg_error_t status = MSG_comm_get_status(node.comm_receive);
569
570         if (status != MSG_OK) {
571           XBT_DEBUG("Failed to receive a task. Nevermind.");
572           MSG_comm_destroy(node.comm_receive);
573           node.comm_receive = NULL;
574         }
575         else {
576           // the task was successfully received
577           MSG_comm_destroy(node.comm_receive);
578           node.comm_receive = NULL;
579           handle_task(&node, task_received);
580         }
581       }
582
583     }
584     print_node(&node);
585   }
586 }
587
588 /*
589  * Node data.
590  */
591 /*typedef struct s_node {
592   int id;                                 // my id
593   char mailbox[MAILBOX_NAME_SIZE];        // my mailbox name (string representation of the id)
594   s_finger_t *fingers;                    // finger table, of size nb_bits (fingers[0] is my successor)
595   int pred_id;                            // predecessor id
596   char pred_mailbox[MAILBOX_NAME_SIZE];   // predecessor's mailbox name
597   int next_finger_to_fix;                 // index of the next finger to fix in fix_fingers()
598   msg_comm_t comm_receive;                // current communication to receive
599   double last_change_date;                // last time I changed a finger or my predecessor
600 } s_node_t, *node_t;*/
601
602 /**
603  * \brief Main function.
604  */
605 int main(int argc, char *argv[])
606 {
607   MSG_init(&argc, argv);
608   if (argc < 3) {
609     printf("Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n", argv[0]);
610     printf("example: %s ../msg_platform.xml chord.xml\n", argv[0]);
611     exit(1);
612   }
613   
614   char **options = &argv[1];
615   while (!strncmp(options[0], "-", 1)) {
616
617     int length = strlen("-nb_bits=");
618     if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
619       nb_bits = atoi(options[0] + length);
620       XBT_DEBUG("Set nb_bits to %d", nb_bits);
621     }
622     else {
623
624       length = strlen("-timeout=");
625       if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
626         timeout = atoi(options[0] + length);
627         XBT_DEBUG("Set timeout to %d", timeout);
628       }
629       else {
630         xbt_die("Invalid chord option '%s'", options[0]);
631       }
632     }
633     options++;
634   }
635
636   const char* platform_file = options[0];
637   const char* application_file = options[1];
638
639   MSG_create_environment(platform_file);
640   
641   MSG_function_register("node", node);
642   MSG_launch_application(application_file);
643
644   msg_error_t res = MSG_main();
645   XBT_CRITICAL("Messages created: %ld", smx_total_comms);
646   XBT_INFO("Simulated time: %g", MSG_get_clock());
647
648   if (res == MSG_OK)
649     return 0;
650   else
651     return 1;
652
653 }