Merge bitcoin/bitcoin#34410: test: let connections happen in any order in p2p_private_broadcast.py

da7f70a5322843b70f29456a8bc2227209a0718b test: use port 0 for I2P addresses in p2p_private_broadcast.py (Vasil Dimov)
a8ebcfd34c63f142064b4f5ef7d52299739d4cd6 test: let connections happen in any order in p2p_private_broadcast.py (Vasil Dimov)
67696b207f370e902c8d5fb765e4ff10f6c9e1b4 net: extend log message to include attempted connection type (Vasil Dimov)

Pull request description:

  If the following two events happen:

  * (likely) the automatic 10 initial connections are not made to all
    networks
  * (unlikely) the network-specific logic kicks in almost immediately.
    It is using exponential distribution with a mean of 5 minutes
    (`rng.rand_exp_duration(EXTRA_NETWORK_PEER_INTERVAL)`).

  So if both happen, then the 11th connection may not be the expected
  private broadcast, but a network-specific connection.

  Fix this by retrieving the connection type from
  `destinations_factory()`. This is more flexible because it allows
  connections to happen in any order and does not break if e.g. the 11th
  connection is not the expected first private broadcast.

  This also makes the test run faster:
  before: 19-44 sec
  now: 10-25 sec
  because for example there is no need to wait for the initial 10
  automatic outbound connections to be made in order to proceed.

  Fixes: https://github.com/bitcoin/bitcoin/issues/34387

ACKs for top commit:
  achow101:
    ACK da7f70a5322843b70f29456a8bc2227209a0718b
  andrewtoth:
    ACK da7f70a5322843b70f29456a8bc2227209a0718b
  mzumsande:
    Code Review ACK da7f70a5322843b70f29456a8bc2227209a0718b

Tree-SHA512: 7c293e59c15c148a438e0119343b05eb278205640658c99336d4caf4848c5bae92b48e15f325fa616cbc9d5f394649abfa02406a76e802cffbd3d312a22a6885
This commit is contained in:
Ava Chow 2026-03-02 07:47:53 -08:00
commit 6b0a980de9
No known key found for this signature in database
GPG Key ID: 17565732E08E5E41
2 changed files with 88 additions and 25 deletions

View File

