Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
further WIP on the ActivityImplPtr feature. Now it compiles (but fails)
[simgrid.git] / src / s4u / s4u_comm.cpp
1 /* Copyright (c) 2006-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "xbt/log.h"
7 #include "src/msg/msg_private.h"
8
9 #include "simgrid/s4u/Comm.hpp"
10 #include "simgrid/s4u/Mailbox.hpp"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm,s4u_activity,"S4U asynchronous communications");
13
14 namespace simgrid {
15 namespace s4u {
16 Comm::~Comm()
17 {
18   if (state_ == started && not detached_ && (pimpl_ == nullptr || pimpl_->state == SIMIX_RUNNING)) {
19     XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, state_);
20     if (pimpl_ != nullptr)
21       XBT_INFO("pimpl_->state: %d", pimpl_->state);
22     else
23       XBT_INFO("pimpl_ is null");
24     xbt_backtrace_display_current();
25   }
26 }
27
28 s4u::CommPtr Comm::send_init(s4u::MailboxPtr chan)
29 {
30   CommPtr res   = CommPtr(new s4u::Comm());
31   res->sender_ = SIMIX_process_self();
32   res->mailbox_ = chan;
33   return res;
34 }
35
36 s4u::CommPtr Comm::recv_init(s4u::MailboxPtr chan)
37 {
38   CommPtr res    = CommPtr(new s4u::Comm());
39   res->receiver_ = SIMIX_process_self();
40   res->mailbox_ = chan;
41   return res;
42 }
43
44 void Comm::setRate(double rate) {
45   xbt_assert(state_==inited);
46   rate_ = rate;
47 }
48
49 void Comm::setSrcData(void * buff) {
50   xbt_assert(state_==inited);
51   xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
52   srcBuff_ = buff;
53 }
54 void Comm::setSrcDataSize(size_t size){
55   xbt_assert(state_==inited);
56   srcBuffSize_ = size;
57 }
58 void Comm::setSrcData(void * buff, size_t size) {
59   xbt_assert(state_==inited);
60
61   xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
62   srcBuff_ = buff;
63   srcBuffSize_ = size;
64 }
65 void Comm::setDstData(void ** buff) {
66   xbt_assert(state_==inited);
67   xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
68   dstBuff_ = buff;
69 }
70 size_t Comm::getDstDataSize(){
71   xbt_assert(state_==finished);
72   return dstBuffSize_;
73 }
74 void Comm::setDstData(void ** buff, size_t size) {
75   xbt_assert(state_==inited);
76
77   xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
78   dstBuff_ = buff;
79   dstBuffSize_ = size;
80 }
81
82 void Comm::start() {
83   xbt_assert(state_ == inited);
84
85   if (srcBuff_ != nullptr) { // Sender side
86     pimpl_ = simcall_comm_isend(sender_, mailbox_->getImpl(), remains_, rate_,
87         srcBuff_, srcBuffSize_,
88         matchFunction_, cleanFunction_, copyDataFunction_,
89         userData_, detached_);
90   } else if (dstBuff_ != nullptr) { // Receiver side
91     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
92         matchFunction_, copyDataFunction_,
93         userData_, rate_);
94
95   } else {
96     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
97   }
98   state_ = started;
99 }
100 void Comm::wait() {
101   xbt_assert(state_ == started || state_ == inited);
102
103   if (state_ == started)
104     simcall_comm_wait(pimpl_, -1/*timeout*/);
105   else { // state_ == inited. Save a simcall and do directly a blocking send/recv
106     if (srcBuff_ != nullptr) {
107       simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
108           srcBuff_, srcBuffSize_,
109           matchFunction_, copyDataFunction_,
110           userData_, -1 /*timeout*/);
111     } else {
112       simcall_comm_recv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
113           matchFunction_, copyDataFunction_,
114           userData_, -1/*timeout*/, rate_);
115     }
116   }
117   state_ = finished;
118 }
119
120 void Comm::wait(double timeout) {
121   xbt_assert(state_ == started || state_ == inited);
122
123   if (state_ == started) {
124     simcall_comm_wait(pimpl_, timeout);
125     state_ = finished;
126     return;
127   }
128
129   // It's not started yet. Do it in one simcall
130   if (srcBuff_ != nullptr) {
131     simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
132         srcBuff_, srcBuffSize_,
133         matchFunction_, copyDataFunction_,
134         userData_, timeout);
135   } else { // Receiver
136     simcall_comm_recv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
137         matchFunction_, copyDataFunction_,
138         userData_, timeout, rate_);
139   }
140   state_ = finished;
141 }
142
143 void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize)
144 {
145   s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
146   res->setRemains(simulatedSize);
147   res->srcBuff_     = data;
148   res->srcBuffSize_ = sizeof(void*);
149   res->detached_    = true;
150   res->start();
151 }
152
153 s4u::CommPtr Comm::send_async(MailboxPtr dest, void* data, int simulatedSize)
154 {
155   s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
156   res->setRemains(simulatedSize);
157   res->srcBuff_     = data;
158   res->srcBuffSize_ = sizeof(void*);
159   res->start();
160   return res;
161 }
162
163 s4u::CommPtr Comm::recv_async(MailboxPtr dest, void** data)
164 {
165   s4u::CommPtr res = CommPtr(s4u::Comm::recv_init(dest));
166   res->setDstData(data, sizeof(*data));
167   res->start();
168   return res;
169 }
170
171 void Comm::cancel()
172 {
173   simgrid::kernel::activity::CommImplPtr commPimpl =
174       boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(pimpl_);
175   commPimpl->cancel();
176 }
177
178 bool Comm::test() {
179   xbt_assert(state_ == inited || state_ == started || state_ == finished);
180
181   if (state_ == finished)
182     xbt_die("Don't call test on a finished comm.");
183
184   if (state_ == inited) {
185     this->start();
186   }
187
188   if(simcall_comm_test(pimpl_)){
189     state_ = finished;
190     return true;
191   }
192   return false;
193 }
194
195 void intrusive_ptr_release(simgrid::s4u::Comm* c)
196 {
197   if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
198     std::atomic_thread_fence(std::memory_order_acquire);
199     delete c;
200   }
201 }
202 void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
203 {
204   c->refcount_.fetch_add(1, std::memory_order_relaxed);
205 }
206 }
207 } // namespaces