sendHandshakeToAllPeers();
XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
- void* data = nullptr;
while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
if (comm_received == nullptr) {
- comm_received = mailbox_->get_async(&data);
+ comm_received = mailbox_->get_async<Message>(&message);
}
if (comm_received->test()) {
- message = static_cast<Message*>(data);
handleMessage();
delete message;
comm_received = nullptr;
double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
XBT_DEBUG("Start seeding.");
// start the main seed loop
- void* data = nullptr;
while (simgrid::s4u::Engine::get_clock() < deadline) {
if (comm_received == nullptr) {
- comm_received = mailbox_->get_async(&data);
+ comm_received = mailbox_->get_async<Message>(&message);
}
if (comm_received->test()) {
- message = static_cast<Message*>(data);
handleMessage();
delete message;
comm_received = nullptr;
void Tracker::operator()()
{
simgrid::s4u::CommPtr comm = nullptr;
- void* received = nullptr;
+ TrackerQuery* query = nullptr;
while (simgrid::s4u::Engine::get_clock() < deadline) {
if (comm == nullptr)
- comm = mailbox->get_async(&received);
+ comm = mailbox->get_async<TrackerQuery>(&query);
if (comm->test()) {
// Retrieve the data sent by the peer.
- xbt_assert(received != nullptr);
- auto* query = static_cast<TrackerQuery*>(received);
+ xbt_assert(query != nullptr);
// Add the peer to our peer list, if not already known.
if (known_peers.find(query->getPeerId()) == known_peers.end()) {
void forwardFile()
{
- void* received;
+ FilePiece* received;
bool done = false;
while (not done) {
- simgrid::s4u::CommPtr comm = me->get_async(&received);
+ simgrid::s4u::CommPtr comm = me->get_async<FilePiece>(&received);
pending_recvs.push_back(comm);
int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
pending_sends.push_back(send);
} else
- delete static_cast<FilePiece*>(received);
+ delete received;
received_pieces++;
received_bytes += PIECE_SIZE;
XBT_INFO("Wait for my first message");
for (bool cont = true; cont;) {
- void* payload;
- simgrid::s4u::CommPtr comm = mbox->get_async(&payload);
+ std::string* received;
+ simgrid::s4u::CommPtr comm = mbox->get_async<std::string>(&received);
if (sleep_test_time > 0) { /* - "test_time" is set to 0, wait */
while (not comm->test()) { /* - Call test() every "sleep_test_time" otherwise */
comm->wait();
}
- const auto* received = static_cast<std::string*>(payload);
XBT_INFO("I got a '%s'.", received->c_str());
if (*received == "finalize")
cont = false; // If it's a finalize message, we're done.
void Node::checkPredecessor()
{
XBT_DEBUG("Checking whether my predecessor is alive");
- void* data = nullptr;
if (pred_id_ == -1)
return;
// receive the answer
XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_,
message->answer_to->get_cname());
- simgrid::s4u::CommPtr comm = return_mailbox->get_async(&data);
+ ChordMessage* answer = nullptr;
+ simgrid::s4u::CommPtr comm = return_mailbox->get_async<ChordMessage>(&answer);
try {
comm->wait_for(timeout);
XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
- delete static_cast<ChordMessage*>(data);
+ delete answer;
} catch (const simgrid::TimeoutException&) {
XBT_DEBUG("Failed to receive the answer to my 'Predecessor Alive' request");
pred_id_ = -1;
int Node::remoteGetPredecessor(int ask_to)
{
int predecessor_id = -1;
- void* data = nullptr;
simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(ask_to));
simgrid::s4u::Mailbox* return_mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_) + "_pred");
// receive the answer
XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to,
message->answer_to->get_cname());
- simgrid::s4u::CommPtr comm = return_mailbox->get_async(&data);
+ ChordMessage* answer = nullptr;
+ simgrid::s4u::CommPtr comm = return_mailbox->get_async<ChordMessage>(&answer);
try {
comm->wait_for(timeout);
- const auto* answer = static_cast<ChordMessage*>(data);
XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to,
answer->answer_id);
predecessor_id = answer->answer_id;
delete answer;
} catch (const simgrid::TimeoutException&) {
XBT_DEBUG("Failed to receive the answer to my 'Get Predecessor' request");
- delete static_cast<ChordMessage*>(data);
+ delete answer;
}
return predecessor_id;
int Node::remoteFindSuccessor(int ask_to, int id)
{
int successor = -1;
- ChordMessage* data = nullptr;
simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(ask_to));
simgrid::s4u::Mailbox* return_mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_) + "_succ");
}
// receive the answer
XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
- simgrid::s4u::CommPtr comm = return_mailbox->get_async(reinterpret_cast<void**>(&data));
+ ChordMessage* answer = nullptr;
+ simgrid::s4u::CommPtr comm = return_mailbox->get_async<ChordMessage>(&answer);
try {
comm->wait_for(timeout);
- const ChordMessage* answer = data;
XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d",
answer->request_id, id_, answer->answer_id);
successor = answer->answer_id;
delete answer;
} catch (const simgrid::TimeoutException&) {
XBT_DEBUG("Failed to receive the answer to my 'Find Successor' request");
- delete data;
+ delete answer;
}
return successor;
if (not joined)
return;
- void* data = nullptr;
+ ChordMessage* message = nullptr;
double now = simgrid::s4u::Engine::get_clock();
double next_stabilize_date = start_time_ + PERIODIC_STABILIZE_DELAY;
double next_fix_fingers_date = start_time_ + PERIODIC_FIX_FINGERS_DELAY;
simgrid::s4u::CommPtr comm_receive = nullptr;
while (now < std::min(start_time_ + deadline_, MAX_SIMULATION_TIME)) {
if (comm_receive == nullptr)
- comm_receive = mailbox_->get_async(&data);
+ comm_receive = mailbox_->get_async<ChordMessage>(&message);
bool comm_completed = true;
try {
if (not comm_receive->test())
}
if (comm_completed) {
- if (data != nullptr) {
- auto* message = static_cast<ChordMessage*>(data);
+ if (message != nullptr) {
handleMessage(message);
- data = nullptr;
+ message = nullptr;
}
comm_receive = nullptr;
} else {
if (comm_receive != nullptr) {
try {
if (comm_receive->test())
- delete static_cast<ChordMessage*>(data);
+ delete message;
else
comm_receive->cancel();
} catch (const simgrid::TimeoutException&) {
simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_));
do {
if (receive_comm == nullptr)
- receive_comm = mailbox->get_async(&received_msg);
+ receive_comm = mailbox->get_async<Message>(&received_msg);
if (receive_comm->test()) {
XBT_DEBUG("Received an answer from the node I know.");
got_answer = true;
// retrieve the node list and ping them.
- const auto* msg = static_cast<Message*>(received_msg);
- const Answer* node_list = msg->answer_.get();
+ const Answer* node_list = received_msg->answer_.get();
if (node_list) {
for (auto const& contact : node_list->getNodes())
routingTableUpdate(contact.first);
} else {
- handleFindNode(msg);
+ handleFindNode(received_msg);
}
- delete msg;
+ delete received_msg;
receive_comm = nullptr;
} else
simgrid::s4u::this_actor::sleep_for(1);
simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_));
do {
if (receive_comm == nullptr)
- receive_comm = mailbox->get_async(&received_msg);
+ receive_comm = mailbox->get_async<Message>(&received_msg);
if (receive_comm->test()) {
- const auto* msg = static_cast<Message*>(received_msg);
// Check if what we have received is what we are looking for.
- if (msg->answer_ && msg->answer_->getDestinationId() == id_to_find) {
- routingTableUpdate(msg->sender_id_);
+ if (received_msg->answer_ && received_msg->answer_->getDestinationId() == id_to_find) {
+ routingTableUpdate(received_msg->sender_id_);
// Handle the answer
for (auto const& contact : node_list->getNodes())
routingTableUpdate(contact.first);
answers++;
- nodes_added = node_list->merge(msg->answer_.get());
- XBT_DEBUG("Received an answer from %s (%s) with %zu nodes on it", msg->answer_to_->get_cname(),
- msg->issuer_host_name_.c_str(), msg->answer_->getSize());
+ nodes_added = node_list->merge(received_msg->answer_.get());
+ XBT_DEBUG("Received an answer from %s (%s) with %zu nodes on it", received_msg->answer_to_->get_cname(),
+ received_msg->issuer_host_name_.c_str(), received_msg->answer_->getSize());
} else {
- if (msg->answer_) {
- routingTableUpdate(msg->sender_id_);
+ if (received_msg->answer_) {
+ routingTableUpdate(received_msg->sender_id_);
XBT_DEBUG("Received a wrong answer for a FIND_NODE");
} else {
- handleFindNode(msg);
+ handleFindNode(received_msg);
}
// Update the timeout if we didn't have our answer
timeout += simgrid::s4u::Engine::get_clock() - time_beginreceive;
time_beginreceive = simgrid::s4u::Engine::get_clock();
}
- delete msg;
+ delete received_msg;
receive_comm = nullptr;
} else {
simgrid::s4u::this_actor::sleep_for(1);
unsigned int find_node_failed = 0; // Number of find_node which have failed.
public:
simgrid::s4u::CommPtr receive_comm = nullptr;
- void* received_msg = nullptr;
+ Message* received_msg = nullptr;
explicit Node(unsigned int node_id) : id_(node_id), table(node_id) {}
Node(const Node&) = delete;
Node& operator=(const Node&) = delete;
while (simgrid::s4u::Engine::get_clock() < deadline) {
if (node.receive_comm == nullptr)
- node.receive_comm = mailbox->get_async(&node.received_msg);
+ node.receive_comm = mailbox->get_async<kademlia::Message>(&node.received_msg);
if (node.receive_comm->test()) {
// There has been a message, we need to handle it !
- const auto* msg = static_cast<kademlia::Message*>(node.received_msg);
- if (msg) {
- node.handleFindNode(msg);
- delete msg;
+ if (node.received_msg) {
+ node.handleFindNode(node.received_msg);
+ delete node.received_msg;
node.receive_comm = nullptr;
} else
simgrid::s4u::this_actor::sleep_for(1);
char* res = mailbox->get<char>();
xbt_free(res);
} else {
- std::vector<void*> data(flow_amount);
+ std::vector<char*> data(flow_amount);
// Start all comms in parallel, and wait for their completion in one shot
std::vector<simgrid::s4u::CommPtr> comms;
for (int i = 0; i < flow_amount; i++)
- comms.push_back(mailbox->get_async(&data[i]));
+ comms.push_back(mailbox->get_async<char>(&data[i]));
simgrid::s4u::Comm::wait_all(&comms);
for (int i = 0; i < flow_amount; i++)
static void server()
{
- void* data1 = nullptr;
- void* data2 = nullptr;
- simgrid::s4u::CommPtr comm_received1 = simgrid::s4u::Mailbox::by_name("mymailbox")->get_async(&data1);
- simgrid::s4u::CommPtr comm_received2 = simgrid::s4u::Mailbox::by_name("mymailbox")->get_async(&data2);
+ int* data1 = nullptr;
+ int* data2 = nullptr;
+ simgrid::s4u::CommPtr comm_received1 = simgrid::s4u::Mailbox::by_name("mymailbox")->get_async<int>(&data1);
+ simgrid::s4u::CommPtr comm_received2 = simgrid::s4u::Mailbox::by_name("mymailbox")->get_async<int>(&data2);
comm_received1->wait();
comm_received2->wait();
XBT_INFO("OK");
- delete static_cast<int*>(data1);
- delete static_cast<int*>(data2);
+ delete data1;
+ delete data2;
}
static void client(int id)
simgrid::s4u::ActorPtr receiver = simgrid::s4u::Actor::create("receiver", all_hosts[1], []() {
assert_exit(true, 2);
int* data;
- simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async((void**)&data);
+ simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
std::vector<simgrid::s4u::CommPtr> pending_comms = {comm};
REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(&pending_comms));
});
static void receiver_fun()
{
XBT_INFO("Receiving");
- void* payload = nullptr;
- simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("Tremblay")->get_async(&payload);
+ std::string* payload = nullptr;
+ simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("Tremblay")->get_async<std::string>(&payload);
comm->wait();
xbt_assert(comm->get_sender(), "No sender received");
XBT_INFO("Got a message sent by '%s'", comm->get_sender()->get_cname());
simgrid::s4u::this_actor::sleep_for(2.0);
XBT_INFO("Did I tell you that I got a message sent by '%s'?", comm->get_sender()->get_cname());
- delete static_cast<std::string*>(payload);
+ delete payload;
}
int main(int argc, char* argv[])
auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
- int * out1;
- auto get1 = mbox->get_async((void**)&out1);
+ int* out1;
+ auto get1 = mbox->get_async<int>(&out1);
- int * out2;
- auto get2 = mbox->get_async((void**)&out2);
+ int* out2;
+ auto get2 = mbox->get_async<int>(&out2);
XBT_INFO("All comms have started");
std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};