Message* message = nullptr; // current message being received
public:
explicit Peer(std::vector<std::string> args);
+ Peer(const Peer&) = delete;
~Peer();
void operator()();
smpi_bench_end();
if (request == nullptr) {
- retval = MPI_ERR_ARG;
+ retval = MPI_ERR_ARG;
} else if (comm == MPI_COMM_NULL) {
- retval = MPI_ERR_COMM;
+ retval = MPI_ERR_COMM;
} else if (not datatype->is_valid()) {
retval = MPI_ERR_TYPE;
} else if (dst == MPI_PROC_NULL) {
- retval = MPI_SUCCESS;
+ retval = MPI_SUCCESS;
} else {
- *request = simgrid::smpi::Request::send_init(buf, count, datatype, dst, tag, comm);
- retval = MPI_SUCCESS;
+ *request = simgrid::smpi::Request::send_init(buf, count, datatype, dst, tag, comm);
+ retval = MPI_SUCCESS;
}
smpi_bench_begin();
if (retval != MPI_SUCCESS && request != nullptr)
return retval;
}
-int PMPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void *recvbuf,
- int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status * status)
+int PMPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf,
+ int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status)
{
int retval = 0;
retval = MPI_ERR_TAG;
} else {
- int rank = comm != MPI_COMM_NULL ? smpi_process()->index() : -1;
- int dst_traced = comm->group()->index(dst);
- int src_traced = comm->group()->index(src);
- instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
- extra->type = TRACING_SENDRECV;
- extra->src = src_traced;
- extra->dst = dst_traced;
- int known=0;
- extra->datatype1 = encode_datatype(sendtype, &known);
- int dt_size_send = 1;
- if(known==0)
- dt_size_send = sendtype->size();
- extra->send_size = sendcount*dt_size_send;
- extra->datatype2 = encode_datatype(recvtype, &known);
- int dt_size_recv = 1;
- if(known==0)
- dt_size_recv = recvtype->size();
- extra->recv_size = recvcount*dt_size_recv;
-
- TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
- TRACE_smpi_send(rank, rank, dst_traced, sendtag,sendcount*sendtype->size());
-
- simgrid::smpi::Request::sendrecv(sendbuf, sendcount, sendtype, dst, sendtag, recvbuf, recvcount, recvtype, src, recvtag, comm,
- status);
- retval = MPI_SUCCESS;
-
- TRACE_smpi_ptp_out(rank, dst_traced, __FUNCTION__);
- TRACE_smpi_recv(src_traced, rank, recvtag);
+ int rank = comm != MPI_COMM_NULL ? smpi_process()->index() : -1;
+ int dst_traced = comm->group()->index(dst);
+ int src_traced = comm->group()->index(src);
+ instr_extra_data extra = xbt_new0(s_instr_extra_data_t, 1);
+ extra->type = TRACING_SENDRECV;
+ extra->src = src_traced;
+ extra->dst = dst_traced;
+ int known = 0;
+ extra->datatype1 = encode_datatype(sendtype, &known);
+ int dt_size_send = 1;
+ if (known == 0)
+ dt_size_send = sendtype->size();
+ extra->send_size = sendcount * dt_size_send;
+ extra->datatype2 = encode_datatype(recvtype, &known);
+ int dt_size_recv = 1;
+ if (known == 0)
+ dt_size_recv = recvtype->size();
+ extra->recv_size = recvcount * dt_size_recv;
+
+ TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
+ TRACE_smpi_send(rank, rank, dst_traced, sendtag, sendcount * sendtype->size());
+
+ simgrid::smpi::Request::sendrecv(sendbuf, sendcount, sendtype, dst, sendtag, recvbuf, recvcount, recvtype, src,
+ recvtag, comm, status);
+ retval = MPI_SUCCESS;
+
+ TRACE_smpi_ptp_out(rank, dst_traced, __FUNCTION__);
+ TRACE_smpi_recv(src_traced, rank, recvtag);
}
smpi_bench_begin();
void* recvbuf = xbt_new0(char, size);
retval = MPI_Sendrecv(buf, count, datatype, dst, sendtag, recvbuf, count, datatype, src, recvtag, comm, status);
if(retval==MPI_SUCCESS){
- simgrid::smpi::Datatype::copy(recvbuf, count, datatype, buf, count, datatype);
+ simgrid::smpi::Datatype::copy(recvbuf, count, datatype, buf, count, datatype);
}
xbt_free(recvbuf);
TRACE_smpi_ptp_out(rank, dst_traced, __FUNCTION__);
if (is_wait_for_receive) {
if(src_traced==MPI_ANY_SOURCE)
- src_traced = (status!=MPI_STATUS_IGNORE) ?
- comm->group()->rank(status->MPI_SOURCE) :
- src_traced;
+ src_traced = (status != MPI_STATUS_IGNORE) ? comm->group()->rank(status->MPI_SOURCE) : src_traced;
TRACE_smpi_recv(src_traced, dst_traced, tag_traced);
}
}
int is_wait_for_receive = savedvals[*index].recv;
if (is_wait_for_receive) {
if(savedvals[*index].src==MPI_ANY_SOURCE)
- src_traced = (status != MPI_STATUSES_IGNORE)
- ? savedvals[*index].comm->group()->rank(status->MPI_SOURCE)
- : savedvals[*index].src;
+ src_traced = (status != MPI_STATUSES_IGNORE) ? savedvals[*index].comm->group()->rank(status->MPI_SOURCE)
+ : savedvals[*index].src;
TRACE_smpi_recv(src_traced, dst_traced, savedvals[*index].tag);
}
TRACE_smpi_ptp_out(rank_traced, dst_traced, __FUNCTION__);
for (int i = 0; i < count; i++) {
if(savedvals[i].valid){
- //the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
+ // the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
int src_traced = savedvals[i].src;
int dst_traced = savedvals[i].dst;
int is_wait_for_receive = savedvals[i].recv;
if (is_wait_for_receive) {
if(src_traced==MPI_ANY_SOURCE)
- src_traced = (status!=MPI_STATUSES_IGNORE) ?
- savedvals[i].comm->group()->rank(status[i].MPI_SOURCE) : savedvals[i].src;
+ src_traced = (status != MPI_STATUSES_IGNORE) ? savedvals[i].comm->group()->rank(status[i].MPI_SOURCE)
+ : savedvals[i].src;
TRACE_smpi_recv(src_traced, dst_traced,savedvals[i].tag);
}
}
std::fprintf(stderr, "{");
for (auto elt : vec) {
std::fprintf(stderr, "(0x%zx, 0x%zx),", elt.first, elt.second);
- }
- std::fprintf(stderr, "}\n");
+ }
+ std::fprintf(stderr, "}\n");
}
static void memcpy_private(void* dest, const void* src, std::vector<std::pair<size_t, size_t>>& private_blocks)
{
- for(auto block : private_blocks) {
+ for (auto block : private_blocks)
memcpy((uint8_t*)dest+block.first, (uint8_t*)src+block.first, block.second-block.first);
- }
}
static void check_blocks(std::vector<std::pair<size_t, size_t>> &private_blocks, size_t buff_size) {
- for(auto block : private_blocks) {
+ for (auto block : private_blocks)
xbt_assert(block.first <= block.second && block.second <= buff_size, "Oops, bug in shared malloc.");
- }
}
void smpi_comm_copy_buffer_callback(smx_activity_t synchro, void *buff, size_t buff_size)
//xbt_free(comm->comm.src_data);// inside SMPI the request is kept inside the user data and should be free
comm->src_buff = nullptr;
}
- if(tmpbuff!=buff)xbt_free(tmpbuff);
-
+ if (tmpbuff != buff)
+ xbt_free(tmpbuff);
}
void smpi_comm_null_copy_buffer_callback(smx_activity_t comm, void *buff, size_t buff_size)
std::string str = std::string(xbt_cfg_get_string("smpi/papi-events"));
Tokenizer tokens(str, separator_units);
- // Iterate over all the computational units. This could be
- // processes, hosts, threads, ranks... You name it. I'm not exactly
- // sure what we will support eventually, so I'll leave it at the
- // general term "units".
+ // Iterate over all the computational units. This could be processes, hosts, threads, ranks... You name it.
+ // I'm not exactly sure what we will support eventually, so I'll leave it at the general term "units".
for (auto& unit_it : tokens) {
boost::char_separator<char> separator_events(":");
Tokenizer event_tokens(unit_it, separator_events);
// Note that we need to remove the name of the unit
// (that could also be the "default" value), which always comes first.
// Hence, we start at ++(events.begin())!
- for (Tokenizer::iterator events_it = ++(event_tokens.begin()); events_it != event_tokens.end(); events_it++) {
+ for (Tokenizer::iterator events_it = ++(event_tokens.begin()); events_it != event_tokens.end(); ++events_it) {
int event_code = PAPI_NULL;
char* event_name = const_cast<char*>((*events_it).c_str());
* E --> F
* G --> H
*/
- for (Tokenizer::iterator token_iter = tokens.begin(); token_iter != tokens.end(); token_iter++) {
+ for (Tokenizer::iterator token_iter = tokens.begin(); token_iter != tokens.end(); ++token_iter) {
XBT_DEBUG("token : %s", token_iter->c_str());
Tokenizer factor_values(*token_iter, factor_separator);
s_smpi_factor_t fact;
xbt_die("Malformed radical for smpi factor: '%s'", smpi_coef_string);
}
unsigned int iteration = 0;
- for (Tokenizer::iterator factor_iter = factor_values.begin(); factor_iter != factor_values.end(); factor_iter++) {
+ for (Tokenizer::iterator factor_iter = factor_values.begin(); factor_iter != factor_values.end(); ++factor_iter) {
iteration++;
if (factor_iter == factor_values.begin()) { /* first element */
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
if(op==MPI_OP_NULL){
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src, dst, tag,
- comm, RMA | NON_PERSISTENT | RECV | PREPARED);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, src, dst, tag, comm,
+ RMA | NON_PERSISTENT | RECV | PREPARED);
}else{
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src, dst, tag,
- comm, RMA | NON_PERSISTENT | RECV | PREPARED | ACCUMULATE);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, src, dst, tag, comm,
+ RMA | NON_PERSISTENT | RECV | PREPARED | ACCUMULATE);
request->op_ = op;
}
return request;
MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
- return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
- comm->group()->index(src), smpi_process()->index(), tag,
- comm, PERSISTENT | RECV | PREPARED);
+ return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
+ src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src), smpi_process()->index(), tag,
+ comm, PERSISTENT | RECV | PREPARED);
}
MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
- comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SEND);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SEND);
request->start();
return request;
}
MPI_Request Request::issend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
- comm->group()->index(dst), tag,comm, NON_PERSISTENT | ISEND | SSEND | SEND);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SSEND | SEND);
request->start();
return request;
}
MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
- comm->group()->index(src), smpi_process()->index(), tag, comm,
- NON_PERSISTENT | RECV);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
+ src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src), smpi_process()->index(),
+ tag, comm, NON_PERSISTENT | RECV);
request->start();
return request;
}
void Request::send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
- comm->group()->index(dst), tag, comm, NON_PERSISTENT | SEND);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | SEND);
request->start();
wait(&request, MPI_STATUS_IGNORE);
void Request::ssend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
- comm->group()->index(dst), tag, comm, NON_PERSISTENT | SSEND | SEND);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
+ comm->group()->index(dst), tag, comm, NON_PERSISTENT | SSEND | SEND);
request->start();
wait(&request,MPI_STATUS_IGNORE);
std::map<std::string, sg_size_t>* parse_content = new std::map<std::string, sg_size_t>();
- std::ifstream* fs = surf_ifsopen(filename.c_str());
+ std::ifstream* fs = surf_ifsopen(filename);
std::string line;
std::vector<std::string> tokens;