"Messages specific for kadeploy");
#define MESSAGE_SIZE 1
+#define PIECE_COUNT 100
#define HOSTNAME_LENGTH 20
+#define PEER_SHUTDOWN_DEADLINE 600
+
/*
Data structures
*/
const char *next;
const char *me;
int pieces;
+ xbt_dynar_t pending_sends;
+ int close_asap; /* TODO: unused */
} s_peer_t, *peer_t;
/* Iterator methods */
int broadcaster_finish(xbt_dynar_t host_list);
/* Peer: helper functions */
-msg_error_t peer_wait_for_message();
+msg_error_t peer_wait_for_message(peer_t peer);
int peer_execute_task(peer_t peer, msg_task_t task);
void peer_init_chain(peer_t peer, message_t msg);
+void peer_shutdown(peer_t p);
+void peer_init(peer_t p);
/* Initialization stuff */
msg_error_t test_all(const char *platform_file,
void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
{
int next = it->criteria_fn((xbt_dynar_iterator_t)it);
- XBT_INFO("%d current\n", next);
+ //XBT_INFO("%d current\n", next);
if (next < 0) {
- XBT_INFO("Nothing to return!\n");
+ //XBT_INFO("Nothing to return!\n");
return NULL;
} else {
xbt_dynar_push(it->indices_list, &next);
msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
{
msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
+ if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
message_t msg = MSG_task_get_data(task);
msg->data_block = block;
msg->data_length = len;
return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
}
-
void task_message_delete(void *task)
{
message_t msg = MSG_task_get_data(task);
MSG_task_destroy(task);
}
+void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
+{
+ xbt_dynar_push(q, &comm);
+}
+
+int process_pending_connections(xbt_dynar_t q)
+{
+ unsigned int iter;
+ int status;
+ int empty = 0;
+ msg_comm_t comm;
+
+ xbt_dynar_foreach(q, iter, comm) {
+ empty = 1;
+ if (MSG_comm_test(comm)) {
+ MSG_comm_destroy(comm);
+ status = MSG_comm_get_status(comm);
+ xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
+ xbt_dynar_cursor_rm(q, &iter);
+ empty = 0;
+ }
+ }
+ return empty;
+}
+
xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
{
xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
for (; i < hostcount+1; i++) {
hostname = xbt_new(char, HOSTNAME_LENGTH);
snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
- XBT_INFO("%s", hostname);
+ //XBT_INFO("%s", hostname);
h = MSG_get_host_by_name(hostname);
if (h == NULL) {
XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
next = *cur;
else
next = NULL;
- XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+ //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
/* Send message to current peer */
task = task_message_chain_new(me, current_host, prev, next);
+ //MSG_task_set_category(task, current_host);
MSG_task_send(task, current_host);
last = current_host;
msg_comm_t comm = NULL;
int status;
- int piece_count = 10;
+ int piece_count = PIECE_COUNT;
int cur = 0;
for (; cur < piece_count; cur++) {
/* TODO: stub */
task = task_message_data_new(me, first, NULL, 0);
- XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
+ //XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
//comm = MSG_task_isend(task, first);
- //status =
- MSG_task_dsend(task, first, task_message_delete);
+ status = MSG_task_send(task, first);
+ //MSG_task_dsend(task, first, task_message_delete);
//status = MSG_comm_wait(comm, -1);
- //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+ xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
//MSG_comm_destroy(comm);
}
/* Send message to current peer */
current_host = *cur;
task = task_message_end_data_new(me, current_host);
+ //MSG_task_set_category(task, current_host);
MSG_task_send(task, current_host);
}
peer->init = 1;
}
-/* TODO: error checking */
void peer_forward_msg(peer_t peer, message_t msg)
{
int status;
msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
msg_comm_t comm = NULL;
- XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
- //comm =
- //status =
- MSG_task_dsend(task, peer->next, task_message_delete);
-
- //status = MSG_comm_wait(comm, -1);
- xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
- //MSG_comm_destroy(comm);
+ //XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
+ comm = MSG_task_isend(task, peer->next);
+ queue_pending_connection(comm, peer->pending_sends);
}
int peer_execute_task(peer_t peer, msg_task_t task)
{
- int done = 0, init = 0;
+ int done = 0;
message_t msg = MSG_task_get_data(task);
- XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
+ //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
switch (msg->type) {
case MESSAGE_BUILD_CHAIN:
peer_init_chain(peer, msg);
msg_task_t task = NULL;
int done = 0;
-
- /* TODO: Error checking is not correct */
while (!done) {
if (comm == NULL)
comm = MSG_task_irecv(&task, peer->me);
if (MSG_comm_test(comm)) {
status = MSG_comm_get_status(comm);
+ //XBT_INFO("peer_wait_for_message: error code = %d", status);
xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
MSG_comm_destroy(comm);
comm = NULL;
task_message_delete(task);
task = NULL;
} else {
+ process_pending_connections(peer->pending_sends);
MSG_process_sleep(0.01);
}
}
p->prev = NULL;
p->next = NULL;
p->pieces = 0;
+ p->close_asap = 0;
+ p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
p->me = MSG_host_get_name(MSG_host_self());
}
+void peer_shutdown(peer_t p)
+{
+ float start_time = MSG_get_clock();
+ float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
+
+ XBT_INFO("Waiting for sends to finish before shutdown...");
+ while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
+ process_pending_connections(p->pending_sends);
+ MSG_process_sleep(0.1);
+ }
+
+ xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
+ xbt_dynar_free(&p->pending_sends);
+
+ xbt_free(p);
+}
+
/** Peer function */
int peer(int argc, char *argv[])
{
peer_init(p);
status = peer_wait_for_message(p);
-
- xbt_free(p);
+ peer_shutdown(p);
return MSG_OK;
} /* end_of_receiver */
/* Simulation setting */
MSG_create_environment(platform_file);
+ /* Trace categories */
+ TRACE_category_with_color("host0", "0 0 1");
+ TRACE_category_with_color("host1", "0 1 0");
+ TRACE_category_with_color("host2", "0 1 1");
+ TRACE_category_with_color("host3", "1 0 0");
+ TRACE_category_with_color("host4", "1 0 1");
+ TRACE_category_with_color("host5", "1 1 0");
+
/* Application deployment */
MSG_function_register("broadcaster", broadcaster);
MSG_function_register("peer", peer);
MSG_init(&argc, argv);
-
/*if (argc <= 3) {
XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
argv[0]);