X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/78c37d1780d1243aec405e7f38751e0aa5037c38..56bbc06a5abb010645afccbcbd4c3c188e438c59:/src/amok/Bandwidth/bandwidth.c diff --git a/src/amok/Bandwidth/bandwidth.c b/src/amok/Bandwidth/bandwidth.c index a1a8623ab9..d5fb44130b 100644 --- a/src/amok/Bandwidth/bandwidth.c +++ b/src/amok/Bandwidth/bandwidth.c @@ -1,10 +1,7 @@ -/* $Id$ */ - /* amok_bandwidth - Bandwidth tests facilities */ -/* Copyright (c) 2003-6 Martin Quinson. */ -/* Copyright (c) 2006 Ahmed Harbaoui. */ -/* All rights reserved. */ +/* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team. + * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -13,7 +10,7 @@ #include "amok/Bandwidth/bandwidth_private.h" #include "gras/messages.h" -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing"); /****************************** @@ -23,13 +20,14 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing"); static short _amok_bw_initialized = 0; /** @brief module initialization; all participating nodes must run this */ -void amok_bw_init(void) { +void amok_bw_init(void) +{ - if (! _amok_bw_initialized) { + if (!_amok_bw_initialized) { amok_bw_bw_init(); amok_bw_sat_init(); } - + amok_bw_bw_join(); amok_bw_sat_join(); @@ -37,10 +35,11 @@ void amok_bw_init(void) { } /** @brief module finalization */ -void amok_bw_exit(void) { - if (! _amok_bw_initialized) +void amok_bw_exit(void) +{ + if (!_amok_bw_initialized) return; - + amok_bw_bw_leave(); amok_bw_sat_leave(); @@ -53,45 +52,54 @@ void amok_bw_exit(void) { static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload); static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload); -void amok_bw_bw_init() { - gras_datadesc_type_t bw_request_desc, bw_res_desc; - - /* Build the Bandwidth datatype descriptions */ - bw_request_desc = gras_datadesc_struct("s_bw_request_t"); - gras_datadesc_struct_append(bw_request_desc,"peer", - gras_datadesc_by_name("s_xbt_peer_t")); - gras_datadesc_struct_append(bw_request_desc,"buf_size", - gras_datadesc_by_name("unsigned long int")); - gras_datadesc_struct_append(bw_request_desc,"msg_size", - gras_datadesc_by_name("unsigned long int")); - gras_datadesc_struct_append(bw_request_desc,"msg_amount", - gras_datadesc_by_name("unsigned long int")); - gras_datadesc_struct_append(bw_request_desc,"min_duration", - gras_datadesc_by_name("double")); - gras_datadesc_struct_close(bw_request_desc); - bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc); - - bw_res_desc = gras_datadesc_struct("s_bw_res_t"); - gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int")); - gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double")); - gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double")); - gras_datadesc_struct_close(bw_res_desc); - bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc); - - gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc); - - gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL); +void amok_bw_bw_init() +{ + xbt_datadesc_type_t bw_request_desc, bw_res_desc; + + /* Build the Bandwidth datatype descriptions */ + bw_request_desc = xbt_datadesc_struct("s_bw_request_t"); + xbt_datadesc_struct_append(bw_request_desc, "peer", + xbt_datadesc_by_name("s_xbt_peer_t")); + xbt_datadesc_struct_append(bw_request_desc, "buf_size", + xbt_datadesc_by_name("unsigned long int")); + xbt_datadesc_struct_append(bw_request_desc, "msg_size", + xbt_datadesc_by_name("unsigned long int")); + xbt_datadesc_struct_append(bw_request_desc, "msg_amount", + xbt_datadesc_by_name("unsigned long int")); + xbt_datadesc_struct_append(bw_request_desc, "min_duration", + xbt_datadesc_by_name("double")); + xbt_datadesc_struct_close(bw_request_desc); + bw_request_desc = xbt_datadesc_ref("bw_request_t", bw_request_desc); + + bw_res_desc = xbt_datadesc_struct("s_bw_res_t"); + xbt_datadesc_struct_append(bw_res_desc, "timestamp", + xbt_datadesc_by_name("unsigned int")); + xbt_datadesc_struct_append(bw_res_desc, "seconds", + xbt_datadesc_by_name("double")); + xbt_datadesc_struct_append(bw_res_desc, "bw", + xbt_datadesc_by_name("double")); + xbt_datadesc_struct_close(bw_res_desc); + bw_res_desc = xbt_datadesc_ref("bw_res_t", bw_res_desc); + + gras_msgtype_declare_rpc("BW handshake", bw_request_desc, + bw_request_desc); + + gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL); gras_msgtype_declare("BW stop", NULL); - gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc); + gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc); } -void amok_bw_bw_join() { - gras_cb_register("BW request", &amok_bw_cb_bw_request); - gras_cb_register("BW handshake",&amok_bw_cb_bw_handshake); + +void amok_bw_bw_join() +{ + gras_cb_register("BW request", &amok_bw_cb_bw_request); + gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake); } -void amok_bw_bw_leave() { - gras_cb_unregister("BW request", &amok_bw_cb_bw_request); - gras_cb_unregister("BW handshake",&amok_bw_cb_bw_handshake); + +void amok_bw_bw_leave() +{ + gras_cb_unregister("BW request", &amok_bw_cb_bw_request); + gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake); } /** @@ -125,126 +133,141 @@ void amok_bw_bw_leave() { * bytes in a fat pipe. * */ -void amok_bw_test(gras_socket_t peer, - unsigned long int buf_size, - unsigned long int msg_size, - unsigned long int msg_amount, - double min_duration, - /*OUT*/ double *sec, double *bw) { +void amok_bw_test(xbt_socket_t peer, + unsigned long int buf_size, + unsigned long int msg_size, + unsigned long int msg_amount, + double min_duration, /*OUT*/ double *sec, double *bw) +{ /* Measurement sockets for the experiments */ - gras_socket_t measMasterIn=NULL,measIn,measOut=NULL; - int port; - bw_request_t request,request_ack; + volatile xbt_socket_t measMasterIn = NULL, measIn, measOut = NULL; + volatile int port; + bw_request_t request, request_ack; xbt_ex_t e; - int first_pass; - + int first_pass; + for (port = 5000; port < 10000 && measMasterIn == NULL; port++) { TRY { - measMasterIn = gras_socket_server_ext(++port,buf_size,1); - } CATCH(e) { + measMasterIn = gras_socket_server_ext(++port, buf_size, 1); + } + CATCH(e) { measMasterIn = NULL; - if (port == 10000 -1) { - RETHROW0("Error caught while opening a measurement socket: %s"); + if (port == 10000 - 1) { + RETHROWF("Error caught while opening a measurement socket: %s"); } else { - xbt_ex_free(e); + xbt_ex_free(e); } } } - - request=xbt_new0(s_bw_request_t,1); - request->buf_size=buf_size; - request->msg_size=msg_size; - request->msg_amount=msg_amount; + + request = xbt_new0(s_bw_request_t, 1); + request->buf_size = buf_size; + request->msg_size = msg_size; + request->msg_amount = msg_amount; request->peer.name = NULL; - request->peer.port = gras_socket_my_port(measMasterIn); - DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)", - gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port, - request->buf_size,request->msg_size,request->msg_amount); + request->peer.port = xbt_socket_my_port(measMasterIn); + XBT_DEBUG + ("Handshaking with %s:%d to connect it back on my %d (bufsize=%lu, msg_size=%lu, msg_amount=%lu)", + xbt_socket_peer_name(peer), xbt_socket_peer_port(peer), + request->peer.port, request->buf_size, request->msg_size, + request->msg_amount); TRY { - gras_msg_rpccall(peer,15,"BW handshake",&request, &request_ack); - } CATCH(e) { - RETHROW0("Error encountered while sending the BW request: %s"); + gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack); + } + CATCH_ANONYMOUS { + RETHROWF("Error encountered while sending the BW request: %s"); } - measIn = gras_socket_meas_accept(measMasterIn); - + measIn = xbt_socket_meas_accept(measMasterIn); + TRY { - measOut=gras_socket_client_ext(gras_socket_peer_name(peer), - request_ack->peer.port, - request->buf_size,1); - } CATCH(e) { - RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s", - gras_socket_peer_name(peer),request_ack->peer.port); + measOut = gras_socket_client_ext(xbt_socket_peer_name(peer), + request_ack->peer.port, + request->buf_size, 1); + } + CATCH_ANONYMOUS { + RETHROWF + ("Error encountered while opening the measurement socket to %s:%d for BW test: %s", + xbt_socket_peer_name(peer), request_ack->peer.port); } - DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)", - request->msg_size, request->msg_amount); + XBT_DEBUG + ("Got ACK; conduct the experiment (msg_size = %lu, msg_amount=%lu)", + request->msg_size, request->msg_amount); *sec = 0; first_pass = 1; do { if (first_pass == 0) { - double meas_duration=*sec; + double meas_duration = *sec; double increase; - if (*sec != 0.0 ) { - increase = (min_duration / meas_duration) * 1.1; + if (*sec != 0.0) { + increase = (min_duration / meas_duration) * 1.1; } else { - increase = 4; + increase = 4; } - /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/ + /* Do not increase the exp size too fast since our decision would be based on wrong measurements */ if (increase > 20) - increase = 20; - + increase = 20; + request->msg_size = request->msg_size * increase; /* Do not do too large experiments messages or the sensors - will start to swap to store one of them. - And then increase the number of messages to compensate (check for overflow there, too) */ - if (request->msg_size > 64*1024*1024) { - unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024)) - * request->msg_amount ) + 1; - - xbt_assert0(new_amount > request->msg_amount, - "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform"); - request->msg_amount = new_amount; - - request->msg_size = 64*1024*1024; + will start to swap to store one of them. + And then increase the number of messages to compensate (check for overflow there, too) */ + if (request->msg_size > 64 * 1024 * 1024) { + unsigned long int new_amount = + ((request->msg_size / ((double) 64 * 1024 * 1024)) + * request->msg_amount) + 1; + + xbt_assert(new_amount > request->msg_amount, + "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform"); + request->msg_amount = new_amount; + + request->msg_size = 64 * 1024 * 1024; } - VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)", - meas_duration, min_duration, - request->msg_size, request->msg_amount, - ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0)); + XBT_VERB + ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)", + meas_duration, min_duration, request->msg_size, + request->msg_amount, + ((double) request->msg_size) * ((double) request->msg_amount / + (*sec) / 1024.0 / 1024.0)); - gras_msg_rpccall(peer, 60, "BW reask",&request, NULL); + gras_msg_rpccall(peer, 60, "BW reask", &request, NULL); } first_pass = 0; - *sec=gras_os_time(); + *sec = gras_os_time(); TRY { - gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount); - DEBUG0("Data sent. Wait ACK"); - gras_socket_meas_recv(measIn,120,1,1); - } CATCH(e) { + xbt_socket_meas_send(measOut, 120, request->msg_size, + request->msg_amount); + XBT_DEBUG("Data sent. Wait ACK"); + xbt_socket_meas_recv(measIn, 120, 1, 1); + } + CATCH_ANONYMOUS { gras_socket_close(measOut); gras_socket_close(measMasterIn); gras_socket_close(measIn); - RETHROW0("Unable to conduct the experiment: %s"); + RETHROWF("Unable to conduct the experiment: %s"); } *sec = gras_os_time() - *sec; - if (*sec != 0.0) { - *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec); + if (*sec != 0.0) { + *bw = + ((double) request->msg_size) * ((double) request->msg_amount) / + (*sec); } - DEBUG1("Experiment done ; it took %f sec", *sec); + XBT_DEBUG("Experiment done ; it took %f sec", *sec); if (*sec <= 0) { - CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec); + XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec); } } while (*sec < min_duration); - DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer", - *sec,*bw); - gras_msg_send(peer, "BW stop", NULL); + XBT_DEBUG + ("This measurement was long enough (%f sec; found %f b/s). Stop peer", + *sec, *bw); + gras_msg_send(peer, "BW stop", NULL); free(request_ack); free(request); @@ -263,105 +286,116 @@ void amok_bw_test(gras_socket_t peer, sizes are in byte */ -int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, - void *payload) { - gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx); - gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL; - bw_request_t request=*(bw_request_t*)payload; +int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) +{ + xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx); + volatile xbt_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL; + volatile bw_request_t request = *(bw_request_t *) payload; bw_request_t answer; xbt_ex_t e; int port; int tooshort = 1; gras_msg_cb_ctx_t ctx_reask; - static xbt_dynar_t msgtwaited=NULL; - - DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)", - gras_socket_peer_name(expeditor),request->peer.port, - request->buf_size,request->msg_size, request->msg_amount); + static xbt_dynar_t msgtwaited = NULL; + + XBT_DEBUG + ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)", + xbt_socket_peer_name(expeditor), request->peer.port, + request->buf_size, request->msg_size, request->msg_amount); /* Build our answer */ - answer = xbt_new0(s_bw_request_t,1); - + answer = xbt_new0(s_bw_request_t, 1); + for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) { TRY { - measMasterIn = gras_socket_server_ext(port,request->buf_size,1); - } CATCH(e) { + measMasterIn = gras_socket_server_ext(port, request->buf_size, 1); + } + CATCH(e) { measMasterIn = NULL; if (port < 10000) - xbt_ex_free(e); + xbt_ex_free(e); else - /* FIXME: tell error to remote */ - RETHROW0("Error encountered while opening a measurement server socket: %s"); + /* FIXME: tell error to remote */ + RETHROWF + ("Error encountered while opening a measurement server socket: %s"); } } - - answer->buf_size=request->buf_size; - answer->msg_size=request->msg_size; - answer->msg_amount=request->msg_amount; - answer->peer.port=gras_socket_my_port(measMasterIn); + + answer->buf_size = request->buf_size; + answer->msg_size = request->msg_size; + answer->msg_amount = request->msg_amount; + answer->peer.port = xbt_socket_my_port(measMasterIn); TRY { - gras_msg_rpcreturn(60,ctx,&answer); - } CATCH(e) { + gras_msg_rpcreturn(60, ctx, &answer); + } + CATCH_ANONYMOUS { gras_socket_close(measMasterIn); /* FIXME: tell error to remote */ - RETHROW0("Error encountered while sending the answer: %s"); + RETHROWF("Error encountered while sending the answer: %s"); } /* Don't connect asap to leave time to other side to enter the accept() */ TRY { - measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor), - request->peer.port, - request->buf_size,1); - } CATCH(e) { - RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", - gras_socket_peer_name(expeditor),request->peer.port); + measOut = gras_socket_client_ext(xbt_socket_peer_name(expeditor), + request->peer.port, + request->buf_size, 1); + } + CATCH_ANONYMOUS { + RETHROWF + ("Error encountered while opening a measurement socket back to %s:%d : %s", + xbt_socket_peer_name(expeditor), request->peer.port); /* FIXME: tell error to remote */ } TRY { - measIn = gras_socket_meas_accept(measMasterIn); - DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d", - answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port); - } CATCH(e) { + measIn = xbt_socket_meas_accept(measMasterIn); + XBT_DEBUG + ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d", + answer->buf_size, answer->msg_size, answer->msg_amount, + answer->peer.port); + } + CATCH_ANONYMOUS { gras_socket_close(measMasterIn); gras_socket_close(measIn); gras_socket_close(measOut); /* FIXME: tell error to remote ? */ - RETHROW0("Error encountered while opening the meas socket: %s"); + RETHROWF("Error encountered while opening the meas socket: %s"); } if (!msgtwaited) { - msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL); - xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop")); - xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask")); + msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL); + xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop")); + xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask")); } while (tooshort) { - void *payload; + void *payloadgot; int msggot; TRY { - gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount); - gras_socket_meas_send(measOut,120,1,1); - } CATCH(e) { + xbt_socket_meas_recv(measIn, 120, request->msg_size, + request->msg_amount); + xbt_socket_meas_send(measOut, 120, 1, 1); + } + CATCH_ANONYMOUS { gras_socket_close(measMasterIn); gras_socket_close(measIn); gras_socket_close(measOut); /* FIXME: tell error to remote ? */ - RETHROW0("Error encountered while receiving the experiment: %s"); + RETHROWF("Error encountered while receiving the experiment: %s"); } - gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload); - switch(msggot) { - case 0: /* BW stop */ + gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payloadgot); + switch (msggot) { + case 0: /* BW stop */ tooshort = 0; break; - case 1: /* BW reask */ + case 1: /* BW reask */ tooshort = 1; free(request); - request = (bw_request_t)payload; - VERB0("Return the reasking RPC"); - gras_msg_rpcreturn(60,ctx_reask,NULL); + request = (bw_request_t) payloadgot; + XBT_VERB("Return the reasking RPC"); + gras_msg_rpcreturn(60, ctx_reask, NULL); } gras_msg_cb_ctx_free(ctx_reask); } @@ -372,7 +406,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, gras_socket_close(measOut); free(answer); free(request); - VERB0("BW experiment done."); + XBT_VERB("BW experiment done."); return 0; } @@ -401,78 +435,77 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, * * Results are reported in last args, and sizes are in bytes. */ -void amok_bw_request(const char* from_name,unsigned int from_port, - const char* to_name,unsigned int to_port, - unsigned long int buf_size, - unsigned long int msg_size, - unsigned long int msg_amount, - double min_duration, - /*OUT*/ double *sec, double*bw) { - - gras_socket_t sock; +void amok_bw_request(const char *from_name, unsigned int from_port, + const char *to_name, unsigned int to_port, + unsigned long int buf_size, + unsigned long int msg_size, + unsigned long int msg_amount, + double min_duration, /*OUT*/ double *sec, double *bw) +{ + + xbt_socket_t sock; /* The request */ bw_request_t request; bw_res_t result; - request=xbt_new0(s_bw_request_t,1); - request->buf_size=buf_size; - request->msg_size=msg_size; - request->msg_amount=msg_amount; + request = xbt_new0(s_bw_request_t, 1); + request->buf_size = buf_size; + request->msg_size = msg_size; + request->msg_amount = msg_amount; request->min_duration = min_duration; - request->peer.name = (char*)to_name; + request->peer.name = (char *) to_name; request->peer.port = to_port; - sock = gras_socket_client(from_name,from_port); - - - - DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port); - gras_msg_rpccall(sock,20*60,"BW request", &request, &result); + sock = gras_socket_client(from_name, from_port); + + + + XBT_DEBUG("Ask for a BW test between %s:%u and %s:%u", from_name, from_port, + to_name, to_port); + gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result); if (sec) - *sec=result->sec; + *sec = result->sec; if (bw) - *bw =result->bw; + *bw = result->bw; - VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)", - from_name,from_port, to_name,to_port, - result->sec,((double)result->bw)/1024.0); + XBT_VERB("BW test (%s:%u -> %s:%u) took %f sec (%f kb/s)", + from_name, from_port, to_name, to_port, + result->sec, ((double) result->bw) / 1024.0); gras_socket_close(sock); free(result); free(request); } -int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, - void *payload) { - - /* specification of the test to run, and our answer */ - bw_request_t request = *(bw_request_t*)payload; - bw_res_t result = xbt_new0(s_bw_res_t,1); - gras_socket_t peer,asker; - - asker=gras_msg_cb_ctx_from(ctx); - VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)", - gras_socket_peer_name(asker),gras_socket_peer_port(asker), +int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload) +{ - request->peer.name,request->peer.port, - request->msg_size,request->msg_amount); - peer = gras_socket_client(request->peer.name,request->peer.port); + /* specification of the test to run, and our answer */ + bw_request_t request = *(bw_request_t *) payload; + bw_res_t result = xbt_new0(s_bw_res_t, 1); + xbt_socket_t peer, asker; + + asker = gras_msg_cb_ctx_from(ctx); + XBT_VERB("Asked by %s:%d to conduct a bw XP with %s:%d (request: %lu %lu)", + xbt_socket_peer_name(asker), xbt_socket_peer_port(asker), + request->peer.name, request->peer.port, + request->msg_size, request->msg_amount); + peer = gras_socket_client(request->peer.name, request->peer.port); amok_bw_test(peer, - request->buf_size,request->msg_size,request->msg_amount, - request->min_duration, - &(result->sec),&(result->bw)); - - gras_msg_rpcreturn(240,ctx,&result); + request->buf_size, request->msg_size, request->msg_amount, + request->min_duration, &(result->sec), &(result->bw)); + + gras_msg_rpcreturn(240, ctx, &result); gras_os_sleep(1); - gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */ + gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */ free(request->peer.name); free(request); free(result); - + return 0; } @@ -483,26 +516,28 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, * was changed for the fool wanting to send more than MAXINT * bytes in a fat pipe. */ -double * amok_bw_matrix(xbt_dynar_t peers, - int buf_size_bw, int msg_size_bw, int msg_amount_bw, - double min_duration) { +double *amok_bw_matrix(xbt_dynar_t peers, + int buf_size_bw, int msg_size_bw, int msg_amount_bw, + double min_duration) +{ double sec; /* construction of matrices for bandwith and latency */ - int i,j,len=xbt_dynar_length(peers); + unsigned int i, j; + int len = xbt_dynar_length(peers); - double *matrix_res = xbt_new0(double, len*len); - xbt_peer_t p1,p2; + double *matrix_res = xbt_new0(double, len * len); + xbt_peer_t p1, p2; - xbt_dynar_foreach (peers,i,p1) { - xbt_dynar_foreach (peers,j,p2) { - if (i!=j) { + xbt_dynar_foreach(peers, i, p1) { + xbt_dynar_foreach(peers, j, p2) { + if (i != j) { /* Mesurements of Bandwidth */ - amok_bw_request(p1->name,p1->port,p2->name,p2->port, - buf_size_bw,msg_size_bw,msg_amount_bw,min_duration, - &sec,&matrix_res[i*len + j]); - } + amok_bw_request(p1->name, p1->port, p2->name, p2->port, + buf_size_bw, msg_size_bw, msg_amount_bw, + min_duration, &sec, &matrix_res[i * len + j]); + } } } return matrix_res;