-/* $Id$ */
-
/* ALL2ALL - all2all of GRAS features */
-/* Copyright (c) 2006 Ahmed Harbaoui. All rights reserved. */
+/* Copyright (c) 2006, 2007, 2009, 2010. The SimGrid Team.
+ * All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
+ * under the terms of the license (GNU LGPL) which comes with this package. */
#include "gras.h"
#include "xbt/ex.h"
-XBT_LOG_NEW_DEFAULT_CATEGORY(all2all,"Messages specific to this example");
+XBT_LOG_NEW_DEFAULT_CATEGORY(all2all, "Messages specific to this example");
/* register data which may be sent (common to client and server) */
-static void register_messages(void) {
- gras_msgtype_declare("data", gras_datadesc_by_name("int"));
+static void register_messages(void)
+{
}
/* Function prototypes */
-int node (int argc,char *argv[]);
+int receiver(int argc, char *argv[]);
+int sender(int argc, char *argv[]);
/* **********************************************************************
- * node code
+ * Receiver code
* **********************************************************************/
+int receiver(int argc, char *argv[])
+{
-/* Global private data */
-typedef struct {
- gras_socket_t sock;
- int done;
-} node_data_t;
+ int myport; /* port on which I receive stuff */
+ int todo; /* amount of messages I should get */
+ char *data; /* message content */
-static void free_host(void *d){
- xbt_host_t h=*(xbt_host_t*)d;
- free(h->name);
- free(h);
-}
+ gras_socket_t mysock; /* socket on which other people contact me */
+ gras_socket_t expeditor; /* to notice who wrote me */
-static void kill_buddy(char *name,int port){
- gras_socket_t sock=gras_socket_client(name,port);
- gras_msg_send(sock,gras_msgtype_by_name("kill"),NULL);
- gras_socket_close(sock);
-}
+ /* Init the GRAS infrastructure and declare my globals */
+ gras_init(&argc, argv);
-static void kill_buddy_dynar(void *b) {
- xbt_host_t buddy=*(xbt_host_t*)b;
- kill_buddy(buddy->name,buddy->port);
-}
+ /* Get my settings from the command line */
+ myport = atoi(argv[1]);
+ todo = atoi(argv[2]);
-static int node_cb_data_handler(gras_msg_cb_ctx_t ctx,
- void *payload_data) {
-
- /* Get the payload into the msg variable */
- int data=*(int*)payload_data;
-
- gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
-
- /* Retrieve the server's state (globals) */
-
- node_data_t *globals=(node_data_t*)gras_userdata_get();
- globals->done = 0;
-
- /* Log which client connected */
- INFO3(">>>>>>>> Got Data(%d) from %s:%d <<<<<<<<",
- data,
- gras_socket_peer_name(expeditor), gras_socket_peer_port(expeditor));
-
- /* Set the done boolean to true (and make sure the server stops after receiving it). */
- globals->done = 1;
-
- /* Make sure we don't leak sockets */
- //gras_socket_close(expeditor);
-
- /* Tell GRAS that we consummed this message */
- return 1;
-} /* end_of_server_cb_ping_handler */
-
-int node (int argc,char *argv[]) {
-
- xbt_ex_t e;
-
- int port,nb_hosts,data,
- i,done;
-
- xbt_host_t h1;
-
- gras_socket_t peer; /* socket to node */
-
- node_data_t *globals;
-
- /* xbt_dynar for hosts */
- xbt_dynar_t hosts = xbt_dynar_new(sizeof(xbt_host_t),&free_host);
-
- /* Init the GRAS infrastructure and declare my globals */
- gras_init(&argc,argv);
-
- globals=gras_userdata_new(node_data_t *);
+ /* Register the known messages */
+ gras_msgtype_declare("data", gras_datadesc_by_name("string"));
+
+ /* Create my master socket */
+ mysock = gras_socket_server(myport);
+
+ /* Get the data */
+ XBT_INFO("Listening on port %d (expecting %d messages)",
+ gras_socket_my_port(mysock), todo);
+ while (todo > 0) {
+ gras_msg_wait(60 /* wait up to one minute */ ,
+ "data", &expeditor, &data);
+ todo--;
+ free(data);
+
+ XBT_INFO("Got Data from %s:%d (still %d to go)",
+ gras_socket_peer_name(expeditor),
+ gras_socket_peer_port(expeditor), todo);
- /* Get the port I should listen on from the command line, if specified */
- if (argc > 2) {
- port=atoi(argv[1]);
}
+ /* Free the allocated resources, and shut GRAS down */
+ gras_socket_close(mysock);
+
+ gras_exit();
+ return 0;
+} /* end_of_receiver */
+
+/* **********************************************************************
+ * Sender code
+ * **********************************************************************/
+
+int sender(int argc, char *argv[])
+{
+
+ unsigned int iter; /* iterator */
+ char *data; /* data exchanged */
+ int datasize; /* size of message */
+ xbt_peer_t h; /* iterator */
+ int connected = 0;
+
+ gras_socket_t peer = NULL; /* socket to node */
+
+
+ /* xbt_dynar for peers */
+ xbt_dynar_t peers =
+ xbt_dynar_new(sizeof(xbt_peer_t), &xbt_peer_free_voidp);
+
+ /* Init the GRAS infrastructure and declare my globals */
+ gras_init(&argc, argv);
+
/* Get the node location from argc/argv */
- for (i=2; i<argc; i++){
- xbt_host_t host=xbt_new(s_xbt_host_t,1);
- host->name=strdup(argv[i]);
- host->port=atoi(argv[1]);
- INFO2("New node : %s:%d",host->name,host->port);
- xbt_dynar_push(hosts,&host);
+ for (iter = 1; iter < argc - 1; iter++) {
+ xbt_peer_t peer = xbt_peer_from_string(argv[iter]);
+ xbt_dynar_push(peers, &peer);
}
- nb_hosts = xbt_dynar_length(hosts);
- INFO1("Launch current node (port=%d)", port);
+ datasize = atoi(argv[argc - 1]);
- /* Create my master socket */
- globals->sock = gras_socket_server(port);
+ data = (char *) malloc(datasize + 1); // allocation of datasize octets
+ memset(data, 32, datasize);
+ data[datasize] = '\0';
+
+ XBT_INFO("Launch current node");
/* Register the known messages */
- register_messages();
- register_messages();
-
- /* 3. Wait for others nodesthe startup */
- gras_os_sleep(1);
-
-
- /* Register my callback */
- gras_cb_register(gras_msgtype_by_name("data"),&node_cb_data_handler);
-
- INFO1(">>>>>>>> Listening on port %d <<<<<<<<", gras_socket_my_port(globals->sock));
- globals->done=0;
-
- data =1000;
- xbt_dynar_foreach(hosts,i,h1) {
- peer = gras_socket_client(h1->name,h1->port);
- done=0;
- while (!done){
+ gras_msgtype_declare("data", gras_datadesc_by_name("string"));
+
+
+ /* write to the receivers */
+ xbt_dynar_foreach(peers, iter, h) {
+ connected = 0;
+ while (!connected) {
+ xbt_ex_t e;
TRY {
- gras_msg_handle(0);
- }CATCH(e){
- if (e.category != timeout_error)
- RETHROW;
- xbt_ex_free(e);
- done = 1;
+ peer = gras_socket_client(h->name, h->port);
+ connected = 1;
}
+ CATCH(e) {
+ if (e.category != system_error /*in RL */
+ && e.category != mismatch_error /*in SG */ )
+ RETHROW;
+ xbt_ex_free(e);
+ gras_os_sleep(0.01);
+ }
+ }
+ gras_msg_send(peer, "data", &data);
+ if (gras_if_SG()) {
+ XBT_INFO(" Sent Data from %s to %s", gras_os_myname(), h->name);
+ } else {
+ XBT_INFO(" Sent Data");
}
-
- gras_msg_send(peer,gras_msgtype_by_name("data"),&data);
- INFO3(">>>>>>>> Send Data (%d) from %s to %s <<<<<<<<",
- data,argv[0],h1->name);
+
+ gras_socket_close(peer);
}
- if (!globals->done)
- WARN0("An error occured, the done was not set by the callback");
+ /* Free the allocated resources, and shut GRAS down */
+ free(data);
+ xbt_dynar_free(&peers);
- /* Free the allocated resources, and shut GRAS down */
- gras_socket_close(globals->sock);
- free(globals);
gras_exit();
-
- //xbt_dynar_map(hosts,kill_buddy_dynar);
- //xbt_dynar_free(&hosts);
-
- INFO0("Done.");
return 0;
-} /* end_of_node */
+} /* end_of_sender */