Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Less low-level operations to manage receive buffer.
authorArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Wed, 5 Apr 2023 20:04:58 +0000 (22:04 +0200)
committerArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Mon, 10 Apr 2023 20:18:59 +0000 (22:18 +0200)
src/mc/remote/Channel.cpp
src/mc/remote/Channel.hpp

index ac202ed..c7d57fe 100644 (file)
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication");
 
 namespace simgrid::mc {
-Channel::Channel(int sock, Channel const& other) : socket_(sock)
+Channel::Channel(int sock, Channel const& other) : socket_(sock), buffer_(other.buffer_)
 {
-  if (not other.buffer_.empty()) {
-    ssize_t size = other.buffer_.size();
-    XBT_DEBUG("Adopt %d bytes buffered by father channel.", (int)size);
-    buffer_.resize(size);
-    memcpy(buffer_.data(), other.buffer_.data(), size);
-  }
+  XBT_DEBUG("Adopt %zu bytes buffered by father channel.", buffer_.size());
 }
 
 Channel::~Channel()
@@ -51,27 +46,19 @@ int Channel::send(const void* message, size_t size) const
   return 0;
 }
 
-ssize_t Channel::receive(const void* message, size_t size, int flags)
+ssize_t Channel::receive(void* message, size_t size, int flags)
 {
   size_t bufsize = buffer_.size();
   ssize_t copied = 0;
-  void* whereto  = const_cast<void*>(message);
+  char* whereto  = static_cast<char*>(message);
   size_t todo    = size;
   if (bufsize > 0) {
     XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size);
-    if (bufsize >= size) {
-      copied = size;
-      memcpy(whereto, buffer_.data(), size);
-      memcpy(static_cast<void*>(buffer_.data()), buffer_.data() + size, bufsize - size);
-      buffer_.resize(bufsize - size);
-      todo = 0;
-    } else {
-      copied = bufsize;
-      memcpy(whereto, buffer_.data(), bufsize);
-      buffer_.clear();
-      todo -= bufsize;
-      whereto = static_cast<char*>(whereto) + bufsize;
-    }
+    copied = std::min(size, bufsize);
+    std::copy_n(begin(buffer_), copied, whereto);
+    buffer_.erase(begin(buffer_), begin(buffer_) + copied);
+    todo -= copied;
+    whereto += copied;
   }
   ssize_t res = 0;
   if (todo > 0) {
@@ -82,7 +69,7 @@ ssize_t Channel::receive(const void* message, size_t size, int flags)
       res = 0;
     }
   }
-  XBT_DEBUG("%d Wanted %d; Got %d from buffer and %d from network", getpid(), (int)size, (int)copied, (int)res);
+  XBT_DEBUG("%d Wanted %zu; Got %zd from buffer and %zd from network", getpid(), size, copied, res);
   res += copied;
   if (static_cast<size_t>(res) >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
     XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(),
@@ -96,9 +83,7 @@ ssize_t Channel::receive(const void* message, size_t size, int flags)
 void Channel::reinject(const char* data, size_t size)
 {
   xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size);
-  auto prev_size = buffer_.size();
-  XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, prev_size);
-  buffer_.resize(prev_size + size);
-  memcpy(buffer_.data() + prev_size, data, size);
+  XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, buffer_.size());
+  buffer_.insert(end(buffer_), data, data + size);
 }
 } // namespace simgrid::mc
index 7f326ad..06d22b1 100644 (file)
@@ -46,7 +46,7 @@ public:
   }
 
   // Receive
-  ssize_t receive(const void* message, size_t size, int flags = 0);
+  ssize_t receive(void* message, size_t size, int flags = 0);
   template <class M> typename std::enable_if_t<messageType<M>(), ssize_t> receive(M& m)
   {
     return this->receive(&m, sizeof(M), 0);