@ -390,8 +390,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect,
}
}
LogDebug(BCLog::NET, "trying %s connection %s lastseen=%.1fhrs\n",
LogDebug(BCLog::NET, "trying %s connection (%s) to %s, lastseen=%.1fhrs\n",
use_v2transport ? "v2" : "v1",
ConnectionTypeAsString(conn_type),
pszDest ? pszDest : addrConnect.ToStringAddrPort(),
Ticks<HoursDouble>(pszDest ? 0h : Now<NodeSeconds>() - addrConnect.nTime));

View File

@ -6,6 +6,7 @@
Test how locally submitted transactions are sent to the network when private broadcast is used.
"""
import re
import time
import threading
@ -46,9 +47,6 @@ from test_framework.wallet import (
MiniWallet,
)
MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8
MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2
NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS
NUM_PRIVATE_BROADCAST_PER_TX = 3
# Fill addrman with these addresses. Must have enough Tor addresses, so that even
@ -184,24 +182,56 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.destinations_lock = threading.Lock()
def find_connection_type_in_debug_log(to_addr, to_port):
"""
Scan the debug log of tx_originator for a connection attempt to to_addr:to_port.
Return the connection type (outbound-full-relay, private-broadcast, etc) or
None if there is no connection attempt to to_addr:to_port.
"""
with open(self.tx_originator_debug_log_path, mode="r", encoding="utf-8") as debug_log:
for line in debug_log.readlines():
match = re.match(f".*trying v. connection \\((.+)\\) to \\[?{to_addr}]?:{to_port},.*", line)
if match:
return match.group(1)
return None
def destinations_factory(requested_to_addr, requested_to_port):
"""
Instruct the SOCKS5 proxy to redirect connections:
* The first automatic outbound connection -> P2PDataStore
* The first private broadcast connection -> nodes[1]
* Anything else -> P2PInterface
"""
conn_type = None
def found_connection_in_debug_log():
nonlocal conn_type
conn_type = find_connection_type_in_debug_log(requested_to_addr, requested_to_port)
return conn_type is not None
self.wait_until(found_connection_in_debug_log)
with self.destinations_lock:
i = len(self.destinations)
actual_to_addr = ""
actual_to_port = 0
listener = None
if i == NUM_INITIAL_CONNECTIONS:
target_name = ""
if conn_type == "private-broadcast" and not any(dest["conn_type"] == "private-broadcast" for dest in self.destinations):
# Instruct the SOCKS5 server to redirect the first private
# broadcast connection from nodes[0] to nodes[1]
actual_to_addr = "127.0.0.1" # nodes[1] listen address
actual_to_port = tor_port(1) # nodes[1] listen port for Tor
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])")
target_name = "nodes[1]"
else:
# Create a Python P2P listening node and instruct the SOCKS5 proxy to
# redirect the connection to it. The first outbound connection is used
# later to serve GETDATA, thus make it P2PDataStore().
listener = P2PDataStore() if i == 0 else P2PInterface()
if conn_type == "outbound-full-relay" and not any(dest["conn_type"] == "outbound-full-relay" for dest in self.destinations):
listener = P2PDataStore()
target_name = "Python P2PDataStore"
else:
listener = P2PInterface()
target_name = "Python P2PInterface"
listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor)
listener.peer_connect_send_version(services=P2P_SERVICES)
@ -221,11 +251,14 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
callback=on_listen_done)
# Wait until the callback has been called.
self.wait_until(lambda: actual_to_port != 0)
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} ({conn_type}) for "
f"{format_addr_port(requested_to_addr, requested_to_port)} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} ({target_name})")
self.destinations.append({
"requested_to": format_addr_port(requested_to_addr, requested_to_port),
"conn_type": conn_type,
"node": listener,
})
assert_equal(len(self.destinations), i + 1)
@ -263,6 +296,15 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.setup_nodes()
def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations):
def wait_and_get_destination(n):
"""Wait for self.destinations[] to have at least n elements and return the 'n'th."""
def get_destinations_len():
with self.destinations_lock:
return len(self.destinations)
self.wait_until(lambda: get_destinations_len() > n)
with self.destinations_lock:
return self.destinations[n]
broadcasts_done = 0
i = skip_destinations - 1
while broadcasts_done < broadcasts_to_expect:
@ -270,9 +312,10 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.log.debug(f"{label}: waiting for outbound connection i={i}")
# At this point the connection may not yet have been established (A),
# may be active (B), or may have already been closed (C).
self.wait_until(lambda: len(self.destinations) > i)
dest = self.destinations[i]
dest = wait_and_get_destination(i)
peer = dest["node"]
if peer is None:
continue # That is the first private broadcast connection, redirected to nodes[1]
peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False)
# Now it is either (B) or (C).
if peer.last_message["version"].nServices != 0:
@ -314,6 +357,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
def run_test(self):
tx_originator = self.nodes[0]
self.tx_originator_debug_log_path = tx_originator.debug_log_path
tx_receiver = self.nodes[1]
far_observer = tx_receiver.add_p2p_connection(P2PInterface())
@ -321,30 +365,24 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
# Fill tx_originator's addrman.
for addr in ADDRMAN_ADDRESSES:
res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False)
res = tx_originator.addpeeraddress(address=addr, port=0 if addr.endswith(".i2p") else 8333, tried=False)
if not res["success"]:
self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)")
self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS)
# The next opened connection by tx_originator should be "private broadcast"
# for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver.
txs = wallet.create_self_transfer_chain(chain_length=3)
self.log.info(f"Created txid={txs[0]['txid']}: for basic test")
self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast")
self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool")
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1)
self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, "
"must be the first private broadcast connection")
self.log.info("First private broadcast: waiting for the transaction to reach the recipient")
self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0)
self.log.info("First private broadcast: the recipient received the transaction")
far_observer.wait_for_tx(txs[0]["txid"])
self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: "
"the private broadcast target received and further relayed the transaction")
self.log.info("First private broadcast: the recipient further relayed the transaction")
# One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts.
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1)
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, 0)
self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)")
ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}"
@ -364,8 +402,32 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
wtxid_int = int(txs[0]["wtxid"], 16)
inv = CInv(MSG_WTX, wtxid_int)
tx_returner = None # First outbound-full-relay, will be P2PDataStore.
other_peer = None # Any other outbound-full-relay, we use the second one.
def set_tx_returner_and_other():
nonlocal tx_returner
nonlocal other_peer
tx_returner = None
other_peer = None
with self.destinations_lock:
for dest in self.destinations:
if dest["conn_type"] == "outbound-full-relay" and dest["node"] is not None:
if tx_returner is None:
assert(type(dest["node"]) is P2PDataStore)
tx_returner = dest["node"]
else:
assert(type(dest["node"]) is P2PInterface)
other_peer = dest["node"]
return True
return False
self.wait_until(set_tx_returner_and_other)
tx_returner.wait_for_connect()
other_peer.wait_for_connect()
self.log.info("Sending INV and waiting for GETDATA from node")
tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator.
tx_returner.tx_store[wtxid_int] = txs[0]["tx"]
assert "getdata" not in tx_returner.last_message
received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network"
@ -375,7 +437,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0)
self.log.info("Waiting for normal broadcast to another peer")
self.destinations[1]["node"].wait_for_inv([inv])
other_peer.wait_for_inv([inv])
self.log.info("Checking getprivatebroadcastinfo no longer reports the transaction after it is received back")
pbinfo = tx_originator.getprivatebroadcastinfo()