diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 77ce3b7585de4f..2dcdca5c39e2d6 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -14,7 +14,10 @@ #include #include #include +#include +#include +#include #include void ConnmanTestMsg::Handshake(CNode& node, @@ -240,3 +243,167 @@ StaticContentsSock& StaticContentsSock::operator=(Sock&& other) assert(false && "Move of Sock into StaticContentsSock not allowed."); return *this; } + +ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags) +{ + WAIT_LOCK(m_mutex, lock); + + if (m_data.empty()) { + if (m_eof) { + return 0; + } + errno = EAGAIN; // Same as recv(2) on a non-blocking socket. + return -1; + } + + const size_t read_bytes{std::min(len, m_data.size())}; + + std::memcpy(buf, m_data.data(), read_bytes); + if ((flags & MSG_PEEK) == 0) { + m_data.erase(m_data.begin(), m_data.begin() + read_bytes); + } + + return read_bytes; +} + +std::optional DynSock::Pipe::GetNetMsg() +{ + V1Transport transport{NodeId{0}}; + + { + WAIT_LOCK(m_mutex, lock); + + WaitForDataOrEof(lock); + if (m_eof && m_data.empty()) { + return std::nullopt; + } + + for (;;) { + Span s{m_data}; + if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s. + return std::nullopt; + } + m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size()); + if (transport.ReceivedMessageComplete()) { + break; + } + if (m_data.empty()) { + WaitForDataOrEof(lock); + if (m_eof && m_data.empty()) { + return std::nullopt; + } + } + } + } + + bool reject{false}; + CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)}; + if (reject) { + return std::nullopt; + } + return std::make_optional(std::move(msg)); +} + +void DynSock::Pipe::PushBytes(const void* buf, size_t len) +{ + LOCK(m_mutex); + const uint8_t* b = static_cast(buf); + m_data.insert(m_data.end(), b, b + len); + m_cond.notify_all(); +} + +void DynSock::Pipe::Eof() +{ + LOCK(m_mutex); + m_eof = true; + m_cond.notify_all(); +} + +void DynSock::Pipe::WaitForDataOrEof(UniqueLock& lock) +{ + Assert(lock.mutex() == &m_mutex); + + m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { + AssertLockHeld(m_mutex); + return !m_data.empty() || m_eof; + }); +} + +DynSock::DynSock(std::shared_ptr pipes, std::shared_ptr accept_sockets) + : m_pipes{pipes}, m_accept_sockets{accept_sockets} +{ +} + +DynSock::~DynSock() +{ + m_pipes->send.Eof(); +} + +ssize_t DynSock::Recv(void* buf, size_t len, int flags) const +{ + return m_pipes->recv.GetBytes(buf, len, flags); +} + +ssize_t DynSock::Send(const void* buf, size_t len, int) const +{ + m_pipes->send.PushBytes(buf, len); + return len; +} + +std::unique_ptr DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const +{ + return m_accept_sockets->Pop().value_or(nullptr); +} + +bool DynSock::Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred) const +{ + EventsPerSock ev; + ev.emplace(this, Events{requested}); + const bool ret{WaitMany(timeout, ev)}; + if (occurred != nullptr) { + *occurred = ev.begin()->second.occurred; + } + return ret; +} + +bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const +{ + const auto deadline = std::chrono::steady_clock::now() + timeout; + bool at_least_one_event_occurred{false}; + + for (;;) { + // Check all sockets for readiness without waiting. + for (auto& [sock, events] : events_per_sock) { + if ((events.requested & Sock::SEND) != 0) { + // Always ready for Send(). + events.occurred |= Sock::SEND; + at_least_one_event_occurred = true; + } + + if ((events.requested & Sock::RECV) != 0) { + auto dyn_sock = reinterpret_cast(sock.get()); + uint8_t b; + if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) { + events.occurred |= Sock::RECV; + at_least_one_event_occurred = true; + } + } + } + + if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) { + break; + } + + std::this_thread::sleep_for(10ms); + } + + return true; +} + +DynSock& DynSock::operator=(Sock&&) +{ + assert(false && "Move of Sock into DynSock not allowed."); + return *this; +} diff --git a/src/test/util/net.h b/src/test/util/net.h index dfea9dd44b83c3..63817f70fee32c 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -6,6 +6,7 @@ #define BITCOIN_TEST_UTIL_NET_H #include +#include #include #include #include @@ -19,9 +20,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -206,4 +209,155 @@ class StaticContentsSock : public ZeroSock mutable size_t m_consumed{0}; }; +/** + * A mocked Sock alternative that allows providing the data to be returned by Recv() + * and inspecting the data that has been supplied to Send(). + */ +class DynSock : public ZeroSock +{ +public: + /** + * Unidirectional bytes or CNetMessage queue (FIFO). + */ + class Pipe + { + public: + /** + * Get bytes and remove them from the pipe. + * @param[in] buf Destination to write bytes to. + * @param[in] len Write up to this number of bytes. + * @param[in] flags Same as the flags of `recv(2)`. Just `MSG_PEEK` is honored. + * @return The number of bytes written to `buf`. `0` if `Eof()` has been called. + * If no bytes are available then `-1` is returned and `errno` is set to `EAGAIN`. + */ + ssize_t GetBytes(void* buf, size_t len, int flags = 0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Deserialize a `CNetMessage` and remove it from the pipe. + * If not enough bytes are available then the function will wait. If parsing fails + * or EOF is signaled to the pipe, then `std::nullopt` is returned. + */ + std::optional GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Push bytes to the pipe. + */ + void PushBytes(const void* buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Construct and push CNetMessage to the pipe. + */ + template + void PushNetMsg(const std::string& type, Args&&... payload) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Signal end-of-file on the receiving end (`GetBytes()` or `GetNetMsg()`). + */ + void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + private: + /** + * Return when there is some data to read or EOF has been signaled. + * @param[in,out] lock Unique lock that must have been derived from `m_mutex` by `WAIT_LOCK(m_mutex, lock)`. + */ + void WaitForDataOrEof(UniqueLock& lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex); + + Mutex m_mutex; + std::condition_variable m_cond; + std::vector m_data GUARDED_BY(m_mutex); + bool m_eof GUARDED_BY(m_mutex){false}; + }; + + struct Pipes { + Pipe recv; + Pipe send; + }; + + /** + * A basic thread-safe queue, used for queuing sockets to be returned by Accept(). + */ + class Queue + { + public: + using S = std::unique_ptr; + + void Push(S s) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + m_queue.push(std::move(s)); + } + + std::optional Pop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + if (m_queue.empty()) { + return std::nullopt; + } + S front{std::move(m_queue.front())}; + m_queue.pop(); + return front; + } + + bool Empty() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + return m_queue.empty(); + } + + private: + mutable Mutex m_mutex; + std::queue m_queue GUARDED_BY(m_mutex); + }; + + /** + * Create a new mocked sock. + * @param[in] pipes Send/recv pipes used by the Send() and Recv() methods. + * @param[in] accept_sockets Sockets to return by the Accept() method. + */ + explicit DynSock(std::shared_ptr pipes, std::shared_ptr accept_sockets); + + ~DynSock(); + + ssize_t Recv(void* buf, size_t len, int flags) const override; + + ssize_t Send(const void* buf, size_t len, int) const override; + + std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override; + + bool Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred = nullptr) const override; + + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override; + +private: + DynSock& operator=(Sock&&) override; + + std::shared_ptr m_pipes; + std::shared_ptr m_accept_sockets; +}; + +template +void DynSock::Pipe::PushNetMsg(const std::string& type, Args&&... payload) +{ + auto msg = NetMsg::Make(type, std::forward(payload)...); + V1Transport transport{NodeId{0}}; + + const bool queued{transport.SetMessageToSend(msg)}; + assert(queued); + + LOCK(m_mutex); + + for (;;) { + const auto& [bytes, _more, _msg_type] = transport.GetBytesToSend(/*have_next_message=*/true); + if (bytes.empty()) { + break; + } + m_data.insert(m_data.end(), bytes.begin(), bytes.end()); + transport.MarkBytesSent(bytes.size()); + } + + m_cond.notify_all(); +} + #endif // BITCOIN_TEST_UTIL_NET_H