From fb0c855fd154347c6ee4e43362d7509c7c5eafef Mon Sep 17 00:00:00 2001 From: illogict Date: Thu, 6 Apr 2006 13:07:43 +0000 Subject: [PATCH] Update for chord example, indentation, cleaning out... git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@2090 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- examples/gras/p2p/chord/chord.c | 520 +++++++++++-------- examples/gras/p2p/chord/chord_deployment.xml | 6 +- 2 files changed, 314 insertions(+), 212 deletions(-) diff --git a/examples/gras/p2p/chord/chord.c b/examples/gras/p2p/chord/chord.c index 5789cb1452..2fd9b6a6be 100644 --- a/examples/gras/p2p/chord/chord.c +++ b/examples/gras/p2p/chord/chord.c @@ -1,5 +1,5 @@ /* - * vim:ts=2:sw=2:noexpandtab + * vim:ts=2:sw=2:expandtab */ #include "xbt/sysdep.h" @@ -11,272 +11,372 @@ static void check_predecessor(void); XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example"); typedef enum msg_typus{ - PING, - PONG, - GET_PRE, - REP_PRE, - GET_SUC, - REP_SUC, - STD, + PING, + PONG, + GET_PRE, + REP_PRE, + GET_SUC, + REP_SUC, + STD, }msg_typus; /*GRAS_DEFINE_TYPE(s_pbio, - struct s_pbio{ - msg_typus type; - int dest; - char msg[1024]; - }; + struct s_pbio{ + msg_typus type; + int dest; + char msg[1024]; + }; ); typedef struct s_pbio pbio_t;*/ -//GRAS_DEFINE_TYPE(s_ping, - struct s_ping{ - int id; - }; -//); +/*GRAS_DEFINE_TYPE(s_ping,*/ + struct s_ping{ + int id; + }; +/*);*/ typedef struct s_ping ping_t; -//GRAS_DEFINE_TYPE(s_pong, - struct s_pong{ - int id; - int failed; - }; -//); +/*GRAS_DEFINE_TYPE(s_pong,*/ + struct s_pong{ + int id; + int failed; + }; +/*);*/ typedef struct s_pong pong_t; +GRAS_DEFINE_TYPE(s_notify, + struct s_notify{ + int id; + char host[1024]; + int port; + }; +); +typedef struct s_notify notify_t; + GRAS_DEFINE_TYPE(s_get_suc, - struct s_get_suc{ - int id; - }; + struct s_get_suc{ + int id; + }; ); typedef struct s_get_suc get_suc_t; GRAS_DEFINE_TYPE(s_rep_suc, - struct s_rep_suc{ - int id; - char host[1024]; - int port; - }; + struct s_rep_suc{ + int id; + char host[1024]; + int port; + }; ); typedef struct s_rep_suc rep_suc_t; typedef struct finger_elem{ - int id; - char host[1024]; - int port; + int id; + char host[1024]; + int port; }finger_elem; static void register_messages(){ -/* gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/ - gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc)); - gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc)); +/* gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/ + gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc)); + gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc)); + gras_msgtype_declare("chord_notify",gras_datadesc_by_symbol(s_notify)); } /* Global private data */ typedef struct{ - gras_socket_t sock; /* server socket on which I'm listening */ - int id; /* my id number */ - char host[1024]; /* my host name */ - int port; /* port on which I'm listening FIXME */ - int fingers; /* how many fingers */ - finger_elem *finger; /* finger table */ - char pre_host[1024]; /* predecessor host */ - int pre_port; /* predecessor port */ + gras_socket_t sock; /* server socket on which I'm listening */ + int id; /* my id number */ + char host[1024]; /* my host name */ + int port; /* port on which I'm listening FIXME */ + int fingers; /* how many fingers */ + finger_elem *finger; /* finger table */ + int next_to_fix; /* next in the finger list to be checked */ + int pre_id; /* predecessor id */ + char pre_host[1024]; /* predecessor host */ + int pre_port; /* predecessor port */ }node_data_t; int node(int argc,char **argv); /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){ - xbt_ex_t e; - pbio_t pbio_i=*(pbio_t*)payload_data; + xbt_ex_t e; + pbio_t pbio_i=*(pbio_t*)payload_data; - node_data_t *globals=(node_data_t*)gras_userdata_get(); + node_data_t *globals=(node_data_t*)gras_userdata_get(); - INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest); + INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest); }*/ static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){ - gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx); - xbt_ex_t e; - get_suc_t incoming=*(get_suc_t*)payload_data; - rep_suc_t outgoing; - node_data_t *globals=(node_data_t*)gras_userdata_get(); - INFO2("Received a get_successor message from %s for %d",gras_socket_peer_name(expeditor),incoming.id); - if((globals->id==globals->finger[0].id)||(incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){ - outgoing.id=globals->finger[0].id; - snprintf(outgoing.host,1024,globals->finger[0].host); - outgoing.port=globals->finger[0].port; - INFO0("My successor is his successor!"); - }else{ - gras_socket_t temp_sock; - int contact=closest_preceding_node(incoming.id); - get_suc_t asking;asking.id=incoming.id; - TRY{ - temp_sock=gras_socket_client(globals->finger[contact].host,globals->finger[contact].port); - }CATCH(e){ - RETHROW0("Unable to connect!: %s"); - } - TRY{ - gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking); - }CATCH(e){ - RETHROW0("Unable to ask!: %s"); - } - gras_msg_wait(10.0,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,&outgoing); - } - - TRY{ - gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing); - INFO0("Successor information sent!"); - }CATCH(e){ - RETHROW2("%s:Timeout sending successor information to %s: %s",globals->host,gras_socket_peer_name(expeditor)); - } - gras_socket_close(expeditor); - return(1); + gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx); + xbt_ex_t e; + get_suc_t incoming=*(get_suc_t*)payload_data; + rep_suc_t outgoing; + node_data_t *globals=(node_data_t*)gras_userdata_get(); + INFO2("Received a get_successor message from %s for %d", + gras_socket_peer_name(expeditor),incoming.id); + if((globals->id==globals->finger[0].id)|| + (incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){ + outgoing.id=globals->finger[0].id; + snprintf(outgoing.host,1024,globals->finger[0].host); + outgoing.port=globals->finger[0].port; + INFO0("My successor is his successor!"); + }else{ + gras_socket_t temp_sock; + int contact=closest_preceding_node(incoming.id); + if(contact==-1){ + outgoing.id=globals->finger[0].id; + snprintf(outgoing.host,1024,globals->finger[0].host); + outgoing.port=globals->finger[0].port; + INFO0("My successor is his successor!"); + }else{ + get_suc_t asking;asking.id=incoming.id; + TRY{ + temp_sock=gras_socket_client(globals->finger[contact].host, + globals->finger[contact].port); + }CATCH(e){ + RETHROW0("Unable to connect!: %s"); + } + TRY{ + gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking); + }CATCH(e){ + RETHROW0("Unable to ask!: %s"); + } + gras_msg_wait(10.,gras_msgtype_by_name("chord_rep_suc"),&temp_sock, + &outgoing); + } + } + + TRY{ + gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing); + INFO0("Successor information sent!"); + }CATCH(e){ + RETHROW2("%s:Timeout sending successor information to %s: %s", + globals->host,gras_socket_peer_name(expeditor)); + } + gras_socket_close(expeditor); + return(1); } static int closest_preceding_node(int id){ - node_data_t *globals=(node_data_t*)gras_userdata_get(); - int i; - for(i=globals->fingers-1;i>=0;i--){ - if(globals->finger[i].id>globals->id&&globals->finger[i].idfingers-1;i>=0;i--){ + if(globals->finger[i].id>globals->id&&globals->finger[i].idpre_id==-1|| + (incoming.id>globals->pre_id&&incoming.idid)){ + globals->pre_id=incoming.id; + snprintf(globals->pre_host,1024,incoming.host); + globals->pre_port=incoming.port; + INFO0("Set as my new predecessor!"); + } + return(1); +} -static void check_predecessor() { +static void fix_fingers(){ + xbt_ex_t e; + gras_socket_t temp_sock=NULL; + gras_socket_t temp_sock2=NULL; + node_data_t *globals=(node_data_t*)gras_userdata_get(); + + TRY{ + temp_sock=gras_socket_client(globals->host,globals->port); + }CATCH(e){ + RETHROW0("Unable to contact known host: %s"); + } + get_suc_t get_suc_msg;get_suc_msg.id=globals->id; + TRY{ + gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg); + }CATCH(e){ + gras_socket_close(temp_sock); + RETHROW0("Unable to contact known host to get successor!: %s"); + } + rep_suc_t rep_suc_msg; + TRY{ + INFO0("Waiting for reply!"); + gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2, + &rep_suc_msg); + }CATCH(e){ + RETHROW1("%s: Error waiting for successor:%s",globals->host); + } + globals->finger[0].id=rep_suc_msg.id; + snprintf(globals->finger[0].host,1024,rep_suc_msg.host); + globals->finger[0].port=rep_suc_msg.port; + INFO1("→ Finger %d fixed!",globals->next_to_fix); + gras_socket_close(temp_sock); + + globals->next_to_fix=(++globals->next_to_fix==globals->fingers)? + 0:globals->next_to_fix; +} + +static void check_predecessor(){ node_data_t *globals = (node_data_t*)gras_userdata_get(); gras_socket_t temp_sock; xbt_ex_t e; -if (globals->pre_host[0] == 0) - { - return; - } -TRY - { - temp_sock = gras_socket_client( globals->pre_host, globals->pre_port ); - } -CATCH(e) - { - globals->pre_host[0] = 0; - globals->pre_port = 0; - } -ping_t ping_kong; -pong_t king_pong; -ping_kong.id = 0; -TRY - { - gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping_kong); - } -CATCH(e) - { - globals->pre_host[0] = 0; - globals->pre_port = 0; - } -TRY - { - gras_msg_wait( 6969, gras_msgtype_by_name("chord_pong"), &temp_sock, &king_pong); - } -CATCH(e) - { - globals->pre_host[0] = 0; - globals->pre_port = 0; - } -gras_socket_close(temp_sock); + if (globals->pre_id == -1){ + return; + } + TRY{ + temp_sock = gras_socket_client( globals->pre_host, globals->pre_port ); + }CATCH(e){ + globals->pre_id = -1; + globals->pre_host[0] = 0; + globals->pre_port = 0; + } + ping_t ping; + pong_t pong; + ping.id = 0; + TRY{ + gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping); + }CATCH(e){ + globals->pre_id = -1; + globals->pre_host[0] = 0; + globals->pre_port = 0; + } + TRY{ + gras_msg_wait( 60, gras_msgtype_by_name("chord_pong"), &temp_sock, &pong); + }CATCH(e){ + globals->pre_id = -1; + globals->pre_host[0] = 0; + globals->pre_port = 0; + } + gras_socket_close(temp_sock); } int node(int argc,char **argv){ - node_data_t *globals=NULL; - gras_socket_t temp_sock=NULL; - gras_socket_t temp_sock2=NULL; + node_data_t *globals=NULL; + gras_socket_t temp_sock=NULL; + gras_socket_t temp_sock2=NULL; - xbt_ex_t e; + xbt_ex_t e; - int create=0; - int other_port=-1; - char *other_host; + int create=0; + int other_port=-1; + char *other_host; - /* 1. Init the GRAS infrastructure and declare my globals */ - gras_init(&argc,argv); + /* 1. Init the GRAS infrastructure and declare my globals */ + gras_init(&argc,argv); - gras_os_sleep((15-gras_os_getpid())*20); + gras_os_sleep((15-gras_os_getpid())*20); - globals=gras_userdata_new(node_data_t); - - globals->id=atoi(argv[1]); - globals->port=atoi(argv[2]); - globals->fingers=0; - globals->finger=NULL; - globals->pre_host[0]=0; - globals->pre_port=-1; - - snprintf(globals->host,1024,gras_os_myname()); - - if(argc==3){ - create=1; - }else{ - asprintf(&other_host,"%s",argv[3]); - other_port=atoi(argv[4]); - } - - globals->sock=gras_socket_server(globals->port); - gras_os_sleep(1.0); - - register_messages(); - register_messages(); - - globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem)); - INFO2("Launching node %s:%d",globals->host,globals->port); - if(create){ - INFO0("→Creating ring"); - globals->finger[0].id=globals->id; - snprintf(globals->finger[0].host,1024,globals->host); - globals->finger[0].port=globals->port; - }else{ - INFO2("→Known node %s:%d",other_host,other_port); - INFO0("→Contacting to determine successor"); - TRY{ - temp_sock=gras_socket_client(other_host,other_port); - }CATCH(e){ - RETHROW0("Unable to contact known host!: %s"); - } - get_suc_t get_suc_msg;get_suc_msg.id=globals->id; - TRY{ - gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg); - }CATCH(e){ - gras_socket_close(temp_sock); - RETHROW0("Unable to contact known host to get successor!: %s"); - } - rep_suc_t rep_suc_msg; - TRY{ - INFO0("Waiting for reply!"); - gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,&rep_suc_msg); - }CATCH(e){ - RETHROW1("%s: Error waiting for successor:%s",globals->host); - } - globals->finger[0].id=rep_suc_msg.id; - snprintf(globals->finger[0].host,1024,rep_suc_msg.host); - globals->finger[0].port=rep_suc_msg.port; - INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,globals->finger[0].host,globals->finger[0].port); - gras_socket_close(temp_sock); - } - - gras_cb_register(gras_msgtype_by_name("chord_get_suc"),&node_cb_get_suc_handler); -// gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler); - - gras_msg_handle(60.0); - - gras_socket_close(globals->sock); - free(globals); - gras_exit(); - INFO0("Done"); - return(0); + globals=gras_userdata_new(node_data_t); + + globals->id=atoi(argv[1]); + globals->port=atoi(argv[2]); + globals->fingers=0; + globals->finger=NULL; + globals->pre_id=-1; + globals->pre_host[0]=0; + globals->pre_port=-1; + + snprintf(globals->host,1024,gras_os_myname()); + + if(argc==3){ + create=1; + }else{ + asprintf(&other_host,"%s",argv[3]); + other_port=atoi(argv[4]); + } + + globals->sock=gras_socket_server(globals->port); + gras_os_sleep(1.0); + + register_messages(); + register_messages(); + + globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem)); + INFO2("Launching node %s:%d",globals->host,globals->port); + if(create){ + INFO0("→Creating ring"); + globals->finger[0].id=globals->id; + snprintf(globals->finger[0].host,1024,globals->host); + globals->finger[0].port=globals->port; + }else{ + INFO2("→Known node %s:%d",other_host,other_port); + INFO0("→Contacting to determine successor"); + TRY{ + temp_sock=gras_socket_client(other_host,other_port); + }CATCH(e){ + RETHROW0("Unable to contact known host: %s"); + } + get_suc_t get_suc_msg;get_suc_msg.id=globals->id; + TRY{ + gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"), + &get_suc_msg); + }CATCH(e){ + gras_socket_close(temp_sock); + RETHROW0("Unable to contact known host to get successor!: %s"); + } + rep_suc_t rep_suc_msg; + TRY{ + INFO0("Waiting for reply!"); + gras_msg_wait(10.,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2, + &rep_suc_msg); + }CATCH(e){ + RETHROW1("%s: Error waiting for successor:%s",globals->host); + } + globals->finger[0].id=rep_suc_msg.id; + snprintf(globals->finger[0].host,1024,rep_suc_msg.host); + globals->finger[0].port=rep_suc_msg.port; + INFO3("→Got successor : %d-%s:%d",globals->finger[0].id, + globals->finger[0].host,globals->finger[0].port); + gras_socket_close(temp_sock); + TRY{ + temp_sock=gras_socket_client(globals->finger[0].host, + globals->finger[0].port); + }CATCH(e){ + RETHROW0("Unable to contact successor: %s"); + } + notify_t notify_msg; + notify_msg.id=globals->id; + snprintf(notify_msg.host,1024,globals->host); + notify_msg.port=globals->port; + TRY{ + gras_msg_send(temp_sock,gras_msgtype_by_name("chord_notify"),¬ify_msg); + }CATCH(e){ + RETHROW0("Unable to notify successor! %s"); + } + } + + gras_cb_register(gras_msgtype_by_name("chord_get_suc"), + &node_cb_get_suc_handler); + gras_cb_register(gras_msgtype_by_name("chord_notify"), + &node_cb_notify_handler); + /*gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);*/ + /* gras_timer_repeat(600.,fix_fingers);*/ + /*while(1){*/ + int l; + for(l=0;l<50;l++){ + TRY{ + gras_msg_handle(6000000.0); + }CATCH(e){ + } + } + /*}*/ + + gras_socket_close(globals->sock); + free(globals); + gras_exit(); + INFO0("Done"); + return(0); } diff --git a/examples/gras/p2p/chord/chord_deployment.xml b/examples/gras/p2p/chord/chord_deployment.xml index 2e0aa713db..d1bb789b34 100644 --- a/examples/gras/p2p/chord/chord_deployment.xml +++ b/examples/gras/p2p/chord/chord_deployment.xml @@ -5,8 +5,10 @@ - + + + @@ -81,7 +83,7 @@ - + -- 2.20.1