X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6760cb07d6b57be16928d95339d71e57c4e24f36..53cde8dfb94134348e908b3c2845200ffc582dc7:/src/amok/Bandwidth/bandwidth.c diff --git a/src/amok/Bandwidth/bandwidth.c b/src/amok/Bandwidth/bandwidth.c index 32e9890ec0..6e917ced0c 100644 --- a/src/amok/Bandwidth/bandwidth.c +++ b/src/amok/Bandwidth/bandwidth.c @@ -54,34 +54,35 @@ static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload); void amok_bw_bw_init() { - gras_datadesc_type_t bw_request_desc, bw_res_desc; + xbt_datadesc_type_t bw_request_desc, bw_res_desc; /* Build the Bandwidth datatype descriptions */ - bw_request_desc = gras_datadesc_struct("s_bw_request_t"); - gras_datadesc_struct_append(bw_request_desc, "peer", - gras_datadesc_by_name("s_xbt_peer_t")); - gras_datadesc_struct_append(bw_request_desc, "buf_size", - gras_datadesc_by_name("unsigned long int")); - gras_datadesc_struct_append(bw_request_desc, "msg_size", - gras_datadesc_by_name("unsigned long int")); - gras_datadesc_struct_append(bw_request_desc, "msg_amount", - gras_datadesc_by_name("unsigned long int")); - gras_datadesc_struct_append(bw_request_desc, "min_duration", - gras_datadesc_by_name("double")); - gras_datadesc_struct_close(bw_request_desc); - bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc); - - bw_res_desc = gras_datadesc_struct("s_bw_res_t"); - gras_datadesc_struct_append(bw_res_desc, "timestamp", - gras_datadesc_by_name("unsigned int")); - gras_datadesc_struct_append(bw_res_desc, "seconds", - gras_datadesc_by_name("double")); - gras_datadesc_struct_append(bw_res_desc, "bw", - gras_datadesc_by_name("double")); - gras_datadesc_struct_close(bw_res_desc); - bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc); - - gras_msgtype_declare_rpc("BW handshake", bw_request_desc, bw_request_desc); + bw_request_desc = xbt_datadesc_struct("s_bw_request_t"); + xbt_datadesc_struct_append(bw_request_desc, "peer", + xbt_datadesc_by_name("s_xbt_peer_t")); + xbt_datadesc_struct_append(bw_request_desc, "buf_size", + xbt_datadesc_by_name("unsigned long int")); + xbt_datadesc_struct_append(bw_request_desc, "msg_size", + xbt_datadesc_by_name("unsigned long int")); + xbt_datadesc_struct_append(bw_request_desc, "msg_amount", + xbt_datadesc_by_name("unsigned long int")); + xbt_datadesc_struct_append(bw_request_desc, "min_duration", + xbt_datadesc_by_name("double")); + xbt_datadesc_struct_close(bw_request_desc); + bw_request_desc = xbt_datadesc_ref("bw_request_t", bw_request_desc); + + bw_res_desc = xbt_datadesc_struct("s_bw_res_t"); + xbt_datadesc_struct_append(bw_res_desc, "timestamp", + xbt_datadesc_by_name("unsigned int")); + xbt_datadesc_struct_append(bw_res_desc, "seconds", + xbt_datadesc_by_name("double")); + xbt_datadesc_struct_append(bw_res_desc, "bw", + xbt_datadesc_by_name("double")); + xbt_datadesc_struct_close(bw_res_desc); + bw_res_desc = xbt_datadesc_ref("bw_res_t", bw_res_desc); + + gras_msgtype_declare_rpc("BW handshake", bw_request_desc, + bw_request_desc); gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL); gras_msgtype_declare("BW stop", NULL); @@ -132,7 +133,7 @@ void amok_bw_bw_leave() * bytes in a fat pipe. * */ -void amok_bw_test(gras_socket_t peer, +void amok_bw_test(xbt_socket_t peer, unsigned long int buf_size, unsigned long int msg_size, unsigned long int msg_amount, @@ -140,8 +141,8 @@ void amok_bw_test(gras_socket_t peer, { /* Measurement sockets for the experiments */ - gras_socket_t measMasterIn = NULL, measIn, measOut = NULL; - int port; + volatile xbt_socket_t measMasterIn = NULL, measIn, measOut = NULL; + volatile int port; bw_request_t request, request_ack; xbt_ex_t e; int first_pass; @@ -153,7 +154,7 @@ void amok_bw_test(gras_socket_t peer, CATCH(e) { measMasterIn = NULL; if (port == 10000 - 1) { - RETHROW0("Error caught while opening a measurement socket: %s"); + RETHROWF("Error caught while opening a measurement socket: %s"); } else { xbt_ex_free(e); } @@ -165,33 +166,34 @@ void amok_bw_test(gras_socket_t peer, request->msg_size = msg_size; request->msg_amount = msg_amount; request->peer.name = NULL; - request->peer.port = gras_socket_my_port(measMasterIn); - DEBUG6 - ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)", - gras_socket_peer_name(peer), gras_socket_peer_port(peer), - request->peer.port, request->buf_size, request->msg_size, - request->msg_amount); + request->peer.port = xbt_socket_my_port(measMasterIn); + XBT_DEBUG + ("Handshaking with %s:%d to connect it back on my %d (bufsize=%lu, msg_size=%lu, msg_amount=%lu)", + xbt_socket_peer_name(peer), xbt_socket_peer_port(peer), + request->peer.port, request->buf_size, request->msg_size, + request->msg_amount); TRY { gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack); } - CATCH(e) { - RETHROW0("Error encountered while sending the BW request: %s"); + CATCH_ANONYMOUS { + RETHROWF("Error encountered while sending the BW request: %s"); } - measIn = gras_socket_meas_accept(measMasterIn); + measIn = xbt_socket_meas_accept(measMasterIn); TRY { - measOut = gras_socket_client_ext(gras_socket_peer_name(peer), + measOut = gras_socket_client_ext(xbt_socket_peer_name(peer), request_ack->peer.port, request->buf_size, 1); } - CATCH(e) { - RETHROW2 - ("Error encountered while opening the measurement socket to %s:%d for BW test: %s", - gras_socket_peer_name(peer), request_ack->peer.port); + CATCH_ANONYMOUS { + RETHROWF + ("Error encountered while opening the measurement socket to %s:%d for BW test: %s", + xbt_socket_peer_name(peer), request_ack->peer.port); } - DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)", - request->msg_size, request->msg_amount); + XBT_DEBUG + ("Got ACK; conduct the experiment (msg_size = %lu, msg_amount=%lu)", + request->msg_size, request->msg_amount); *sec = 0; first_pass = 1; @@ -215,21 +217,22 @@ void amok_bw_test(gras_socket_t peer, And then increase the number of messages to compensate (check for overflow there, too) */ if (request->msg_size > 64 * 1024 * 1024) { unsigned long int new_amount = - ((request->msg_size / ((double) 64 * 1024 * 1024)) - * request->msg_amount) + 1; + ((request->msg_size / ((double) 64 * 1024 * 1024)) + * request->msg_amount) + 1; - xbt_assert0(new_amount > request->msg_amount, + xbt_assert(new_amount > request->msg_amount, "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform"); request->msg_amount = new_amount; request->msg_size = 64 * 1024 * 1024; } - VERB5 - ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)", - meas_duration, min_duration, request->msg_size, request->msg_amount, - ((double) request->msg_size) * ((double) request->msg_amount / - (*sec) / 1024.0 / 1024.0)); + XBT_VERB + ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)", + meas_duration, min_duration, request->msg_size, + request->msg_amount, + ((double) request->msg_size) * ((double) request->msg_amount / + (*sec) / 1024.0 / 1024.0)); gras_msg_rpccall(peer, 60, "BW reask", &request, NULL); } @@ -237,31 +240,33 @@ void amok_bw_test(gras_socket_t peer, first_pass = 0; *sec = gras_os_time(); TRY { - gras_socket_meas_send(measOut, 120, request->msg_size, + xbt_socket_meas_send(measOut, 120, request->msg_size, request->msg_amount); - DEBUG0("Data sent. Wait ACK"); - gras_socket_meas_recv(measIn, 120, 1, 1); - } CATCH(e) { + XBT_DEBUG("Data sent. Wait ACK"); + xbt_socket_meas_recv(measIn, 120, 1, 1); + } + CATCH_ANONYMOUS { gras_socket_close(measOut); gras_socket_close(measMasterIn); gras_socket_close(measIn); - RETHROW0("Unable to conduct the experiment: %s"); + RETHROWF("Unable to conduct the experiment: %s"); } *sec = gras_os_time() - *sec; if (*sec != 0.0) { *bw = - ((double) request->msg_size) * ((double) request->msg_amount) / - (*sec); + ((double) request->msg_size) * ((double) request->msg_amount) / + (*sec); } - DEBUG1("Experiment done ; it took %f sec", *sec); + XBT_DEBUG("Experiment done ; it took %f sec", *sec); if (*sec <= 0) { - CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec); + XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec); } } while (*sec < min_duration); - DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer", - *sec, *bw); + XBT_DEBUG + ("This measurement was long enough (%f sec; found %f b/s). Stop peer", + *sec, *bw); gras_msg_send(peer, "BW stop", NULL); free(request_ack); @@ -283,9 +288,9 @@ void amok_bw_test(gras_socket_t peer, */ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) { - gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx); - gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL; - bw_request_t request = *(bw_request_t *) payload; + xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx); + volatile xbt_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL; + volatile bw_request_t request = *(bw_request_t *) payload; bw_request_t answer; xbt_ex_t e; int port; @@ -293,10 +298,10 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) gras_msg_cb_ctx_t ctx_reask; static xbt_dynar_t msgtwaited = NULL; - DEBUG5 - ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)", - gras_socket_peer_name(expeditor), request->peer.port, request->buf_size, - request->msg_size, request->msg_amount); + XBT_DEBUG + ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)", + xbt_socket_peer_name(expeditor), request->peer.port, + request->buf_size, request->msg_size, request->msg_amount); /* Build our answer */ answer = xbt_new0(s_bw_request_t, 1); @@ -311,52 +316,52 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) xbt_ex_free(e); else /* FIXME: tell error to remote */ - RETHROW0 - ("Error encountered while opening a measurement server socket: %s"); + RETHROWF + ("Error encountered while opening a measurement server socket: %s"); } } answer->buf_size = request->buf_size; answer->msg_size = request->msg_size; answer->msg_amount = request->msg_amount; - answer->peer.port = gras_socket_my_port(measMasterIn); + answer->peer.port = xbt_socket_my_port(measMasterIn); TRY { gras_msg_rpcreturn(60, ctx, &answer); } - CATCH(e) { + CATCH_ANONYMOUS { gras_socket_close(measMasterIn); /* FIXME: tell error to remote */ - RETHROW0("Error encountered while sending the answer: %s"); + RETHROWF("Error encountered while sending the answer: %s"); } /* Don't connect asap to leave time to other side to enter the accept() */ TRY { - measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor), + measOut = gras_socket_client_ext(xbt_socket_peer_name(expeditor), request->peer.port, request->buf_size, 1); } - CATCH(e) { - RETHROW2 - ("Error encountered while opening a measurement socket back to %s:%d : %s", - gras_socket_peer_name(expeditor), request->peer.port); + CATCH_ANONYMOUS { + RETHROWF + ("Error encountered while opening a measurement socket back to %s:%d : %s", + xbt_socket_peer_name(expeditor), request->peer.port); /* FIXME: tell error to remote */ } TRY { - measIn = gras_socket_meas_accept(measMasterIn); - DEBUG4 - ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d", - answer->buf_size, answer->msg_size, answer->msg_amount, - answer->peer.port); + measIn = xbt_socket_meas_accept(measMasterIn); + XBT_DEBUG + ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d", + answer->buf_size, answer->msg_size, answer->msg_amount, + answer->peer.port); } - CATCH(e) { + CATCH_ANONYMOUS { gras_socket_close(measMasterIn); gras_socket_close(measIn); gras_socket_close(measOut); /* FIXME: tell error to remote ? */ - RETHROW0("Error encountered while opening the meas socket: %s"); + RETHROWF("Error encountered while opening the meas socket: %s"); } if (!msgtwaited) { @@ -369,15 +374,16 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) void *payload; int msggot; TRY { - gras_socket_meas_recv(measIn, 120, request->msg_size, + xbt_socket_meas_recv(measIn, 120, request->msg_size, request->msg_amount); - gras_socket_meas_send(measOut, 120, 1, 1); - } CATCH(e) { + xbt_socket_meas_send(measOut, 120, 1, 1); + } + CATCH_ANONYMOUS { gras_socket_close(measMasterIn); gras_socket_close(measIn); gras_socket_close(measOut); /* FIXME: tell error to remote ? */ - RETHROW0("Error encountered while receiving the experiment: %s"); + RETHROWF("Error encountered while receiving the experiment: %s"); } gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload); switch (msggot) { @@ -388,7 +394,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) tooshort = 1; free(request); request = (bw_request_t) payload; - VERB0("Return the reasking RPC"); + XBT_VERB("Return the reasking RPC"); gras_msg_rpcreturn(60, ctx_reask, NULL); } gras_msg_cb_ctx_free(ctx_reask); @@ -400,7 +406,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload) gras_socket_close(measOut); free(answer); free(request); - VERB0("BW experiment done."); + XBT_VERB("BW experiment done."); return 0; } @@ -437,7 +443,7 @@ void amok_bw_request(const char *from_name, unsigned int from_port, double min_duration, /*OUT*/ double *sec, double *bw) { - gras_socket_t sock; + xbt_socket_t sock; /* The request */ bw_request_t request; bw_res_t result; @@ -456,7 +462,7 @@ void amok_bw_request(const char *from_name, unsigned int from_port, - DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name, from_port, + XBT_DEBUG("Ask for a BW test between %s:%u and %s:%u", from_name, from_port, to_name, to_port); gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result); @@ -465,7 +471,7 @@ void amok_bw_request(const char *from_name, unsigned int from_port, if (bw) *bw = result->bw; - VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)", + XBT_VERB("BW test (%s:%u -> %s:%u) took %f sec (%f kb/s)", from_name, from_port, to_name, to_port, result->sec, ((double) result->bw) / 1024.0); @@ -480,11 +486,11 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload) /* specification of the test to run, and our answer */ bw_request_t request = *(bw_request_t *) payload; bw_res_t result = xbt_new0(s_bw_res_t, 1); - gras_socket_t peer, asker; + xbt_socket_t peer, asker; asker = gras_msg_cb_ctx_from(ctx); - VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)", - gras_socket_peer_name(asker), gras_socket_peer_port(asker), + XBT_VERB("Asked by %s:%d to conduct a bw XP with %s:%d (request: %lu %lu)", + xbt_socket_peer_name(asker), xbt_socket_peer_port(asker), request->peer.name, request->peer.port, request->msg_size, request->msg_amount); peer = gras_socket_client(request->peer.name, request->peer.port); @@ -529,8 +535,8 @@ double *amok_bw_matrix(xbt_dynar_t peers, if (i != j) { /* Mesurements of Bandwidth */ amok_bw_request(p1->name, p1->port, p2->name, p2->port, - buf_size_bw, msg_size_bw, msg_amount_bw, min_duration, - &sec, &matrix_res[i * len + j]); + buf_size_bw, msg_size_bw, msg_amount_bw, + min_duration, &sec, &matrix_res[i * len + j]); } } }