From e74d54e04896a86cad4e4b1bd9641afcc3a026c2 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 17 May 2023 17:19:49 +0200 Subject: [PATCH] test: add functional test for private broadcast --- test/functional/p2p_private_broadcast.py | 443 +++++++++++++++++++++++ test/functional/test_runner.py | 1 + 2 files changed, 444 insertions(+) create mode 100755 test/functional/p2p_private_broadcast.py diff --git a/test/functional/p2p_private_broadcast.py b/test/functional/p2p_private_broadcast.py new file mode 100755 index 00000000000..4c3739d8a04 --- /dev/null +++ b/test/functional/p2p_private_broadcast.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python3 +# Copyright (c) 2017-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test how locally submitted transactions are sent to the network when private broadcast is used. +""" + +import time +import threading + +from test_framework.p2p import ( + P2PDataStore, + P2PInterface, + P2P_SERVICES, + P2P_VERSION, +) +from test_framework.messages import ( + CAddress, + CInv, + COIN, + MSG_WTX, + malleate_tx_to_invalid_witness, + msg_inv, + msg_tx, +) +from test_framework.netutil import ( + format_addr_port +) +from test_framework.script_util import ValidWitnessMalleatedTx +from test_framework.socks5 import ( + Socks5Configuration, + Socks5Server, +) +from test_framework.test_framework import ( + BitcoinTestFramework, +) +from test_framework.util import ( + MAX_NODES, + assert_equal, + assert_not_equal, + assert_raises_rpc_error, + p2p_port, + tor_port, +) +from test_framework.wallet import ( + MiniWallet, +) + +MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8 +MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2 +NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS +NUM_PRIVATE_BROADCAST_PER_TX = 3 + +# Fill addrman with these addresses. Must have enough Tor addresses, so that even +# if all 10 default connections are opened to a Tor address (!?) there must be more +# for private broadcast. +ADDRMAN_ADDRESSES = [ + "20.0.0.1", + "30.0.0.1", + "40.0.0.1", + "50.0.0.1", + "60.0.0.1", + "70.0.0.1", + "80.0.0.1", + "90.0.0.1", + "100.0.0.1", + "110.0.0.1", + "120.0.0.1", + "130.0.0.1", + "140.0.0.1", + "150.0.0.1", + "160.0.0.1", + "170.0.0.1", + "180.0.0.1", + "190.0.0.1", + "200.0.0.1", + "210.0.0.1", + + "[20::1]", + "[30::1]", + "[40::1]", + "[50::1]", + "[60::1]", + "[70::1]", + "[80::1]", + "[90::1]", + "[100::1]", + "[110::1]", + "[120::1]", + "[130::1]", + "[140::1]", + "[150::1]", + "[160::1]", + "[170::1]", + "[180::1]", + "[190::1]", + "[200::1]", + "[210::1]", + + "testonlyad777777777777777777777777777777777777777775b6qd.onion", + "testonlyah77777777777777777777777777777777777777777z7ayd.onion", + "testonlyal77777777777777777777777777777777777777777vp6qd.onion", + "testonlyap77777777777777777777777777777777777777777r5qad.onion", + "testonlyat77777777777777777777777777777777777777777udsid.onion", + "testonlyax77777777777777777777777777777777777777777yciid.onion", + "testonlya777777777777777777777777777777777777777777rhgyd.onion", + "testonlybd77777777777777777777777777777777777777777rs4ad.onion", + "testonlybp77777777777777777777777777777777777777777zs2ad.onion", + "testonlybt777777777777777777777777777777777777777777x6id.onion", + "testonlybx777777777777777777777777777777777777777775styd.onion", + "testonlyb3777777777777777777777777777777777777777774ckid.onion", + "testonlycd77777777777777777777777777777777777777777733id.onion", + "testonlych77777777777777777777777777777777777777777t6kid.onion", + "testonlycl77777777777777777777777777777777777777777tt3ad.onion", + "testonlyct77777777777777777777777777777777777777777wvhyd.onion", + "testonlycx7777777777777777777777777777777777777777774bad.onion", + "testonlyc377777777777777777777777777777777777777777u6aid.onion", + "testonlydd777777777777777777777777777777777777777777u5ad.onion", + "testonlydh77777777777777777777777777777777777777777wgnyd.onion", + + "testonlyad77777777777777777777777777777777777777777q.b32.i2p", + "testonlyah77777777777777777777777777777777777777777q.b32.i2p", + "testonlyap77777777777777777777777777777777777777777q.b32.i2p", + "testonlyat77777777777777777777777777777777777777777q.b32.i2p", + "testonlyax77777777777777777777777777777777777777777q.b32.i2p", + "testonlya377777777777777777777777777777777777777777q.b32.i2p", + "testonlya777777777777777777777777777777777777777777q.b32.i2p", + "testonlybd77777777777777777777777777777777777777777q.b32.i2p", + "testonlybh77777777777777777777777777777777777777777q.b32.i2p", + "testonlybl77777777777777777777777777777777777777777q.b32.i2p", + "testonlybp77777777777777777777777777777777777777777q.b32.i2p", + "testonlybt77777777777777777777777777777777777777777q.b32.i2p", + "testonlybx77777777777777777777777777777777777777777q.b32.i2p", + "testonlyb777777777777777777777777777777777777777777q.b32.i2p", + "testonlych77777777777777777777777777777777777777777q.b32.i2p", + "testonlycp77777777777777777777777777777777777777777q.b32.i2p", + "testonlyct77777777777777777777777777777777777777777q.b32.i2p", + "testonlycx77777777777777777777777777777777777777777q.b32.i2p", + "testonlyc377777777777777777777777777777777777777777q.b32.i2p", + "testonlyc777777777777777777777777777777777777777777q.b32.i2p", + + "[fc00::1]", + "[fc00::2]", + "[fc00::3]", + "[fc00::5]", + "[fc00::6]", + "[fc00::7]", + "[fc00::8]", + "[fc00::9]", + "[fc00::10]", + "[fc00::11]", + "[fc00::12]", + "[fc00::13]", + "[fc00::15]", + "[fc00::16]", + "[fc00::17]", + "[fc00::18]", + "[fc00::19]", + "[fc00::20]", + "[fc00::22]", + "[fc00::23]", +] + + +class P2PPrivateBroadcast(BitcoinTestFramework): + def set_test_params(self): + self.disable_autoconnect = False + self.num_nodes = 2 + + def setup_nodes(self): + # Start a SOCKS5 proxy server. + socks5_server_config = Socks5Configuration() + # self.nodes[0] listens on p2p_port(0), + # self.nodes[1] listens on p2p_port(1), + # thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes) (self.num_nodes is 2) + socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes)) + socks5_server_config.unauth = True + socks5_server_config.auth = True + + self.socks5_server = Socks5Server(socks5_server_config) + self.socks5_server.start() + + # Tor ports are the highest among p2p/rpc/tor, so this should be the first available port. + ports_base = tor_port(MAX_NODES) + 1 + + self.destinations = [] + + self.destinations_lock = threading.Lock() + + def destinations_factory(requested_to_addr, requested_to_port): + with self.destinations_lock: + i = len(self.destinations) + actual_to_addr = "" + actual_to_port = 0 + listener = None + if i == NUM_INITIAL_CONNECTIONS: + # Instruct the SOCKS5 server to redirect the first private + # broadcast connection from nodes[0] to nodes[1] + actual_to_addr = "127.0.0.1" # nodes[1] listen address + actual_to_port = tor_port(1) # nodes[1] listen port for Tor + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])") + else: + # Create a Python P2P listening node and instruct the SOCKS5 proxy to + # redirect the connection to it. The first outbound connection is used + # later to serve GETDATA, thus make it P2PDataStore(). + listener = P2PDataStore() if i == 0 else P2PInterface() + listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor) + listener.peer_connect_send_version(services=P2P_SERVICES) + + def on_listen_done(addr, port): + nonlocal actual_to_addr + nonlocal actual_to_port + actual_to_addr = addr + actual_to_port = port + + self.network_thread.listen( + addr="127.0.0.1", + port=ports_base + i, + p2p=listener, + callback=on_listen_done) + # Wait until the callback has been called. + self.wait_until(lambda: actual_to_port != 0) + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)") + + self.destinations.append({ + "requested_to": format_addr_port(requested_to_addr, requested_to_port), + "node": listener, + }) + assert_equal(len(self.destinations), i + 1) + + return { + "actual_to_addr": actual_to_addr, + "actual_to_port": actual_to_port, + } + + self.socks5_server.conf.destinations_factory = destinations_factory + + self.extra_args = [ + [ + # Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable). + "-cjdnsreachable", + # Connecting, sending garbage, being disconnected messes up with this test's + # check_broadcasts() which waits for a particular Python node to receive a connection. + "-v2transport=0", + "-test=addrman", + "-privatebroadcast", + f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}", + # To increase coverage, make it think that the I2P network is reachable so that it + # selects such addresses as well. Pick a proxy address where nobody is listening + # and connection attempts fail quickly. + "-i2psam=127.0.0.1:1", + ], + [ + "-connect=0", + f"-bind=127.0.0.1:{tor_port(1)}=onion", + ], + ] + super().setup_nodes() + + def setup_network(self): + self.setup_nodes() + + def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations): + broadcasts_done = 0 + i = skip_destinations - 1 + while broadcasts_done < broadcasts_to_expect: + i += 1 + self.log.debug(f"{label}: waiting for outbound connection i={i}") + # At this point the connection may not yet have been established (A), + # may be active (B), or may have already been closed (C). + self.wait_until(lambda: len(self.destinations) > i) + dest = self.destinations[i] + peer = dest["node"] + peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False) + # Now it is either (B) or (C). + if peer.last_message["version"].nServices != 0: + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)") + continue + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} must be a private broadcast, checking it") + peer.wait_for_disconnect() + # Now it is (C). + assert_equal(peer.message_count, { + "version": 1, + "verack": 1, + "inv": 1, + "tx": 1, + "ping": 1 + }) + dummy_address = CAddress() + dummy_address.nServices = 0 + assert_equal(peer.last_message["version"].nVersion, P2P_VERSION) + assert_equal(peer.last_message["version"].nServices, 0) + assert_equal(peer.last_message["version"].nTime, 0) + assert_equal(peer.last_message["version"].addrTo, dummy_address) + assert_equal(peer.last_message["version"].addrFrom, dummy_address) + assert_equal(peer.last_message["version"].strSubVer, "/pynode:0.0.1/") + assert_equal(peer.last_message["version"].nStartingHeight, 0) + assert_equal(peer.last_message["version"].relay, 0) + assert_equal(peer.last_message["tx"].tx.txid_hex, tx["txid"]) + self.log.info(f"{label}: ok: outbound connection i={i} is private broadcast of txid={tx['txid']}") + broadcasts_done += 1 + + def run_test(self): + tx_originator = self.nodes[0] + tx_receiver = self.nodes[1] + far_observer = tx_receiver.add_p2p_connection(P2PInterface()) + + wallet = MiniWallet(tx_originator) + + # Fill tx_originator's addrman. + for addr in ADDRMAN_ADDRESSES: + res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False) + if not res["success"]: + self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)") + + self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS) + + # The next opened connection by tx_originator should be "private broadcast" + # for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver. + + txs = wallet.create_self_transfer_chain(chain_length=3) + self.log.info(f"Created txid={txs[0]['txid']}: for basic test") + self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast") + self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool") + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1) + + self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, " + "must be the first private broadcast connection") + self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0) + far_observer.wait_for_tx(txs[0]["txid"]) + self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: " + "the private broadcast target received and further relayed the transaction") + + # One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts. + self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1) + + self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)") + ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]): + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + + self.log.info("Sending a malleated transaction with an invalid witness via RPC") + malleated_invalid = malleate_tx_to_invalid_witness(txs[0]) + assert_raises_rpc_error(-26, "mempool-script-verify-flag-failed", + tx_originator.sendrawtransaction, + hexstring=malleated_invalid.serialize_with_witness().hex(), + maxfeerate=0.1) + + self.log.info("Checking that the transaction is not in the originator node's mempool") + assert_equal(len(tx_originator.getrawmempool()), 0) + + wtxid_int = int(txs[0]["wtxid"], 16) + inv = CInv(MSG_WTX, wtxid_int) + + self.log.info("Sending INV and waiting for GETDATA from node") + tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator. + tx_returner.tx_store[wtxid_int] = txs[0]["tx"] + assert "getdata" not in tx_returner.last_message + received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network" + with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]): + tx_returner.send_without_ping(msg_inv([inv])) + tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0) + + self.log.info("Waiting for normal broadcast to another peer") + self.destinations[1]["node"].wait_for_inv([inv]) + + self.log.info("Sending a transaction that is already in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + self.check_broadcasts("Broadcast of mempool transaction", txs[0], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1) + self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)") + assert_equal(len(tx_originator.getrawmempool()), 1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0.1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0) + + # Since txs[1] has not been received back by tx_originator, + # it should be re-broadcast after a while. Advance tx_originator's clock + # to trigger a re-broadcast. Should be more than the maximum returned by + # NextTxBroadcast() in net_processing.cpp. + self.log.info("Checking that rebroadcast works") + delta = 20 * 60 # 20min + skip_destinations = len(self.destinations) + rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]): + tx_originator.setmocktime(int(time.time()) + delta) + tx_originator.mockscheduler(delta) + self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations) + tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this). + + self.log.info("Sending a pair of transactions with the same txid but different valid wtxids via RPC") + txgen = ValidWitnessMalleatedTx() + funding = wallet.get_utxo() + fee_sat = 1000 + siblings_parent = txgen.build_parent_tx(funding["txid"], amount=funding["value"] * COIN - fee_sat) + sibling1, sibling2 = txgen.build_malleated_children(siblings_parent.txid_hex, amount=siblings_parent.vout[0].nValue - fee_sat) + self.log.info(f" - sibling1: txid={sibling1.txid_hex}, wtxid={sibling1.wtxid_hex}") + self.log.info(f" - sibling2: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}") + assert_equal(sibling1.txid_hex, sibling2.txid_hex) + assert_not_equal(sibling1.wtxid_hex, sibling2.wtxid_hex) + wallet.sign_tx(siblings_parent) + assert_equal(len(tx_originator.getrawmempool()), 1) + tx_returner.send_without_ping(msg_tx(siblings_parent)) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 1) + self.log.info(" - siblings' parent added to the mempool") + tx_originator.sendrawtransaction(hexstring=sibling1.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sent sibling1: ok") + tx_originator.sendrawtransaction(hexstring=sibling2.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sent sibling2: ok") + + # Stop the SOCKS5 proxy server to avoid it being upset by the bitcoin + # node disconnecting in the middle of the SOCKS5 handshake when we + # restart below. + self.socks5_server.stop() + + self.log.info("Trying to send a transaction when none of Tor or I2P is reachable") + self.restart_node(0, extra_args=[ + "-privatebroadcast", + "-v2transport=0", + # A location where definitely a Tor control is not listening. This would allow + # Bitcoin Core to start, hoping/assuming that the location of the Tor proxy + # may be retrieved after startup from the Tor control, but it will not be, so + # the RPC should throw. + "-torcontrol=127.0.0.1:1", + "-listenonion", + ]) + assert_raises_rpc_error(-1, "none of the Tor or I2P networks is reachable", + tx_originator.sendrawtransaction, hexstring=txs[0]["hex"], maxfeerate=0.1) + + +if __name__ == "__main__": + P2PPrivateBroadcast(__file__).main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index d63dbadefd3..037aa7579ef 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -300,6 +300,7 @@ BASE_SCRIPTS = [ 'rpc_dumptxoutset.py', 'feature_minchainwork.py', 'rpc_estimatefee.py', + 'p2p_private_broadcast.py', 'rpc_getblockstats.py', 'feature_port.py', 'feature_bind_port_externalip.py',