unsigned int data_length;
} s_message_t, *message_t;
+/* Peer struct */
+typedef struct s_peer {
+ int init;
+ const char *prev;
+ const char *next;
+ const char *me;
+} s_peer_t, *peer_t;
+
/* Iterator methods */
xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
/* Broadcaster: helper functions */
-int broadcaster_build_chain(xbt_dynar_t host_list);
-int broadcaster_send_file(xbt_dynar_t host_list);
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
+int broadcaster_send_file(const char *first);
int broadcaster_finish(xbt_dynar_t host_list);
/* Peer: helper functions */
-int peer_wait_for_init();
+msg_error_t peer_wait_for_message();
+int peer_execute_task(peer_t peer, msg_task_t task);
+void peer_init_chain(peer_t peer, message_t msg);
/* Initialization stuff */
msg_error_t test_all(const char *platform_file,
xbt_dynar_free(&h);
}
-int broadcaster_build_chain(xbt_dynar_t host_list)
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
{
xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
msg_task_t task = NULL;
char **cur = (char**)xbt_dynar_iterator_next(it);
+ const char *me = MSG_host_get_name(MSG_host_self());
const char *current_host = NULL;
const char *prev = NULL;
const char *next = NULL;
- const char *me = MSG_host_get_name(MSG_host_self());
const char *last = NULL;
/* Build the chain if there's at least one peer */
if (cur != NULL) {
/* init: prev=NULL, host=current cur, next=next cur */
next = *cur;
+ *first = next;
/* This iterator iterates one step ahead: cur is current iterated element,
but it's actually the next one in the chain */
return MSG_OK;
}
-int broadcaster_send_file(xbt_dynar_t host_list)
+int broadcaster_send_file(const char *first)
{
- /* ... */
+ const char *me = MSG_host_get_name(MSG_host_self());
+ msg_task_t task = NULL;
+ msg_comm_t comm = NULL;
+ int status;
+
+ int piece_count = 10;
+ int cur = 0;
+
+ for (; cur < piece_count; cur++) {
+ /* TODO: stub */
+ task = task_message_data_new(me, first, NULL, 0);
+ XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
+ comm = MSG_task_isend(task, first);
+
+ status = MSG_comm_wait(comm, -1);
+ xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+ MSG_comm_destroy(comm);
+ }
return MSG_OK;
}
/* Send goodbye message to every peer */
for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
- /* Send message to current peer */
- current_host = *cur;
- task = task_message_end_data_new(me, current_host);
- MSG_task_send(task, current_host);
+ /* Send message to current peer */
+ current_host = *cur;
+ task = task_message_end_data_new(me, current_host);
+ MSG_task_send(task, current_host);
}
return MSG_OK;
/*host_list = build_hostlist_from_argv(argc, argv);*/
/* TODO: Error checking */
- status = broadcaster_build_chain(host_list);
- status = broadcaster_send_file(host_list);
+ status = broadcaster_build_chain(&first, host_list);
+ status = broadcaster_send_file(first);
status = broadcaster_finish(host_list);
delete_hostlist(host_list);
- /* Latency */
- /*time = MSG_get_clock();
- sprintf(sprintf_buffer_la, "latency task");
- task_la =
- MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL);
- task_la->data = xbt_new(double, 1);
- *(double *) task_la->data = time;
- XBT_INFO("task_la->data = %le", *((double *) task_la->data));
- MSG_task_send(task_la, argv[1]);*/
-
- /* Bandwidth */
- /*time = MSG_get_clock();
- sprintf(sprintf_buffer_bw, "bandwidth task");
- task_bw =
- MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL);
- task_bw->data = xbt_new(double, 1);
- *(double *) task_bw->data = time;
- XBT_INFO("task_bw->data = %le", *((double *) task_bw->data));
- MSG_task_send(task_bw, argv[1]);
- */
return status;
}
-int peer_wait_for_init()
+/*******************************************************
+ * Peer *
+ *******************************************************/
+
+void peer_init_chain(peer_t peer, message_t msg)
+{
+ peer->prev = msg->prev_hostname;
+ peer->next = msg->next_hostname;
+}
+
+/* TODO: error checking */
+void peer_forward_msg(peer_t peer, message_t msg)
+{
+ int status;
+ msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
+ msg_comm_t comm = NULL;
+ XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
+ comm = MSG_task_isend(task, peer->next);
+
+ status = MSG_comm_wait(comm, -1);
+ xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
+ MSG_comm_destroy(comm);
+}
+
+int peer_execute_task(peer_t peer, msg_task_t task)
+{
+ int done = 0, init = 0;
+ message_t msg = MSG_task_get_data(task);
+
+ XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
+ switch (msg->type) {
+ case MESSAGE_BUILD_CHAIN:
+ peer_init_chain(peer, msg);
+ peer->init = 1;
+ break;
+ case MESSAGE_SEND_DATA:
+ xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
+ if (peer->next != NULL)
+ peer_forward_msg(peer, msg);
+ break;
+ case MESSAGE_END_DATA:
+ xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
+ done = 1;
+ break;
+ }
+
+ MSG_task_execute(task);
+
+ return done;
+}
+
+msg_error_t peer_wait_for_message(peer_t peer)
{
+ msg_error_t status;
+ msg_comm_t comm;
+ int done = 0;
+
msg_task_t task = NULL;
- const char *me = MSG_host_get_name(MSG_host_self());
- int a = MSG_task_receive(&task, me);
+ /* TODO: Error checking is not correct */
+ while (!done) {
+ comm = MSG_task_irecv(&task, peer->me);
+ status = MSG_comm_wait(comm, -1);
+ xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
+ MSG_comm_destroy(comm);
- if (a == MSG_OK) {
- XBT_INFO("Peer %s got message\n", me);
+ done = peer_execute_task(peer, task);
+ task_message_delete(task);
+ task = NULL;
}
- task_message_delete(task);
+ return status;
+}
- return MSG_OK;
+void peer_init(peer_t p)
+{
+ p->init = 0;
+ p->prev = NULL;
+ p->next = NULL;
+ p->me = MSG_host_get_name(MSG_host_self());
}
/** Peer function */
int peer(int argc, char *argv[])
{
- double time, time1, sender_time;
- msg_task_t task_la = NULL;
- msg_task_t task_bw = NULL;
- int a;
- double communication_time = 0;
+ peer_t p = xbt_new(s_peer_t, 1);
+ msg_error_t status;
XBT_INFO("peer");
- time = MSG_get_clock();
-
- a = peer_wait_for_init();
- /* Get Latency */
- /*a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self()));
- if (a == MSG_OK) {
- time1 = MSG_get_clock();
- sender_time = *((double *) (task_la->data));
- time = sender_time;
- communication_time = time1 - time;
- XBT_INFO("Task received : %s", task_la->name);
- xbt_free(task_la->data);
- MSG_task_destroy(task_la);
- XBT_INFO("Communic. time %le", communication_time);
- XBT_INFO("--- la %f ----", communication_time);
- } else {
- xbt_die("Unexpected behavior");
- }*/
+ peer_init(p);
+ status = peer_wait_for_message(p);
+ xbt_free(p);
- /* Get Bandwidth */
- /*a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self()));
- if (a == MSG_OK) {
- time1 = MSG_get_clock();
- sender_time = *((double *) (task_bw->data));
- time = sender_time;
- communication_time = time1 - time;
- XBT_INFO("Task received : %s", task_bw->name);
- xbt_free(task_bw->data);
- MSG_task_destroy(task_bw);
- XBT_INFO("Communic. time %le", communication_time);
- XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time);
- } else {
- xbt_die("Unexpected behavior");
- }*/
-
-
- return 0;
+ return MSG_OK;
} /* end_of_receiver */