/* register data which may be sent (common to client and server) */
static void register_messages(void) {
- gras_msgtype_declare("data", gras_datadesc_by_name("int"));
}
/* Function prototypes */
-int node (int argc,char *argv[]);
+int receiver (int argc,char *argv[]);
+int sender (int argc,char *argv[]);
/* **********************************************************************
- * node code
+ * Receiver code
* **********************************************************************/
+int receiver (int argc,char *argv[]) {
-/* Global private data */
-typedef struct {
- gras_socket_t sock;
- int done;
-} node_data_t;
-
-static void free_host(void *d){
- xbt_host_t h=*(xbt_host_t*)d;
- free(h->name);
- free(h);
-}
-
-static void kill_buddy(char *name,int port){
- gras_socket_t sock=gras_socket_client(name,port);
- gras_msg_send(sock,gras_msgtype_by_name("kill"),NULL);
- gras_socket_close(sock);
-}
-
-static void kill_buddy_dynar(void *b) {
- xbt_host_t buddy=*(xbt_host_t*)b;
- kill_buddy(buddy->name,buddy->port);
-}
-
-static int node_cb_data_handler(gras_msg_cb_ctx_t ctx,
- void *payload_data) {
-
- /* Get the payload into the msg variable */
- int data=*(int*)payload_data;
+ int myport; /* port on which I receive stuff */
+ int todo; /* amount of messages I should get */
+ int data; /* message content */
+
+ gras_socket_t mysock; /* socket on which other people contact me */
+ gras_socket_t expeditor; /* to notice who wrote me */
+
+ /* Init the GRAS infrastructure and declare my globals */
+ gras_init(&argc,argv);
+
+ /* Get my settings from the command line */
+ myport=atoi(argv[1]);
+ todo=atoi(argv[2]);
- gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
+ /* Create my master socket */
+ mysock = gras_socket_server(myport);
- /* Retrieve the server's state (globals) */
+ /* Register the known messages */
+ gras_msgtype_declare("data", gras_datadesc_by_name("int"));
- node_data_t *globals=(node_data_t*)gras_userdata_get();
- globals->done = 0;
+ /* Get the data */
+
+ INFO2("Listening on port %d (expecting %d messages)",
+ gras_socket_my_port(mysock),
+ todo);
+ while (todo>0) {
+ gras_msg_wait(60 /* wait up to one minute */,
+ gras_msgtype_by_name("data"),
+ &expeditor,
+ &data);
+ todo--;
+
+ INFO4("Got Data(%d) from %s:%d (still %d to go)",
+ data,
+ gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor),
+ todo);
- /* Log which client connected */
- INFO3(">>>>>>>> Got Data(%d) from %s:%d <<<<<<<<",
- data,
- gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
-
- /* Set the done boolean to true (and make sure the server stops after receiving it). */
- globals->done = 1;
-
- /* Make sure we don't leak sockets */
- //gras_socket_close(expeditor);
+ }
- /* Tell GRAS that we consummed this message */
- return 1;
-} /* end_of_server_cb_ping_handler */
+ /* Free the allocated resources, and shut GRAS down */
+ gras_socket_close(mysock);
+
+ gras_exit();
+ return 0;
+} /* end_of_receiver */
+
+/* **********************************************************************
+ * Sender code
+ * **********************************************************************/
-int node (int argc,char *argv[]) {
+int sender (int argc,char *argv[]) {
- xbt_ex_t e;
-
- int port,nb_hosts,data,
- i,done;
+ int i; /* iterator */
+ int data; /* data exchanged */
- xbt_host_t h1;
+ xbt_host_t h; /* iterator */
gras_socket_t peer; /* socket to node */
- node_data_t *globals;
-
/* xbt_dynar for hosts */
- xbt_dynar_t hosts = xbt_dynar_new(sizeof(xbt_host_t),&free_host);
+ xbt_dynar_t hosts = xbt_dynar_new(sizeof(xbt_host_t),&xbt_host_free_voidp);
/* Init the GRAS infrastructure and declare my globals */
gras_init(&argc,argv);
- globals=gras_userdata_new(node_data_t *);
-
- /* Get the port I should listen on from the command line, if specified */
- if (argc > 2) {
- port=atoi(argv[1]);
- }
-
/* Get the node location from argc/argv */
for (i=2; i<argc; i++){
- xbt_host_t host=xbt_new(s_xbt_host_t,1);
- host->name=strdup(argv[i]);
- host->port=atoi(argv[1]);
- INFO2("New node : %s:%d",host->name,host->port);
+ xbt_host_t host = xbt_host_new(argv[i],atoi(argv[1]));
xbt_dynar_push(hosts,&host);
}
- nb_hosts = xbt_dynar_length(hosts);
- INFO1("Launch current node (port=%d)", port);
-
- /* Create my master socket */
- globals->sock = gras_socket_server(port);
+ INFO0("Launch current node");
/* Register the known messages */
- register_messages();
- register_messages();
+ gras_msgtype_declare("data", gras_datadesc_by_name("int"));
- /* 3. Wait for others nodesthe startup */
+ /* Wait for receivers to startup */
gras_os_sleep(1);
-
-
- /* Register my callback */
- gras_cb_register(gras_msgtype_by_name("data"),&node_cb_data_handler);
- INFO1(">>>>>>>> Listening on port %d <<<<<<<<", gras_socket_my_port(globals->sock));
- globals->done=0;
-
+ /* write 'em */
data =1000;
- xbt_dynar_foreach(hosts,i,h1) {
- peer = gras_socket_client(h1->name,h1->port);
- done=0;
- while (!done){
- TRY {
- gras_msg_handle(0);
- }CATCH(e){
- if (e.category != timeout_error)
- RETHROW;
- xbt_ex_free(e);
- done = 1;
- }
- }
-
- gras_msg_send(peer,gras_msgtype_by_name("data"),&data);
- INFO3(">>>>>>>> Send Data (%d) from %s to %s <<<<<<<<",
- data,argv[0],h1->name);
+ xbt_dynar_foreach(hosts,i,h) {
+
+ peer = gras_socket_client(h->name,h->port);
+ gras_msg_send(peer,gras_msgtype_by_name("data"),&data);
+ INFO3(" Sent Data (%d) from %s to %s",
+ data,gras_os_myname(),h->name);
+ gras_socket_close(peer);
}
- if (!globals->done)
- WARN0("An error occured, the done was not set by the callback");
-
- /* Free the allocated resources, and shut GRAS down */
- gras_socket_close(globals->sock);
- free(globals);
+ /* Free the allocated resources, and shut GRAS down */
+ xbt_dynar_free(&hosts);
+
gras_exit();
-
- //xbt_dynar_map(hosts,kill_buddy_dynar);
- //xbt_dynar_free(&hosts);
-
- INFO0("Done.");
return 0;
-} /* end_of_node */
+} /* end_of_sender */
<!DOCTYPE platform_description SYSTEM "surfxml.dtd">
<platform_description>
<!-- The Tremblay node, arguments :: all others nodes -->
- <process host="Tremblay" function="node">
+ <process host="Tremblay" function="sender">
<argument value="4000"/>
<argument value="Jupiter"/>
<argument value="Fafard"/>
<argument value="Bourassa"/>
</process>
<!-- The Jupiter node, arguments :: all others nodes -->
- <process host="Jupiter" function="node">
+ <process host="Jupiter" function="sender">
<argument value="4000"/>
<argument value="Tremblay"/>
<argument value="Fafard"/>
<argument value="Bourassa"/>
</process>
<!-- The Fafard node, arguments :: all others nodes -->
- <process host="Fafard" function="node">
+ <process host="Fafard" function="sender">
<argument value="4000"/>
<argument value="Tremblay"/>
<argument value="Jupiter"/>
<argument value="Bourassa"/>
</process>
<!-- The Ginette node, arguments :: all others nodes -->
- <process host="Ginette" function="node">
+ <process host="Ginette" function="sender">
<argument value="4000"/>
<argument value="Tremblay"/>
<argument value="Jupiter"/>
<argument value="Bourassa"/>
</process>
<!-- The Bourassa node, arguments :: all others nodes -->
- <process host="Bourassa" function="node">
+ <process host="Bourassa" function="sender">
<argument value="4000"/>
<argument value="Tremblay"/>
<argument value="Jupiter"/>
<argument value="Fafard"/>
<argument value="Ginette"/>
</process>
+
+
+ <process host="Tremblay" function="receiver">
+ <argument value="4000"/>
+ <argument value="4"/>
+ </process>
+ <process host="Jupiter" function="receiver">
+ <argument value="4000"/>
+ <argument value="4"/>
+ </process>
+ <process host="Fafard" function="receiver">
+ <argument value="4000"/>
+ <argument value="4"/>
+ </process>
+ <process host="Ginette" function="receiver">
+ <argument value="4000"/>
+ <argument value="4"/>
+ </process>
+ <process host="Bourassa" function="receiver">
+ <argument value="4000"/>
+ <argument value="4"/>
+ </process>
</platform_description>