Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / mc / remote / Channel.cpp
1 /* Copyright (c) 2015-2023. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/mc/remote/Channel.hpp"
7 #include <xbt/log.h>
8
9 #include <algorithm>
10 #include <cerrno>
11 #include <cstring>
12 #include <sys/socket.h>
13 #include <sys/types.h>
14 #include <unistd.h>
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication");
17
18 namespace simgrid::mc {
19 Channel::Channel(int sock, Channel const& other) : socket_(sock), buffer_(other.buffer_)
20 {
21   XBT_DEBUG("Adopt %zu bytes buffered by father channel.", buffer_.size());
22 }
23
24 Channel::~Channel()
25 {
26   if (this->socket_ >= 0)
27     close(this->socket_);
28 }
29
30 /** @brief Send a message; returns 0 on success or errno on failure */
31 int Channel::send(const void* message, size_t size) const
32 {
33   if (size >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
34     XBT_DEBUG("Sending %s (%zu bytes sent)", to_c_str(*static_cast<const MessageType*>(message)), size);
35   } else {
36     XBT_DEBUG("Sending bytes directly (from address %p) (%zu bytes sent)", message, size);
37     if (size == 0)
38       XBT_WARN("Request to send a 0-sized message! Proceeding anyway.");
39   }
40
41   while (::send(this->socket_, message, size, 0) == -1) {
42     if (errno != EINTR) {
43       XBT_ERROR("Channel::send failure: %s", strerror(errno));
44       return errno;
45     }
46   }
47   return 0;
48 }
49
50 ssize_t Channel::receive(void* message, size_t size, int flags)
51 {
52   size_t bufsize = buffer_.size();
53   ssize_t copied = 0;
54   auto* whereto  = static_cast<char*>(message);
55   size_t todo    = size;
56   if (bufsize > 0) {
57     XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size);
58     copied = std::min(size, bufsize);
59     std::copy_n(begin(buffer_), copied, whereto);
60     buffer_.erase(begin(buffer_), begin(buffer_) + copied);
61     todo -= copied;
62     whereto += copied;
63   }
64   ssize_t res = 0;
65   if (todo > 0) {
66     errno = 0;
67     res   = recv(this->socket_, whereto, todo, flags);
68     xbt_assert(res != -1 || errno == EAGAIN, "Channel::receive failure: %s", strerror(errno));
69     if (res == -1) {
70       res = 0;
71     }
72   }
73   XBT_DEBUG("%d Wanted %zu; Got %zd from buffer and %zd from network", getpid(), size, copied, res);
74   res += copied;
75   if (static_cast<size_t>(res) >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
76     XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(),
77               to_c_str(*static_cast<const MessageType*>(message)), size, res, message);
78   } else {
79     XBT_DEBUG("Receive %zd bytes", res);
80   }
81   return res;
82 }
83
84 void Channel::reinject(const char* data, size_t size)
85 {
86   xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size);
87   XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, buffer_.size());
88   buffer_.insert(end(buffer_), data, data + size);
89 }
90 } // namespace simgrid::mc