/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-#include<stdio.h>
+#include <stdio.h>
+#include <stdlib.h>
#include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
#include "xbt/sysdep.h" /* calloc */
"Messages specific for kadeploy");
#define MESSAGE_SIZE 1
+#define HOSTNAME_LENGTH 20
/*
Data structures
/* Messages enum */
typedef enum {
MESSAGE_BUILD_CHAIN = 0,
- MESSAGE_SEND_DATA
+ MESSAGE_SEND_DATA,
+ MESSAGE_END_DATA
} e_message_type;
/* Message struct */
/* Message methods */
msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
-msg_task_t task_message_chain_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
-msg_task_t task_message_data_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
+msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
+msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
void task_message_delete(void *);
/* Tasks */
int broadcaster(int argc, char *argv[]);
int peer(int argc, char *argv[]);
-void check_hosts(const int count, char **list);
-xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);
-void build_chain(xbt_dynar_t host_list);
+xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
+/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
+/* Broadcaster: helper functions */
+int broadcaster_build_chain(xbt_dynar_t host_list);
+int broadcaster_send_file(xbt_dynar_t host_list);
+int broadcaster_finish(xbt_dynar_t host_list);
+
+/* Peer: helper functions */
int peer_wait_for_init();
+/* Initialization stuff */
msg_error_t test_all(const char *platform_file,
const char *application_file);
-double task_comm_size_lat = 10e0;
-double task_comm_size_bw = 10e8;
-
/* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
criteria_fn: given an iterator, it must update the iterator and give the next element's index,
less than 0 otherwise*/
return task;
}
-msg_task_t task_message_chain_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
{
- msg_task_t task = task_message_new(type, issuer_hostname, mailbox);
+ msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
message_t msg = MSG_task_get_data(task);
msg->prev_hostname = prev;
msg->next_hostname = next;
return task;
}
-msg_task_t task_message_data_new(e_message_type type, const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
+msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
{
- msg_task_t task = task_message_new(type, issuer_hostname, mailbox);
+ msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
message_t msg = MSG_task_get_data(task);
msg->data_block = block;
msg->data_length = len;
return task;
}
+msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
+{
+ return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
+}
+
+
void task_message_delete(void *task)
{
message_t msg = MSG_task_get_data(task);
MSG_task_destroy(task);
}
+xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
+{
+ xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
+ char *hostname = NULL;
+ msg_host_t h = NULL;
+ int i = 1;
+
+ for (; i < hostcount+1; i++) {
+ hostname = xbt_new(char, HOSTNAME_LENGTH);
+ snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
+ XBT_INFO("%s", hostname);
+ h = MSG_get_host_by_name(hostname);
+ if (h == NULL) {
+ XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
+ abort();
+ } else {
+ xbt_dynar_push(host_list, &hostname);
+ }
+ }
+ return host_list;
+}
-xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
+/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
{
xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
msg_host_t h = NULL;
}
}
return host_list;
-}
+}*/
void delete_hostlist(xbt_dynar_t h)
{
- xbt_dynar_free_container(&h);
+ xbt_dynar_free(&h);
}
-void build_chain(xbt_dynar_t host_list)
+int broadcaster_build_chain(xbt_dynar_t host_list)
{
xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
+ msg_task_t task = NULL;
+ char **cur = (char**)xbt_dynar_iterator_next(it);
const char *current_host = NULL;
const char *prev = NULL;
const char *next = NULL;
const char *me = MSG_host_get_name(MSG_host_self());
+ const char *last = NULL;
+
+ /* Build the chain if there's at least one peer */
+ if (cur != NULL) {
+ /* init: prev=NULL, host=current cur, next=next cur */
+ next = *cur;
+
+ /* This iterator iterates one step ahead: cur is current iterated element,
+ but it's actually the next one in the chain */
+ do {
+ /* following steps: prev=last, host=next, next=cur */
+ cur = (char**)xbt_dynar_iterator_next(it);
+ prev = last;
+ current_host = next;
+ if (cur != NULL)
+ next = *cur;
+ else
+ next = NULL;
+ XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+
+ /* Send message to current peer */
+ task = task_message_chain_new(me, current_host, prev, next);
+ MSG_task_send(task, current_host);
+
+ last = current_host;
+ } while (cur != NULL);
+ }
+ xbt_dynar_iterator_delete(it);
+
+ return MSG_OK;
+}
+
+int broadcaster_send_file(xbt_dynar_t host_list)
+{
+ /* ... */
+
+ return MSG_OK;
+}
+
+int broadcaster_finish(xbt_dynar_t host_list)
+{
+ xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
+ msg_task_t task = NULL;
+ const char *me = MSG_host_get_name(MSG_host_self());
+ const char *current_host = NULL;
char **cur = NULL;
+ /* Send goodbye message to every peer */
for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
- current_host = *cur;
- XBT_INFO("Building chain broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
-
- msg_task_t msg = task_message_chain_new(MESSAGE_BUILD_CHAIN, me, current_host, prev, next);
- MSG_task_send(msg, current_host);
- task_message_delete(msg);
+ /* Send message to current peer */
+ current_host = *cur;
+ task = task_message_end_data_new(me, current_host);
+ MSG_task_send(task, current_host);
}
- xbt_dynar_iterator_delete(it);
+ return MSG_OK;
}
+
/** Emitter function */
int broadcaster(int argc, char *argv[])
{
- double time;
xbt_dynar_t host_list = NULL;
- msg_task_t task_la = NULL;
- msg_task_t task_bw = NULL;
- char sprintf_buffer_la[64];
- char sprintf_buffer_bw[64];
+ const char *first = NULL;
+ int status = !MSG_OK;
XBT_INFO("broadcaster");
- /* Check that every host in the command line actually exists and add it to a dynamic array */
- host_list = build_hostlist_from_argv(argc, argv);
+ /* Check that every host given by the hostcount in argv[1] exists and add it
+ to a dynamic array */
+ host_list = build_hostlist_from_hostcount(atoi(argv[1]));
+ /*host_list = build_hostlist_from_argv(argc, argv);*/
- build_chain(host_list);
+ /* TODO: Error checking */
+ status = broadcaster_build_chain(host_list);
+ status = broadcaster_send_file(host_list);
+ status = broadcaster_finish(host_list);
delete_hostlist(host_list);
XBT_INFO("task_bw->data = %le", *((double *) task_bw->data));
MSG_task_send(task_bw, argv[1]);
*/
- return 0;
-} /* end_of_client */
+ return status;
+}
int peer_wait_for_init()
{
- msg_task_t msg = NULL;
+ msg_task_t task = NULL;
const char *me = MSG_host_get_name(MSG_host_self());
- int a = MSG_task_receive(&msg, me);
+ int a = MSG_task_receive(&task, me);
if (a == MSG_OK) {
XBT_INFO("Peer %s got message\n", me);
}
+ task_message_delete(task);
+
return MSG_OK;
}
MSG_init(&argc, argv);
- if (argc != 3) {
+ /*if (argc <= 3) {
XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
argv[0]);
XBT_CRITICAL
("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
argv[0]);
exit(1);
- }
+ }*/
/* Options for the workstation/model: