diff --git i/src/sv2/connman.cpp w/src/sv2/connman.cpp index bb63994383..c5f6cd003b 100644 --- i/src/sv2/connman.cpp +++ w/src/sv2/connman.cpp @@ -35,14 +35,12 @@ bool Sv2Connman::Start(Sv2EventsInterface* msgproc, std::string host, uint16_t p if (!Bind(host, port)) return false; SockMan::Options sockman_options; StartSocketsThreads(sockman_options); - m_thread_sv2_handler = std::thread(&util::TraceThread, "sv2connman", [this] { ThreadSv2Handler(); }); - return true; } bool Sv2Connman::Bind(std::string host, uint16_t port) { const CService addr_bind = LookupNumeric(host, port); @@ -81,117 +79,21 @@ void Sv2Connman::DisconnectFlagged() // m_sv2_clients.erase( // std::remove_if(m_sv2_clients.begin(), m_sv2_clients.end(), [](const auto *client) { // return client->second->m_disconnect_flag; // }), m_sv2_clients.end()); } -void Sv2Connman::ThreadSv2Handler() EXCLUSIVE_LOCKS_REQUIRED(!m_clients_mutex) +void Sv2Connman::EventIOLoopCompletedForAllPeers() { - AssertLockNotHeld(m_clients_mutex); - - while (!m_flag_interrupt_sv2) { - { - LOCK(m_clients_mutex); - DisconnectFlagged(); - } - - constexpr auto timeout = std::chrono::milliseconds(50); - - // TODO: what? - continue; - - Sock::EventsPerSock events_per_sock; // wait sockets - - LOCK(m_clients_mutex); - // Process messages from and for connected sv2_clients. - for (auto& c : m_sv2_clients) { - const NodeId node_id{c.first}; - const auto& client{c.second}; - bool has_received_data = false; - bool has_error_occurred = false; - - const auto socket_it = events_per_sock.end(); // events_per_sock.find(client->m_sock); - if (socket_it != events_per_sock.end()) { - has_received_data = socket_it->second.occurred & Sock::RECV; - has_error_occurred = socket_it->second.occurred & Sock::ERR; - } - - if (has_error_occurred) { - LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Socket receive error, disconnecting client id=%zu\n", - node_id); - client->m_disconnect_flag = true; - continue; - } - - // Process message queue and any outbound bytes still held by the transport - auto it = client->m_send_messages.begin(); - std::optional expected_more; - while(true) { - if (it != client->m_send_messages.end()) { - // If possible, move one message from the send queue to the transport. - // This fails when there is an existing message still being sent, - // or when the handshake has not yet completed. - // - // Wrap Sv2NetMsg inside CSerializedNetMsg for transport - CSerializedNetMsg net_msg{*it}; - if (client->m_transport->SetMessageToSend(net_msg)) { - ++it; - } - } - - const auto& [data, more, _m_message_type] = client->m_transport->GetBytesToSend(/*have_next_message=*/it != client->m_send_messages.end()); - size_t total_sent = 0; - - // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more - // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check, - // verify that the previously returned 'more' was correct. - if (expected_more.has_value()) Assume(!data.empty() == *expected_more); - expected_more = more; - ssize_t sent = 0; - - if (!data.empty()) { - int flags = MSG_NOSIGNAL | MSG_DONTWAIT; -#ifdef MSG_MORE - if (more) { - flags |= MSG_MORE; - } -#endif - LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Send %d bytes to client id=%zu\n", - data.size() - total_sent, client->m_id); - sent = false; // client->m_sock->Send(data.data() + total_sent, data.size() - total_sent, flags); - } - if (sent > 0) { - // Notify transport that bytes have been processed. - client->m_transport->MarkBytesSent(sent); - if ((size_t)sent != data.size()) { - // could not send full message; stop sending more - break; - } - } else { - if (sent < 0) { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { - LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Socket send error for client id=%zu: %s\n", - client->m_id, NetworkErrorString(nErr)); - client->m_disconnect_flag = true; - } - } - break; - } - } - // Clear messages that have been handed to transport from the queue - client->m_send_messages.erase(client->m_send_messages.begin(), it); - - } - } + LOCK(m_clients_mutex); + DisconnectFlagged(); } void Sv2Connman::Interrupt() { - m_flag_interrupt_sv2 = true; + interruptNet(); } void Sv2Connman::StopThreads() { JoinSocketsThreads(); } @@ -278,12 +180,15 @@ void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv) CloseConnection(node_id); } break; } } + // Clear messages that have been handed to transport from the queue + client->m_send_messages.erase(client->m_send_messages.begin(), it); + // If both receiving and (non-optimistic) sending were possible, we first attempt // sending. If that succeeds, but does not fully drain the send queue, do not // attempt to receive. This avoids needlessly queueing data if the remote peer // is slow at receiving data, by means of TCP flow control. We only do this when // sending actually succeeded to make sure progress is always made; otherwise a // deadlock would be possible when both sides have data to send, but neither is diff --git i/src/sv2/connman.h w/src/sv2/connman.h index f131ae37f9..c1e2a39d22 100644 --- i/src/sv2/connman.h +++ w/src/sv2/connman.h @@ -120,36 +120,19 @@ private: /** * A map of all connected stratum v2 clients. */ using Clients = std::unordered_map>; Clients m_sv2_clients GUARDED_BY(m_clients_mutex); - /** - * The main thread for connection handling. - */ - std::thread m_thread_sv2_handler; - - /** - * Signal for handling interrupts and stopping the template provider event loop. - */ - std::atomic m_flag_interrupt_sv2{false}; - CThreadInterrupt m_interrupt_sv2; - /** * Creates a socket and binds the port for new stratum v2 connections. */ [[nodiscard]] bool Bind(std::string host, uint16_t port); void DisconnectFlagged() EXCLUSIVE_LOCKS_REQUIRED(m_clients_mutex); - /** - * The main thread for the template provider, contains an event loop handling - * all tasks for the template provider. - */ - void ThreadSv2Handler(); - /** * Create a `Sv2Client` object and add it to the `m_sv2_clients` member. * @param[in] node_id Id of the newly accepted connection. * @param[in] me The address and port at our side of the connection. * @param[in] them The address and port at the peer's side of the connection. * @retval true on success @@ -168,12 +151,14 @@ private: virtual void EventGotEOF(NodeId node_id) override EXCLUSIVE_LOCKS_REQUIRED(!m_clients_mutex) { fprintf(stderr, "EventGotEOF\n"); }; // TODO in sv2/conmman.cpp virtual void EventGotPermanentReadError(NodeId node_id, const std::string& errmsg) override EXCLUSIVE_LOCKS_REQUIRED(!m_clients_mutex) { fprintf(stderr, "EventGotPermanentReadError\n"); }; // TODO in sv2/conmman.cpp + virtual void EventIOLoopCompletedForAllPeers() override; + /** * Encrypt the header and message payload and send it. * @throws std::runtime_error if encrypting the message fails. */ bool EncryptAndSendMessage(Sv2Client& client, node::Sv2NetMsg& net_msg); diff --git i/src/test/sv2_connman_tests.cpp w/src/test/sv2_connman_tests.cpp index 8836cbee0f..c6eb60d578 100644 --- i/src/test/sv2_connman_tests.cpp +++ w/src/test/sv2_connman_tests.cpp @@ -14,13 +14,13 @@ BOOST_FIXTURE_TEST_SUITE(sv2_connman_tests, TestChain100Setup) * A class for testing the Sv2Connman. Each ConnTester encapsulates a * Sv2Connman (the one being tested) as well as a Sv2Cipher * to act as the other side. */ class ConnTester : Sv2EventsInterface { private: - std::unique_ptr m_peer_transport; //!< Transport for peer + std::unique_ptr m_remote_transport; //!< Transport for peer // Sockets that will be returned by the Sv2Connman's listening socket Accept() method. std::shared_ptr m_sv2connman_accepted_sockets{std::make_shared()}; std::shared_ptr m_current_client_pipes; XOnlyPubKey m_connman_authority_pubkey; @@ -56,24 +56,23 @@ public: ~ConnTester() { CreateSock = CreateSockOS; } - void SendPeerBytes() + void RemoteToLocalBytes() { - const auto& [data, more, _m_message_type] = m_peer_transport->GetBytesToSend(/*have_next_message=*/false); + const auto& [data, more, _m_message_type] = m_remote_transport->GetBytesToSend(/*have_next_message=*/false); BOOST_REQUIRE(data.size() > 0); // Schedule data to be returned by the next Recv() call from // Sv2Connman on the socket it has accepted. m_current_client_pipes->recv.PushBytes(data.data(), data.size()); - m_peer_transport->MarkBytesSent(data.size()); + m_remote_transport->MarkBytesSent(data.size()); } - // Have the peer receive and process bytes: - size_t PeerReceiveBytes() + size_t LocalToRemoteBytes() { uint8_t buf[0x10000]; // Get the data that has been written to the accepted socket with Send() by Sv2Connman. // Wait until the bytes appear in the "send" pipe. ssize_t n; for (;;) { @@ -81,49 +80,49 @@ public: if (n != -1 || errno != EAGAIN) { break; } UninterruptibleSleep(50ms); } - // Inform client's transport that some bytes have been received (sent by Sv2Connman). + // Inform remote transport that some bytes have been received (sent by the local Sv2Connman). if (n > 0) { Span s(buf, n); - BOOST_REQUIRE(m_peer_transport->ReceivedBytes(s)); + BOOST_REQUIRE(m_remote_transport->ReceivedBytes(s)); } return n; } /* Create a new client and perform handshake */ void handshake() { - m_peer_transport.reset(); + m_remote_transport.reset(); auto peer_static_key{GenerateRandomKey()}; - m_peer_transport = std::make_unique(std::move(peer_static_key), m_connman_authority_pubkey); + m_remote_transport = std::make_unique(std::move(peer_static_key), m_connman_authority_pubkey); // Have Sv2Connman's listen socket's Accept() simulate a newly arrived connection. m_current_client_pipes = std::make_shared(); m_sv2connman_accepted_sockets->Push( std::make_unique(m_current_client_pipes, std::make_shared())); // Flush transport for handshake part 1 - SendPeerBytes(); + RemoteToLocalBytes(); // Read handshake part 2 from transport - BOOST_REQUIRE_EQUAL(PeerReceiveBytes(), Sv2HandshakeState::HANDSHAKE_STEP2_SIZE); + BOOST_REQUIRE_EQUAL(LocalToRemoteBytes(), Sv2HandshakeState::HANDSHAKE_STEP2_SIZE); BOOST_REQUIRE(IsConnected()); } - void receiveMessage(Sv2NetMsg& msg) + void RemoteToLocalMsg(Sv2NetMsg& msg) { // Client encrypts message and puts it on the transport: CSerializedNetMsg net_msg{std::move(msg)}; - BOOST_REQUIRE(m_peer_transport->SetMessageToSend(net_msg)); - SendPeerBytes(); + BOOST_REQUIRE(m_remote_transport->SetMessageToSend(net_msg)); + RemoteToLocalBytes(); } bool IsConnected() { LOCK(m_connman->m_clients_mutex); return m_connman->ConnectedClients() > 0; @@ -166,37 +165,37 @@ BOOST_AUTO_TEST_CASE(client_tests) ConnTester tester{}; BOOST_REQUIRE(!tester.IsConnected()); tester.handshake(); BOOST_REQUIRE(!tester.IsFullyConnected()); - // After the handshake the client must send a SetupConnection message to the + // After the handshake the remote peer must send a SetupConnection message to the // Template Provider. // An empty SetupConnection message should cause disconnection node::Sv2NetMsg sv2_msg{node::Sv2MsgType::SETUP_CONNECTION, {}}; - tester.receiveMessage(sv2_msg); - BOOST_REQUIRE_EQUAL(tester.PeerReceiveBytes(), 0); + tester.RemoteToLocalMsg(sv2_msg); + BOOST_REQUIRE_EQUAL(tester.LocalToRemoteBytes(), 0); BOOST_REQUIRE(!tester.IsConnected()); BOOST_TEST_MESSAGE("Reconnect after empty message"); // Reconnect tester.handshake(); BOOST_TEST_MESSAGE("Handshake done, send SetupConnectionMsg"); node::Sv2NetMsg setup{tester.SetupConnectionMsg()}; - tester.receiveMessage(setup); + tester.RemoteToLocalMsg(setup); // SetupConnection.Success is 6 bytes - BOOST_REQUIRE_EQUAL(tester.PeerReceiveBytes(), SV2_HEADER_ENCRYPTED_SIZE + 6 + Poly1305::TAGLEN); + BOOST_REQUIRE_EQUAL(tester.LocalToRemoteBytes(), SV2_HEADER_ENCRYPTED_SIZE + 6 + Poly1305::TAGLEN); BOOST_REQUIRE(tester.IsFullyConnected()); std::vector coinbase_output_max_additional_size_bytes{ 0x01, 0x00, 0x00, 0x00 }; node::Sv2NetMsg msg{node::Sv2MsgType::COINBASE_OUTPUT_DATA_SIZE, std::move(coinbase_output_max_additional_size_bytes)}; // No reply expected, not yet implemented - tester.receiveMessage(msg); + tester.RemoteToLocalMsg(msg); } BOOST_AUTO_TEST_SUITE_END()