Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ae5dc08ae24433c9eeeaa7642d50f124fe64f7bb
[simgrid.git] / examples / msg / kadeploy / kadeploy.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * Copyright (c) 2012. Maximiliano Geier.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h"         /* calloc */
13
14 /* Create a log channel to have nice outputs. */
15 #include "xbt/log.h"
16 #include "xbt/asserts.h"
17
18 /** @addtogroup MSG_examples
19  * 
20  *  - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
21  */
22
23
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25                              "Messages specific for kadeploy");
26
27 #define MESSAGE_SIZE 1
28 #define HOSTNAME_LENGTH 20
29
30 /*
31  Data structures
32  */
33
34 /* Random iterator for xbt_dynar */
35 typedef struct xbt_dynar_iterator_struct {
36   xbt_dynar_t list;
37   xbt_dynar_t indices_list;
38   int current;
39   unsigned long length;
40   int (*criteria_fn)(void* it);
41 } *xbt_dynar_iterator_t;
42 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
43
44 /* Messages enum */
45 typedef enum {
46   MESSAGE_BUILD_CHAIN = 0,
47   MESSAGE_SEND_DATA,
48   MESSAGE_END_DATA
49 } e_message_type;
50
51 /* Message struct */
52 typedef struct s_message {
53   e_message_type type;
54   const char *issuer_hostname;
55   const char *mailbox;
56   const char *prev_hostname;
57   const char *next_hostname;
58   const char *data_block;
59   unsigned int data_length;
60 } s_message_t, *message_t;
61
62 /* Peer struct */
63 typedef struct s_peer {
64   int init;
65   const char *prev;
66   const char *next;
67   const char *me;
68   int pieces;
69 } s_peer_t, *peer_t;
70
71 /* Iterator methods */
72 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
73 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
74 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
75 int xbt_dynar_iterator_forward_criteria(void *p);
76
77 /* Message methods */
78 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
79 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
80 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
81 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
82 void task_message_delete(void *);
83
84 /* Tasks */
85 int broadcaster(int argc, char *argv[]);
86 int peer(int argc, char *argv[]);
87
88 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
89 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
90
91 /* Broadcaster: helper functions */
92 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
93 int broadcaster_send_file(const char *first);
94 int broadcaster_finish(xbt_dynar_t host_list);
95
96 /* Peer: helper functions */
97 msg_error_t peer_wait_for_message();
98 int peer_execute_task(peer_t peer, msg_task_t task);
99 void peer_init_chain(peer_t peer, message_t msg);
100
101 /* Initialization stuff */
102 msg_error_t test_all(const char *platform_file,
103                      const char *application_file);
104
105 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
106    criteria_fn: given an iterator, it must update the iterator and give the next element's index, 
107    less than 0 otherwise*/
108 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
109 {
110   xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
111   
112   it->list = list;
113   it->length = xbt_dynar_length(list);
114   it->indices_list = xbt_dynar_new(sizeof(int), NULL);
115   it->criteria_fn = criteria_fn;
116   it->current = -1;
117 }
118
119 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
120 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
121 {
122   int next = it->criteria_fn((xbt_dynar_iterator_t)it);
123   XBT_INFO("%d current\n", next);
124   if (next < 0) {
125     XBT_INFO("Nothing to return!\n");
126     return NULL;
127   } else {
128     xbt_dynar_push(it->indices_list, &next);
129     return xbt_dynar_get_ptr(it->list, next);
130   }
131 }
132
133 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
134 {
135   xbt_dynar_free_container(&(it->indices_list));
136   xbt_free_ref(&it);
137 }
138
139 int xbt_dynar_iterator_forward_criteria(void *p)
140 {
141   xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
142   int r = -1;
143   if (it->current == -1) {
144     /* iterator initialization */
145     it->current = 0;
146   }
147   if (it->current < it->length) {
148     r = it->current;
149     it->current++;
150   }
151
152   return r;
153 }
154
155 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
156 {
157   message_t msg = xbt_new(s_message_t, 1);
158   msg->type = type;
159   msg->issuer_hostname = issuer_hostname;
160   msg->mailbox = mailbox;
161   msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
162
163   return task;
164 }
165
166 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
167 {
168   msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
169   message_t msg = MSG_task_get_data(task);
170   msg->prev_hostname = prev;
171   msg->next_hostname = next;
172
173   return task;
174 }
175
176 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
177 {
178   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
179   message_t msg = MSG_task_get_data(task);
180   msg->data_block = block;
181   msg->data_length = len;
182
183   return task;
184 }
185
186 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
187 {
188   return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
189 }
190
191
192 void task_message_delete(void *task)
193 {
194   message_t msg = MSG_task_get_data(task);
195   xbt_free(msg);
196   MSG_task_destroy(task);
197 }
198
199 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
200 {
201   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
202   char *hostname = NULL;
203   msg_host_t h = NULL;
204   int i = 1;
205   
206   for (; i < hostcount+1; i++) {
207     hostname = xbt_new(char, HOSTNAME_LENGTH);
208     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
209     XBT_INFO("%s", hostname);
210     h = MSG_get_host_by_name(hostname);
211     if (h == NULL) {
212       XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
213       abort();
214     } else {
215       xbt_dynar_push(host_list, &hostname);
216     }
217   }
218   return host_list;
219 }
220
221 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
222 {
223   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
224   msg_host_t h = NULL;
225   int i = 1;
226   
227   for (; i < argc; i++) {
228     XBT_INFO("host%d = %s", i, argv[i]);
229     h = MSG_get_host_by_name(argv[i]);
230     if (h == NULL) {
231       XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
232       abort();
233     } else {
234       xbt_dynar_push(host_list, &(argv[i]));
235     }
236   }
237   return host_list;
238 }*/
239
240 void delete_hostlist(xbt_dynar_t h)
241 {
242   xbt_dynar_free(&h);
243 }
244
245 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
246 {
247   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
248   msg_task_t task = NULL;
249   char **cur = (char**)xbt_dynar_iterator_next(it);
250   const char *me = MSG_host_get_name(MSG_host_self());
251   const char *current_host = NULL;
252   const char *prev = NULL;
253   const char *next = NULL;
254   const char *last = NULL;
255
256   /* Build the chain if there's at least one peer */
257   if (cur != NULL) {
258     /* init: prev=NULL, host=current cur, next=next cur */
259     next = *cur;
260     *first = next;
261
262     /* This iterator iterates one step ahead: cur is current iterated element, 
263        but it's actually the next one in the chain */
264     do {
265       /* following steps: prev=last, host=next, next=cur */
266       cur = (char**)xbt_dynar_iterator_next(it);
267       prev = last;
268       current_host = next;
269       if (cur != NULL)
270         next = *cur;
271       else
272         next = NULL;
273       XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
274     
275       /* Send message to current peer */
276       task = task_message_chain_new(me, current_host, prev, next);
277       MSG_task_send(task, current_host);
278
279       last = current_host;
280     } while (cur != NULL);
281   }
282   xbt_dynar_iterator_delete(it);
283
284   return MSG_OK;
285 }
286
287 int broadcaster_send_file(const char *first)
288 {
289   const char *me = MSG_host_get_name(MSG_host_self());
290   msg_task_t task = NULL;
291   msg_comm_t comm = NULL;
292   int status;
293
294   int piece_count = 10;
295   int cur = 0;
296
297   for (; cur < piece_count; cur++) {
298     /* TODO: stub */
299     task = task_message_data_new(me, first, NULL, 0);
300     XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
301     //comm = MSG_task_isend(task, first);
302     //status = 
303     MSG_task_dsend(task, first, task_message_delete);
304    
305     //status = MSG_comm_wait(comm, -1);
306     //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
307     //MSG_comm_destroy(comm);
308   }
309
310   return MSG_OK;
311 }
312
313 int broadcaster_finish(xbt_dynar_t host_list)
314 {
315   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
316   msg_task_t task = NULL;
317   const char *me = MSG_host_get_name(MSG_host_self());
318   const char *current_host = NULL;
319   char **cur = NULL;
320
321   /* Send goodbye message to every peer */
322   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
323     /* Send message to current peer */
324     current_host = *cur;
325     task = task_message_end_data_new(me, current_host);
326     MSG_task_send(task, current_host);
327   }
328
329   return MSG_OK;
330 }
331
332
333 /** Emitter function  */
334 int broadcaster(int argc, char *argv[])
335 {
336   xbt_dynar_t host_list = NULL;
337   const char *first = NULL;
338   int status = !MSG_OK;
339
340   XBT_INFO("broadcaster");
341
342   /* Check that every host given by the hostcount in argv[1] exists and add it
343      to a dynamic array */
344   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
345   /*host_list = build_hostlist_from_argv(argc, argv);*/
346   
347   /* TODO: Error checking */
348   status = broadcaster_build_chain(&first, host_list);
349   status = broadcaster_send_file(first);
350   status = broadcaster_finish(host_list);
351
352   delete_hostlist(host_list);
353
354   return status;
355 }
356
357 /*******************************************************
358  *                     Peer                            *
359  *******************************************************/
360
361 void peer_init_chain(peer_t peer, message_t msg)
362 {
363   peer->prev = msg->prev_hostname;
364   peer->next = msg->next_hostname;
365   peer->init = 1;
366 }
367
368 /* TODO: error checking */
369 void peer_forward_msg(peer_t peer, message_t msg)
370 {
371   int status;
372   msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
373   msg_comm_t comm = NULL;
374   XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
375   //comm =
376   //status = 
377   MSG_task_dsend(task, peer->next, task_message_delete);
378    
379   //status = MSG_comm_wait(comm, -1);
380   xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
381   //MSG_comm_destroy(comm);
382 }
383
384 int peer_execute_task(peer_t peer, msg_task_t task)
385 {
386   int done = 0, init = 0;
387   message_t msg = MSG_task_get_data(task);
388   
389   XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
390   switch (msg->type) {
391     case MESSAGE_BUILD_CHAIN:
392       peer_init_chain(peer, msg);
393       break;
394     case MESSAGE_SEND_DATA:
395       xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
396       if (peer->next != NULL)
397         peer_forward_msg(peer, msg);
398       peer->pieces++;
399       break;
400     case MESSAGE_END_DATA:
401       xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
402       done = 1;
403       XBT_INFO("%d pieces receieved", peer->pieces);
404       break;
405   }
406
407   MSG_task_execute(task);
408
409   return done;
410 }
411
412 msg_error_t peer_wait_for_message(peer_t peer)
413 {
414   msg_error_t status;
415   msg_comm_t comm = NULL;
416   msg_task_t task = NULL;
417   int done = 0;
418
419
420   /* TODO: Error checking is not correct */
421   while (!done) {
422     if (comm == NULL)
423       comm = MSG_task_irecv(&task, peer->me);
424
425     if (MSG_comm_test(comm)) {
426       status = MSG_comm_get_status(comm);
427       xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
428       MSG_comm_destroy(comm);
429       comm = NULL;
430       done = peer_execute_task(peer, task);
431       task_message_delete(task);
432       task = NULL;
433     } else {
434       MSG_process_sleep(0.01);
435     }
436   }
437
438   return status;
439 }
440
441 void peer_init(peer_t p)
442 {
443   p->init = 0;
444   p->prev = NULL;
445   p->next = NULL;
446   p->pieces = 0;
447   p->me = MSG_host_get_name(MSG_host_self());
448 }
449
450 /** Peer function  */
451 int peer(int argc, char *argv[])
452 {
453   peer_t p = xbt_new(s_peer_t, 1);
454   msg_error_t status;
455
456   XBT_INFO("peer");
457
458   peer_init(p);
459   status = peer_wait_for_message(p);
460
461   xbt_free(p);
462
463   return MSG_OK;
464 }                               /* end_of_receiver */
465
466
467 /** Test function */
468 msg_error_t test_all(const char *platform_file,
469                      const char *application_file)
470 {
471
472   msg_error_t res = MSG_OK;
473
474
475
476   XBT_INFO("test_all");
477
478   /*  Simulation setting */
479   MSG_create_environment(platform_file);
480
481   /*   Application deployment */
482   MSG_function_register("broadcaster", broadcaster);
483   MSG_function_register("peer", peer);
484
485   MSG_launch_application(application_file);
486
487   res = MSG_main();
488
489   return res;
490 }                               /* end_of_test_all */
491
492
493 /** Main function */
494 int main(int argc, char *argv[])
495 {
496   msg_error_t res = MSG_OK;
497
498 #ifdef _MSC_VER
499   unsigned int prev_exponent_format =
500       _set_output_format(_TWO_DIGIT_EXPONENT);
501 #endif
502
503   MSG_init(&argc, argv);
504
505
506   /*if (argc <= 3) {
507     XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
508               argv[0]);
509     XBT_CRITICAL
510         ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
511          argv[0]);
512     exit(1);
513   }*/
514
515   /* Options for the workstation/model:
516
517      KCCFLN05              => for maxmin
518      KCCFLN05_proportional => for proportional (Vegas)
519      KCCFLN05_Vegas        => for TCP Vegas
520      KCCFLN05_Reno         => for TCP Reno
521    */
522   //MSG_config("workstation/model", argv[3]);
523
524   res = test_all(argv[1], argv[2]);
525
526   XBT_INFO("Total simulation time: %le", MSG_get_clock());
527
528   MSG_clean();
529
530 #ifdef _MSC_VER
531   _set_output_format(prev_exponent_format);
532 #endif
533
534   if (res == MSG_OK)
535     return 0;
536   else
537     return 1;
538 }                               /* end_of_main */