A
lgorithmique
N
umérique
D
istribuée
Public GIT Repository
projects
/
simgrid.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
| inline |
side by side
chainsend: fix compilation errors
[simgrid.git]
/
examples
/
msg
/
chainsend
/
broadcaster.c
diff --git
a/examples/msg/chainsend/broadcaster.c
b/examples/msg/chainsend/broadcaster.c
index
2d0b1c0
..
c87efc9
100644
(file)
--- a/
examples/msg/chainsend/broadcaster.c
+++ b/
examples/msg/chainsend/broadcaster.c
@@
-7,52
+7,21
@@
xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
{
xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
char *hostname = NULL;
- msg_host_t h = NULL;
int i = 1;
for (; i < hostcount+1; i++) {
hostname = xbt_new(char, HOSTNAME_LENGTH);
snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
- //XBT_INFO("%s", hostname);
- /*h = MSG_get_host_by_name(hostname);
- if (h == NULL) {
- XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
- abort();
- } else {*/
- xbt_dynar_push(host_list, &hostname);
- /*}*/
+ XBT_DEBUG("%s", hostname);
+ xbt_dynar_push(host_list, &hostname);
}
return host_list;
}
-/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
-{
- xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
- msg_host_t h = NULL;
- int i = 1;
-
- for (; i < argc; i++) {
- XBT_INFO("host%d = %s", i, argv[i]);
- h = MSG_get_host_by_name(argv[i]);
- if (h == NULL) {
- XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
- abort();
- } else {
- xbt_dynar_push(host_list, &(argv[i]));
- }
- }
- return host_list;
-}*/
-
-void delete_hostlist(xbt_dynar_t h)
-{
- xbt_dynar_free(&h);
-}
-
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it)
+int broadcaster_build_chain(broadcaster_t bc)
{
msg_task_t task = NULL;
- char **cur = (char**)xbt_dynar_iterator_next(it);
+ char **cur = (char**)xbt_dynar_iterator_next(
bc->
it);
const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
const char *current_host = NULL;
const char *prev = NULL;
@@
-63,20
+32,20
@@
int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar
if (cur != NULL) {
/* init: prev=NULL, host=current cur, next=next cur */
next = *cur;
-
*
first = next;
+
bc->
first = next;
/* This iterator iterates one step ahead: cur is current iterated element,
but it's actually the next one in the chain */
do {
/* following steps: prev=last, host=next, next=cur */
- cur = (char**)xbt_dynar_iterator_next(it);
+ cur = (char**)xbt_dynar_iterator_next(
bc->
it);
prev = last;
current_host = next;
if (cur != NULL)
next = *cur;
else
next = NULL;
-
//XBT_INFO
("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+
XBT_DEBUG
("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
/* Send message to current peer */
task = task_message_chain_new(me, current_host, prev, next);
@@
-90,38
+59,41
@@
int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar
return MSG_OK;
}
-int broadcaster_send_file(
const char *first
)
+int broadcaster_send_file(
broadcaster_t bc
)
{
- const char *me = MSG_host_get_name(MSG_host_self());
- msg_task_t task = NULL;
+ const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
msg_comm_t comm = NULL;
-
int status
;
+
msg_task_t task = NULL
;
- int piece_count = PIECE_COUNT;
- int cur = 0;
+ bc->current_piece = 0;
- for (; cur < piece_count; cur++) {
- task = task_message_data_new(me, first, NULL, 0);
- XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
- status = MSG_task_send(task, first);
-
- xbt_assert(status == MSG_OK, "broadcaster_send_file() failed");
+ while (bc->current_piece < bc->piece_count) {
+ if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) {
+ task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
+ XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %lu)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends));
+ comm = MSG_task_isend(task, bc->first);
+ queue_pending_connection(comm, bc->pending_sends);
+ bc->current_piece++;
+ } else {
+ MSG_process_sleep(0.01);
+ }
+ process_pending_connections(bc->pending_sends);
}
return MSG_OK;
}
-int broadcaster_finish(
xbt_dynar_iterator_t it
)
+int broadcaster_finish(
broadcaster_t bc
)
{
msg_task_t task = NULL;
const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
const char *current_host = NULL;
char **cur = NULL;
- xbt_dynar_iterator_seek(it, 0);
+ xbt_dynar_iterator_seek(
bc->
it, 0);
/* Send goodbye message to every peer in the order generated by iterator it */
- for (cur = (char**)xbt_dynar_iterator_next(
it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(
it)) {
+ for (cur = (char**)xbt_dynar_iterator_next(
bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->
it)) {
/* Send message to current peer */
current_host = *cur;
task = task_message_end_data_new(me, current_host);
@@
-132,31
+104,51
@@
int broadcaster_finish(xbt_dynar_iterator_t it)
return MSG_OK;
}
+broadcaster_t broadcaster_init(xbt_dynar_t host_list)
+{
+ int status;
+ broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
+
+ bc->piece_count = PIECE_COUNT;
+ bc->current_piece = 0;
+ bc->host_list = host_list;
+ bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
+ bc->max_pending_sends = MAX_PENDING_SENDS;
+ bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+
+ status = broadcaster_build_chain(bc);
+ xbt_assert(status == MSG_OK, "Chain initialization failed");
+
+ return bc;
+}
+
+static void broadcaster_destroy(broadcaster_t bc)
+{
+ /* Destroy iterator and hostlist */
+ xbt_dynar_iterator_delete(bc->it);
+ xbt_dynar_free(&bc->pending_sends);
+ xbt_dynar_free(&bc->host_list);
+}
/** Emitter function */
int broadcaster(int argc, char *argv[])
{
+ broadcaster_t bc = NULL;
xbt_dynar_t host_list = NULL;
- const char *first = NULL;
- int status = !MSG_OK;
+ int status;
XBT_INFO("broadcaster");
/* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
host_list = build_hostlist_from_hostcount(atoi(argv[1]));
- /*host_list = build_hostlist_from_argv(argc, argv);*/
- /* Initialize iterator */
- xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+ bc = broadcaster_init(host_list);
/* TODO: Error checking */
- status = broadcaster_build_chain(&first, host_list, it);
- status = broadcaster_send_file(first);
- status = broadcaster_finish(it);
+ status = broadcaster_send_file(bc);
+ status = broadcaster_finish(bc);
- /* Destroy iterator and hostlist */
- xbt_dynar_iterator_delete(it);
- delete_hostlist(host_list);
+ broadcaster_destroy(bc);
return status;
}