1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * Copyright (c) 2012. Maximiliano Geier.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h" /* calloc */
14 /* Create a log channel to have nice outputs. */
16 #include "xbt/asserts.h"
18 /** @addtogroup MSG_examples
20 * - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25 "Messages specific for kadeploy");
27 #define MESSAGE_SIZE 1
28 #define HOSTNAME_LENGTH 20
34 /* Random iterator for xbt_dynar */
35 typedef struct xbt_dynar_iterator_struct {
37 xbt_dynar_t indices_list;
40 int (*criteria_fn)(void* it);
41 } *xbt_dynar_iterator_t;
42 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
46 MESSAGE_BUILD_CHAIN = 0,
52 typedef struct s_message {
54 const char *issuer_hostname;
56 const char *prev_hostname;
57 const char *next_hostname;
58 const char *data_block;
59 unsigned int data_length;
60 } s_message_t, *message_t;
63 typedef struct s_peer {
71 /* Iterator methods */
72 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
73 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
74 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
75 int xbt_dynar_iterator_forward_criteria(void *p);
78 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
79 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
80 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
81 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
82 void task_message_delete(void *);
85 int broadcaster(int argc, char *argv[]);
86 int peer(int argc, char *argv[]);
88 xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
89 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
91 /* Broadcaster: helper functions */
92 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
93 int broadcaster_send_file(const char *first);
94 int broadcaster_finish(xbt_dynar_t host_list);
96 /* Peer: helper functions */
97 msg_error_t peer_wait_for_message();
98 int peer_execute_task(peer_t peer, msg_task_t task);
99 void peer_init_chain(peer_t peer, message_t msg);
101 /* Initialization stuff */
102 msg_error_t test_all(const char *platform_file,
103 const char *application_file);
105 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
106 criteria_fn: given an iterator, it must update the iterator and give the next element's index,
107 less than 0 otherwise*/
108 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
110 xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
113 it->length = xbt_dynar_length(list);
114 it->indices_list = xbt_dynar_new(sizeof(int), NULL);
115 it->criteria_fn = criteria_fn;
119 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
120 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
122 int next = it->criteria_fn((xbt_dynar_iterator_t)it);
123 XBT_INFO("%d current\n", next);
125 XBT_INFO("Nothing to return!\n");
128 xbt_dynar_push(it->indices_list, &next);
129 return xbt_dynar_get_ptr(it->list, next);
133 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
135 xbt_dynar_free_container(&(it->indices_list));
139 int xbt_dynar_iterator_forward_criteria(void *p)
141 xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
143 if (it->current == -1) {
144 /* iterator initialization */
147 if (it->current < it->length) {
155 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
157 message_t msg = xbt_new(s_message_t, 1);
159 msg->issuer_hostname = issuer_hostname;
160 msg->mailbox = mailbox;
161 msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg);
166 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
168 msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
169 message_t msg = MSG_task_get_data(task);
170 msg->prev_hostname = prev;
171 msg->next_hostname = next;
176 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
178 msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
179 message_t msg = MSG_task_get_data(task);
180 msg->data_block = block;
181 msg->data_length = len;
186 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
188 return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
192 void task_message_delete(void *task)
194 message_t msg = MSG_task_get_data(task);
196 MSG_task_destroy(task);
199 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
201 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
202 char *hostname = NULL;
206 for (; i < hostcount+1; i++) {
207 hostname = xbt_new(char, HOSTNAME_LENGTH);
208 snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
209 XBT_INFO("%s", hostname);
210 h = MSG_get_host_by_name(hostname);
212 XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
215 xbt_dynar_push(host_list, &hostname);
221 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
223 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
227 for (; i < argc; i++) {
228 XBT_INFO("host%d = %s", i, argv[i]);
229 h = MSG_get_host_by_name(argv[i]);
231 XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
234 xbt_dynar_push(host_list, &(argv[i]));
240 void delete_hostlist(xbt_dynar_t h)
245 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
247 xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
248 msg_task_t task = NULL;
249 char **cur = (char**)xbt_dynar_iterator_next(it);
250 const char *me = MSG_host_get_name(MSG_host_self());
251 const char *current_host = NULL;
252 const char *prev = NULL;
253 const char *next = NULL;
254 const char *last = NULL;
256 /* Build the chain if there's at least one peer */
258 /* init: prev=NULL, host=current cur, next=next cur */
262 /* This iterator iterates one step ahead: cur is current iterated element,
263 but it's actually the next one in the chain */
265 /* following steps: prev=last, host=next, next=cur */
266 cur = (char**)xbt_dynar_iterator_next(it);
273 XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
275 /* Send message to current peer */
276 task = task_message_chain_new(me, current_host, prev, next);
277 MSG_task_send(task, current_host);
280 } while (cur != NULL);
282 xbt_dynar_iterator_delete(it);
287 int broadcaster_send_file(const char *first)
289 const char *me = MSG_host_get_name(MSG_host_self());
290 msg_task_t task = NULL;
291 msg_comm_t comm = NULL;
294 int piece_count = 10;
297 for (; cur < piece_count; cur++) {
299 task = task_message_data_new(me, first, NULL, 0);
300 XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
301 //comm = MSG_task_isend(task, first);
303 MSG_task_dsend(task, first, task_message_delete);
305 //status = MSG_comm_wait(comm, -1);
306 //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
307 //MSG_comm_destroy(comm);
313 int broadcaster_finish(xbt_dynar_t host_list)
315 xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
316 msg_task_t task = NULL;
317 const char *me = MSG_host_get_name(MSG_host_self());
318 const char *current_host = NULL;
321 /* Send goodbye message to every peer */
322 for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
323 /* Send message to current peer */
325 task = task_message_end_data_new(me, current_host);
326 MSG_task_send(task, current_host);
333 /** Emitter function */
334 int broadcaster(int argc, char *argv[])
336 xbt_dynar_t host_list = NULL;
337 const char *first = NULL;
338 int status = !MSG_OK;
340 XBT_INFO("broadcaster");
342 /* Check that every host given by the hostcount in argv[1] exists and add it
343 to a dynamic array */
344 host_list = build_hostlist_from_hostcount(atoi(argv[1]));
345 /*host_list = build_hostlist_from_argv(argc, argv);*/
347 /* TODO: Error checking */
348 status = broadcaster_build_chain(&first, host_list);
349 status = broadcaster_send_file(first);
350 status = broadcaster_finish(host_list);
352 delete_hostlist(host_list);
357 /*******************************************************
359 *******************************************************/
361 void peer_init_chain(peer_t peer, message_t msg)
363 peer->prev = msg->prev_hostname;
364 peer->next = msg->next_hostname;
368 /* TODO: error checking */
369 void peer_forward_msg(peer_t peer, message_t msg)
372 msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
373 msg_comm_t comm = NULL;
374 XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
377 MSG_task_dsend(task, peer->next, task_message_delete);
379 //status = MSG_comm_wait(comm, -1);
380 xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
381 //MSG_comm_destroy(comm);
384 int peer_execute_task(peer_t peer, msg_task_t task)
386 int done = 0, init = 0;
387 message_t msg = MSG_task_get_data(task);
389 XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
391 case MESSAGE_BUILD_CHAIN:
392 peer_init_chain(peer, msg);
394 case MESSAGE_SEND_DATA:
395 xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
396 if (peer->next != NULL)
397 peer_forward_msg(peer, msg);
400 case MESSAGE_END_DATA:
401 xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
403 XBT_INFO("%d pieces receieved", peer->pieces);
407 MSG_task_execute(task);
412 msg_error_t peer_wait_for_message(peer_t peer)
415 msg_comm_t comm = NULL;
416 msg_task_t task = NULL;
420 /* TODO: Error checking is not correct */
423 comm = MSG_task_irecv(&task, peer->me);
425 if (MSG_comm_test(comm)) {
426 status = MSG_comm_get_status(comm);
427 xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
428 MSG_comm_destroy(comm);
430 done = peer_execute_task(peer, task);
431 task_message_delete(task);
434 MSG_process_sleep(0.01);
441 void peer_init(peer_t p)
447 p->me = MSG_host_get_name(MSG_host_self());
451 int peer(int argc, char *argv[])
453 peer_t p = xbt_new(s_peer_t, 1);
459 status = peer_wait_for_message(p);
464 } /* end_of_receiver */
468 msg_error_t test_all(const char *platform_file,
469 const char *application_file)
472 msg_error_t res = MSG_OK;
476 XBT_INFO("test_all");
478 /* Simulation setting */
479 MSG_create_environment(platform_file);
481 /* Application deployment */
482 MSG_function_register("broadcaster", broadcaster);
483 MSG_function_register("peer", peer);
485 MSG_launch_application(application_file);
490 } /* end_of_test_all */
494 int main(int argc, char *argv[])
496 msg_error_t res = MSG_OK;
499 unsigned int prev_exponent_format =
500 _set_output_format(_TWO_DIGIT_EXPONENT);
503 MSG_init(&argc, argv);
507 XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
510 ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
515 /* Options for the workstation/model:
517 KCCFLN05 => for maxmin
518 KCCFLN05_proportional => for proportional (Vegas)
519 KCCFLN05_Vegas => for TCP Vegas
520 KCCFLN05_Reno => for TCP Reno
522 //MSG_config("workstation/model", argv[3]);
524 res = test_all(argv[1], argv[2]);
526 XBT_INFO("Total simulation time: %le", MSG_get_clock());
531 _set_output_format(prev_exponent_format);