Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
171f3d1bbc71962983af000a70c9ac7ca7d2024c
[simgrid.git] / examples / msg / kadeploy / kadeploy.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * Copyright (c) 2012. Maximiliano Geier.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h"         /* calloc */
13
14 /* Create a log channel to have nice outputs. */
15 #include "xbt/log.h"
16 #include "xbt/asserts.h"
17
18 /** @addtogroup MSG_examples
19  * 
20  *  - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
21  */
22
23
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25                              "Messages specific for kadeploy");
26
27 #define MESSAGE_SIZE 1
28 #define PIECE_COUNT 1000
29 #define HOSTNAME_LENGTH 20
30
31 #define PEER_SHUTDOWN_DEADLINE 6000
32
33 /*
34  Data structures
35  */
36
37 /* Random iterator for xbt_dynar */
38 typedef struct xbt_dynar_iterator_struct {
39   xbt_dynar_t list;
40   xbt_dynar_t indices_list;
41   int current;
42   unsigned long length;
43   xbt_dynar_t (*criteria_fn)(int size);
44 } *xbt_dynar_iterator_t;
45 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
46
47 /* Messages enum */
48 typedef enum {
49   MESSAGE_BUILD_CHAIN = 0,
50   MESSAGE_SEND_DATA,
51   MESSAGE_END_DATA
52 } e_message_type;
53
54 /* Message struct */
55 typedef struct s_message {
56   e_message_type type;
57   const char *issuer_hostname;
58   const char *mailbox;
59   const char *prev_hostname;
60   const char *next_hostname;
61   const char *data_block;
62   unsigned int data_length;
63 } s_message_t, *message_t;
64
65 /* Peer struct */
66 typedef struct s_peer {
67   int init;
68   const char *prev;
69   const char *next;
70   const char *me;
71   int pieces;
72   xbt_dynar_t pending_sends;
73   int close_asap; /* TODO: unused */
74 } s_peer_t, *peer_t;
75
76 /* Iterator methods */
77 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int));
78 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
79 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
80
81 /* Iterator generators */
82 xbt_dynar_t forward_indices_list(int size);
83 xbt_dynar_t reverse_indices_list(int size);
84 xbt_dynar_t random_indices_list(int size);
85
86 /* Message methods */
87 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
88 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
89 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
90 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
91 void task_message_delete(void *);
92
93 /* Tasks */
94 int broadcaster(int argc, char *argv[]);
95 int peer(int argc, char *argv[]);
96
97 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
98 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
99
100 /* Broadcaster: helper functions */
101 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
102 int broadcaster_send_file(const char *first);
103 int broadcaster_finish(xbt_dynar_t host_list);
104
105 /* Peer: helper functions */
106 msg_error_t peer_wait_for_message(peer_t peer);
107 int peer_execute_task(peer_t peer, msg_task_t task);
108 void peer_init_chain(peer_t peer, message_t msg);
109 void peer_shutdown(peer_t p);
110 void peer_init(peer_t p);
111
112 /* Initialization stuff */
113 msg_error_t test_all(const char *platform_file,
114                      const char *application_file);
115
116 /* Shuffle */
117 /**************************************/
118 static int rand_int(int n);
119 void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list);
120
121 #define xbt_dynar_swap_elements(d, type, i, j) \
122   type tmp; \
123   tmp = xbt_dynar_get_as(indices_list, (unsigned int)j, type); \
124   xbt_dynar_set_as(indices_list, (unsigned int)j, type, \
125     xbt_dynar_get_as(indices_list, (unsigned int)i, type)); \
126   xbt_dynar_set_as(indices_list, (unsigned int)i, type, tmp);
127
128 /* http://stackoverflow.com/a/3348142 */
129 static int rand_int(int n)
130 {
131   int limit = RAND_MAX - RAND_MAX % n;
132   int rnd;
133
134   do {
135     rnd = rand();
136   } while (rnd >= limit);
137   
138   return rnd % n;
139 }
140
141 void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list)
142 {
143   int i, j;
144
145   for (i = xbt_dynar_length(indices_list) - 1; i > 0; i--) {
146     j = rand_int(i + 1);
147     xbt_dynar_swap_elements(indices_list, int, i, j);
148   }
149 }
150 /**************************************/
151
152 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
153    criteria_fn: given an array size, it must generate a list containing the indices of every item in some order */
154 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int))
155 {
156   xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
157   
158   it->list = list;
159   it->length = xbt_dynar_length(list);
160   it->indices_list = criteria_fn(it->length); //xbt_dynar_new(sizeof(int), NULL);
161   it->criteria_fn = criteria_fn;
162   it->current = 0;
163 }
164
165 void xbt_dynar_iterator_reset(xbt_dynar_iterator_t it)
166 {
167   xbt_dynar_free_container(&(it->indices_list));
168   it->indices_list = it->criteria_fn(it->length);
169   it->current = 0;
170 }
171
172 void xbt_dynar_iterator_seek(xbt_dynar_iterator_t it, int pos)
173 {
174   it->current = pos;
175 }
176
177 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
178 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
179 {
180   int *next;
181   //XBT_INFO("%d current\n", next);
182   if (it->current >= it->length) {
183     //XBT_INFO("Nothing to return!\n");
184     return NULL;
185   } else {
186     next = xbt_dynar_get_ptr(it->indices_list, it->current);
187     it->current++;
188     return xbt_dynar_get_ptr(it->list, *next);
189   }
190 }
191
192 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
193 {
194   xbt_dynar_free_container(&(it->indices_list));
195   xbt_free_ref(&it);
196 }
197
198 xbt_dynar_t forward_indices_list(int size)
199 {
200   xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
201   int i;
202   for (i = 0; i < size; i++)
203     xbt_dynar_push_as(indices_list, int, i);
204   return indices_list;
205 }
206
207 xbt_dynar_t reverse_indices_list(int size)
208 {
209   xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
210   int i;
211   for (i = size-1; i >= 0; i--)
212     xbt_dynar_push_as(indices_list, int, i);
213   return indices_list;
214 }
215
216 xbt_dynar_t random_indices_list(int size)
217 {
218   xbt_dynar_t indices_list = forward_indices_list(size);
219   xbt_dynar_shuffle_in_place(indices_list);
220   return indices_list;
221 }
222
223
224 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
225 {
226   message_t msg = xbt_new(s_message_t, 1);
227   msg->type = type;
228   msg->issuer_hostname = issuer_hostname;
229   msg->mailbox = mailbox;
230   msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
231
232   return task;
233 }
234
235 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
236 {
237   msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
238   message_t msg = MSG_task_get_data(task);
239   msg->prev_hostname = prev;
240   msg->next_hostname = next;
241
242   return task;
243 }
244
245 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
246 {
247   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
248   if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
249   message_t msg = MSG_task_get_data(task);
250   msg->data_block = block;
251   msg->data_length = len;
252
253   return task;
254 }
255
256 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
257 {
258   return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
259 }
260
261 void task_message_delete(void *task)
262 {
263   message_t msg = MSG_task_get_data(task);
264   xbt_free(msg);
265   MSG_task_destroy(task);
266 }
267
268 inline void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
269 {
270   xbt_dynar_push(q, &comm);
271 }
272
273 int process_pending_connections(xbt_dynar_t q)
274 {
275   unsigned int iter;
276   int status;
277   int empty = 0;
278   msg_comm_t comm;
279
280   xbt_dynar_foreach(q, iter, comm) {
281     empty = 1;
282     if (MSG_comm_test(comm)) {
283       MSG_comm_destroy(comm);
284       status = MSG_comm_get_status(comm);
285       xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
286       xbt_dynar_cursor_rm(q, &iter);
287       empty = 0;
288     }
289   }
290   return empty;
291 }
292
293 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
294 {
295   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
296   char *hostname = NULL;
297   msg_host_t h = NULL;
298   int i = 1;
299   
300   for (; i < hostcount+1; i++) {
301     hostname = xbt_new(char, HOSTNAME_LENGTH);
302     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
303     //XBT_INFO("%s", hostname);
304     h = MSG_get_host_by_name(hostname);
305     if (h == NULL) {
306       XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
307       abort();
308     } else {
309       xbt_dynar_push(host_list, &hostname);
310     }
311   }
312   return host_list;
313 }
314
315 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
316 {
317   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
318   msg_host_t h = NULL;
319   int i = 1;
320   
321   for (; i < argc; i++) {
322     XBT_INFO("host%d = %s", i, argv[i]);
323     h = MSG_get_host_by_name(argv[i]);
324     if (h == NULL) {
325       XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
326       abort();
327     } else {
328       xbt_dynar_push(host_list, &(argv[i]));
329     }
330   }
331   return host_list;
332 }*/
333
334 void delete_hostlist(xbt_dynar_t h)
335 {
336   xbt_dynar_free(&h);
337 }
338
339 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
340 {
341   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
342   msg_task_t task = NULL;
343   char **cur = (char**)xbt_dynar_iterator_next(it);
344   const char *me = MSG_host_get_name(MSG_host_self());
345   const char *current_host = NULL;
346   const char *prev = NULL;
347   const char *next = NULL;
348   const char *last = NULL;
349
350   /* Build the chain if there's at least one peer */
351   if (cur != NULL) {
352     /* init: prev=NULL, host=current cur, next=next cur */
353     next = *cur;
354     *first = next;
355
356     /* This iterator iterates one step ahead: cur is current iterated element, 
357        but it's actually the next one in the chain */
358     do {
359       /* following steps: prev=last, host=next, next=cur */
360       cur = (char**)xbt_dynar_iterator_next(it);
361       prev = last;
362       current_host = next;
363       if (cur != NULL)
364         next = *cur;
365       else
366         next = NULL;
367       //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
368     
369       /* Send message to current peer */
370       task = task_message_chain_new(me, current_host, prev, next);
371       //MSG_task_set_category(task, current_host);
372       MSG_task_send(task, current_host);
373
374       last = current_host;
375     } while (cur != NULL);
376   }
377   xbt_dynar_iterator_delete(it);
378
379   return MSG_OK;
380 }
381
382 int broadcaster_send_file(const char *first)
383 {
384   const char *me = MSG_host_get_name(MSG_host_self());
385   msg_task_t task = NULL;
386   msg_comm_t comm = NULL;
387   int status;
388
389   int piece_count = PIECE_COUNT;
390   int cur = 0;
391
392   for (; cur < piece_count; cur++) {
393     task = task_message_data_new(me, first, NULL, 0);
394     XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
395     status = MSG_task_send(task, first);
396    
397     xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
398   }
399
400   return MSG_OK;
401 }
402
403 /* FIXME: I should iterate nodes in the same order as the one used to build the chain */
404 int broadcaster_finish(xbt_dynar_t host_list)
405 {
406   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
407   msg_task_t task = NULL;
408   const char *me = MSG_host_get_name(MSG_host_self());
409   const char *current_host = NULL;
410   char **cur = NULL;
411
412   /* Send goodbye message to every peer */
413   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
414     /* Send message to current peer */
415     current_host = *cur;
416     task = task_message_end_data_new(me, current_host);
417     //MSG_task_set_category(task, current_host);
418     MSG_task_send(task, current_host);
419   }
420
421   return MSG_OK;
422 }
423
424
425 /** Emitter function  */
426 int broadcaster(int argc, char *argv[])
427 {
428   xbt_dynar_t host_list = NULL;
429   const char *first = NULL;
430   int status = !MSG_OK;
431
432   XBT_INFO("broadcaster");
433
434   /* Check that every host given by the hostcount in argv[1] exists and add it
435      to a dynamic array */
436   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
437   /*host_list = build_hostlist_from_argv(argc, argv);*/
438   
439   /* TODO: Error checking */
440   status = broadcaster_build_chain(&first, host_list);
441   status = broadcaster_send_file(first);
442   status = broadcaster_finish(host_list);
443
444   delete_hostlist(host_list);
445
446   return status;
447 }
448
449 /*******************************************************
450  *                     Peer                            *
451  *******************************************************/
452
453 void peer_init_chain(peer_t peer, message_t msg)
454 {
455   peer->prev = msg->prev_hostname;
456   peer->next = msg->next_hostname;
457   peer->init = 1;
458 }
459
460 void peer_forward_msg(peer_t peer, message_t msg)
461 {
462   int status;
463   msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
464   msg_comm_t comm = NULL;
465   XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
466   comm = MSG_task_isend(task, peer->next);
467   queue_pending_connection(comm, peer->pending_sends);
468 }
469
470 int peer_execute_task(peer_t peer, msg_task_t task)
471 {
472   int done = 0;
473   message_t msg = MSG_task_get_data(task);
474   
475   //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
476   switch (msg->type) {
477     case MESSAGE_BUILD_CHAIN:
478       peer_init_chain(peer, msg);
479       break;
480     case MESSAGE_SEND_DATA:
481       xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
482       if (peer->next != NULL)
483         peer_forward_msg(peer, msg);
484       peer->pieces++;
485       break;
486     case MESSAGE_END_DATA:
487       xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
488       done = 1;
489       XBT_INFO("%d pieces receieved", peer->pieces);
490       break;
491   }
492
493   MSG_task_execute(task);
494
495   return done;
496 }
497
498 msg_error_t peer_wait_for_message(peer_t peer)
499 {
500   msg_error_t status;
501   msg_comm_t comm = NULL;
502   msg_task_t task = NULL;
503   int done = 0;
504
505   while (!done) {
506     if (comm == NULL)
507       comm = MSG_task_irecv(&task, peer->me);
508
509     if (MSG_comm_test(comm)) {
510       status = MSG_comm_get_status(comm);
511       //XBT_INFO("peer_wait_for_message: error code = %d", status);
512       xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
513       MSG_comm_destroy(comm);
514       comm = NULL;
515       done = peer_execute_task(peer, task);
516       task_message_delete(task);
517       task = NULL;
518     } else {
519       process_pending_connections(peer->pending_sends);
520       MSG_process_sleep(0.1);
521     }
522   }
523
524   return status;
525 }
526
527 void peer_init(peer_t p)
528 {
529   p->init = 0;
530   p->prev = NULL;
531   p->next = NULL;
532   p->pieces = 0;
533   p->close_asap = 0;
534   p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
535   p->me = MSG_host_get_name(MSG_host_self());
536 }
537
538 void peer_shutdown(peer_t p)
539 {
540   float start_time = MSG_get_clock();
541   float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
542
543   XBT_INFO("Waiting for sends to finish before shutdown...");
544   while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
545     process_pending_connections(p->pending_sends);
546     MSG_process_sleep(0.1);
547   }
548
549   xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
550   xbt_dynar_free(&p->pending_sends);
551
552   xbt_free(p);
553 }
554
555 /** Peer function  */
556 int peer(int argc, char *argv[])
557 {
558   peer_t p = xbt_new(s_peer_t, 1);
559   msg_error_t status;
560
561   XBT_INFO("peer");
562
563   peer_init(p);
564   status = peer_wait_for_message(p);
565   peer_shutdown(p);
566
567   return MSG_OK;
568 }                               /* end_of_receiver */
569
570
571 /** Test function */
572 msg_error_t test_all(const char *platform_file,
573                      const char *application_file)
574 {
575
576   msg_error_t res = MSG_OK;
577
578
579
580   XBT_INFO("test_all");
581
582   /*  Simulation setting */
583   MSG_create_environment(platform_file);
584
585   /* Trace categories */
586   TRACE_category_with_color("host0", "0 0 1");
587   TRACE_category_with_color("host1", "0 1 0");
588   TRACE_category_with_color("host2", "0 1 1");
589   TRACE_category_with_color("host3", "1 0 0");
590   TRACE_category_with_color("host4", "1 0 1");
591   TRACE_category_with_color("host5", "1 1 0");
592
593   /*   Application deployment */
594   MSG_function_register("broadcaster", broadcaster);
595   MSG_function_register("peer", peer);
596
597   MSG_launch_application(application_file);
598
599   res = MSG_main();
600
601   return res;
602 }                               /* end_of_test_all */
603
604
605 /** Main function */
606 int main(int argc, char *argv[])
607 {
608   msg_error_t res = MSG_OK;
609
610 #ifdef _MSC_VER
611   unsigned int prev_exponent_format =
612       _set_output_format(_TWO_DIGIT_EXPONENT);
613 #endif
614
615   MSG_init(&argc, argv);
616
617   /*if (argc <= 3) {
618     XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
619               argv[0]);
620     XBT_CRITICAL
621         ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
622          argv[0]);
623     exit(1);
624   }*/
625
626   /* Options for the workstation/model:
627
628      KCCFLN05              => for maxmin
629      KCCFLN05_proportional => for proportional (Vegas)
630      KCCFLN05_Vegas        => for TCP Vegas
631      KCCFLN05_Reno         => for TCP Reno
632    */
633   //MSG_config("workstation/model", argv[3]);
634
635   res = test_all(argv[1], argv[2]);
636
637   XBT_INFO("Total simulation time: %le", MSG_get_clock());
638
639   MSG_clean();
640
641 #ifdef _MSC_VER
642   _set_output_format(prev_exponent_format);
643 #endif
644
645   if (res == MSG_OK)
646     return 0;
647   else
648     return 1;
649 }                               /* end_of_main */