return host_list;
}
-static void delete_hostlist(xbt_dynar_t h)
-{
- xbt_dynar_free(&h);
-}
-
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it)
+int broadcaster_build_chain(broadcaster_t bc)
{
msg_task_t task = NULL;
- char **cur = (char**)xbt_dynar_iterator_next(it);
+ char **cur = (char**)xbt_dynar_iterator_next(bc->it);
const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
const char *current_host = NULL;
const char *prev = NULL;
if (cur != NULL) {
/* init: prev=NULL, host=current cur, next=next cur */
next = *cur;
- *first = next;
+ bc->first = next;
/* This iterator iterates one step ahead: cur is current iterated element,
but it's actually the next one in the chain */
do {
/* following steps: prev=last, host=next, next=cur */
- cur = (char**)xbt_dynar_iterator_next(it);
+ cur = (char**)xbt_dynar_iterator_next(bc->it);
prev = last;
current_host = next;
if (cur != NULL)
return MSG_OK;
}
-int broadcaster_send_file(const char *first)
+int broadcaster_send_file(broadcaster_t bc)
{
- const char *me = MSG_host_get_name(MSG_host_self());
+ const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
+ msg_comm_t comm = NULL;
msg_task_t task = NULL;
int status;
- int piece_count = PIECE_COUNT;
- int cur = 0;
-
- for (; cur < piece_count; cur++) {
- task = task_message_data_new(me, first, NULL, 0);
- XBT_DEBUG("Sending (send) from %s into mailbox %s", me, first);
- status = MSG_task_send(task, first);
-
- xbt_assert(status == MSG_OK, "broadcaster_send_file() failed");
+ bc->current_piece = 0;
+
+ while (bc->current_piece < bc->piece_count) {
+ if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) {
+ task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
+ XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %d)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends));
+ comm = MSG_task_isend(task, bc->first);
+ queue_pending_connection(comm, bc->pending_sends);
+ bc->current_piece++;
+ } else {
+ MSG_process_sleep(0.01);
+ }
+ process_pending_connections(bc->pending_sends);
}
return MSG_OK;
}
-int broadcaster_finish(xbt_dynar_iterator_t it)
+int broadcaster_finish(broadcaster_t bc)
{
msg_task_t task = NULL;
const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
const char *current_host = NULL;
char **cur = NULL;
- xbt_dynar_iterator_seek(it, 0);
+ xbt_dynar_iterator_seek(bc->it, 0);
/* Send goodbye message to every peer in the order generated by iterator it */
- for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
+ for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
/* Send message to current peer */
current_host = *cur;
task = task_message_end_data_new(me, current_host);
return MSG_OK;
}
+broadcaster_t broadcaster_init(xbt_dynar_t host_list)
+{
+ int status;
+ broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
+
+ bc->piece_count = PIECE_COUNT;
+ bc->current_piece = 0;
+ bc->host_list = host_list;
+ bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
+ bc->max_pending_sends = MAX_PENDING_SENDS;
+ bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+
+ status = broadcaster_build_chain(bc);
+ xbt_assert(status == MSG_OK, "Chain initialization failed");
+
+ return bc;
+}
+
+static void broadcaster_destroy(broadcaster_t bc)
+{
+ /* Destroy iterator and hostlist */
+ xbt_dynar_iterator_delete(bc->it);
+ xbt_dynar_free(&bc->pending_sends);
+ xbt_dynar_free(&bc->host_list);
+}
/** Emitter function */
int broadcaster(int argc, char *argv[])
{
+ broadcaster_t bc = NULL;
xbt_dynar_t host_list = NULL;
const char *first = NULL;
- int status = !MSG_OK;
+ int status;
XBT_INFO("broadcaster");
/* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
host_list = build_hostlist_from_hostcount(atoi(argv[1]));
- /*host_list = build_hostlist_from_argv(argc, argv);*/
- /* Initialize iterator */
- xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+ bc = broadcaster_init(host_list);
/* TODO: Error checking */
- status = broadcaster_build_chain(&first, host_list, it);
- status = broadcaster_send_file(first);
- status = broadcaster_finish(it);
+ status = broadcaster_send_file(bc);
+ status = broadcaster_finish(bc);
- /* Destroy iterator and hostlist */
- xbt_dynar_iterator_delete(it);
- delete_hostlist(host_list);
+ broadcaster_destroy(bc);
return status;
}
#include "iterator.h"
#include "common.h"
+/* Connection parameters */
+#define MAX_PENDING_SENDS 10
+
+/* ``File'' details */
+#define PIECE_SIZE 16384
#define PIECE_COUNT 50
+/* Broadcaster struct */
+typedef struct s_broadcaster {
+ const char *first;
+ int piece_count;
+ int current_piece;
+ xbt_dynar_t host_list;
+ xbt_dynar_iterator_t it;
+ int max_pending_sends;
+ xbt_dynar_t pending_sends;
+} s_broadcaster_t, *broadcaster_t;
+
xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
/* Broadcaster: helper functions */
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it);
-int broadcaster_send_file(const char *first);
-int broadcaster_finish(xbt_dynar_iterator_t it);
+broadcaster_t broadcaster_init(xbt_dynar_t host_list);
+int broadcaster_build_chain(broadcaster_t bc);
+int broadcaster_send_file(broadcaster_t bc);
+int broadcaster_finish(broadcaster_t bc);
+static void broadcaster_destroy(broadcaster_t bc);
/* Tasks */
int broadcaster(int argc, char *argv[]);
> [ 0.000000] (7:peer@host6) peer
> [ 0.000000] (8:peer@host7) peer
> [ 0.000000] (9:peer@host8) peer
-> [ 93.000000] (2:peer@host1) Waiting for sends to finish before shutdown...
-> [ 225.700000] (3:peer@host2) Waiting for sends to finish before shutdown...
-> [ 294.700000] (4:peer@host3) Waiting for sends to finish before shutdown...
-> [ 298.600000] (5:peer@host4) Waiting for sends to finish before shutdown...
-> [ 309.100000] (6:peer@host5) Waiting for sends to finish before shutdown...
-> [ 314.300000] (7:peer@host6) Waiting for sends to finish before shutdown...
-> [ 318.300000] (8:peer@host7) Waiting for sends to finish before shutdown...
-> [ 318.400000] (0:@) Total simulation time: 3.184000e+02
-> [ 318.400000] (9:peer@host8) Waiting for sends to finish before shutdown...
+> [ 88.950000] (2:peer@host1) Waiting for sends to finish before shutdown...
+> [ 221.070000] (3:peer@host2) Waiting for sends to finish before shutdown...
+> [ 289.980000] (4:peer@host3) Waiting for sends to finish before shutdown...
+> [ 293.890000] (5:peer@host4) Waiting for sends to finish before shutdown...
+> [ 304.300000] (6:peer@host5) Waiting for sends to finish before shutdown...
+> [ 310.940000] (7:peer@host6) Waiting for sends to finish before shutdown...
+> [ 314.850000] (8:peer@host7) Waiting for sends to finish before shutdown...
+> [ 314.930000] (9:peer@host8) Waiting for sends to finish before shutdown...
+> [ 316.850000] (0:@) Total simulation time: 3.168500e+02
#include "messages.h"
-msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
+msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox)
{
message_t msg = xbt_new(s_message_t, 1);
msg->type = type;
msg->issuer_hostname = issuer_hostname;
msg->mailbox = mailbox;
- msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg);
+ msg_task_t task = MSG_task_create(NULL, 0, len, msg);
return task;
}
msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
{
- msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
+ msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, MESSAGE_BUILD_CHAIN_SIZE, issuer_hostname, mailbox);
message_t msg = MSG_task_get_data(task);
msg->prev_hostname = prev;
msg->next_hostname = next;
msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
{
- msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
+ msg_task_t task = task_message_new(MESSAGE_SEND_DATA, MESSAGE_SEND_DATA_HEADER_SIZE + len, issuer_hostname, mailbox);
//if (strcmp(mailbox, "host4") == 0)
//MSG_task_set_category(task, mailbox);
message_t msg = MSG_task_get_data(task);
msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
{
- return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
+ return task_message_new(MESSAGE_END_DATA, MESSAGE_END_DATA_SIZE, issuer_hostname, mailbox);
}
void task_message_delete(void *task)
#include "msg/msg.h"
#include "xbt/sysdep.h"
-#define MESSAGE_SIZE 1
+#define MESSAGE_BUILD_CHAIN_SIZE 40
+#define MESSAGE_SEND_DATA_HEADER_SIZE 10
+#define MESSAGE_END_DATA_SIZE 20
/* Messages enum */
typedef enum {
} s_message_t, *message_t;
/* Message methods */
-msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
+msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox);
msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
task = NULL;
} else {
process_pending_connections(peer->pending_sends);
- MSG_process_sleep(0.1);
+ MSG_process_sleep(0.01);
}
}
XBT_INFO("Waiting for sends to finish before shutdown...");
while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
process_pending_connections(p->pending_sends);
- MSG_process_sleep(0.1);
+ MSG_process_sleep(1);
}
xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
#include "messages.h"
#include "common.h"
-#define PEER_SHUTDOWN_DEADLINE 6000
+#define PEER_SHUTDOWN_DEADLINE 60000
/* Peer struct */
typedef struct s_peer {