1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * Copyright (c) 2012. Maximiliano Geier.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h" /* calloc */
14 /* Create a log channel to have nice outputs. */
16 #include "xbt/asserts.h"
18 /** @addtogroup MSG_examples
20 * - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25 "Messages specific for kadeploy");
27 #define MESSAGE_SIZE 1
28 #define PIECE_COUNT 1000
29 #define HOSTNAME_LENGTH 20
31 #define PEER_SHUTDOWN_DEADLINE 6000
37 /* Random iterator for xbt_dynar */
38 typedef struct xbt_dynar_iterator_struct {
40 xbt_dynar_t indices_list;
43 xbt_dynar_t (*criteria_fn)(int size);
44 } *xbt_dynar_iterator_t;
45 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
49 MESSAGE_BUILD_CHAIN = 0,
55 typedef struct s_message {
57 const char *issuer_hostname;
59 const char *prev_hostname;
60 const char *next_hostname;
61 const char *data_block;
62 unsigned int data_length;
63 } s_message_t, *message_t;
66 typedef struct s_peer {
72 xbt_dynar_t pending_sends;
73 int close_asap; /* TODO: unused */
76 /* Iterator methods */
77 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int));
78 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
79 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
81 /* Iterator generators */
82 xbt_dynar_t forward_indices_list(int size);
83 xbt_dynar_t reverse_indices_list(int size);
84 xbt_dynar_t random_indices_list(int size);
87 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
88 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
89 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
90 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
91 void task_message_delete(void *);
94 int broadcaster(int argc, char *argv[]);
95 int peer(int argc, char *argv[]);
97 xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
98 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
100 /* Broadcaster: helper functions */
101 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
102 int broadcaster_send_file(const char *first);
103 int broadcaster_finish(xbt_dynar_t host_list);
105 /* Peer: helper functions */
106 msg_error_t peer_wait_for_message(peer_t peer);
107 int peer_execute_task(peer_t peer, msg_task_t task);
108 void peer_init_chain(peer_t peer, message_t msg);
109 void peer_shutdown(peer_t p);
110 void peer_init(peer_t p);
112 /* Initialization stuff */
113 msg_error_t test_all(const char *platform_file,
114 const char *application_file);
117 /**************************************/
118 static int rand_int(int n);
119 void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list);
121 #define xbt_dynar_swap_elements(d, type, i, j) \
123 tmp = xbt_dynar_get_as(indices_list, (unsigned int)j, type); \
124 xbt_dynar_set_as(indices_list, (unsigned int)j, type, \
125 xbt_dynar_get_as(indices_list, (unsigned int)i, type)); \
126 xbt_dynar_set_as(indices_list, (unsigned int)i, type, tmp);
128 /* http://stackoverflow.com/a/3348142 */
129 static int rand_int(int n)
131 int limit = RAND_MAX - RAND_MAX % n;
136 } while (rnd >= limit);
141 void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list)
145 for (i = xbt_dynar_length(indices_list) - 1; i > 0; i--) {
147 xbt_dynar_swap_elements(indices_list, int, i, j);
150 /**************************************/
152 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
153 criteria_fn: given an array size, it must generate a list containing the indices of every item in some order */
154 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int))
156 xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
159 it->length = xbt_dynar_length(list);
160 it->indices_list = criteria_fn(it->length); //xbt_dynar_new(sizeof(int), NULL);
161 it->criteria_fn = criteria_fn;
165 void xbt_dynar_iterator_reset(xbt_dynar_iterator_t it)
167 xbt_dynar_free_container(&(it->indices_list));
168 it->indices_list = it->criteria_fn(it->length);
172 void xbt_dynar_iterator_seek(xbt_dynar_iterator_t it, int pos)
177 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
178 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
181 //XBT_INFO("%d current\n", next);
182 if (it->current >= it->length) {
183 //XBT_INFO("Nothing to return!\n");
186 next = xbt_dynar_get_ptr(it->indices_list, it->current);
188 return xbt_dynar_get_ptr(it->list, *next);
192 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
194 xbt_dynar_free_container(&(it->indices_list));
198 xbt_dynar_t forward_indices_list(int size)
200 xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
202 for (i = 0; i < size; i++)
203 xbt_dynar_push_as(indices_list, int, i);
207 xbt_dynar_t reverse_indices_list(int size)
209 xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
211 for (i = size-1; i >= 0; i--)
212 xbt_dynar_push_as(indices_list, int, i);
216 xbt_dynar_t random_indices_list(int size)
218 xbt_dynar_t indices_list = forward_indices_list(size);
219 xbt_dynar_shuffle_in_place(indices_list);
224 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
226 message_t msg = xbt_new(s_message_t, 1);
228 msg->issuer_hostname = issuer_hostname;
229 msg->mailbox = mailbox;
230 msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg);
235 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
237 msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
238 message_t msg = MSG_task_get_data(task);
239 msg->prev_hostname = prev;
240 msg->next_hostname = next;
245 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
247 msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
248 if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
249 message_t msg = MSG_task_get_data(task);
250 msg->data_block = block;
251 msg->data_length = len;
256 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
258 return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
261 void task_message_delete(void *task)
263 message_t msg = MSG_task_get_data(task);
265 MSG_task_destroy(task);
268 inline void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
270 xbt_dynar_push(q, &comm);
273 int process_pending_connections(xbt_dynar_t q)
280 xbt_dynar_foreach(q, iter, comm) {
282 if (MSG_comm_test(comm)) {
283 MSG_comm_destroy(comm);
284 status = MSG_comm_get_status(comm);
285 xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
286 xbt_dynar_cursor_rm(q, &iter);
293 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
295 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
296 char *hostname = NULL;
300 for (; i < hostcount+1; i++) {
301 hostname = xbt_new(char, HOSTNAME_LENGTH);
302 snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
303 //XBT_INFO("%s", hostname);
304 h = MSG_get_host_by_name(hostname);
306 XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
309 xbt_dynar_push(host_list, &hostname);
315 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
317 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
321 for (; i < argc; i++) {
322 XBT_INFO("host%d = %s", i, argv[i]);
323 h = MSG_get_host_by_name(argv[i]);
325 XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
328 xbt_dynar_push(host_list, &(argv[i]));
334 void delete_hostlist(xbt_dynar_t h)
339 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
341 xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
342 msg_task_t task = NULL;
343 char **cur = (char**)xbt_dynar_iterator_next(it);
344 const char *me = MSG_host_get_name(MSG_host_self());
345 const char *current_host = NULL;
346 const char *prev = NULL;
347 const char *next = NULL;
348 const char *last = NULL;
350 /* Build the chain if there's at least one peer */
352 /* init: prev=NULL, host=current cur, next=next cur */
356 /* This iterator iterates one step ahead: cur is current iterated element,
357 but it's actually the next one in the chain */
359 /* following steps: prev=last, host=next, next=cur */
360 cur = (char**)xbt_dynar_iterator_next(it);
367 //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
369 /* Send message to current peer */
370 task = task_message_chain_new(me, current_host, prev, next);
371 //MSG_task_set_category(task, current_host);
372 MSG_task_send(task, current_host);
375 } while (cur != NULL);
377 xbt_dynar_iterator_delete(it);
382 int broadcaster_send_file(const char *first)
384 const char *me = MSG_host_get_name(MSG_host_self());
385 msg_task_t task = NULL;
386 msg_comm_t comm = NULL;
389 int piece_count = PIECE_COUNT;
392 for (; cur < piece_count; cur++) {
393 task = task_message_data_new(me, first, NULL, 0);
394 XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
395 status = MSG_task_send(task, first);
397 xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
403 /* FIXME: I should iterate nodes in the same order as the one used to build the chain */
404 int broadcaster_finish(xbt_dynar_t host_list)
406 xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
407 msg_task_t task = NULL;
408 const char *me = MSG_host_get_name(MSG_host_self());
409 const char *current_host = NULL;
412 /* Send goodbye message to every peer */
413 for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
414 /* Send message to current peer */
416 task = task_message_end_data_new(me, current_host);
417 //MSG_task_set_category(task, current_host);
418 MSG_task_send(task, current_host);
425 /** Emitter function */
426 int broadcaster(int argc, char *argv[])
428 xbt_dynar_t host_list = NULL;
429 const char *first = NULL;
430 int status = !MSG_OK;
432 XBT_INFO("broadcaster");
434 /* Check that every host given by the hostcount in argv[1] exists and add it
435 to a dynamic array */
436 host_list = build_hostlist_from_hostcount(atoi(argv[1]));
437 /*host_list = build_hostlist_from_argv(argc, argv);*/
439 /* TODO: Error checking */
440 status = broadcaster_build_chain(&first, host_list);
441 status = broadcaster_send_file(first);
442 status = broadcaster_finish(host_list);
444 delete_hostlist(host_list);
449 /*******************************************************
451 *******************************************************/
453 void peer_init_chain(peer_t peer, message_t msg)
455 peer->prev = msg->prev_hostname;
456 peer->next = msg->next_hostname;
460 void peer_forward_msg(peer_t peer, message_t msg)
463 msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
464 msg_comm_t comm = NULL;
465 XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
466 comm = MSG_task_isend(task, peer->next);
467 queue_pending_connection(comm, peer->pending_sends);
470 int peer_execute_task(peer_t peer, msg_task_t task)
473 message_t msg = MSG_task_get_data(task);
475 //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
477 case MESSAGE_BUILD_CHAIN:
478 peer_init_chain(peer, msg);
480 case MESSAGE_SEND_DATA:
481 xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
482 if (peer->next != NULL)
483 peer_forward_msg(peer, msg);
486 case MESSAGE_END_DATA:
487 xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
489 XBT_INFO("%d pieces receieved", peer->pieces);
493 MSG_task_execute(task);
498 msg_error_t peer_wait_for_message(peer_t peer)
501 msg_comm_t comm = NULL;
502 msg_task_t task = NULL;
507 comm = MSG_task_irecv(&task, peer->me);
509 if (MSG_comm_test(comm)) {
510 status = MSG_comm_get_status(comm);
511 //XBT_INFO("peer_wait_for_message: error code = %d", status);
512 xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
513 MSG_comm_destroy(comm);
515 done = peer_execute_task(peer, task);
516 task_message_delete(task);
519 process_pending_connections(peer->pending_sends);
520 MSG_process_sleep(0.1);
527 void peer_init(peer_t p)
534 p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
535 p->me = MSG_host_get_name(MSG_host_self());
538 void peer_shutdown(peer_t p)
540 float start_time = MSG_get_clock();
541 float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
543 XBT_INFO("Waiting for sends to finish before shutdown...");
544 while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
545 process_pending_connections(p->pending_sends);
546 MSG_process_sleep(0.1);
549 xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
550 xbt_dynar_free(&p->pending_sends);
556 int peer(int argc, char *argv[])
558 peer_t p = xbt_new(s_peer_t, 1);
564 status = peer_wait_for_message(p);
568 } /* end_of_receiver */
572 msg_error_t test_all(const char *platform_file,
573 const char *application_file)
576 msg_error_t res = MSG_OK;
580 XBT_INFO("test_all");
582 /* Simulation setting */
583 MSG_create_environment(platform_file);
585 /* Trace categories */
586 TRACE_category_with_color("host0", "0 0 1");
587 TRACE_category_with_color("host1", "0 1 0");
588 TRACE_category_with_color("host2", "0 1 1");
589 TRACE_category_with_color("host3", "1 0 0");
590 TRACE_category_with_color("host4", "1 0 1");
591 TRACE_category_with_color("host5", "1 1 0");
593 /* Application deployment */
594 MSG_function_register("broadcaster", broadcaster);
595 MSG_function_register("peer", peer);
597 MSG_launch_application(application_file);
602 } /* end_of_test_all */
606 int main(int argc, char *argv[])
608 msg_error_t res = MSG_OK;
611 unsigned int prev_exponent_format =
612 _set_output_format(_TWO_DIGIT_EXPONENT);
615 MSG_init(&argc, argv);
618 XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
621 ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
626 /* Options for the workstation/model:
628 KCCFLN05 => for maxmin
629 KCCFLN05_proportional => for proportional (Vegas)
630 KCCFLN05_Vegas => for TCP Vegas
631 KCCFLN05_Reno => for TCP Reno
633 //MSG_config("workstation/model", argv[3]);
635 res = test_all(argv[1], argv[2]);
637 XBT_INFO("Total simulation time: %le", MSG_get_clock());
642 _set_output_format(prev_exponent_format);