static void peer_forward_msg(peer_t peer, message_t msg)
{
msg_task_t task = task_message_data_new(NULL, msg->data_length);
- msg_comm_t comm = NULL;
XBT_DEBUG("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
- comm = MSG_task_isend(task, peer->next);
+ msg_comm_t comm = MSG_task_isend(task, peer->next);
queue_pending_connection(comm, peer->pending_sends);
}
}
}
- // send the SUCCESSOR_LEAVING to our predecessor
- ChordMessage* succ_msg = new ChordMessage(SUCCESSOR_LEAVING);
- succ_msg->request_id = fingers_[0];
- succ_msg->answer_to = mailbox_;
- XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
-
- try {
- simgrid::s4u::this_actor::send(simgrid::s4u::Mailbox::byName(std::to_string(pred_id_)), succ_msg, 10, timeout);
- } catch (xbt_ex& e) {
- if (e.category == timeout_error) {
- XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
- delete succ_msg;
+ if (pred_id_ != -1) {
+ // send the SUCCESSOR_LEAVING to our predecessor (only if I have one)
+ ChordMessage* succ_msg = new ChordMessage(SUCCESSOR_LEAVING);
+ succ_msg->request_id = fingers_[0];
+ succ_msg->answer_to = mailbox_;
+ XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
+
+ try {
+ simgrid::s4u::this_actor::send(simgrid::s4u::Mailbox::byName(std::to_string(pred_id_)), succ_msg, 10, timeout);
+ } catch (xbt_ex& e) {
+ if (e.category == timeout_error) {
+ XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
+ delete succ_msg;
+ }
}
}
}
try {
comm->wait(timeout);
XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
- delete message;
} catch (xbt_ex& e) {
if (e.category == timeout_error) {
XBT_DEBUG("Failed to receive the answer to my 'Predecessor Alive' request");
pred_id_ = -1;
}
}
+ delete message;
}
/* Asks its predecessor to a remote node
double next_fix_fingers_date = start_time_ + PERIODIC_FIX_FINGERS_DELAY;
double next_check_predecessor_date = start_time_ + PERIODIC_CHECK_PREDECESSOR_DELAY;
double next_lookup_date = start_time_ + PERIODIC_LOOKUP_DELAY;
-
+ simgrid::s4u::CommPtr comm_receive = nullptr;
while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME) {
- data = nullptr;
- simgrid::s4u::CommPtr comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data);
+ if (comm_receive == nullptr)
+ comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data);
while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME && not comm_receive->test()) {
// no task was received: make some periodic calls
if (now >= next_stabilize_date) {
if (data != nullptr) {
ChordMessage* message = static_cast<ChordMessage*>(data);
handleMessage(message);
- } else {
- comm_receive->cancel();
+ comm_receive = nullptr;
+ data = nullptr;
}
now = simgrid::s4u::Engine::getClock();
}
xbt_die("Cannot open %s", argv[1]);
char *line = NULL;
size_t n = 0;
- int instance_size = 0;
const char* instance_id = NULL;
while (xbt_getline(&line, &n, fp) != -1 ){
xbt_dynar_t elems = xbt_str_split_quoted_in_place(line);
const char** line_char= xbt_dynar_to_array(elems);
instance_id = line_char[0];
- instance_size = xbt_str_parse_int(line_char[2], "Invalid size: %s");
+ int instance_size = xbt_str_parse_int(line_char[2], "Invalid size: %s");
XBT_INFO("Initializing instance %s of size %d", instance_id, instance_size);
SMPI_app_instance_register(instance_id, smpi_replay,instance_size);
friend void intrusive_ptr_add_ref(Comm * c);
protected:
- Activity();
- virtual ~Activity();
+ Activity() = default;
+ ~Activity() = default;
public:
Activity(Activity const&) = delete;
#include <simgrid/forward.h>
#include <simgrid/s4u/Activity.hpp>
#include <simgrid/s4u/forward.hpp>
-
namespace simgrid {
namespace s4u {
/** @brief Communication async
void ref();
/** @brief Reduces the refcount */
void unref();
- // boost::intrusive_ptr<Activity> support:
+
+ // boost::intrusive_ptr<Activity> support:
friend void intrusive_ptr_add_ref(ActivityImpl * activity);
friend void intrusive_ptr_release(ActivityImpl * activity);
namespace simgrid {
namespace s4u {
-Activity::Activity() {
-
-}
-Activity::~Activity() {
-
-}
-
void Activity::setRemains(double remains) {
xbt_assert(state_ == inited, "Cannot change the remaining amount of work once the Activity is started");
remains_ = remains;
{
return Comm::send_async(chan, payload, simulatedSize);
}
+
void dsend(MailboxPtr chan, void* payload, double simulatedSize)
{
Comm::send_detached(chan, payload, simulatedSize);
}
}
state_ = finished;
+ if (pimpl_)
+ pimpl_->unref();
}
+
void Comm::wait(double timeout) {
xbt_assert(state_ == started || state_ == inited);
if (state_ == started) {
simcall_comm_wait(pimpl_, timeout);
state_ = finished;
+ pimpl_->unref();
return;
}
userData_, timeout, rate_);
}
state_ = finished;
+ if (pimpl_)
+ pimpl_->unref();
}
void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize)
res->detached_ = true;
res->start();
}
+
s4u::CommPtr Comm::send_async(MailboxPtr dest, void* data, int simulatedSize)
{
s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
simgrid::kernel::activity::CommImpl* commPimpl = static_cast<simgrid::kernel::activity::CommImpl*>(pimpl_);
commPimpl->cancel();
}
+
bool Comm::test() {
xbt_assert(state_ == inited || state_ == started || state_ == finished);
if(simcall_comm_test(pimpl_)){
state_ = finished;
+ pimpl_->unref();
return true;
}
return false;
*/
void SIMIX_process_cleanup(smx_actor_t process)
{
- XBT_DEBUG("Cleanup process %s (%p), waiting synchro %p",
- process->name.c_str(), process, process->waiting_synchro);
+ XBT_DEBUG("Cleanup process %s (%p), waiting synchro %p", process->name.c_str(), process, process->waiting_synchro);
process->finished = true;
SIMIX_process_on_exit_runall(process);
/* Unregister from the kill timer if any */
if (process->kill_timer != nullptr)
- SIMIX_timer_remove(process->kill_timer);
+ SIMIX_timer_remove(process->kill_timer);
xbt_os_mutex_acquire(simix_global->mutex);
/* make sure no one will finish the comm after this process is destroyed,
* because src_proc or dst_proc would be an invalid pointer */
- comm->cancel();
if (comm->src_proc == process) {
XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d, src = %p, dst = %p",
}
process->comms.pop_front();
synchro = static_cast<smx_activity_t>(process->comms.front());
+ comm->cancel();
}
XBT_DEBUG("%p should not be run anymore",process);
other_comm->state = SIMIX_READY;
other_comm->type = SIMIX_COMM_READY;
-
}
src_proc->comms.push_back(other_comm);