bitcoin/test/functional/p2p_private_broadcast.py
merge-script 8593d96519
Merge bitcoin/bitcoin#33067: test: refactor ValidWitnessMalleatedTx class to helper function
3f5211cba8e73e8eb03781e6ec32ba9c4a263782 test: remove child_one/child_two (w)txid variables (naiyoma)
7cfe790820cf247e8a27bb8091defc54c74d6aec test: replace ValidWitnessMalleatedTx class with function (naiyoma)
81675a781f3ab62a0576a9739d13b4997b63230d test: use pre-generated chain (naiyoma)

Pull request description:

  This PR refactors ` ValidWitnessMalleatedTx` class into a `build_malleated_tx_package` function. As a result, two tests are updated:  `mempool_accept_wtxid` and `p2p_p2p_private_broadcast`. Also included are a  few small refactors in mempool_accept_wtxid , (switching to MiniWallet, using a pre-mined chain, using txid directly.)

  Together, these changes reduce complexity and improve test runtime.

ACKs for top commit:
  stratospher:
    reACK 3f5211c.
  cedwies:
    reACK 3f5211c
  maflcko:
    review ACK 3f5211cba8e73e8eb03781e6ec32ba9c4a263782 👥
  rkrux:
    ACK 3f5211cba8e73e8eb03781e6ec32ba9c4a263782

Tree-SHA512: 1fd02be3432fef6b68e54fbe8b15ed56d2699580bb13d0777b21f9cbe4c6d33bbb710541e3ca2fc93eab771d17bf1c427e4b08fa216d561bdb320cc6b36ac8fc
2026-01-27 10:10:41 +00:00

