-
Notifications
You must be signed in to change notification settings - Fork 37.7k
Use shared_ptr for CNode inside CConnman #28222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The following sections might be updated with supplementary metadata relevant to reviewers and maintainers. ReviewsSee the guideline for information on the review process.
If your review is incorrectly listed, please react with 👎 to this comment and the bot will ignore it on the next update. ConflictsReviewers, this pull request conflicts with the following ones:
If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first. |
Also see #10738 |
Thanks, I hadn't seen this one before. Will take a look tonight Edit: Really like the last commit. Will see if I can also drop a few locks too.. |
190feb2
to
499f267
Compare
👍 I guess it must be possible to do that without "reinventing the wheel aka introducing custom smart pointer types", like mentioned in the last comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concept ACK on getting rid of our custom ref counting.
I actually implemented something similar a while ago (dergoegge@b3913b4) but didn't pursue because it doesn't retain the behavior we currently have w.r.t. which thread actually deletes the CNode
. That is also the reason why #10738 is a bit more complex and introduces new pointer types.
which thread actually deletes the CNode.
The reason why that is important is because we only want to call FinalizeNode
once and we want there to be no other references to that CNode
at that time.
The changes in this PR and the current ref-counting don't guarantee that, they just aim to work that way but it always requires manual review to avoid bugs.
I think I'd prefer an approach that guarantees (at least conceptually) correct behavior (like e.g. the pointer types in #10738).
src/net_processing.cpp
Outdated
@@ -502,7 +502,7 @@ class PeerManagerImpl final : public PeerManager | |||
|
|||
/** Implement NetEventsInterface */ | |||
void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); | |||
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); | |||
void FinalizeNode(const CNodeRef node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we need to use the shared_ptr type in net processing?
src/net.h
Outdated
@@ -651,6 +633,8 @@ class CNode | |||
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session GUARDED_BY(m_sock_mutex); | |||
}; | |||
|
|||
typedef std::shared_ptr<CNode> CNodeRef; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I know this will get push back but)
nit: Could we call this ConnectionRef
? After all the connection manager is managing... connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also seem more modern to use using
here
std::vector<CNodeRef> disconnected_still_in_use; | ||
for (auto& pnode : m_nodes_disconnected) { | ||
// Finalize when we have the final reference inside this block | ||
if (pnode.use_count() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a way this is worse than the custom ref counting because it is harder to assert that we actually don't take new references after this check.
cc @theuni (just in case you are interested in chiming in) |
To be used in m_nodes and m_nodes_disconnected.
Switch to using CNodeRef in m_nodes and m_nodes_disconnected. This gives us automatic refcounting and will permit safer deletion of nodes from multiple threads in the future. Some methods continue to use the stored pointer for now as they require larger LOC changes, and they operate synchronously under the shared_ptr reference counting already: In ThreadMessageHandler SendMessages() and ProcessMessages() are called using a ref-counted NodeSnapshot. PushMessage() is called from inside SendMessages() or ForNode(). ProcessMessage() from inside ProcessMessages().
Use the shared_ptr for refcounting.
499f267
to
82dc400
Compare
Thanks for the conceptual review @dergoegge. I've forced push a relatively different approach here, trying to take into consideration your review points, and #10738, but trying not to require introducing the decaying pointers. This has resulted in the following conceptual changes:
The commits need tidying up, rewording etc. still, but otherwise would be curious to know if this new approach looked better to you? |
82dc400
to
36c6fe5
Compare
@dergoegge Thanks for the ping.
Why is this a goal? I'm quite nervous about these changes. I've spoken with @dergoegge about this a few times now, but I'm afraid my information and opinions on the code involved are quite dated at this point (2017 to be exact :), so I haven't had much to contribute. I'll say though, any changes or refactors for future changes to the threading model should be very well justified upfront imo. I remember even being uneasy about #10738 at the time, in fact I might have even talked myself out of rebasing it for merge. |
Ensure we don't create new references while trying to drop a node
36c6fe5
to
0954822
Compare
@@ -665,8 +650,11 @@ class CNode | |||
* Otherwise this unique_ptr is empty. | |||
*/ | |||
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session GUARDED_BY(m_sock_mutex); | |||
CConnman* m_connman; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the managed objects have a reference to the manager leads to absolute spaghetti code, so leaning towards NACK on that.
*/ | ||
class NodesSnapshot | ||
{ | ||
public: | ||
explicit NodesSnapshot(const CConnman& connman, bool shuffle) | ||
explicit NodesSnapshot(const CConnman& connman, bool shuffle = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just get rid of NodesSnapshot entirely now, it was only needed as a RAII helper for the manual ref counting.
I missed this in the OP and think that this should not be a goal or should be well justified. I like getting rid of the custom ref counting for other reasons. Mainly, because it's annoying and bug prone. Something I have been working on is hiding CNode from the public net interface and turning the connection manager into a NodeId based manager, e.g. |
Thanks, and I agree that after working on this I'm not sure this will ever extend to being threadsafe (todo: update PR description here). For context #27912 is how I came to be looking at this. I feel like the model as (intended to be) implemented here is easier to reason about as refcounting is taken care of for us via the smart pointers and node cleanup only happens (automatically) once we have a single final reference in @dergoegge also left some comments on the current implementation of the approach while I was replying here, which I will consider and reply to seperately :) |
Seeing these failures locally:
|
What about calling [patch] FinalizeNode() from ~CNode()diff --git i/src/net.h w/src/net.h
index 1ea0ad868a..587d907c3a 100644
--- i/src/net.h
+++ w/src/net.h
@@ -412,13 +412,12 @@ public:
/** fSuccessfullyConnected is set to true on receiving VERACK from the peer. */
std::atomic_bool fSuccessfullyConnected{false};
// Setting fDisconnect to true will cause the node to be disconnected the
// next time DisconnectNodes() runs
std::atomic_bool fDisconnect{false};
CSemaphoreGrant grantOutbound;
- std::atomic<int> nRefCount{0};
const uint64_t nKeyedNetGroup;
std::atomic_bool fPauseRecv{false};
std::atomic_bool fPauseSend{false};
const ConnectionType m_conn_type;
@@ -566,30 +565,27 @@ public:
uint64_t nKeyedNetGroupIn,
uint64_t nLocalHostNonceIn,
const CAddress& addrBindIn,
const std::string& addrNameIn,
ConnectionType conn_type_in,
bool inbound_onion,
+ std::function<void(CNode&)> destruct_cb = {},
CNodeOptions&& node_opts = {});
CNode(const CNode&) = delete;
CNode& operator=(const CNode&) = delete;
+ ~CNode();
+
NodeId GetId() const {
return id;
}
uint64_t GetLocalNonce() const {
return nLocalHostNonce;
}
- int GetRefCount() const
- {
- assert(nRefCount >= 0);
- return nRefCount;
- }
-
/**
* Receive bytes from the buffer and deserialize them into messages.
*
* @param[in] msg_bytes The raw data
* @param[out] complete Set True if at least one message has been
* deserialized and is ready to be processed
@@ -609,23 +605,12 @@ public:
}
CService GetAddrLocal() const EXCLUSIVE_LOCKS_REQUIRED(!m_addr_local_mutex);
//! May not be called more than once
void SetAddrLocal(const CService& addrLocalIn) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_local_mutex);
- CNode* AddRef()
- {
- nRefCount++;
- return this;
- }
-
- void Release()
- {
- nRefCount--;
- }
-
void CloseSocketDisconnect() EXCLUSIVE_LOCKS_REQUIRED(!m_sock_mutex);
void CopyStats(CNodeStats& stats) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv);
std::string ConnectionTypeAsString() const { return ::ConnectionTypeAsString(m_conn_type); }
@@ -662,12 +647,17 @@ private:
* the data socket is closed, the control socket is not going to be used anymore
* and is just taking up resources. So better close it as soon as `m_sock` is
* closed.
* Otherwise this unique_ptr is empty.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session GUARDED_BY(m_sock_mutex);
+
+ /**
+ * A function to be called just before this object is destroyed.
+ */
+ std::function<void(CNode&)> m_destruct_cb;
};
/**
* Interface for message handling
*/
class NetEventsInterface
@@ -802,23 +792,23 @@ public:
using NodeFn = std::function<void(CNode*)>;
void ForEachNode(const NodeFn& func)
{
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
- if (NodeFullyConnected(node))
- func(node);
+ if (NodeFullyConnected(*node))
+ func(node.get());
}
};
void ForEachNode(const NodeFn& func) const
{
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
- if (NodeFullyConnected(node))
- func(node);
+ if (NodeFullyConnected(*node))
+ func(node.get());
}
};
// Addrman functions
/**
* Return all or many randomly selected addresses, optionally by network.
@@ -964,25 +954,25 @@ private:
/**
* Generate a collection of sockets to check for IO readiness.
* @param[in] nodes Select from these nodes' sockets.
* @return sockets to check for readiness
*/
- Sock::EventsPerSock GenerateWaitSockets(Span<CNode* const> nodes);
+ Sock::EventsPerSock GenerateWaitSockets(const std::vector<std::shared_ptr<CNode>>& nodes);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
void SocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] nodes Nodes to process. The socket of each node is checked against `what`.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
- void SocketHandlerConnected(const std::vector<CNode*>& nodes,
+ void SocketHandlerConnected(const std::vector<std::shared_ptr<CNode>>& nodes,
const Sock::EventsPerSock& events_per_sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] events_per_sock Sockets that are ready for IO.
@@ -991,25 +981,25 @@ private:
void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;
- CNode* FindNode(const CNetAddr& ip);
- CNode* FindNode(const CSubNet& subNet);
- CNode* FindNode(const std::string& addrName);
- CNode* FindNode(const CService& addr);
+ std::shared_ptr<CNode> FindNode(const CNetAddr& ip);
+ std::shared_ptr<CNode> FindNode(const CSubNet& subNet);
+ std::shared_ptr<CNode> FindNode(const std::string& addrName);
+ std::shared_ptr<CNode> FindNode(const CService& addr);
/**
* Determine whether we're already connected to a given address, in order to
* avoid initiating duplicate connections.
*/
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
- CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
+ std::shared_ptr<CNode> ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const;
void DeleteNode(CNode* pnode);
NodeId GetNewNodeId();
@@ -1041,13 +1031,13 @@ private:
*
* @return bool Whether a preferred network was found.
*/
bool MaybePickPreferredNetwork(std::optional<Network>& network);
// Whether the node should be passed out in ForEach* callbacks
- static bool NodeFullyConnected(const CNode* pnode);
+ static bool NodeFullyConnected(const CNode& node);
// Network usage totals
mutable Mutex m_total_bytes_sent_mutex;
std::atomic<uint64_t> nTotalBytesRecv{0};
uint64_t nTotalBytesSent GUARDED_BY(m_total_bytes_sent_mutex) {0};
@@ -1072,14 +1062,13 @@ private:
AddrMan& addrman;
const NetGroupManager& m_netgroupman;
std::deque<std::string> m_addr_fetches GUARDED_BY(m_addr_fetches_mutex);
Mutex m_addr_fetches_mutex;
std::vector<std::string> m_added_nodes GUARDED_BY(m_added_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
- std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
- std::list<CNode*> m_nodes_disconnected;
+ std::vector<std::shared_ptr<CNode>> m_nodes GUARDED_BY(m_nodes_mutex);
mutable RecursiveMutex m_nodes_mutex;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
// Stores number of full-tx connections (outbound and manual) per network
std::array<unsigned int, Network::NET_MAX> m_network_conn_counts GUARDED_BY(m_nodes_mutex) = {};
@@ -1227,35 +1216,25 @@ private:
public:
explicit NodesSnapshot(const CConnman& connman, bool shuffle)
{
{
LOCK(connman.m_nodes_mutex);
m_nodes_copy = connman.m_nodes;
- for (auto& node : m_nodes_copy) {
- node->AddRef();
- }
}
if (shuffle) {
Shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{});
}
}
- ~NodesSnapshot()
- {
- for (auto& node : m_nodes_copy) {
- node->Release();
- }
- }
-
- const std::vector<CNode*>& Nodes() const
+ const std::vector<std::shared_ptr<CNode>>& Nodes() const
{
return m_nodes_copy;
}
private:
- std::vector<CNode*> m_nodes_copy;
+ std::vector<std::shared_ptr<CNode>> m_nodes_copy;
};
friend struct ConnmanTestMsg;
};
/** Dump binary message to file, with timestamp */
diff --git i/src/net.cpp w/src/net.cpp
index b51043ba27..c1ad1166f3 100644
--- i/src/net.cpp
+++ w/src/net.cpp
@@ -359,49 +359,49 @@ bool SeenLocal(const CService& addr)
bool IsLocal(const CService& addr)
{
LOCK(g_maplocalhost_mutex);
return mapLocalHost.count(addr) > 0;
}
-CNode* CConnman::FindNode(const CNetAddr& ip)
+std::shared_ptr<CNode> CConnman::FindNode(const CNetAddr& ip)
{
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
+ for (auto& pnode : m_nodes) {
if (static_cast<CNetAddr>(pnode->addr) == ip) {
return pnode;
}
}
return nullptr;
}
-CNode* CConnman::FindNode(const CSubNet& subNet)
+std::shared_ptr<CNode> CConnman::FindNode(const CSubNet& subNet)
{
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
+ for (auto& pnode : m_nodes) {
if (subNet.Match(static_cast<CNetAddr>(pnode->addr))) {
return pnode;
}
}
return nullptr;
}
-CNode* CConnman::FindNode(const std::string& addrName)
+std::shared_ptr<CNode> CConnman::FindNode(const std::string& addrName)
{
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
+ for (auto& pnode : m_nodes) {
if (pnode->m_addr_name == addrName) {
return pnode;
}
}
return nullptr;
}
-CNode* CConnman::FindNode(const CService& addr)
+std::shared_ptr<CNode> CConnman::FindNode(const CService& addr)
{
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
+ for (auto& pnode : m_nodes) {
if (static_cast<CService>(pnode->addr) == addr) {
return pnode;
}
}
return nullptr;
}
@@ -411,13 +411,13 @@ bool CConnman::AlreadyConnectedToAddress(const CAddress& addr)
return FindNode(static_cast<CNetAddr>(addr)) || FindNode(addr.ToStringAddrPort());
}
bool CConnman::CheckIncomingNonce(uint64_t nonce)
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce)
return false;
}
return true;
}
@@ -434,25 +434,23 @@ static CAddress GetBindAddress(const Sock& sock)
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n");
}
}
return addr_bind;
}
-CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type)
+std::shared_ptr<CNode> CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
if (pszDest == nullptr) {
if (IsLocal(addrConnect))
return nullptr;
// Look for an existing connection
- CNode* pnode = FindNode(static_cast<CService>(addrConnect));
- if (pnode)
- {
+ if (FindNode(static_cast<CService>(addrConnect))) {
LogPrintf("Failed to open new connection, already connected\n");
return nullptr;
}
}
LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying connection %s lastseen=%.1fhrs\n",
@@ -471,14 +469,13 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
LogPrint(BCLog::NET, "Resolver returned invalid address %s for %s\n", addrConnect.ToStringAddrPort(), pszDest);
return nullptr;
}
// It is possible that we already have a connection to the IP/port pszDest resolved to.
// In that case, drop the connection that was just created.
LOCK(m_nodes_mutex);
- CNode* pnode = FindNode(static_cast<CService>(addrConnect));
- if (pnode) {
+ if (FindNode(static_cast<CService>(addrConnect))) {
LogPrintf("Failed to open new connection, already connected\n");
return nullptr;
}
}
}
@@ -563,26 +560,26 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
// Add node
NodeId id = GetNewNodeId();
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
if (!addr_bind.IsValid()) {
addr_bind = GetBindAddress(*sock);
}
- CNode* pnode = new CNode(id,
+ auto pnode = std::make_shared<CNode>(id,
std::move(sock),
addrConnect,
CalculateKeyedNetGroup(addrConnect),
nonce,
addr_bind,
pszDest ? pszDest : "",
conn_type,
/*inbound_onion=*/false,
+ [this](CNode& node) {m_msgproc->FinalizeNode(node);},
CNodeOptions{
.i2p_sam_session = std::move(i2p_transient_session),
.recv_flood_size = nReceiveFloodSize,
});
- pnode->AddRef();
// We're making a new connection, harvest entropy from the time (and our peer count)
RandAddEvent((uint32_t)id);
return pnode;
}
@@ -902,13 +899,13 @@ size_t CConnman::SocketSendData(CNode& node) const
bool CConnman::AttemptToEvictConnection()
{
std::vector<NodeEvictionCandidate> vEvictionCandidates;
{
LOCK(m_nodes_mutex);
- for (const CNode* node : m_nodes) {
+ for (const auto& node : m_nodes) {
if (node->fDisconnect)
continue;
NodeEvictionCandidate candidate{
.id = node->GetId(),
.m_connected = node->m_connected,
.m_min_ping_time = node->m_min_ping_time,
@@ -929,13 +926,13 @@ bool CConnman::AttemptToEvictConnection()
}
const std::optional<NodeId> node_id_to_evict = SelectNodeToEvict(std::move(vEvictionCandidates));
if (!node_id_to_evict) {
return false;
}
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->GetId() == *node_id_to_evict) {
LogPrint(BCLog::NET, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionTypeAsString(), pnode->GetId());
pnode->fDisconnect = true;
return true;
}
}
@@ -986,13 +983,13 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
NetPermissions::AddFlag(permission_flags, NetPermissionFlags::Mempool);
NetPermissions::AddFlag(permission_flags, NetPermissionFlags::NoBan);
}
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->IsInboundConn()) nInbound++;
}
}
if (!fNetworkActive) {
LogPrint(BCLog::NET, "connection from %s dropped: not accepting new connections\n", addr.ToStringAddrPort());
@@ -1043,27 +1040,27 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
ServiceFlags nodeServices = nLocalServices;
if (NetPermissions::HasFlag(permission_flags, NetPermissionFlags::BloomFilter)) {
nodeServices = static_cast<ServiceFlags>(nodeServices | NODE_BLOOM);
}
const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end();
- CNode* pnode = new CNode(id,
+ auto pnode = std::make_shared<CNode>(id,
std::move(sock),
addr,
CalculateKeyedNetGroup(addr),
nonce,
addr_bind,
/*addrNameIn=*/"",
ConnectionType::INBOUND,
inbound_onion,
+ [this](CNode& node) {m_msgproc->FinalizeNode(node);},
CNodeOptions{
.permission_flags = permission_flags,
.prefer_evict = discouraged,
.recv_flood_size = nReceiveFloodSize,
});
- pnode->AddRef();
m_msgproc->InitializeNode(*pnode, nodeServices);
LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToStringAddrPort());
{
LOCK(m_nodes_mutex);
@@ -1095,13 +1092,13 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
case ConnectionType::FEELER:
break;
} // no default case, so the compiler can warn about missing cases
// Count existing connections
int existing_connections = WITH_LOCK(m_nodes_mutex,
- return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
+ return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](std::shared_ptr<CNode> node) { return node->m_conn_type == conn_type; }););
// Max connections of specified type already exist
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
// Max total outbound connections already exist
CSemaphoreGrant grant(*semOutbound, true);
@@ -1110,58 +1107,50 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type);
return true;
}
void CConnman::DisconnectNodes()
{
+ std::vector<std::shared_ptr<CNode>> disconnected_nodes;
+
{
LOCK(m_nodes_mutex);
if (!fNetworkActive) {
// Disconnect any connected nodes
- for (CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (!pnode->fDisconnect) {
LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId());
pnode->fDisconnect = true;
}
}
}
// Disconnect unused nodes
- std::vector<CNode*> nodes_copy = m_nodes;
- for (CNode* pnode : nodes_copy)
- {
- if (pnode->fDisconnect)
- {
+ for (auto it = m_nodes.begin(); it != m_nodes.end();) {
+ auto node = *it;
+ if (node->fDisconnect) {
// remove from m_nodes
- m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
+ it = m_nodes.erase(it);
+
+ // Keep a reference to this CNode object to delay its destruction until
+ // after m_nodes_mutex has been released. Destructing a node involves
+ // calling m_msgproc->FinalizeNode() which acquires cs_main. Lock order
+ // should be cs_main, m_nodes_mutex.
+ disconnected_nodes.push_back(node);
// release outbound grant (if any)
- pnode->grantOutbound.Release();
+ node->grantOutbound.Release();
// close socket and cleanup
- pnode->CloseSocketDisconnect();
+ node->CloseSocketDisconnect();
// update connection count by network
- if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()];
-
- // hold in disconnected pool until all refs are released
- pnode->Release();
- m_nodes_disconnected.push_back(pnode);
- }
- }
- }
- {
- // Delete disconnected nodes
- std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected;
- for (CNode* pnode : nodes_disconnected_copy)
- {
- // Destroy the object only after other threads have stopped using it.
- if (pnode->GetRefCount() <= 0) {
- m_nodes_disconnected.remove(pnode);
- DeleteNode(pnode);
+ if (node->IsManualOrFullOutboundConn()) --m_network_conn_counts[node->addr.GetNetwork()];
+ } else {
+ ++it;
}
}
}
}
void CConnman::NotifyNumConnectionsChanged()
@@ -1214,21 +1203,21 @@ bool CConnman::InactivityCheck(const CNode& node) const
return true;
}
return false;
}
-Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
+Sock::EventsPerSock CConnman::GenerateWaitSockets(const std::vector<std::shared_ptr<CNode>>& nodes)
{
Sock::EventsPerSock events_per_sock;
for (const ListenSocket& hListenSocket : vhListenSocket) {
events_per_sock.emplace(hListenSocket.sock, Sock::Events{Sock::RECV});
}
- for (CNode* pnode : nodes) {
+ for (auto& pnode : nodes) {
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
@@ -1287,18 +1276,18 @@ void CConnman::SocketHandler()
}
// Accept new connections from listening sockets.
SocketHandlerListening(events_per_sock);
}
-void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
+void CConnman::SocketHandlerConnected(const std::vector<std::shared_ptr<CNode>>& nodes,
const Sock::EventsPerSock& events_per_sock)
{
AssertLockNotHeld(m_total_bytes_sent_mutex);
- for (CNode* pnode : nodes) {
+ for (auto& pnode : nodes) {
if (interruptNet)
return;
//
// Receive
//
@@ -1454,13 +1443,13 @@ void CConnman::ThreadDNSAddressSeed()
if (!interruptNet.sleep_for(w)) return;
to_wait -= w;
int nRelevant = 0;
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && pnode->IsFullOutboundConn()) ++nRelevant;
}
}
if (nRelevant >= 2) {
if (found > 0) {
LogPrintf("%d addresses found from DNS seeds\n", found);
@@ -1572,13 +1561,13 @@ void CConnman::StartExtraBlockRelayPeers()
// evict some peer that has finished the handshake)
int CConnman::GetExtraFullOutboundCount() const
{
int full_outbound_peers = 0;
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsFullOutboundConn()) {
++full_outbound_peers;
}
}
}
return std::max(full_outbound_peers - m_max_outbound_full_relay, 0);
@@ -1586,13 +1575,13 @@ int CConnman::GetExtraFullOutboundCount() const
int CConnman::GetExtraBlockRelayCount() const
{
int block_relay_peers = 0;
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) {
++block_relay_peers;
}
}
}
return std::max(block_relay_peers - m_max_outbound_block_relay, 0);
@@ -1734,13 +1723,13 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
int nOutboundBlockRelay = 0;
int outbound_privacy_network_peers = 0;
std::set<std::vector<unsigned char>> outbound_ipv46_peer_netgroups;
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->IsFullOutboundConn()) nOutboundFullRelay++;
if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
// Make sure our persistent outbound slots to ipv4/ipv6 peers belong to different netgroups.
switch (pnode->m_conn_type) {
// We currently don't take inbound connections into account. Since they are
@@ -1954,13 +1943,13 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
}
std::vector<CAddress> CConnman::GetCurrentBlockRelayOnlyConns() const
{
std::vector<CAddress> ret;
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->IsBlockOnlyConn()) {
ret.push_back(pnode->addr);
}
}
return ret;
@@ -1980,13 +1969,13 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
// Build a map of all already connected addresses (by IP:port and by name) to inbound/outbound and resolved CService
std::map<CService, bool> mapConnected;
std::map<std::string, std::pair<bool, CService>> mapConnectedByName;
{
LOCK(m_nodes_mutex);
- for (const CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
if (pnode->addr.IsValid()) {
mapConnected[pnode->addr] = pnode->IsInboundConn();
}
std::string addrName{pnode->m_addr_name};
if (!addrName.empty()) {
mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->IsInboundConn(), static_cast<const CService&>(pnode->addr));
@@ -2068,13 +2057,13 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
if (IsLocal(addrConnect) || banned_or_discouraged || AlreadyConnectedToAddress(addrConnect)) {
return;
}
} else if (FindNode(std::string(pszDest)))
return;
- CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type);
+ auto pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type);
if (!pnode)
return;
if (grantOutbound)
grantOutbound->MoveTo(pnode->grantOutbound);
@@ -2101,23 +2090,23 @@ void CConnman::ThreadMessageHandler()
{
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the m_nodes list.
const NodesSnapshot snap{*this, /*shuffle=*/true};
- for (CNode* pnode : snap.Nodes()) {
+ for (auto& pnode : snap.Nodes()) {
if (pnode->fDisconnect)
continue;
// Receive messages
- bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
+ bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode.get(), flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
- m_msgproc->SendMessages(pnode);
+ m_msgproc->SendMessages(pnode.get());
if (flagInterruptMsgProc)
return;
}
}
@@ -2531,35 +2520,24 @@ void CConnman::StopNodes()
}
DumpAnchors(gArgs.GetDataDirNet() / ANCHORS_DATABASE_FILENAME, anchors_to_dump);
}
}
// Delete peer connections.
- std::vector<CNode*> nodes;
+ std::vector<std::shared_ptr<CNode>> nodes;
WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes));
- for (CNode* pnode : nodes) {
- pnode->CloseSocketDisconnect();
- DeleteNode(pnode);
+ for (auto& node : nodes) {
+ node->CloseSocketDisconnect();
}
+ nodes.clear();
- for (CNode* pnode : m_nodes_disconnected) {
- DeleteNode(pnode);
- }
- m_nodes_disconnected.clear();
vhListenSocket.clear();
semOutbound.reset();
semAddnode.reset();
}
-void CConnman::DeleteNode(CNode* pnode)
-{
- assert(pnode);
- m_msgproc->FinalizeNode(*pnode);
- delete pnode;
-}
-
CConnman::~CConnman()
{
Interrupt();
Stop();
}
@@ -2664,35 +2642,35 @@ uint32_t CConnman::GetMappedAS(const CNetAddr& addr) const
void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) const
{
vstats.clear();
LOCK(m_nodes_mutex);
vstats.reserve(m_nodes.size());
- for (CNode* pnode : m_nodes) {
+ for (const auto& pnode : m_nodes) {
vstats.emplace_back();
pnode->CopyStats(vstats.back());
vstats.back().m_mapped_as = GetMappedAS(pnode->addr);
}
}
bool CConnman::DisconnectNode(const std::string& strNode)
{
LOCK(m_nodes_mutex);
- if (CNode* pnode = FindNode(strNode)) {
+ if (auto pnode = FindNode(strNode)) {
LogPrint(BCLog::NET, "disconnect by address%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->GetId());
pnode->fDisconnect = true;
return true;
}
return false;
}
bool CConnman::DisconnectNode(const CSubNet& subnet)
{
bool disconnected = false;
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
+ for (auto& pnode : m_nodes) {
if (subnet.Match(pnode->addr)) {
LogPrint(BCLog::NET, "disconnect by subnet%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->GetId());
pnode->fDisconnect = true;
disconnected = true;
}
}
@@ -2704,13 +2682,13 @@ bool CConnman::DisconnectNode(const CNetAddr& addr)
return DisconnectNode(CSubNet(addr));
}
bool CConnman::DisconnectNode(NodeId id)
{
LOCK(m_nodes_mutex);
- for(CNode* pnode : m_nodes) {
+ for(auto& pnode : m_nodes) {
if (id == pnode->GetId()) {
LogPrint(BCLog::NET, "disconnect by id peer=%d; disconnecting\n", pnode->GetId());
pnode->fDisconnect = true;
return true;
}
}
@@ -2828,12 +2806,13 @@ CNode::CNode(NodeId idIn,
uint64_t nKeyedNetGroupIn,
uint64_t nLocalHostNonceIn,
const CAddress& addrBindIn,
const std::string& addrNameIn,
ConnectionType conn_type_in,
bool inbound_onion,
+ std::function<void(CNode&)> destruct_cb,
CNodeOptions&& node_opts)
: m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))},
m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())},
m_permission_flags{node_opts.permission_flags},
m_sock{sock},
m_connected{GetTime<std::chrono::seconds>()},
@@ -2844,13 +2823,14 @@ CNode::CNode(NodeId idIn,
m_prefer_evict{node_opts.prefer_evict},
nKeyedNetGroup{nKeyedNetGroupIn},
m_conn_type{conn_type_in},
id{idIn},
nLocalHostNonce{nLocalHostNonceIn},
m_recv_flood_size{node_opts.recv_flood_size},
- m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}
+ m_i2p_sam_session{std::move(node_opts.i2p_sam_session)},
+ m_destruct_cb{destruct_cb}
{
if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND);
for (const std::string &msg : getAllNetMessageTypes())
mapRecvBytesPerMsgType[msg] = 0;
mapRecvBytesPerMsgType[NET_MESSAGE_TYPE_OTHER] = 0;
@@ -2859,12 +2839,19 @@ CNode::CNode(NodeId idIn,
LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", m_addr_name, id);
} else {
LogPrint(BCLog::NET, "Added connection peer=%d\n", id);
}
}
+CNode::~CNode()
+{
+ if (m_destruct_cb) {
+ m_destruct_cb(*this);
+ }
+}
+
void CNode::MarkReceivedMsgsForProcessing()
{
AssertLockNotHeld(m_msg_process_queue_mutex);
size_t nSizeAdded = 0;
for (const auto& msg : vRecvMsg) {
@@ -2890,15 +2877,15 @@ std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
}
-bool CConnman::NodeFullyConnected(const CNode* pnode)
+bool CConnman::NodeFullyConnected(const CNode& node)
{
- return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
+ return node.fSuccessfullyConnected && !node.fDisconnect;
}
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
AssertLockNotHeld(m_total_bytes_sent_mutex);
size_t nMessageSize = msg.data.size();
@@ -2939,21 +2926,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
{
- CNode* found = nullptr;
LOCK(m_nodes_mutex);
for (auto&& pnode : m_nodes) {
if(pnode->GetId() == id) {
- found = pnode;
- break;
+ return NodeFullyConnected(*pnode) && func(pnode.get());
}
}
- return found != nullptr && NodeFullyConnected(found) && func(found);
+ return false;
}
CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
{
return CSipHasher(nSeed0, nSeed1).Write(id);
}
diff --git i/src/test/fuzz/net.cpp w/src/test/fuzz/net.cpp
index ddf919f2e6..19db2c7c50 100644
--- i/src/test/fuzz/net.cpp
+++ w/src/test/fuzz/net.cpp
@@ -40,21 +40,12 @@ FUZZ_TARGET(net, .init = initialize_net)
node.CloseSocketDisconnect();
},
[&] {
CNodeStats stats;
node.CopyStats(stats);
},
- [&] {
- const CNode* add_ref_node = node.AddRef();
- assert(add_ref_node == &node);
- },
- [&] {
- if (node.GetRefCount() > 0) {
- node.Release();
- }
- },
[&] {
const std::optional<CService> service_opt = ConsumeDeserializable<CService>(fuzzed_data_provider);
if (!service_opt) {
return;
}
node.SetAddrLocal(*service_opt);
@@ -66,14 +57,12 @@ FUZZ_TARGET(net, .init = initialize_net)
});
}
(void)node.GetAddrLocal();
(void)node.GetId();
(void)node.GetLocalNonce();
- const int ref_count = node.GetRefCount();
- assert(ref_count >= 0);
(void)node.GetCommonVersion();
const NetPermissionFlags net_permission_flags = ConsumeWeakEnum(fuzzed_data_provider, ALL_NET_PERMISSION_FLAGS);
(void)node.HasPermission(net_permission_flags);
(void)node.ConnectedThroughNetwork();
}
diff --git i/src/test/fuzz/util/net.h w/src/test/fuzz/util/net.h
index 47e4a2fac0..bc68018cdb 100644
--- i/src/test/fuzz/util/net.h
+++ w/src/test/fuzz/util/net.h
@@ -117,23 +117,25 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
keyed_net_group,
local_host_nonce,
addr_bind,
addr_name,
conn_type,
inbound_onion,
+ [](CNode&){},
CNodeOptions{ .permission_flags = permission_flags });
} else {
return CNode{node_id,
sock,
address,
keyed_net_group,
local_host_nonce,
addr_bind,
addr_name,
conn_type,
inbound_onion,
+ {},
CNodeOptions{ .permission_flags = permission_flags }};
}
}
inline std::unique_ptr<CNode> ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional<NodeId>& node_id_in = std::nullopt) { return ConsumeNode<true>(fdp, node_id_in); }
void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);
diff --git i/src/test/util/net.h w/src/test/util/net.h
index b2f6ebb163..e90e3d8401 100644
--- i/src/test/util/net.h
+++ w/src/test/util/net.h
@@ -25,23 +25,20 @@ struct ConnmanTestMsg : public CConnman {
m_peer_connect_timeout = timeout;
}
void AddTestNode(CNode& node)
{
LOCK(m_nodes_mutex);
- m_nodes.push_back(&node);
+ m_nodes.push_back(std::shared_ptr<CNode>(&node));
if (node.IsManualOrFullOutboundConn()) ++m_network_conn_counts[node.addr.GetNetwork()];
}
void ClearTestNodes()
{
LOCK(m_nodes_mutex);
- for (CNode* node : m_nodes) {
- delete node;
- }
m_nodes.clear();
}
void Handshake(CNode& node,
bool successfully_connected,
ServiceFlags remote_services, |
🐙 This pull request conflicts with the target branch and needs rebase. |
@@ -798,11 +798,11 @@ class CConnman | |||
// alias for thread safety annotations only, not defined | |||
RecursiveMutex& GetNodesMutex() const LOCK_RETURNED(m_nodes_mutex); | |||
|
|||
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func); | |||
bool ForNode(NodeId id, std::function<bool(CNodeRef pnode)> func); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this makes sense in general? If you have a shared_ptr, you should normally just pass the reference to the object (CNode&
) if you've checked it's not null, or the pointer itself (CNode*
).
cf http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rf-smart
Are you still working on this? |
⌛ There hasn't been much activity lately and the patch still needs rebase. What is the status here?
|
1 similar comment
⌛ There hasn't been much activity lately and the patch still needs rebase. What is the status here?
|
🤔 There hasn't been much activity lately and the CI seems to be failing. If no one reviewed the current pull request by commit hash, a rebase can be considered. While the CI failure may be a false positive, the CI hasn't been running for some time, so there may be a real issue hiding as well. A rebase triggers the latest CI and makes sure that no silent merge conflicts have snuck in. |
Closing for now |
Switch to using smart pointers to
CNode
s inside ofCConnman
.Currently we are manually refcounting CNodes which is potentially error-prone and makes operations such as deleting them from multiple threads difficult without introducing new locks or other synchronisation operations (see #27912).
Switch to using
std::shared_ptr
references toCNode
s inside ofm_nodes
andm_nodes_disconnected
to give us better memory safety today, and in the future allowAttemptToEvictConnection
(and optionally other sites) to safely synchronously disconnect nodes when needed.Opening as draft for now as I want to both gauge feedback on the approach, and see which PRs this may conflict with (#27213?) before moving it forwards.
CC @vasild