Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Another step toward working CommPtr. chord example is broken ATM
[simgrid.git] / src / s4u / s4u_comm.cpp
1 /* Copyright (c) 2006-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "xbt/log.h"
7 #include "src/msg/msg_private.h"
8
9 #include "simgrid/s4u/Comm.hpp"
10 #include "simgrid/s4u/Mailbox.hpp"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm,s4u_activity,"S4U asynchronous communications");
13
14 namespace simgrid {
15 namespace s4u {
16 Comm::~Comm()
17 {
18   if (state_ == started && not detached_ && (pimpl_ == nullptr || pimpl_->state == SIMIX_RUNNING)) {
19     XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, state_);
20     if (pimpl_ != nullptr)
21       XBT_INFO("pimpl_->state: %d", pimpl_->state);
22     else
23       XBT_INFO("pimpl_ is null");
24     xbt_backtrace_display_current();
25   }
26   if (pimpl_)
27     pimpl_->unref();
28 }
29
30 s4u::CommPtr Comm::send_init(s4u::MailboxPtr chan)
31 {
32   CommPtr res   = CommPtr(new s4u::Comm());
33   res->sender_ = SIMIX_process_self();
34   res->mailbox_ = chan;
35   return res;
36 }
37
38 s4u::CommPtr Comm::recv_init(s4u::MailboxPtr chan)
39 {
40   CommPtr res    = CommPtr(new s4u::Comm());
41   res->receiver_ = SIMIX_process_self();
42   res->mailbox_ = chan;
43   return res;
44 }
45
46 void Comm::setRate(double rate) {
47   xbt_assert(state_==inited);
48   rate_ = rate;
49 }
50
51 void Comm::setSrcData(void * buff) {
52   xbt_assert(state_==inited);
53   xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
54   srcBuff_ = buff;
55 }
56 void Comm::setSrcDataSize(size_t size){
57   xbt_assert(state_==inited);
58   srcBuffSize_ = size;
59 }
60 void Comm::setSrcData(void * buff, size_t size) {
61   xbt_assert(state_==inited);
62
63   xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
64   srcBuff_ = buff;
65   srcBuffSize_ = size;
66 }
67 void Comm::setDstData(void ** buff) {
68   xbt_assert(state_==inited);
69   xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
70   dstBuff_ = buff;
71 }
72 size_t Comm::getDstDataSize(){
73   xbt_assert(state_==finished);
74   return dstBuffSize_;
75 }
76 void Comm::setDstData(void ** buff, size_t size) {
77   xbt_assert(state_==inited);
78
79   xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
80   dstBuff_ = buff;
81   dstBuffSize_ = size;
82 }
83
84 void Comm::start() {
85   xbt_assert(state_ == inited);
86
87   if (srcBuff_ != nullptr) { // Sender side
88     pimpl_ = simcall_comm_isend(sender_, mailbox_->getImpl(), remains_, rate_,
89         srcBuff_, srcBuffSize_,
90         matchFunction_, cleanFunction_, copyDataFunction_,
91         userData_, detached_);
92   } else if (dstBuff_ != nullptr) { // Receiver side
93     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
94         matchFunction_, copyDataFunction_,
95         userData_, rate_);
96
97   } else {
98     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
99   }
100   while (refcount_ > 1) { // Pass all the refcounts we had to the underlying pimpl since we are delegating the
101                           // refcounting to it afterward
102     refcount_--;
103     pimpl_->ref();
104   }
105   state_ = started;
106 }
107 void Comm::wait() {
108   xbt_assert(state_ == started || state_ == inited);
109
110   if (state_ == started)
111     simcall_comm_wait(pimpl_, -1/*timeout*/);
112   else { // state_ == inited. Save a simcall and do directly a blocking send/recv
113     if (srcBuff_ != nullptr) {
114       simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
115           srcBuff_, srcBuffSize_,
116           matchFunction_, copyDataFunction_,
117           userData_, -1 /*timeout*/);
118     } else {
119       simcall_comm_recv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
120           matchFunction_, copyDataFunction_,
121           userData_, -1/*timeout*/, rate_);
122     }
123   }
124   state_ = finished;
125 }
126 void Comm::wait(double timeout) {
127   xbt_assert(state_ == started || state_ == inited);
128
129   if (state_ == started) {
130     simcall_comm_wait(pimpl_, timeout);
131     state_ = finished;
132     return;
133   }
134
135   // It's not started yet. Do it in one simcall
136   if (srcBuff_ != nullptr) {
137     simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
138         srcBuff_, srcBuffSize_,
139         matchFunction_, copyDataFunction_,
140         userData_, timeout);
141   } else { // Receiver
142     simcall_comm_recv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
143         matchFunction_, copyDataFunction_,
144         userData_, timeout, rate_);
145   }
146   state_ = finished;
147 }
148
149 void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize)
150 {
151   s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
152   res->setRemains(simulatedSize);
153   res->srcBuff_     = data;
154   res->srcBuffSize_ = sizeof(void*);
155   res->detached_    = true;
156   res->start();
157 }
158 s4u::CommPtr Comm::send_async(MailboxPtr dest, void* data, int simulatedSize)
159 {
160   s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
161   res->setRemains(simulatedSize);
162   res->srcBuff_     = data;
163   res->srcBuffSize_ = sizeof(void*);
164   res->start();
165   return res;
166 }
167
168 s4u::CommPtr Comm::recv_async(MailboxPtr dest, void** data)
169 {
170   s4u::CommPtr res = CommPtr(s4u::Comm::recv_init(dest));
171   res->setDstData(data, sizeof(*data));
172   res->start();
173   return res;
174 }
175
176 void Comm::cancel()
177 {
178   simgrid::kernel::activity::Comm* commPimpl = static_cast<simgrid::kernel::activity::Comm*>(pimpl_);
179   commPimpl->cancel();
180 }
181 bool Comm::test() {
182   xbt_assert(state_ == inited || state_ == started || state_ == finished);
183   
184   if (state_ == finished) 
185     xbt_die("Don't call test on a finished comm.");
186   
187   if (state_ == inited) {
188     this->start();
189   }
190   
191   if(simcall_comm_test(pimpl_)){
192     state_ = finished;
193     return true;
194   }
195   return false;
196 }
197
198 void intrusive_ptr_release(simgrid::s4u::Comm* c)
199 {
200   if (c->pimpl_ != nullptr) {
201     if (c->pimpl_->unref()) {
202       c->pimpl_ = nullptr;
203       delete c;
204     }
205   } else if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
206     std::atomic_thread_fence(std::memory_order_acquire);
207     delete c;
208   }
209 }
210 void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
211 {
212   if (c->pimpl_ != nullptr) {
213     c->pimpl_->ref();
214   } else {
215     c->refcount_.fetch_add(1, std::memory_order_relaxed);
216   }
217 }
218 }
219 } // namespaces