Reduce number of connections, if needed

This makes the best attempt to reduce the number of connections by
reusing the eviction logic. This reuses the core of that algorithm at
the expense of deterministic behavior. In other words, starting with a
max connections value of 125 and then changing the max connections to 3
does not mean that this will immediately evict 122 connections.

Eventually the number of connections will reduce to the limit.

While the body of the condition will add latency to network processing,
the integer comparison between max connections and size of the
connections vector should be quick in most cases.

Note the extraction of connection disconnect/delete helpers so as to
reuse the same logic in multiple places. While this may not be strictly
necessary for the algorithm, it reduces the possibility that this entire
loop will get stuck doing busy work when trying to evict connections to
get under the maximum threshold.
This commit is contained in:
chromatic 2021-11-20 13:21:29 -08:00
parent d871cda81a
commit 4b5dcc77f6
3 changed files with 107 additions and 47 deletions

View File

@ -62,10 +62,15 @@ class SetMaxConnectionCountTest (BitcoinTestFramework):
def run_test(self):
self.test_rpc_argument_validation()
self.test_node_connection_changes(5)
self.test_node_connection_changes(10)
self.test_node_connection_changes(3)
# max_count has to be at least 20
# min_count can be closer to 20
self.test_node_disconnections(40, 20)
def test_rpc_argument_validation(self):
first_node = self.nodes[0]
@ -118,5 +123,42 @@ class SetMaxConnectionCountTest (BitcoinTestFramework):
x = first_node.getconnectioncount()
assert(x == 0)
def test_node_disconnections(self, max_count, min_count):
first_node = self.nodes[0]
attempted_nodes = []
# 9 is 8 outgoing connections plus 1 feeler
first_node.setmaxconnections(9 + max_count)
client_nodes = []
self.connect_nodes(client_nodes, max_count)
x = first_node.getconnectioncount()
assert(x == max_count)
first_node.setmaxconnections(min_count)
def nodes_disconnected():
disc_count = 0
for node in attempted_nodes:
if node.peer_disconnected:
disc_count += 1
else:
node.sync_with_ping(0.1)
if disc_count < max_count - min_count:
return False
return True
wait_until(nodes_disconnected, timeout=10)
# try asserting this two ways, for debugging the test
x = first_node.getconnectioncount()
assert(x < max_count)
assert(x == min_count)
for node in attempted_nodes:
node.close()
if __name__ == '__main__':
SetMaxConnectionCountTest().main()

View File

@ -1107,6 +1107,56 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
}
}
void CConnman::DisconnectUnusedNodes()
{
LOCK(cs_vNodes);
// Disconnect unused nodes
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
// release outbound grant (if any)
pnode->grantOutbound.Release();
// close socket and cleanup
pnode->CloseSocketDisconnect();
// hold in disconnected pool until all refs are released
pnode->Release();
vNodesDisconnected.push_back(pnode);
}
}
}
void CConnman::DeleteDisconnectedNodes()
{
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy)
{
// wait until threads are done using it
if (pnode->GetRefCount() <= 0) {
bool fDelete = false;
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) {
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
fDelete = true;
}
}
}
if (fDelete) {
vNodesDisconnected.remove(pnode);
DeleteNode(pnode);
}
}
}
}
void CConnman::ThreadSocketHandler()
{
unsigned int nPrevNodeCount = 0;
@ -1115,53 +1165,8 @@ void CConnman::ThreadSocketHandler()
//
// Disconnect nodes
//
{
LOCK(cs_vNodes);
// Disconnect unused nodes
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
// release outbound grant (if any)
pnode->grantOutbound.Release();
// close socket and cleanup
pnode->CloseSocketDisconnect();
// hold in disconnected pool until all refs are released
pnode->Release();
vNodesDisconnected.push_back(pnode);
}
}
}
{
// Delete disconnected nodes
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy)
{
// wait until threads are done using it
if (pnode->GetRefCount() <= 0) {
bool fDelete = false;
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) {
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
fDelete = true;
}
}
}
if (fDelete) {
vNodesDisconnected.remove(pnode);
DeleteNode(pnode);
}
}
}
}
DisconnectUnusedNodes();
DeleteDisconnectedNodes();
size_t vNodesSize;
{
LOCK(cs_vNodes);
@ -1398,6 +1403,17 @@ void CConnman::ThreadSocketHandler()
}
}
}
//
// Reduce number of connections, if needed
//
if (vNodesCopy.size() > (size_t)nMaxConnections)
{
LogPrintf("%s: attempting to reduce connections: max=%u current=%u", __func__, vNodesCopy.size(), nMaxConnections);
DisconnectUnusedNodes();
DeleteDisconnectedNodes();
AttemptToEvictConnection();
}
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodesCopy)

View File

@ -317,6 +317,8 @@ private:
bool IsWhitelistedRange(const CNetAddr &addr);
void DeleteNode(CNode* pnode);
void DisconnectUnusedNodes();
void DeleteDisconnectedNodes();
NodeId GetNewNodeId();