* In gras_msg_handle, do not discard messages without callback.
They are probably messages to be explicitly awaited later (ie, proofs of
mis-synchronization in userland since they are sent before being awaited)
- No big deal usually
+ No big deal usually.
+ * gras_socket_meas_send/recv: semantic changed!
+ The numerical arguments used to be (1) the total amount of data to send
+ and (2) msg_size. This was changed to (1) msg_size and (2) amount of
+ messages. This was need for the fool willing to send more than MAXINT
+ bytes on quite fat pipes.
+
AMOK:
* Do really rename the hostmanagement module to peermanagement. [Mt]
Ie, rename functions from amok_hm_* to amok_pm_*. This breaks the API,
but this is rather new and this was documented in the module
documentation (poor excuses, I admit)
+ * Bandwidth measurement semantic changed! This follows the changes to
+ gras_socket_meas_send/recv explained above.
SIMDAG:
* A sequential mode has been added to the workstations. When a workstation
int maestro(int argc,char *argv[]) {
double sec, bw;
int buf_size=32 *1024;
- int exp_size=512 *1024;
int msg_size=512 *1024;
+ int msg_amount = 1;
double min_duration = 1;
gras_socket_t peer;
peer = gras_socket_client(h1->name, h1->port);
INFO0("Test the BW between me and one of the sensors");
- amok_bw_test(peer,buf_size,exp_size,msg_size,min_duration,&sec,&bw);
- INFO6("Experience between me and %s:%d (%d bytes in msgs of %d bytes) took %f sec, achieving %f kb/s",
+ amok_bw_test(peer,buf_size,msg_size,msg_amount,min_duration,&sec,&bw);
+ INFO7("Experience between me and %s:%d (initially %d msgs of %d bytes, maybe modified to fill the pipe at least %.1fs) took %f sec, achieving %f kb/s",
h1->name, h1->port,
- exp_size,msg_size,
+ msg_amount,msg_size,min_duration,
sec,((double)bw)/1024.0);
INFO4("Test the BW between %s:%d and %s:%d", h1->name, h1->port, h2->name, h2->port);
amok_bw_request(h1->name, h1->port, h2->name, h2->port,
- buf_size,exp_size,msg_size,min_duration,&sec,&bw);
+ buf_size,msg_size,msg_amount,min_duration,&sec,&bw);
INFO6("Experience between %s:%d and %s:%d took took %f sec, achieving %f kb/s",
h1->name, h1->port, h2->name, h2->port,
sec,((double)bw)/1024.0);
/* XP setups */
const int buf_size = 0;
-const int exp_size = 100 * 1024;
const int msg_size = 50 * 1024;
+const int msg_amount = 2;
const int sat_size = 1024 * 1024 * 10;
const double min_duration = 1;
gras_os_sleep(5.0); /* wait for the sensors to show up */
/* Test BW without saturation */
amok_bw_request(bw1,4000,bw2,4000,
- buf_size,exp_size,msg_size,min_duration,&sec,&bw);
+ buf_size,msg_size,msg_amount,min_duration,&sec,&bw);
INFO4("BW(%s,%s) => %f sec, achieving %f Mb/s",
bw1, bw2, sec, (bw/1024.0/1024.0));
gras_os_sleep(1.0); /* let it start */
amok_bw_request(bw1,4000,bw2,4000,
- buf_size,exp_size,msg_size,min_duration,&sec_sat,&bw_sat);
+ buf_size,msg_size,msg_amount,min_duration,&sec_sat,&bw_sat);
INFO6("BW(%s,%s//%s,%s) => %f sec, achieving %f Mb/s",
bw1,bw2,sat1,sat2,sec,bw/1024.0/1024.0);
begin=time(NULL);
begin_simulated=gras_os_time();
- bw=amok_bw_matrix(peers,buf_size,exp_size,msg_size,min_duration);
+ bw=amok_bw_matrix(peers,buf_size,msg_size,msg_amount,min_duration);
INFO2("Did all BW tests in %ld sec (%.2f simulated(?) sec)",
time(NULL)-begin,gras_os_time()-begin_simulated);
VERB4("TEST %s %s // %s %s",
h1->name,h2->name,h3->name,h4->name);
amok_bw_request(h3->name,h3->port, h4->name,h4->port,
- buf_size,exp_size,msg_size,min_duration,
+ buf_size,msg_size,msg_amount,min_duration,
NULL,&(bw_sat[k*nb_peers + l]));
ratio=bw_sat[k*nb_peers + l] / bw[k*nb_peers + l];
/*OUT*/ double *sec, double*bw);
XBT_PUBLIC double * amok_bw_matrix(xbt_dynar_t hosts, /* dynar of xbt_host_t */
- int buf_size_bw, int exp_size_bw, int msg_size_bw, double min_duration);
+ int buf_size_bw, int msg_size_bw, int msg_amount_bw, double min_duration);
/* ***************************************************************************
* Link saturation
XBT_PUBLIC int gras_socket_is_meas(gras_socket_t sock);
XBT_PUBLIC void gras_socket_meas_send(gras_socket_t peer,
- unsigned int timeout,
- unsigned long int expSize,
- unsigned long int msgSize);
+ unsigned int timeout,
+ unsigned long int msgSize,
+ unsigned long int msgAmount);
XBT_PUBLIC void gras_socket_meas_recv(gras_socket_t peer,
- unsigned int timeout,
- unsigned long int expSize,
- unsigned long int msgSize);
+ unsigned int timeout,
+ unsigned long int msgSize,
+ unsigned long int msgAmount);
XBT_PUBLIC gras_socket_t gras_socket_meas_accept(gras_socket_t peer);
/* @}*/
gras_datadesc_by_name("s_xbt_peer_t"));
gras_datadesc_struct_append(bw_request_desc,"buf_size",
gras_datadesc_by_name("unsigned long int"));
- gras_datadesc_struct_append(bw_request_desc,"exp_size",
- gras_datadesc_by_name("unsigned long int"));
gras_datadesc_struct_append(bw_request_desc,"msg_size",
gras_datadesc_by_name("unsigned long int"));
+ gras_datadesc_struct_append(bw_request_desc,"msg_amount",
+ gras_datadesc_by_name("unsigned long int"));
gras_datadesc_struct_append(bw_request_desc,"min_duration",
gras_datadesc_by_name("double"));
gras_datadesc_struct_close(bw_request_desc);
*
* \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
* \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
- * \arg exp_size: Total size of data sent across the network
- * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
+ * \arg msg_size: Size of each message sent.
+ * \arg msg_amount: Amount of such messages to exchange
* \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
* \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
* \arg bw: observed Bandwidth (in byte/s)
* This call is blocking until the end of the experiment.
*
* If the asked experiment lasts less than \a min_duration, another one will be
- * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
- * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
+ * launched (and others, if needed). msg_size will be multiplicated by
+ * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
* reach the \a min_duration). In that case, the reported bandwidth and
* duration are the ones of the last run. \a msg_size cannot go over 64Mb
* because we need to malloc a block of this size in RL to conduct the
- * experiment, and we still don't want to visit the swap.
+ * experiment, and we still don't want to visit the swap. In such case, the
+ * number of messages is increased instead of their size.
*
* Results are reported in last args, and sizes are in byte.
+ *
+ * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
+ * as the total amount of data to send and the msg_size. This
+ * was changed for the fool wanting to send more than MAXINT
+ * bytes in a fat pipe.
+ *
*/
void amok_bw_test(gras_socket_t peer,
unsigned long int buf_size,
- unsigned long int exp_size,
unsigned long int msg_size,
+ unsigned long int msg_amount,
double min_duration,
/*OUT*/ double *sec, double *bw) {
bw_request_t request,request_ack;
xbt_ex_t e;
int first_pass;
- int nb_messages = (exp_size % msg_size == 0) ?
- (exp_size / msg_size) : (exp_size / msg_size + 1);
for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
TRY {
request=xbt_new0(s_bw_request_t,1);
request->buf_size=buf_size;
- request->exp_size=msg_size * nb_messages;
request->msg_size=msg_size;
+ request->msg_amount=msg_amount;
request->peer.name = NULL;
request->peer.port = gras_socket_my_port(measMasterIn);
DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)",
RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
gras_socket_peer_name(peer),request_ack->peer.port);
}
- DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
- request->exp_size, request->msg_size);
+ DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
+ request->msg_size, request->msg_amount);
*sec = 0;
first_pass = 1;
/* Do not do too large experiments messages or the sensors
will start to swap to store one of them.
- And then increase the number of messages to compensate */
+ And then increase the number of messages to compensate (check for overflow there, too) */
if (request->msg_size > 64*1024*1024) {
- nb_messages = ( (request->msg_size / ((double)64*1024*1024))
- * nb_messages ) + 1;
- request->msg_size = 64*1024*1024;
+ unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024))
+ * request->msg_amount ) + 1;
+
+ xbt_assert0(new_amount > request->msg_amount,
+ "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
+ request->msg_amount = new_amount;
+
+ request->msg_size = 64*1024*1024;
}
- VERB6("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%lu msg_size=%lu (nb_messages=%d) (got %fkb/s)",
+ VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
meas_duration, min_duration,
- request->exp_size, request->msg_size, nb_messages,
- ((double)request->exp_size) / *sec/1024);
-
- xbt_assert0(request->exp_size > request->msg_size * nb_messages,
- "Overflow on the experiment size! You must have a *really* fat pipe. Please fix your platform");
- request->exp_size = request->msg_size * nb_messages;
-
+ request->msg_size, request->msg_amount,
+ ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
}
first_pass = 0;
*sec=gras_os_time();
TRY {
- gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
+ gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
DEBUG0("Data sent. Wait ACK");
gras_socket_meas_recv(measIn,120,1,1);
} CATCH(e) {
RETHROW0("Unable to conduct the experiment: %s");
}
*sec = gras_os_time() - *sec;
- if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
+ if (*sec != 0.0) {
+ *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
+ }
DEBUG1("Experiment done ; it took %f sec", *sec);
if (*sec <= 0) {
CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
opens a server measurement socket,
indicate its port in an "BW handshaked" message,
receive the corresponding data on the measurement socket,
- close the measurment socket
+ close the measurement socket
sizes are in byte
*/
gras_msg_cb_ctx_t ctx_reask;
static xbt_dynar_t msgtwaited=NULL;
- DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
+ DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
gras_socket_peer_name(expeditor),request->peer.port,
- request->buf_size,request->exp_size,request->msg_size);
+ request->buf_size,request->msg_size, request->msg_amount);
/* Build our answer */
answer = xbt_new0(s_bw_request_t,1);
}
answer->buf_size=request->buf_size;
- answer->exp_size=request->exp_size;
answer->msg_size=request->msg_size;
+ answer->msg_amount=request->msg_amount;
answer->peer.port=gras_socket_my_port(measMasterIn);
TRY {
TRY {
measIn = gras_socket_meas_accept(measMasterIn);
- DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
- answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
+ DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
+ answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
} CATCH(e) {
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
void *payload;
int msggot;
TRY {
- gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
+ gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
gras_socket_meas_send(measOut,120,1,1);
} CATCH(e) {
gras_socket_close(measMasterIn);
* give a measurement socket here. The needed measurement sockets will be created
* automatically and negociated between the peers)
* \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
- * \arg exp_size: Total size of data sent across the network
- * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
+ * \arg msg_size: Size of each message sent.
+ * \arg msg_amount: Amount of such data to exchange
* \arg sec: where the result (in seconds) should be stored.
* \arg bw: observed Bandwidth (in byte/s)
*
* Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
* This call is blocking until the end of the experiment.
*
+ * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
+ * as the total amount of data to send and the msg_size. This
+ * was changed for the fool wanting to send more than MAXINT
+ * bytes in a fat pipe.
+ *
* Results are reported in last args, and sizes are in bytes.
*/
void amok_bw_request(const char* from_name,unsigned int from_port,
const char* to_name,unsigned int to_port,
unsigned long int buf_size,
- unsigned long int exp_size,
unsigned long int msg_size,
+ unsigned long int msg_amount,
double min_duration,
/*OUT*/ double *sec, double*bw) {
bw_res_t result;
request=xbt_new0(s_bw_request_t,1);
request->buf_size=buf_size;
- request->exp_size=exp_size;
request->msg_size=msg_size;
+ request->msg_amount=msg_amount;
request->min_duration = min_duration;
request->peer.name,request->peer.port);
peer = gras_socket_client(request->peer.name,request->peer.port);
amok_bw_test(peer,
- request->buf_size,request->exp_size,request->msg_size,
+ request->buf_size,request->msg_size,request->msg_amount,
request->min_duration,
&(result->sec),&(result->bw));
return 1;
}
-/** \brief builds a matrix of results of bandwidth measurement */
+/** \brief builds a matrix of results of bandwidth measurement
+ *
+ * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
+ * as the total amount of data to send and the msg_size. This
+ * was changed for the fool wanting to send more than MAXINT
+ * bytes in a fat pipe.
+ */
double * amok_bw_matrix(xbt_dynar_t peers,
- int buf_size_bw, int exp_size_bw, int msg_size_bw,
+ int buf_size_bw, int msg_size_bw, int msg_amount_bw,
double min_duration) {
double sec;
/* construction of matrices for bandwith and latency */
if (i!=j) {
/* Mesurements of Bandwidth */
amok_bw_request(p1->name,p1->port,p2->name,p2->port,
- buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
+ buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
&sec,&matrix_res[i*len + j]);
}
}
typedef struct {
s_xbt_peer_t peer; /* peer+raw socket to use */
unsigned long int buf_size;
- unsigned long int exp_size;
unsigned long int msg_size;
+ unsigned long int msg_amount;
double min_duration;
} s_bw_request_t,*bw_request_t;
*
* @param peer measurement socket to use for the experiment
* @param timeout timeout (in seconds)
- * @param exp_size total amount of data to send (in bytes).
* @param msg_size size of each chunk sent over the socket (in bytes).
+ * @param msg_amount how many of these packets you want to send.
*
* Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
* each side of the socket should be paired.
* The exchanged data is zeroed to make sure it's initialized, but
* there is no way to control what is sent (ie, you cannot use these
* functions to exchange data out of band).
+ *
+ * @warning: in SimGrid version 3.1 and previous, the numerical arguments
+ * were the total amount of data to send and the msg_size. This
+ * was changed for the fool wanting to send more than MAXINT
+ * bytes in a fat pipe.
*/
void gras_socket_meas_send(gras_socket_t peer,
unsigned int timeout,
- unsigned long int exp_size,
- unsigned long int msg_size) {
+ unsigned long int msg_size,
+ unsigned long int msg_amount) {
char *chunk=NULL;
- unsigned long int exp_sofar;
- unsigned long int chunk_size = 0;
+ unsigned long int sent_sofar;
XBT_IN;
xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
xbt_assert0(peer->outgoing,"Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
- for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += chunk_size) {
- chunk_size = exp_sofar + msg_size > exp_size ?
- exp_size - exp_sofar : msg_size;
- CDEBUG6(gras_trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d, sending %lu",
- exp_sofar,exp_size,msg_size,
- gras_socket_peer_name(peer), gras_socket_peer_port(peer),
- chunk_size);
- (*peer->plugin->raw_send)(peer,chunk,chunk_size);
+ for (sent_sofar=0; sent_sofar < msg_amount; sent_sofar++) {
+ CDEBUG5(gras_trp_meas,"Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
+ sent_sofar,msg_amount,msg_size,
+ gras_socket_peer_name(peer), gras_socket_peer_port(peer));
+ (*peer->plugin->raw_send)(peer,chunk,msg_size);
}
- CDEBUG5(gras_trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d",
- exp_sofar,exp_size,msg_size,
+ CDEBUG5(gras_trp_meas,"Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
+ sent_sofar,msg_amount,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
if (gras_if_RL())
*
* Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
* each side of the socket should be paired.
+ *
+ * @warning: in SimGrid version 3.1 and previous, the numerical arguments
+ * were the total amount of data to send and the msg_size. This
+ * was changed for the fool wanting to send more than MAXINT
+ * bytes in a fat pipe.
*/
void gras_socket_meas_recv(gras_socket_t peer,
unsigned int timeout,
- unsigned long int exp_size,
- unsigned long int msg_size){
+ unsigned long int msg_size,
+ unsigned long int msg_amount){
char *chunk=NULL;
- unsigned long int exp_sofar;
- unsigned long int chunk_size = 0;
+ unsigned long int got_sofar;
XBT_IN;
"Asked to receive measurement data on a regular socket");
xbt_assert0(peer->incoming,"Socket not suited for data receive");
- for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += chunk_size) {
- chunk_size = exp_sofar + msg_size > exp_size ?
- exp_size - exp_sofar : msg_size;
- CDEBUG6(gras_trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d, receiving %lu",
- exp_sofar,exp_size,msg_size,
- gras_socket_peer_name(peer), gras_socket_peer_port(peer),
- chunk_size);
- (peer->plugin->raw_recv)(peer,chunk,chunk_size);
+ for (got_sofar=0; got_sofar < msg_amount; got_sofar ++) {
+ CDEBUG5(gras_trp_meas,"Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
+ got_sofar,msg_amount,msg_size,
+ gras_socket_peer_name(peer), gras_socket_peer_port(peer));
+ (peer->plugin->raw_recv)(peer,chunk,msg_size);
}
- CDEBUG5(gras_trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d",
- exp_sofar,exp_size,msg_size,
+ CDEBUG5(gras_trp_meas,"Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
+ got_sofar,msg_amount,msg_size,
gras_socket_peer_name(peer), gras_socket_peer_port(peer));
if (gras_if_RL())