443 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2017-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Test how locally submitted transactions are sent to the network when private broadcast is used.
"""
import time
import threading
from test_framework.p2p import (
P2PDataStore,
P2PInterface,
P2P_SERVICES,
P2P_VERSION,
)
from test_framework.messages import (
CAddress,
CInv,
MSG_WTX,
malleate_tx_to_invalid_witness,
msg_inv,
msg_tx,
)
from test_framework.netutil import (
format_addr_port
)
from test_framework.script_util import build_malleated_tx_package
from test_framework.socks5 import (
Socks5Configuration,
Socks5Server,
)
from test_framework.test_framework import (
BitcoinTestFramework,
)
from test_framework.util import (
assert_equal,
assert_not_equal,
assert_raises_rpc_error,
p2p_port,
tor_port,
)
from test_framework.wallet import (
MiniWallet,
)
MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8
MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2
NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS
NUM_PRIVATE_BROADCAST_PER_TX = 3
# Fill addrman with these addresses. Must have enough Tor addresses, so that even
# if all 10 default connections are opened to a Tor address (!?) there must be more
# for private broadcast.
ADDRMAN_ADDRESSES = [
"20.0.0.1",
"30.0.0.1",
"40.0.0.1",
"50.0.0.1",
"60.0.0.1",
"70.0.0.1",
"80.0.0.1",
"90.0.0.1",
"100.0.0.1",
"110.0.0.1",
"120.0.0.1",
"130.0.0.1",
"140.0.0.1",
"150.0.0.1",
"160.0.0.1",
"170.0.0.1",
"180.0.0.1",
"190.0.0.1",
"200.0.0.1",
"210.0.0.1",
"[20::1]",
"[30::1]",
"[40::1]",
"[50::1]",
"[60::1]",
"[70::1]",
"[80::1]",
"[90::1]",
"[100::1]",
"[110::1]",
"[120::1]",
"[130::1]",
"[140::1]",
"[150::1]",
"[160::1]",
"[170::1]",
"[180::1]",
"[190::1]",
"[200::1]",
"[210::1]",
"testonlyad777777777777777777777777777777777777777775b6qd.onion",
"testonlyah77777777777777777777777777777777777777777z7ayd.onion",
"testonlyal77777777777777777777777777777777777777777vp6qd.onion",
"testonlyap77777777777777777777777777777777777777777r5qad.onion",
"testonlyat77777777777777777777777777777777777777777udsid.onion",
"testonlyax77777777777777777777777777777777777777777yciid.onion",
"testonlya777777777777777777777777777777777777777777rhgyd.onion",
"testonlybd77777777777777777777777777777777777777777rs4ad.onion",
"testonlybp77777777777777777777777777777777777777777zs2ad.onion",
"testonlybt777777777777777777777777777777777777777777x6id.onion",
"testonlybx777777777777777777777777777777777777777775styd.onion",
"testonlyb3777777777777777777777777777777777777777774ckid.onion",
"testonlycd77777777777777777777777777777777777777777733id.onion",
"testonlych77777777777777777777777777777777777777777t6kid.onion",
"testonlycl77777777777777777777777777777777777777777tt3ad.onion",
"testonlyct77777777777777777777777777777777777777777wvhyd.onion",
"testonlycx7777777777777777777777777777777777777777774bad.onion",
"testonlyc377777777777777777777777777777777777777777u6aid.onion",
"testonlydd777777777777777777777777777777777777777777u5ad.onion",
"testonlydh77777777777777777777777777777777777777777wgnyd.onion",
"testonlyad77777777777777777777777777777777777777777q.b32.i2p",
"testonlyah77777777777777777777777777777777777777777q.b32.i2p",
"testonlyap77777777777777777777777777777777777777777q.b32.i2p",
"testonlyat77777777777777777777777777777777777777777q.b32.i2p",
"testonlyax77777777777777777777777777777777777777777q.b32.i2p",
"testonlya377777777777777777777777777777777777777777q.b32.i2p",
"testonlya777777777777777777777777777777777777777777q.b32.i2p",
"testonlybd77777777777777777777777777777777777777777q.b32.i2p",
"testonlybh77777777777777777777777777777777777777777q.b32.i2p",
"testonlybl77777777777777777777777777777777777777777q.b32.i2p",
"testonlybp77777777777777777777777777777777777777777q.b32.i2p",
"testonlybt77777777777777777777777777777777777777777q.b32.i2p",
"testonlybx77777777777777777777777777777777777777777q.b32.i2p",
"testonlyb777777777777777777777777777777777777777777q.b32.i2p",
"testonlych77777777777777777777777777777777777777777q.b32.i2p",
"testonlycp77777777777777777777777777777777777777777q.b32.i2p",
"testonlyct77777777777777777777777777777777777777777q.b32.i2p",
"testonlycx77777777777777777777777777777777777777777q.b32.i2p",
"testonlyc377777777777777777777777777777777777777777q.b32.i2p",
"testonlyc777777777777777777777777777777777777777777q.b32.i2p",
"[fc00::1]",
"[fc00::2]",
"[fc00::3]",
"[fc00::5]",
"[fc00::6]",
"[fc00::7]",
"[fc00::8]",
"[fc00::9]",
"[fc00::10]",
"[fc00::11]",
"[fc00::12]",
"[fc00::13]",
"[fc00::15]",
"[fc00::16]",
"[fc00::17]",
"[fc00::18]",
"[fc00::19]",
"[fc00::20]",
"[fc00::22]",
"[fc00::23]",
]
class P2PPrivateBroadcast(BitcoinTestFramework):
def set_test_params(self):
self.disable_autoconnect = False
self.num_nodes = 2
def setup_nodes(self):
# Start a SOCKS5 proxy server.
socks5_server_config = Socks5Configuration()
# self.nodes[0] listens on p2p_port(0),
# self.nodes[1] listens on p2p_port(1),
# thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes) (self.num_nodes is 2)
socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes))
socks5_server_config.unauth = True
socks5_server_config.auth = True
self.socks5_server = Socks5Server(socks5_server_config)
self.socks5_server.start()
self.destinations = []
self.destinations_lock = threading.Lock()
def destinations_factory(requested_to_addr, requested_to_port):
with self.destinations_lock:
i = len(self.destinations)
actual_to_addr = ""
actual_to_port = 0
listener = None
if i == NUM_INITIAL_CONNECTIONS:
# Instruct the SOCKS5 server to redirect the first private
# broadcast connection from nodes[0] to nodes[1]
actual_to_addr = "127.0.0.1" # nodes[1] listen address
actual_to_port = tor_port(1) # nodes[1] listen port for Tor
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])")
else:
# Create a Python P2P listening node and instruct the SOCKS5 proxy to
# redirect the connection to it. The first outbound connection is used
# later to serve GETDATA, thus make it P2PDataStore().
listener = P2PDataStore() if i == 0 else P2PInterface()
listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor)
listener.peer_connect_send_version(services=P2P_SERVICES)
def on_listen_done(addr, port):
nonlocal actual_to_addr
nonlocal actual_to_port
actual_to_addr = addr
actual_to_port = port
# Use port=0 to let the OS assign an available port. This
# avoids "address already in use" errors when tests run
# concurrently or ports are still in TIME_WAIT state.
self.network_thread.listen(
addr="127.0.0.1",
port=0,
p2p=listener,
callback=on_listen_done)
# Wait until the callback has been called.
self.wait_until(lambda: actual_to_port != 0)
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")
self.destinations.append({
"requested_to": format_addr_port(requested_to_addr, requested_to_port),
"node": listener,
})
assert_equal(len(self.destinations), i + 1)
return {
"actual_to_addr": actual_to_addr,
"actual_to_port": actual_to_port,
}
self.socks5_server.conf.destinations_factory = destinations_factory
self.extra_args = [
[
# Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable).
"-cjdnsreachable",
# Connecting, sending garbage, being disconnected messes up with this test's
# check_broadcasts() which waits for a particular Python node to receive a connection.
"-v2transport=0",
"-test=addrman",
"-privatebroadcast",
f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}",
# To increase coverage, make it think that the I2P network is reachable so that it
# selects such addresses as well. Pick a proxy address where nobody is listening
# and connection attempts fail quickly.
"-i2psam=127.0.0.1:1",
],
[
"-connect=0",
f"-bind=127.0.0.1:{tor_port(1)}=onion",
],
]
super().setup_nodes()
def setup_network(self):
self.setup_nodes()
def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations):
broadcasts_done = 0
i = skip_destinations - 1
while broadcasts_done < broadcasts_to_expect:
i += 1
self.log.debug(f"{label}: waiting for outbound connection i={i}")
# At this point the connection may not yet have been established (A),
# may be active (B), or may have already been closed (C).
self.wait_until(lambda: len(self.destinations) > i)
dest = self.destinations[i]
peer = dest["node"]
peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False)
# Now it is either (B) or (C).
if peer.last_message["version"].nServices != 0:
self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)")
continue
self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} must be a private broadcast, checking it")
peer.wait_for_disconnect()
# Now it is (C).
assert_equal(peer.message_count, {
"version": 1,
"verack": 1,
"inv": 1,
"tx": 1,
"ping": 1
})
dummy_address = CAddress()
dummy_address.nServices = 0
assert_equal(peer.last_message["version"].nVersion, P2P_VERSION)
assert_equal(peer.last_message["version"].nServices, 0)
assert_equal(peer.last_message["version"].nTime, 0)
assert_equal(peer.last_message["version"].addrTo, dummy_address)
assert_equal(peer.last_message["version"].addrFrom, dummy_address)
assert_equal(peer.last_message["version"].strSubVer, "/pynode:0.0.1/")
assert_equal(peer.last_message["version"].nStartingHeight, 0)
assert_equal(peer.last_message["version"].relay, 0)
assert_equal(peer.last_message["tx"].tx.txid_hex, tx["txid"])
self.log.info(f"{label}: ok: outbound connection i={i} is private broadcast of txid={tx['txid']}")
broadcasts_done += 1
def run_test(self):
tx_originator = self.nodes[0]
tx_receiver = self.nodes[1]
far_observer = tx_receiver.add_p2p_connection(P2PInterface())
wallet = MiniWallet(tx_originator)
# Fill tx_originator's addrman.
for addr in ADDRMAN_ADDRESSES:
res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False)
if not res["success"]:
self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)")
self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS)
# The next opened connection by tx_originator should be "private broadcast"
# for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver.
txs = wallet.create_self_transfer_chain(chain_length=3)
self.log.info(f"Created txid={txs[0]['txid']}: for basic test")
self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast")
self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool")
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1)
self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, "
"must be the first private broadcast connection")
self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0)
far_observer.wait_for_tx(txs[0]["txid"])
self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: "
"the private broadcast target received and further relayed the transaction")
# One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts.
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1)
self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)")
ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}"
with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]):
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0)
self.log.info("Sending a malleated transaction with an invalid witness via RPC")
malleated_invalid = malleate_tx_to_invalid_witness(txs[0])
assert_raises_rpc_error(-26, "mempool-script-verify-flag-failed",
tx_originator.sendrawtransaction,
hexstring=malleated_invalid.serialize_with_witness().hex(),
maxfeerate=0.1)
self.log.info("Checking that the transaction is not in the originator node's mempool")
assert_equal(len(tx_originator.getrawmempool()), 0)
wtxid_int = int(txs[0]["wtxid"], 16)
inv = CInv(MSG_WTX, wtxid_int)
self.log.info("Sending INV and waiting for GETDATA from node")
tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator.
tx_returner.tx_store[wtxid_int] = txs[0]["tx"]
assert "getdata" not in tx_returner.last_message
received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network"
with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]):
tx_returner.send_without_ping(msg_inv([inv]))
tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message)
self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0)
self.log.info("Waiting for normal broadcast to another peer")
self.destinations[1]["node"].wait_for_inv([inv])
self.log.info("Sending a transaction that is already in the mempool")
skip_destinations = len(self.destinations)
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0)
self.check_broadcasts("Broadcast of mempool transaction", txs[0], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations)
self.log.info("Sending a transaction with a dependency in the mempool")
skip_destinations = len(self.destinations)
tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1)
self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations)
self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)")
assert_equal(len(tx_originator.getrawmempool()), 1)
assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent",
tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0.1)
assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent",
tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0)
# Since txs[1] has not been received back by tx_originator,
# it should be re-broadcast after a while. Advance tx_originator's clock
# to trigger a re-broadcast. Should be more than the maximum returned by
# NextTxBroadcast() in net_processing.cpp.
self.log.info("Checking that rebroadcast works")
delta = 20 * 60 # 20min
skip_destinations = len(self.destinations)
rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}"
with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]):
tx_originator.setmocktime(int(time.time()) + delta)
tx_originator.mockscheduler(delta)
self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations)
tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this).
self.log.info("Sending a pair of transactions with the same txid but different valid wtxids via RPC")
parent = wallet.create_self_transfer()["tx"]
parent_amount = parent.vout[0].nValue - 10000
child_amount = parent_amount - 10000
siblings_parent, sibling1, sibling2 = build_malleated_tx_package(
parent=parent,
rebalance_parent_output_amount=parent_amount,
child_amount=child_amount)
self.log.info(f" - sibling1: txid={sibling1.txid_hex}, wtxid={sibling1.wtxid_hex}")
self.log.info(f" - sibling2: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}")
assert_equal(sibling1.txid_hex, sibling2.txid_hex)
assert_not_equal(sibling1.wtxid_hex, sibling2.wtxid_hex)
assert_equal(len(tx_originator.getrawmempool()), 1)
tx_returner.send_without_ping(msg_tx(siblings_parent))
self.wait_until(lambda: len(tx_originator.getrawmempool()) > 1)
self.log.info(" - siblings' parent added to the mempool")
tx_originator.sendrawtransaction(hexstring=sibling1.serialize_with_witness().hex(), maxfeerate=0.1)
self.log.info(" - sent sibling1: ok")
tx_originator.sendrawtransaction(hexstring=sibling2.serialize_with_witness().hex(), maxfeerate=0.1)
self.log.info(" - sent sibling2: ok")
# Stop the SOCKS5 proxy server to avoid it being upset by the bitcoin
# node disconnecting in the middle of the SOCKS5 handshake when we
# restart below.
self.socks5_server.stop()
self.log.info("Trying to send a transaction when none of Tor or I2P is reachable")
self.restart_node(0, extra_args=[
"-privatebroadcast",
"-v2transport=0",
# A location where definitely a Tor control is not listening. This would allow
# Bitcoin Core to start, hoping/assuming that the location of the Tor proxy
# may be retrieved after startup from the Tor control, but it will not be, so
# the RPC should throw.
"-torcontrol=127.0.0.1:1",
"-listenonion",
])
assert_raises_rpc_error(-1, "none of the Tor or I2P networks is reachable",
tx_originator.sendrawtransaction, hexstring=txs[0]["hex"], maxfeerate=0.1)
if __name__ == "__main__":
P2PPrivateBroadcast(__file__).main()