diff --git a/doc/release-notes-29415.md b/doc/release-notes-29415.md new file mode 100644 index 00000000000..d5040a3193d --- /dev/null +++ b/doc/release-notes-29415.md @@ -0,0 +1,14 @@ +P2P and network changes +----------------------- + +- Normally local transactions are broadcast to all connected peers with + which we do transaction relay. Now, for the `sendrawtransaction` RPC + this behavior can be changed to only do the broadcast via the Tor or + I2P networks. A new boolean option `-privatebroadcast` has been added + to enable this behavior. This improves the privacy of the transaction + originator in two aspects: + 1. Their IP address (and thus geolocation) is never known to the + recipients. + 2. If the originator sends two otherwise unrelated transactions, they + will not be linkable. This is because a separate connection is used + for broadcasting each transaction. (#29415) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 43d67f40201..a1e0c5f131a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -244,6 +244,7 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL policy/rbf.cpp policy/settings.cpp policy/truc_policy.cpp + private_broadcast.cpp rest.cpp rpc/blockchain.cpp rpc/external_signer.cpp diff --git a/src/bitcoin-cli.cpp b/src/bitcoin-cli.cpp index 88d0665e72a..724620aa730 100644 --- a/src/bitcoin-cli.cpp +++ b/src/bitcoin-cli.cpp @@ -452,6 +452,7 @@ private: if (conn_type == "block-relay-only") return "block"; if (conn_type == "manual" || conn_type == "feeler") return conn_type; if (conn_type == "addr-fetch") return "addr"; + if (conn_type == "private-broadcast") return "priv"; return ""; } std::string FormatServices(const UniValue& services) @@ -703,6 +704,7 @@ public: " \"manual\" - peer we manually added using RPC addnode or the -addnode/-connect config options\n" " \"feeler\" - short-lived connection for testing addresses\n" " \"addr\" - address fetch; short-lived connection for requesting addresses\n" + " \"priv\" - private broadcast; short-lived connection for broadcasting our transactions\n" " net Network the peer connected through (\"ipv4\", \"ipv6\", \"onion\", \"i2p\", \"cjdns\", or \"npr\" (not publicly routable))\n" " serv Services offered by the peer\n" " \"n\" - NETWORK: peer can serve the full block chain\n" diff --git a/src/init.cpp b/src/init.cpp index f2af858eb46..1823af1fd89 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -542,7 +542,7 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) argsman.AddArg("-forcednsseed", strprintf("Always query for peer addresses via DNS lookup (default: %u)", DEFAULT_FORCEDNSSEED), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listen", strprintf("Accept connections from outside (default: %u if no -proxy, -connect or -maxconnections=0)", DEFAULT_LISTEN), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); - argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); + argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u. It does not apply to short-lived private broadcast connections either, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS, MAX_PRIVATE_BROADCAST_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxreceivebuffer=", strprintf("Maximum per-connection receive buffer, *1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxsendbuffer=", strprintf("Maximum per-connection memory usage for the send buffer, *1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxuploadtarget=", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -670,6 +670,15 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) OptionsCategory::NODE_RELAY); argsman.AddArg("-minrelaytxfee=", strprintf("Fees (in %s/kvB) smaller than this are considered zero fee for relaying, mining and transaction creation (default: %s)", CURRENCY_UNIT, FormatMoney(DEFAULT_MIN_RELAY_TX_FEE)), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); + argsman.AddArg("-privatebroadcast", + strprintf( + "Broadcast transactions submitted via sendrawtransaction RPC using short-lived " + "connections through the Tor or I2P networks, without putting them in the mempool first. " + "Transactions submitted through the wallet are not affected by this option " + "(default: %u)", + DEFAULT_PRIVATE_BROADCAST), + ArgsManager::ALLOW_ANY, + OptionsCategory::NODE_RELAY); argsman.AddArg("-whitelistforcerelay", strprintf("Add 'forcerelay' permission to whitelisted peers with default permissions. This will relay transactions even if the transactions were already in the mempool. (default: %d)", DEFAULT_WHITELISTFORCERELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); argsman.AddArg("-whitelistrelay", strprintf("Add 'relay' permission to whitelisted peers with default permissions. This will accept relayed transactions even when not relaying transactions (default: %d)", DEFAULT_WHITELISTRELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); @@ -1009,11 +1018,14 @@ bool AppInitParameterInteraction(const ArgsManager& args) if (user_max_connection < 0) { return InitError(Untranslated("-maxconnections must be greater or equal than zero")); } + const size_t max_private{args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST) + ? MAX_PRIVATE_BROADCAST_CONNECTIONS + : 0}; // Reserve enough FDs to account for the bare minimum, plus any manual connections, plus the bound interfaces int min_required_fds = MIN_CORE_FDS + MAX_ADDNODE_CONNECTIONS + nBind; // Try raising the FD limit to what we need (available_fds may be smaller than the requested amount if this fails) - available_fds = RaiseFileDescriptorLimit(user_max_connection + min_required_fds); + available_fds = RaiseFileDescriptorLimit(user_max_connection + max_private + min_required_fds); // If we are using select instead of poll, our actual limit may be even smaller #ifndef USE_POLL available_fds = std::min(FD_SETSIZE, available_fds); @@ -1732,13 +1744,13 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } + const bool listenonion{args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)}; if (onion_proxy.IsValid()) { SetProxy(NET_ONION, onion_proxy); } else { // If -listenonion is set, then we will (try to) connect to the Tor control port // later from the torcontrol thread and may retrieve the onion proxy from there. - const bool listenonion_disabled{!args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)}; - if (onlynet_used_with_onion && listenonion_disabled) { + if (onlynet_used_with_onion && !listenonion) { return InitError( _("Outbound connections restricted to Tor (-onlynet=onion) but the proxy for " "reaching the Tor network is not provided: none of -proxy, -onion or " @@ -2119,7 +2131,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.onion_binds.push_back(onion_service_target); } - if (args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)) { + if (listenonion) { if (connOptions.onion_binds.size() > 1) { InitWarning(strprintf(_("More than one onion bind address is provided. Using %s " "for the automatically created Tor onion service."), @@ -2192,6 +2204,32 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) conflict->ToStringAddrPort())); } + if (args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + // If -listenonion is set, then NET_ONION may not be reachable now + // but may become reachable later, thus only error here if it is not + // reachable and will not become reachable for sure. + const bool onion_may_become_reachable{listenonion && (!args.IsArgSet("-onlynet") || onlynet_used_with_onion)}; + if (!g_reachable_nets.Contains(NET_I2P) && + !g_reachable_nets.Contains(NET_ONION) && + !onion_may_become_reachable) { + return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), " + "but none of Tor or I2P networks is reachable")); + } + if (!connOptions.m_use_addrman_outgoing) { + return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), " + "but -connect is also configured. They are incompatible because the " + "private broadcast needs to open new connections to randomly " + "chosen Tor or I2P peers. Consider using -maxconnections=0 -addnode=... " + "instead")); + } + if (!proxyRandomize && (g_reachable_nets.Contains(NET_ONION) || onion_may_become_reachable)) { + InitWarning(_("Private broadcast of own transactions requested (-privatebroadcast) and " + "-proxyrandomize is disabled. Tor circuits for private broadcast connections " + "may be correlated to other connections over Tor. For maximum privacy set " + "-proxyrandomize=1.")); + } + } + if (!node.connman->Start(scheduler, connOptions)) { return false; } diff --git a/src/logging.cpp b/src/logging.cpp index 113fc3462fa..1c3e30e9807 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -202,6 +202,7 @@ static const std::map> LOG_CATEGORIES_ {"scan", BCLog::SCAN}, {"txpackages", BCLog::TXPACKAGES}, {"kernel", BCLog::KERNEL}, + {"privatebroadcast", BCLog::PRIVBROADCAST}, }; static const std::unordered_map LOG_CATEGORIES_BY_FLAG{ diff --git a/src/logging.h b/src/logging.h index 2105039f511..2ab21071a06 100644 --- a/src/logging.h +++ b/src/logging.h @@ -116,6 +116,7 @@ namespace BCLog { SCAN = (CategoryMask{1} << 27), TXPACKAGES = (CategoryMask{1} << 28), KERNEL = (CategoryMask{1} << 29), + PRIVBROADCAST = (CategoryMask{1} << 30), ALL = ~NONE, }; enum class Level { diff --git a/src/net.cpp b/src/net.cpp index bd3d0bb1f32..d92cb72ccc3 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -354,7 +354,16 @@ bool CConnman::CheckIncomingNonce(uint64_t nonce) { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { - if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce) + // Omit private broadcast connections from this check to prevent this privacy attack: + // - We connect to a peer in an attempt to privately broadcast a transaction. From our + // VERSION message the peer deducts that this is a short-lived connection for + // broadcasting a transaction, takes our nonce and delays their VERACK. + // - The peer starts connecting to (clearnet) nodes and sends them a VERSION message + // which contains our nonce. If the peer manages to connect to us we would disconnect. + // - Upon a disconnect, the peer knows our clearnet address. They go back to the short + // lived privacy broadcast connection and continue with VERACK. + if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && !pnode->IsPrivateBroadcastConn() && + pnode->GetLocalNonce() == nonce) return false; } return true; @@ -458,7 +467,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, i2p::Connection conn; bool connected{false}; - if (m_i2p_sam_session) { + // If an I2P SAM session already exists, normally we would re-use it. But in the case of + // private broadcast we force a new transient session. A Connect() using m_i2p_sam_session + // would use our permanent I2P address as a source address. + if (m_i2p_sam_session && conn_type != ConnectionType::PRIVATE_BROADCAST) { connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed); } else { { @@ -1876,6 +1888,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ switch (conn_type) { case ConnectionType::INBOUND: case ConnectionType::MANUAL: + case ConnectionType::PRIVATE_BROADCAST: return false; case ConnectionType::OUTBOUND_FULL_RELAY: max_connections = m_max_outbound_full_relay; @@ -2666,6 +2679,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std // peers from addrman. case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: + case ConnectionType::PRIVATE_BROADCAST: break; case ConnectionType::MANUAL: case ConnectionType::OUTBOUND_FULL_RELAY: @@ -3050,6 +3064,74 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, return true; } +std::optional CConnman::PrivateBroadcast::PickNetwork(std::optional& proxy) const +{ + prevector<4, Network> nets; + std::optional clearnet_proxy; + proxy.reset(); + if (g_reachable_nets.Contains(NET_ONION)) { + nets.push_back(NET_ONION); + + clearnet_proxy = ProxyForIPv4or6(); + if (clearnet_proxy.has_value()) { + if (g_reachable_nets.Contains(NET_IPV4)) { + nets.push_back(NET_IPV4); + } + if (g_reachable_nets.Contains(NET_IPV6)) { + nets.push_back(NET_IPV6); + } + } + } + if (g_reachable_nets.Contains(NET_I2P)) { + nets.push_back(NET_I2P); + } + + if (nets.empty()) { + return std::nullopt; + } + + const Network net{nets[FastRandomContext{}.randrange(nets.size())]}; + if (net == NET_IPV4 || net == NET_IPV6) { + proxy = clearnet_proxy; + } + return net; +} + +size_t CConnman::PrivateBroadcast::NumToOpen() const +{ + return m_num_to_open; +} + +void CConnman::PrivateBroadcast::NumToOpenAdd(size_t n) +{ + m_num_to_open += n; + m_num_to_open.notify_all(); +} + +size_t CConnman::PrivateBroadcast::NumToOpenSub(size_t n) +{ + size_t current_value{m_num_to_open.load()}; + size_t new_value; + do { + new_value = current_value > n ? current_value - n : 0; + } while (!m_num_to_open.compare_exchange_strong(current_value, new_value)); + return new_value; +} + +void CConnman::PrivateBroadcast::NumToOpenWait() const +{ + m_num_to_open.wait(0); +} + +std::optional CConnman::PrivateBroadcast::ProxyForIPv4or6() const +{ + Proxy tor_proxy; + if (m_outbound_tor_ok_at_least_once.load() && GetProxy(NET_ONION, tor_proxy)) { + return tor_proxy; + } + return std::nullopt; +} + Mutex NetEventsInterface::g_msgproc_mutex; void CConnman::ThreadMessageHandler() @@ -3134,6 +3216,74 @@ void CConnman::ThreadI2PAcceptIncoming() } } +void CConnman::ThreadPrivateBroadcast() +{ + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + + size_t addrman_num_bad_addresses{0}; + while (!m_interrupt_net->interrupted()) { + + if (!fNetworkActive) { + m_interrupt_net->sleep_for(5s); + continue; + } + + CountingSemaphoreGrant<> conn_max_grant{m_private_broadcast.m_sem_conn_max}; // Would block if too many are opened. + + m_private_broadcast.NumToOpenWait(); + + if (m_interrupt_net->interrupted()) { + break; + } + + std::optional proxy; + const std::optional net{m_private_broadcast.PickNetwork(proxy)}; + if (!net.has_value()) { + LogWarning("[privatebroadcast] Connections needed but none of the Tor or I2P networks is reachable"); + m_interrupt_net->sleep_for(5s); + continue; + } + + const auto [addr, _] = addrman.Select(/*new_only=*/false, {net.value()}); + + if (!addr.IsValid() || IsLocal(addr)) { + ++addrman_num_bad_addresses; + if (addrman_num_bad_addresses > 100) { + LogDebug(BCLog::PRIVBROADCAST, "Connections needed but addrman keeps returning bad addresses, will retry"); + m_interrupt_net->sleep_for(500ms); + } + continue; + } + addrman_num_bad_addresses = 0; + + auto target_str{addr.ToStringAddrPort()}; + if (proxy.has_value()) { + target_str += " through the proxy at " + proxy->ToString(); + } + + const bool use_v2transport(addr.nServices & GetLocalServices() & NODE_P2P_V2); + + if (OpenNetworkConnection(addr, + /*fCountFailure=*/true, + std::move(conn_max_grant), + /*pszDest=*/nullptr, + ConnectionType::PRIVATE_BROADCAST, + use_v2transport, + proxy)) { + const size_t remaining{m_private_broadcast.NumToOpenSub(1)}; + LogDebug(BCLog::PRIVBROADCAST, "Socket connected to %s; remaining connections to open: %d", target_str, remaining); + } else { + const size_t remaining{m_private_broadcast.NumToOpen()}; + if (remaining == 0) { + LogDebug(BCLog::PRIVBROADCAST, "Failed to connect to %s, will not retry, no more connections needed", target_str); + } else { + LogDebug(BCLog::PRIVBROADCAST, "Failed to connect to %s, will retry to a different address; remaining connections to open: %d", target_str, remaining); + m_interrupt_net->sleep_for(100ms); // Prevent busy loop if OpenNetworkConnection() fails fast repeatedly. + } + } + } +} + bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions) { int nOne = 1; @@ -3414,6 +3564,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); }); } + if (gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + threadPrivateBroadcast = + std::thread(&util::TraceThread, "privbcast", [this] { ThreadPrivateBroadcast(); }); + } + // Dump network addresses scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL); @@ -3463,10 +3618,16 @@ void CConnman::Interrupt() semAddnode->release(); } } + + m_private_broadcast.m_sem_conn_max.release(); + m_private_broadcast.NumToOpenAdd(1); // Just unblock NumToOpenWait() to be able to continue with shutdown. } void CConnman::StopThreads() { + if (threadPrivateBroadcast.joinable()) { + threadPrivateBroadcast.join(); + } if (threadI2PAcceptIncoming.joinable()) { threadI2PAcceptIncoming.join(); } @@ -3896,9 +4057,33 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } +/// Private broadcast connections only need to send certain message types. +/// Other messages are not needed and may degrade privacy. +static bool IsOutboundMessageAllowedInPrivateBroadcast(std::string_view type) noexcept +{ + return type == NetMsgType::VERSION || + type == NetMsgType::VERACK || + type == NetMsgType::INV || + type == NetMsgType::TX || + type == NetMsgType::PING; +} + void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { AssertLockNotHeld(m_total_bytes_sent_mutex); + + if (pnode->IsPrivateBroadcastConn() && !IsOutboundMessageAllowedInPrivateBroadcast(msg.m_type)) { + LogDebug(BCLog::PRIVBROADCAST, "Omitting send of message '%s', peer=%d%s", msg.m_type, pnode->GetId(), pnode->LogIP(fLogIPs)); + return; + } + + if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() && + pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) { + // If we are sending the peer VERACK that means we successfully sent + // and received another message to/from that peer (VERSION). + m_private_broadcast.m_outbound_tor_ok_at_least_once.store(true); + } + size_t nMessageSize = msg.data.size(); LogDebug(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId()); if (m_capture_messages) { diff --git a/src/net.h b/src/net.h index eedfdfaf483..e7047c5bff2 100644 --- a/src/net.h +++ b/src/net.h @@ -73,6 +73,8 @@ static const int MAX_ADDNODE_CONNECTIONS = 8; static const int MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2; /** Maximum number of feeler connections */ static const int MAX_FEELER_CONNECTIONS = 1; +/** Maximum number of private broadcast connections */ +static constexpr size_t MAX_PRIVATE_BROADCAST_CONNECTIONS{64}; /** -listen default */ static const bool DEFAULT_LISTEN = true; /** The maximum number of peer connections to maintain. */ @@ -83,6 +85,8 @@ static const std::string DEFAULT_MAX_UPLOAD_TARGET{"0M"}; static const bool DEFAULT_BLOCKSONLY = false; /** -peertimeout default */ static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60; +/** Default for -privatebroadcast. */ +static constexpr bool DEFAULT_PRIVATE_BROADCAST{false}; /** Number of file descriptors required for message capture **/ static const int NUM_FDS_MESSAGE_CAPTURE = 1; /** Interval for ASMap Health Check **/ @@ -773,6 +777,7 @@ public: case ConnectionType::MANUAL: case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: + case ConnectionType::PRIVATE_BROADCAST: return false; } // no default case, so the compiler can warn about missing cases @@ -794,6 +799,7 @@ public: case ConnectionType::FEELER: case ConnectionType::BLOCK_RELAY: case ConnectionType::ADDR_FETCH: + case ConnectionType::PRIVATE_BROADCAST: return false; case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::MANUAL: @@ -815,6 +821,11 @@ public: return m_conn_type == ConnectionType::ADDR_FETCH; } + bool IsPrivateBroadcastConn() const + { + return m_conn_type == ConnectionType::PRIVATE_BROADCAST; + } + bool IsInboundConn() const { return m_conn_type == ConnectionType::INBOUND; } @@ -828,6 +839,7 @@ public: case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::BLOCK_RELAY: case ConnectionType::ADDR_FETCH: + case ConnectionType::PRIVATE_BROADCAST: return true; } // no default case, so the compiler can warn about missing cases @@ -1177,6 +1189,70 @@ public: const std::optional& proxy_override = std::nullopt) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + /// Group of private broadcast related members. + class PrivateBroadcast + { + public: + /** + * Remember if we ever established at least one outbound connection to a + * Tor peer, including sending and receiving P2P messages. If this is + * true then the Tor proxy indeed works and is a proxy to the Tor network, + * not a misconfigured ordinary SOCKS5 proxy as -proxy or -onion. If that + * is the case, then we assume that connecting to an IPv4 or IPv6 address + * via that proxy will be done through the Tor network and a Tor exit node. + */ + std::atomic_bool m_outbound_tor_ok_at_least_once{false}; + + /** + * Semaphore used to guard against opening too many connections. + * Opening private broadcast connections will be paused if this is equal to 0. + */ + std::counting_semaphore<> m_sem_conn_max{MAX_PRIVATE_BROADCAST_CONNECTIONS}; + + /** + * Choose a network to open a connection to. + * @param[out] proxy Optional proxy to override the normal proxy selection. + * Will be set if !std::nullopt is returned. Could be set to `std::nullopt` + * if there is no need to override the proxy that would be used for connecting + * to the returned network. + * @retval std::nullopt No network could be selected. + * @retval !std::nullopt The network was selected and `proxy` is set (maybe to `std::nullopt`). + */ + std::optional PickNetwork(std::optional& proxy) const; + + /// Get the pending number of connections to open. + size_t NumToOpen() const; + + /** + * Increment the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Increment by this number. + */ + void NumToOpenAdd(size_t n); + + /** + * Decrement the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Decrement by this number. + * @return The number of connections that remain to be opened after the operation. + */ + size_t NumToOpenSub(size_t n); + + /// Wait for the number of needed connections to become greater than 0. + void NumToOpenWait() const; + + private: + /** + * Check if private broadcast can be done to IPv4 or IPv6 peers and if so via which proxy. + * If private broadcast connections should not be opened to IPv4 or IPv6, then this will + * return an empty optional. + */ + std::optional ProxyForIPv4or6() const; + + /// Number of `ConnectionType::PRIVATE_BROADCAST` connections to open. + std::atomic_size_t m_num_to_open{0}; + } m_private_broadcast; + bool CheckIncomingNonce(uint64_t nonce); void ASMapHealthCheck(); @@ -1351,6 +1427,7 @@ private: void ThreadOpenConnections(std::vector connect, std::span seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(); + void ThreadPrivateBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AcceptConnection(const ListenSocket& hListenSocket); /** @@ -1641,6 +1718,7 @@ private: std::thread threadOpenConnections; std::thread threadMessageHandler; std::thread threadI2PAcceptIncoming; + std::thread threadPrivateBroadcast; /** flag for deciding to connect to an extra outbound peer, * in excess of m_max_outbound_full_relay diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 542d3c5081b..84c0ba9c41b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -196,6 +197,10 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1}; static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; /** The compactblocks version we support. See BIP 152. */ static constexpr uint64_t CMPCTBLOCKS_VERSION{2}; +/** For private broadcast, send a transaction to this many peers. */ +static constexpr size_t NUM_PRIVATE_BROADCAST_PER_TX{3}; +/** Private broadcast connections must complete within this time. Disconnect the peer if it takes longer. */ +static constexpr auto PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME{3min}; // Internal stuff namespace { @@ -538,7 +543,8 @@ public: std::vector GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex); PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void RelayTransaction(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitiateTxBroadcastPrivate(const CTransactionRef& tx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestBlock(int height, std::chrono::seconds time) override { m_best_height = height; @@ -561,6 +567,9 @@ private: /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + /** Rebroadcast stale private transactions (already broadcast but not received back from the network). */ + void ReattemptPrivateBroadcast(CScheduler& scheduler); + /** Get a shared pointer to the Peer object. * May return an empty shared_ptr if the Peer object can't be found. */ PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -720,8 +729,8 @@ private: /** Send a version message to a peer */ void PushNodeVersion(CNode& pnode, const Peer& peer); - /** Send a ping message every PING_INTERVAL or if requested via RPC. May - * mark the peer to be disconnected if a ping has timed out. + /** Send a ping message every PING_INTERVAL or if requested via RPC (peer.m_ping_queued is true). + * May mark the peer to be disconnected if a ping has timed out. * We use mockable time for ping timeouts, so setmocktime may cause pings * to time out. */ void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); @@ -959,6 +968,14 @@ private: void ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const BlockTransactions& block_transactions) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + /** + * Schedule an INV for a transaction to be sent to the given peer (via `PushMessage()`). + * The transaction is picked from the list of transactions for private broadcast. + * It is assumed that the connection to the peer is `ConnectionType::PRIVATE_BROADCAST`. + * Avoid calling this for other peers since it will degrade privacy. + */ + void PushPrivateBroadcastTx(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + /** * When a peer sends us a valid block, instruct it to announce blocks to us * using CMPCTBLOCK if possible by adding its nodeid to the end of @@ -1071,6 +1088,9 @@ private: void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); void LogBlockHeader(const CBlockIndex& index, const CNode& peer, bool via_compact_block); + + /// The transactions to be broadcast privately. + PrivateBroadcast m_tx_for_private_broadcast; }; const CNodeState* PeerManagerImpl::State(NodeId pnode) const @@ -1522,27 +1542,52 @@ void PeerManagerImpl::FindNextBlocks(std::vector& vBlocks, c void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) { - uint64_t my_services{peer.m_our_services}; - const int64_t nTime{count_seconds(GetTime())}; - uint64_t nonce = pnode.GetLocalNonce(); - const int nNodeStartingHeight{m_best_height}; - NodeId nodeid = pnode.GetId(); - CAddress addr = pnode.addr; - - CService addr_you = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService(); - uint64_t your_services{addr.nServices}; - - const bool tx_relay{!RejectIncomingTxs(pnode)}; - MakeAndPushMessage(pnode, NetMsgType::VERSION, PROTOCOL_VERSION, my_services, nTime, - your_services, CNetAddr::V1(addr_you), // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) - my_services, CNetAddr::V1(CService{}), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) - nonce, strSubVersion, nNodeStartingHeight, tx_relay); - - if (fLogIPs) { - LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addr_you.ToStringAddrPort(), tx_relay, nodeid); + uint64_t my_services; + int64_t my_time; + uint64_t your_services; + CService your_addr; + std::string my_user_agent; + int my_height; + bool my_tx_relay; + if (pnode.IsPrivateBroadcastConn()) { + my_services = NODE_NONE; + my_time = 0; + your_services = NODE_NONE; + your_addr = CService{}; + my_user_agent = "/pynode:0.0.1/"; // Use a constant other than the default (or user-configured). See https://github.com/bitcoin/bitcoin/pull/27509#discussion_r1214671917 + my_height = 0; + my_tx_relay = false; } else { - LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, tx_relay, nodeid); + const CAddress& addr{pnode.addr}; + my_services = peer.m_our_services; + my_time = count_seconds(GetTime()); + your_services = addr.nServices; + your_addr = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? CService{addr} : CService{}; + my_user_agent = strSubVersion; + my_height = m_best_height; + my_tx_relay = !RejectIncomingTxs(pnode); } + + MakeAndPushMessage( + pnode, + NetMsgType::VERSION, + PROTOCOL_VERSION, + my_services, + my_time, + // your_services + CNetAddr::V1(your_addr) is the pre-version-31402 serialization of your_addr (without nTime) + your_services, CNetAddr::V1(your_addr), + // same, for a dummy address + my_services, CNetAddr::V1(CService{}), + pnode.GetLocalNonce(), + my_user_agent, + my_height, + my_tx_relay); + + LogDebug( + BCLog::NET, "send version message: version=%d, blocks=%d%s, txrelay=%d, peer=%d\n", + PROTOCOL_VERSION, my_height, + fLogIPs ? strprintf(", them=%s", your_addr.ToStringAddrPort()) : "", + my_tx_relay, pnode.GetId()); } void PeerManagerImpl::UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) @@ -1580,7 +1625,7 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) CTransactionRef tx = m_mempool.get(txid); if (tx != nullptr) { - RelayTransaction(txid, tx->GetWitnessHash()); + InitiateTxBroadcastToAll(txid, tx->GetWitnessHash()); } else { m_mempool.RemoveUnbroadcastTx(txid, true); } @@ -1592,6 +1637,37 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); } +void PeerManagerImpl::ReattemptPrivateBroadcast(CScheduler& scheduler) +{ + // Remove stale transactions that are no longer relevant (e.g. already in + // the mempool or mined) and count the remaining ones. + size_t num_for_rebroadcast{0}; + const auto stale_txs = m_tx_for_private_broadcast.GetStale(); + if (!stale_txs.empty()) { + LOCK(cs_main); + for (const auto& stale_tx : stale_txs) { + auto mempool_acceptable = m_chainman.ProcessTransaction(stale_tx, /*test_accept=*/true); + if (mempool_acceptable.m_result_type == MempoolAcceptResult::ResultType::VALID) { + LogDebug(BCLog::PRIVBROADCAST, + "Reattempting broadcast of stale txid=%s wtxid=%s", + stale_tx->GetHash().ToString(), stale_tx->GetWitnessHash().ToString()); + ++num_for_rebroadcast; + } else { + LogInfo("[privatebroadcast] Giving up broadcast attempts for txid=%s wtxid=%s: %s", + stale_tx->GetHash().ToString(), stale_tx->GetWitnessHash().ToString(), + mempool_acceptable.m_state.ToString()); + m_tx_for_private_broadcast.Remove(stale_tx); + } + } + + // This could overshoot, but that is ok - we will open some private connections in vain. + m_connman.m_private_broadcast.NumToOpenAdd(num_for_rebroadcast); + } + + const auto delta{2min + FastRandomContext().randrange(1min)}; + scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, delta); +} + void PeerManagerImpl::FinalizeNode(const CNode& node) { NodeId nodeid = node.GetId(); @@ -1649,16 +1725,23 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) } } // cs_main if (node.fSuccessfullyConnected && - !node.IsBlockOnlyConn() && !node.IsInboundConn()) { + !node.IsBlockOnlyConn() && !node.IsPrivateBroadcastConn() && !node.IsInboundConn()) { // Only change visible addrman state for full outbound peers. We don't // call Connected() for feeler connections since they don't have - // fSuccessfullyConnected set. + // fSuccessfullyConnected set. Also don't call Connected() for private broadcast + // connections since they could leak information in addrman. m_addrman.Connected(node.addr); } { LOCK(m_headers_presync_mutex); m_headers_presync_stats.erase(nodeid); } + if (node.IsPrivateBroadcastConn() && + !m_tx_for_private_broadcast.DidNodeConfirmReception(nodeid) && + m_tx_for_private_broadcast.HavePendingTransactions()) { + + m_connman.m_private_broadcast.NumToOpenAdd(1); + } LogDebug(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); } @@ -1923,6 +2006,10 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler) // schedule next run for 10-15 minutes in the future const auto delta = 10min + FastRandomContext().randrange(5min); scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); + + if (m_opts.private_broadcast) { + scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, 0min); + } } void PeerManagerImpl::ActiveTipChange(const CBlockIndex& new_tip, bool is_ibd) @@ -2125,7 +2212,7 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::RelayTransaction(const Txid& txid, const Wtxid& wtxid) +void PeerManagerImpl::InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) { LOCK(m_peer_mutex); for(auto& it : m_peer_map) { @@ -2148,6 +2235,17 @@ void PeerManagerImpl::RelayTransaction(const Txid& txid, const Wtxid& wtxid) } } +void PeerManagerImpl::InitiateTxBroadcastPrivate(const CTransactionRef& tx) +{ + const auto txstr{strprintf("txid=%s, wtxid=%s", tx->GetHash().ToString(), tx->GetWitnessHash().ToString())}; + if (m_tx_for_private_broadcast.Add(tx)) { + LogDebug(BCLog::PRIVBROADCAST, "Requesting %d new connections due to %s", NUM_PRIVATE_BROADCAST_PER_TX, txstr); + m_connman.m_private_broadcast.NumToOpenAdd(NUM_PRIVATE_BROADCAST_PER_TX); + } else { + LogDebug(BCLog::PRIVBROADCAST, "Ignoring unnecessary request to schedule an already scheduled transaction: %s", txstr); + } +} + void PeerManagerImpl::RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) @@ -3032,7 +3130,7 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c tx->GetWitnessHash().ToString(), m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000); - RelayTransaction(tx->GetHash(), tx->GetWitnessHash()); + InitiateTxBroadcastToAll(tx->GetHash(), tx->GetWitnessHash()); for (const CTransactionRef& removedTx : replaced_transactions) { AddToCompactExtraTransactions(removedTx); @@ -3426,6 +3524,25 @@ void PeerManagerImpl::LogBlockHeader(const CBlockIndex& index, const CNode& peer } } +void PeerManagerImpl::PushPrivateBroadcastTx(CNode& node) +{ + Assume(node.IsPrivateBroadcastConn()); + + const auto opt_tx{m_tx_for_private_broadcast.PickTxForSend(node.GetId())}; + if (!opt_tx) { + LogDebug(BCLog::PRIVBROADCAST, "Disconnecting: no more transactions for private broadcast (connected in vain), peer=%d%s", node.GetId(), node.LogIP(fLogIPs)); + node.fDisconnect = true; + return; + } + const CTransactionRef& tx{*opt_tx}; + + LogInfo("[privatebroadcast] P2P handshake completed, sending INV for txid=%s%s, peer=%d%s", + tx->GetHash().ToString(), tx->HasWitness() ? strprintf(", wtxid=%s", tx->GetWitnessHash().ToString()) : "", + node.GetId(), node.LogIP(fLogIPs)); + + MakeAndPushMessage(node, NetMsgType::INV, std::vector{{CInv{MSG_TX, tx->GetHash().ToUint256()}}}); +} + void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) @@ -3524,19 +3641,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, pfrom.SetCommonVersion(greatest_common_version); pfrom.nVersion = nVersion; - if (greatest_common_version >= WTXID_RELAY_VERSION) { - MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); - } - - // Signal ADDRv2 support (BIP155). - if (greatest_common_version >= 70016) { - // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some - // implementations reject messages they don't know. As a courtesy, don't send - // it to nodes with a version before 70016, as no software is known to support - // BIP155 that doesn't announce at least that protocol version number. - MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2); - } - pfrom.m_has_all_wanted_services = HasAllDesirableServiceFlags(nServices); peer->m_their_services = nServices; pfrom.SetAddrLocal(addrMe); @@ -3563,6 +3667,36 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (fRelay) pfrom.m_relays_txs = true; } + const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; + LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", + cleanSubVer, pfrom.nVersion, + peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), + pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); + + if (pfrom.IsPrivateBroadcastConn()) { + if (fRelay) { + MakeAndPushMessage(pfrom, NetMsgType::VERACK); + } else { + LogInfo("[privatebroadcast] Disconnecting: does not support transactions relay (connected in vain), peer=%d%s", + pfrom.GetId(), pfrom.LogIP(fLogIPs)); + pfrom.fDisconnect = true; + } + return; + } + + if (greatest_common_version >= WTXID_RELAY_VERSION) { + MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); + } + + // Signal ADDRv2 support (BIP155). + if (greatest_common_version >= 70016) { + // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some + // implementations reject messages they don't know. As a courtesy, don't send + // it to nodes with a version before 70016, as no software is known to support + // BIP155 that doesn't announce at least that protocol version number. + MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2); + } + if (greatest_common_version >= WTXID_RELAY_VERSION && m_txreconciliation) { // Per BIP-330, we announce txreconciliation support if: // - protocol version per the peer's VERSION message supports WTXID_RELAY; @@ -3628,12 +3762,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_addrman.Good(pfrom.addr); } - const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; - LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", - cleanSubVer, pfrom.nVersion, - peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), - pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); - peer->m_time_offset = NodeSeconds{std::chrono::seconds{nTime}} - Now(); if (!pfrom.IsInboundConn()) { // Don't use timedata samples from inbound peers to make it @@ -3686,6 +3814,31 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogInfo("%s", new_peer_msg()); } + if (auto tx_relay = peer->GetTxRelay()) { + // `TxRelay::m_tx_inventory_to_send` must be empty before the + // version handshake is completed as + // `TxRelay::m_next_inv_send_time` is first initialised in + // `SendMessages` after the verack is received. Any transactions + // received during the version handshake would otherwise + // immediately be advertised without random delay, potentially + // leaking the time of arrival to a spy. + Assume(WITH_LOCK( + tx_relay->m_tx_inventory_mutex, + return tx_relay->m_tx_inventory_to_send.empty() && + tx_relay->m_next_inv_send_time == 0s)); + } + + if (pfrom.IsPrivateBroadcastConn()) { + pfrom.fSuccessfullyConnected = true; + // The peer may intend to later send us NetMsgType::FEEFILTER limiting + // cheap transactions, but we don't wait for that and thus we may send + // them a transaction below their threshold. This is ok because this + // relay logic is designed to work even in cases when the peer drops + // the transaction (due to it being too cheap, or for other reasons). + PushPrivateBroadcastTx(pfrom); + return; + } + if (pfrom.GetCommonVersion() >= SHORT_IDS_BLOCKS_VERSION) { // Tell our peer we are willing to provide version 2 cmpctblocks. // However, we do not request new block announcements using @@ -3704,20 +3857,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } - if (auto tx_relay = peer->GetTxRelay()) { - // `TxRelay::m_tx_inventory_to_send` must be empty before the - // version handshake is completed as - // `TxRelay::m_next_inv_send_time` is first initialised in - // `SendMessages` after the verack is received. Any transactions - // received during the version handshake would otherwise - // immediately be advertised without random delay, potentially - // leaking the time of arrival to a spy. - Assume(WITH_LOCK( - tx_relay->m_tx_inventory_mutex, - return tx_relay->m_tx_inventory_to_send.empty() && - tx_relay->m_next_inv_send_time == 0s)); - } - { LOCK2(::cs_main, m_tx_download_mutex); const CNodeState* state = State(pfrom.GetId()); @@ -3851,6 +3990,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } + if (pfrom.IsPrivateBroadcastConn()) { + if (msg_type != NetMsgType::PONG && msg_type != NetMsgType::GETDATA) { + LogDebug(BCLog::PRIVBROADCAST, "Ignoring incoming message '%s', peer=%d%s", msg_type, pfrom.GetId(), pfrom.LogIP(fLogIPs)); + return; + } + } + if (msg_type == NetMsgType::ADDR || msg_type == NetMsgType::ADDRV2) { const auto ser_params{ msg_type == NetMsgType::ADDRV2 ? @@ -4054,6 +4200,33 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogDebug(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId()); } + if (pfrom.IsPrivateBroadcastConn()) { + const auto pushed_tx_opt{m_tx_for_private_broadcast.GetTxForNode(pfrom.GetId())}; + if (!pushed_tx_opt) { + LogInfo("[privatebroadcast] Disconnecting: got GETDATA without sending an INV, peer=%d%s", + pfrom.GetId(), fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); + pfrom.fDisconnect = true; + return; + } + + const CTransactionRef& pushed_tx{*pushed_tx_opt}; + + // The GETDATA request must contain exactly one inv and it must be for the transaction + // that we INVed to the peer earlier. + if (vInv.size() == 1 && vInv[0].IsMsgTx() && vInv[0].hash == pushed_tx->GetHash().ToUint256()) { + + MakeAndPushMessage(pfrom, NetMsgType::TX, TX_WITH_WITNESS(*pushed_tx)); + + peer->m_ping_queued = true; // Ensure a ping will be sent: mimic a request via RPC. + MaybeSendPing(pfrom, *peer, GetTime()); + } else { + LogInfo("[privatebroadcast] Disconnecting: got an unexpected GETDATA message, peer=%d%s", + pfrom.GetId(), fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); + pfrom.fDisconnect = true; + } + return; + } + { LOCK(peer->m_getdata_requests_mutex); peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end()); @@ -4291,6 +4464,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const uint256& hash = peer->m_wtxid_relay ? wtxid.ToUint256() : txid.ToUint256(); AddKnownTx(*peer, hash); + if (const auto num_broadcasted{m_tx_for_private_broadcast.Remove(ptx)}) { + LogInfo("[privatebroadcast] Received our privately broadcast transaction (txid=%s) from the " + "network from peer=%d%s; stopping private broadcast attempts", + txid.ToString(), pfrom.GetId(), pfrom.LogIP(fLogIPs)); + if (NUM_PRIVATE_BROADCAST_PER_TX > num_broadcasted.value()) { + // Not all of the initial NUM_PRIVATE_BROADCAST_PER_TX connections were needed. + // Tell CConnman it does not need to start the remaining ones. + m_connman.m_private_broadcast.NumToOpenSub(NUM_PRIVATE_BROADCAST_PER_TX - num_broadcasted.value()); + } + } + LOCK2(cs_main, m_tx_download_mutex); const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx); @@ -4305,7 +4489,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } else { LogInfo("Force relaying tx %s (wtxid=%s) from peer=%d\n", txid.ToString(), wtxid.ToString(), pfrom.GetId()); - RelayTransaction(txid, wtxid); + InitiateTxBroadcastToAll(txid, wtxid); } } @@ -4795,6 +4979,12 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (ping_time.count() >= 0) { // Let connman know about this successful ping-pong pfrom.PongReceived(ping_time); + if (pfrom.IsPrivateBroadcastConn()) { + m_tx_for_private_broadcast.NodeConfirmedReception(pfrom.GetId()); + LogInfo("[privatebroadcast] Got a PONG (the transaction will probably reach the network), marking for disconnect, peer=%d%s", + pfrom.GetId(), pfrom.LogIP(fLogIPs)); + pfrom.fDisconnect = true; + } } else { // This should never happen sProblem = "Timing mishap"; @@ -5502,6 +5692,18 @@ bool PeerManagerImpl::SendMessages(CNode* pto) const auto current_time{GetTime()}; + // The logic below does not apply to private broadcast peers, so skip it. + // Also in CConnman::PushMessage() we make sure that unwanted messages are + // not sent. This here is just an optimization. + if (pto->IsPrivateBroadcastConn()) { + if (pto->m_connected + PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME < current_time) { + LogInfo("[privatebroadcast] Disconnecting: did not complete the transaction send within %d seconds, peer=%d%s", + count_seconds(PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME), pto->GetId(), pto->LogIP(fLogIPs)); + pto->fDisconnect = true; + } + return true; + } + if (pto->IsAddrFetchConn() && current_time - pto->m_connected > 10 * AVG_ADDRESS_BROADCAST_INTERVAL) { LogDebug(BCLog::NET, "addrfetch connection timeout, %s\n", pto->DisconnectMsg(fLogIPs)); pto->fDisconnect = true; diff --git a/src/net_processing.h b/src/net_processing.h index cf75e8f6da3..09f348c86bd 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -90,6 +90,8 @@ public: //! Number of headers sent in one getheaders message result (this is //! a test-only option). uint32_t max_headers_result{MAX_HEADERS_RESULTS}; + //! Whether private broadcast is used for sending transactions. + bool private_broadcast{DEFAULT_PRIVATE_BROADCAST}; }; static std::unique_ptr make(CConnman& connman, AddrMan& addrman, @@ -116,8 +118,19 @@ public: /** Get peer manager info. */ virtual PeerManagerInfo GetInfo() const = 0; - /** Relay transaction to all peers. */ - virtual void RelayTransaction(const Txid& txid, const Wtxid& wtxid) = 0; + /** + * Initiate a transaction broadcast to eligible peers. + * Queue the witness transaction id to `Peer::TxRelay::m_tx_inventory_to_send` + * for each peer. Later, depending on `Peer::TxRelay::m_next_inv_send_time` and if + * the transaction is in the mempool, an `INV` about it may be sent to the peer. + */ + virtual void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) = 0; + + /** + * Initiate a private transaction broadcast. This is done + * asynchronously via short-lived connections to peers on privacy networks. + */ + virtual void InitiateTxBroadcastPrivate(const CTransactionRef& tx) = 0; /** Send ping message to all peers */ virtual void SendPings() = 0; diff --git a/src/node/connection_types.cpp b/src/node/connection_types.cpp index 6ce2f269dc8..b2448352c22 100644 --- a/src/node/connection_types.cpp +++ b/src/node/connection_types.cpp @@ -20,6 +20,8 @@ std::string ConnectionTypeAsString(ConnectionType conn_type) return "block-relay-only"; case ConnectionType::ADDR_FETCH: return "addr-fetch"; + case ConnectionType::PRIVATE_BROADCAST: + return "private-broadcast"; } // no default case, so the compiler can warn about missing cases assert(false); diff --git a/src/node/connection_types.h b/src/node/connection_types.h index a00895e2a8a..eeb106b616d 100644 --- a/src/node/connection_types.h +++ b/src/node/connection_types.h @@ -75,6 +75,13 @@ enum class ConnectionType { * AddrMan is empty. */ ADDR_FETCH, + + /** + * Private broadcast connections are short-lived and only opened to + * privacy networks (Tor, I2P) for relaying privacy-sensitive data (like + * our own transactions) and closed afterwards. + */ + PRIVATE_BROADCAST, }; /** Convert ConnectionType enum to a string value */ diff --git a/src/node/peerman_args.cpp b/src/node/peerman_args.cpp index 59dc592d66d..9745d69d5ae 100644 --- a/src/node/peerman_args.cpp +++ b/src/node/peerman_args.cpp @@ -23,6 +23,8 @@ void ApplyArgsManOptions(const ArgsManager& argsman, PeerManager::Options& optio if (auto value{argsman.GetBoolArg("-capturemessages")}) options.capture_messages = *value; if (auto value{argsman.GetBoolArg("-blocksonly")}) options.ignore_incoming_txs = *value; + + if (auto value{argsman.GetBoolArg("-privatebroadcast")}) options.private_broadcast = *value; } } // namespace node diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index 0b68fab9629..ee3bdd1cbf5 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -74,13 +74,14 @@ TransactionError BroadcastTransaction(NodeContext& node, wtxid = mempool_tx->GetWitnessHash(); } else { // Transaction is not already in the mempool. - if (max_tx_fee > 0) { + const bool check_max_fee{max_tx_fee > 0}; + if (check_max_fee || broadcast_method == TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST) { // First, call ATMP with test_accept and check the fee. If ATMP // fails here, return error immediately. const MempoolAcceptResult result = node.chainman->ProcessTransaction(tx, /*test_accept=*/ true); if (result.m_result_type != MempoolAcceptResult::ResultType::VALID) { return HandleATMPError(result.m_state, err_string); - } else if (result.m_base_fees.value() > max_tx_fee) { + } else if (check_max_fee && result.m_base_fees.value() > max_tx_fee) { return TransactionError::MAX_FEE_EXCEEDED; } } @@ -104,6 +105,8 @@ TransactionError BroadcastTransaction(NodeContext& node, node.mempool->AddUnbroadcastTx(txid); } break; + case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + break; } if (wait_callback && node.validation_signals) { @@ -133,7 +136,10 @@ TransactionError BroadcastTransaction(NodeContext& node, case TxBroadcast::MEMPOOL_NO_BROADCAST: break; case TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL: - node.peerman->RelayTransaction(txid, wtxid); + node.peerman->InitiateTxBroadcastToAll(txid, wtxid); + break; + case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + node.peerman->InitiateTxBroadcastPrivate(tx); break; } diff --git a/src/node/types.h b/src/node/types.h index 8b9f24f2911..53dc60f0e8d 100644 --- a/src/node/types.h +++ b/src/node/types.h @@ -108,6 +108,9 @@ enum class TxBroadcast : uint8_t { MEMPOOL_AND_BROADCAST_TO_ALL, /// Add the transaction to the mempool, but don't broadcast to anybody. MEMPOOL_NO_BROADCAST, + /// Omit the mempool and directly send the transaction via a few dedicated connections to + /// peers on privacy networks. + NO_MEMPOOL_PRIVATE_BROADCAST, }; } // namespace node diff --git a/src/private_broadcast.cpp b/src/private_broadcast.cpp new file mode 100644 index 00000000000..c7c311c0e89 --- /dev/null +++ b/src/private_broadcast.cpp @@ -0,0 +1,133 @@ +// Copyright (c) 2023-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#include +#include + +#include + +/// If a transaction is not received back from the network for this duration +/// after it is broadcast, then we consider it stale / for rebroadcasting. +static constexpr auto STALE_DURATION{1min}; + +bool PrivateBroadcast::Add(const CTransactionRef& tx) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const bool inserted{m_transactions.try_emplace(tx).second}; + return inserted; +} + +std::optional PrivateBroadcast::Remove(const CTransactionRef& tx) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto handle{m_transactions.extract(tx)}; + if (handle) { + const auto p{DerivePriority(handle.mapped())}; + return p.num_confirmed; + } + return std::nullopt; +} + +std::optional PrivateBroadcast::PickTxForSend(const NodeId& will_send_to_nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + + const auto it{std::ranges::max_element( + m_transactions, + [](const auto& a, const auto& b) { return a < b; }, + [](const auto& el) { return DerivePriority(el.second); })}; + + if (it != m_transactions.end()) { + auto& [tx, sent_to]{*it}; + sent_to.emplace_back(will_send_to_nodeid, NodeClock::now()); + return tx; + } + + return std::nullopt; +} + +std::optional PrivateBroadcast::GetTxForNode(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto tx_and_status{GetSendStatusByNode(nodeid)}; + if (tx_and_status.has_value()) { + return tx_and_status.value().tx; + } + return std::nullopt; +} + +void PrivateBroadcast::NodeConfirmedReception(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto tx_and_status{GetSendStatusByNode(nodeid)}; + if (tx_and_status.has_value()) { + tx_and_status.value().send_status.confirmed = NodeClock::now(); + } +} + +bool PrivateBroadcast::DidNodeConfirmReception(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto tx_and_status{GetSendStatusByNode(nodeid)}; + if (tx_and_status.has_value()) { + return tx_and_status.value().send_status.confirmed.has_value(); + } + return false; +} + +bool PrivateBroadcast::HavePendingTransactions() + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + return !m_transactions.empty(); +} + +std::vector PrivateBroadcast::GetStale() const + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto stale_time{NodeClock::now() - STALE_DURATION}; + std::vector stale; + for (const auto& [tx, send_status] : m_transactions) { + const Priority p{DerivePriority(send_status)}; + if (p.last_confirmed < stale_time) { + stale.push_back(tx); + } + } + return stale; +} + +PrivateBroadcast::Priority PrivateBroadcast::DerivePriority(const std::vector& sent_to) +{ + Priority p; + p.num_picked = sent_to.size(); + for (const auto& send_status : sent_to) { + p.last_picked = std::max(p.last_picked, send_status.picked); + if (send_status.confirmed.has_value()) { + ++p.num_confirmed; + p.last_confirmed = std::max(p.last_confirmed, send_status.confirmed.value()); + } + } + return p; +} + +std::optional PrivateBroadcast::GetSendStatusByNode(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(m_mutex) +{ + AssertLockHeld(m_mutex); + for (auto& [tx, sent_to] : m_transactions) { + for (auto& send_status : sent_to) { + if (send_status.nodeid == nodeid) { + return TxAndSendStatusForNode{.tx = tx, .send_status = send_status}; + } + } + } + return std::nullopt; +} diff --git a/src/private_broadcast.h b/src/private_broadcast.h new file mode 100644 index 00000000000..e88db6bbb7b --- /dev/null +++ b/src/private_broadcast.h @@ -0,0 +1,166 @@ +// Copyright (c) 2023-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#ifndef BITCOIN_PRIVATE_BROADCAST_H +#define BITCOIN_PRIVATE_BROADCAST_H + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +/** + * Store a list of transactions to be broadcast privately. Supports the following operations: + * - Add a new transaction + * - Remove a transaction + * - Pick a transaction for sending to one recipient + * - Query which transaction has been picked for sending to a given recipient node + * - Mark that a given recipient node has confirmed receipt of a transaction + * - Query whether a given recipient node has confirmed reception + * - Query whether any transactions that need sending are currently on the list + */ +class PrivateBroadcast +{ +public: + /** + * Add a transaction to the storage. + * @param[in] tx The transaction to add. + * @retval true The transaction was added. + * @retval false The transaction was already present. + */ + bool Add(const CTransactionRef& tx) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Forget a transaction. + * @param[in] tx Transaction to forget. + * @retval !nullopt The number of times the transaction was sent and confirmed + * by the recipient (if the transaction existed and was removed). + * @retval nullopt The transaction was not in the storage. + */ + std::optional Remove(const CTransactionRef& tx) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Pick the transaction with the fewest send attempts, and confirmations, + * and oldest send/confirm times. + * @param[in] will_send_to_nodeid Will remember that the returned transaction + * was picked for sending to this node. + * @return Most urgent transaction or nullopt if there are no transactions. + */ + std::optional PickTxForSend(const NodeId& will_send_to_nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Get the transaction that was picked for sending to a given node by PickTxForSend(). + * @param[in] nodeid Node to which a transaction is being (or was) sent. + * @return Transaction or nullopt if the nodeid is unknown. + */ + std::optional GetTxForNode(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Mark that the node has confirmed reception of the transaction we sent it by + * responding with `PONG` to our `PING` message. + * @param[in] nodeid Node that we sent a transaction to. + */ + void NodeConfirmedReception(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Check if the node has confirmed reception of the transaction. + * @retval true Node has confirmed, `NodeConfirmedReception()` has been called. + * @retval false Node has not confirmed, `NodeConfirmedReception()` has not been called. + */ + bool DidNodeConfirmReception(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Check if there are transactions that need to be broadcast. + */ + bool HavePendingTransactions() + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Get the transactions that have not been broadcast recently. + */ + std::vector GetStale() const + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + +private: + /// Status of a transaction sent to a given node. + struct SendStatus { + const NodeId nodeid; /// Node to which the transaction will be sent (or was sent). + const NodeClock::time_point picked; ///< When was the transaction picked for sending to the node. + std::optional confirmed; ///< When was the transaction reception confirmed by the node (by PONG). + + SendStatus(const NodeId& nodeid, const NodeClock::time_point& picked) : nodeid{nodeid}, picked{picked} {} + }; + + /// Cumulative stats from all the send attempts for a transaction. Used to prioritize transactions. + struct Priority { + size_t num_picked{0}; ///< Number of times the transaction was picked for sending. + NodeClock::time_point last_picked{}; ///< The most recent time when the transaction was picked for sending. + size_t num_confirmed{0}; ///< Number of nodes that have confirmed reception of a transaction (by PONG). + NodeClock::time_point last_confirmed{}; ///< The most recent time when the transaction was confirmed. + + auto operator<=>(const Priority& other) const + { + // Invert `other` and `this` in the comparison because smaller num_picked, num_confirmed or + // earlier times mean greater priority. In other words, if this.num_picked < other.num_picked + // then this > other. + return std::tie(other.num_picked, other.num_confirmed, other.last_picked, other.last_confirmed) <=> + std::tie(num_picked, num_confirmed, last_picked, last_confirmed); + } + }; + + /// A pair of a transaction and a sent status for a given node. Convenience return type of GetSendStatusByNode(). + struct TxAndSendStatusForNode { + const CTransactionRef& tx; + SendStatus& send_status; + }; + + // No need for salted hasher because we are going to store just a bunch of locally originating transactions. + + struct CTransactionRefHash { + size_t operator()(const CTransactionRef& tx) const + { + return static_cast(tx->GetWitnessHash().ToUint256().GetUint64(0)); + } + }; + + struct CTransactionRefComp { + bool operator()(const CTransactionRef& a, const CTransactionRef& b) const + { + return a->GetWitnessHash() == b->GetWitnessHash(); // If wtxid equals, then txid also equals. + } + }; + + /** + * Derive the sending priority of a transaction. + * @param[in] sent_to List of nodes that the transaction has been sent to. + */ + static Priority DerivePriority(const std::vector& sent_to); + + /** + * Find which transaction we sent to a given node (marked by PickTxForSend()). + * @return That transaction together with the send status or nullopt if we did not + * send any transaction to the given node. + */ + std::optional GetSendStatusByNode(const NodeId& nodeid) + EXCLUSIVE_LOCKS_REQUIRED(m_mutex); + + mutable Mutex m_mutex; + std::unordered_map, CTransactionRefHash, CTransactionRefComp> + m_transactions GUARDED_BY(m_mutex); +}; + +#endif // BITCOIN_PRIVATE_BROADCAST_H diff --git a/src/qt/guiutil.cpp b/src/qt/guiutil.cpp index 3aa4808878c..1619227a419 100644 --- a/src/qt/guiutil.cpp +++ b/src/qt/guiutil.cpp @@ -722,6 +722,8 @@ QString ConnectionTypeToQString(ConnectionType conn_type, bool prepend_direction case ConnectionType::FEELER: return prefix + QObject::tr("Feeler"); //: Short-lived peer connection type that solicits known addresses from a peer. case ConnectionType::ADDR_FETCH: return prefix + QObject::tr("Address Fetch"); + //: Short-lived peer connection type that is used for broadcasting privacy-sensitive data. + case ConnectionType::PRIVATE_BROADCAST: return prefix + QObject::tr("Private Broadcast"); } // no default case, so the compiler can warn about missing cases assert(false); } diff --git a/src/qt/rpcconsole.cpp b/src/qt/rpcconsole.cpp index f401b65ed05..056fedc18d3 100644 --- a/src/qt/rpcconsole.cpp +++ b/src/qt/rpcconsole.cpp @@ -484,7 +484,10 @@ RPCConsole::RPCConsole(interfaces::Node& node, const PlatformStyle *_platformSty tr("Outbound Feeler: short-lived, for testing addresses"), /*: Explanatory text for a short-lived outbound peer connection that is used to request addresses from a peer. */ - tr("Outbound Address Fetch: short-lived, for soliciting addresses")}; + tr("Outbound Address Fetch: short-lived, for soliciting addresses"), + /*: Explanatory text for a short-lived outbound peer connection that is used + to broadcast privacy-sensitive data (like our transactions). */ + tr("Private broadcast: short-lived, for broadcasting privacy-sensitive transactions")}; const QString connection_types_list{"
  • " + Join(CONNECTION_TYPE_DOC, QString("
  • ")) + "
"}; ui->peerConnectionTypeLabel->setToolTip(ui->peerConnectionTypeLabel->toolTip().arg(connection_types_list)); const std::vector TRANSPORT_TYPE_DOC{ diff --git a/src/rpc/mempool.cpp b/src/rpc/mempool.cpp index 1c176c6c8fb..363e5e382e9 100644 --- a/src/rpc/mempool.cpp +++ b/src/rpc/mempool.cpp @@ -8,10 +8,12 @@ #include #include +#include #include #include #include #include +#include #include #include #include @@ -44,11 +46,21 @@ static RPCHelpMan sendrawtransaction() { return RPCHelpMan{ "sendrawtransaction", - "Submit a raw transaction (serialized, hex-encoded) to local node and network.\n" - "\nThe transaction will be sent unconditionally to all peers, so using sendrawtransaction\n" - "for manual rebroadcast may degrade privacy by leaking the transaction's origin, as\n" - "nodes will normally not rebroadcast non-wallet transactions already in their mempool.\n" + "Submit a raw transaction (serialized, hex-encoded) to the network.\n" + + "\nIf -privatebroadcast is disabled, then the transaction will be put into the\n" + "local mempool of the node and will be sent unconditionally to all currently\n" + "connected peers, so using sendrawtransaction for manual rebroadcast will degrade\n" + "privacy by leaking the transaction's origin, as nodes will normally not\n" + "rebroadcast non-wallet transactions already in their mempool.\n" + + "\nIf -privatebroadcast is enabled, then the transaction will be sent only via\n" + "dedicated, short-lived connections to Tor or I2P peers or IPv4/IPv6 peers\n" + "via the Tor network. This conceals the transaction's origin. The transaction\n" + "will only enter the local mempool when it is received back from the network.\n" + "\nA specific exception, RPC_TRANSACTION_ALREADY_IN_UTXO_SET, may throw if the transaction cannot be added to the mempool.\n" + "\nRelated RPCs: createrawtransaction, signrawtransactionwithkey\n", { {"hexstring", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The hex string of the raw transaction"}, @@ -98,11 +110,23 @@ static RPCHelpMan sendrawtransaction() std::string err_string; AssertLockNotHeld(cs_main); NodeContext& node = EnsureAnyNodeContext(request.context); + const bool private_broadcast_enabled{gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)}; + if (private_broadcast_enabled && + !g_reachable_nets.Contains(NET_ONION) && + !g_reachable_nets.Contains(NET_I2P)) { + throw JSONRPCError(RPC_MISC_ERROR, + "-privatebroadcast is enabled, but none of the Tor or I2P networks is " + "reachable. Maybe the location of the Tor proxy couldn't be retrieved " + "from the Tor daemon at startup. Check whether the Tor daemon is running " + "and that -torcontrol, -torpassword and -i2psam are configured properly."); + } + const auto method = private_broadcast_enabled ? node::TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST + : node::TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL; const TransactionError err = BroadcastTransaction(node, tx, err_string, max_raw_tx_fee, - node::TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL, + method, /*wait_callback=*/true); if (TransactionError::OK != err) { throw JSONRPCTransactionError(err, err_string); diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index b07555e97a0..c6359c37dc7 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -48,7 +48,8 @@ const std::vector CONNECTION_TYPE_DOC{ "inbound (initiated by the peer)", "manual (added via addnode RPC or -addnode/-connect configuration options)", "addr-fetch (short-lived automatic connection for soliciting addresses)", - "feeler (short-lived automatic connection for testing addresses)" + "feeler (short-lived automatic connection for testing addresses)", + "private-broadcast (short-lived automatic connection for broadcasting privacy-sensitive transactions)" }; const std::vector TRANSPORT_TYPE_DOC{ diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 83cb989aa9b..e70eaf5cf89 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -76,6 +76,7 @@ add_executable(test_bitcoin pool_tests.cpp pow_tests.cpp prevector_tests.cpp + private_broadcast_tests.cpp raii_event_tests.cpp random_tests.cpp rbf_tests.cpp diff --git a/src/test/private_broadcast_tests.cpp b/src/test/private_broadcast_tests.cpp new file mode 100644 index 00000000000..6c5ef36fc34 --- /dev/null +++ b/src/test/private_broadcast_tests.cpp @@ -0,0 +1,96 @@ +// Copyright (c) 2025-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include + +#include + +BOOST_FIXTURE_TEST_SUITE(private_broadcast_tests, BasicTestingSetup) + +static CTransactionRef MakeDummyTx(uint32_t id, size_t num_witness) +{ + CMutableTransaction mtx; + mtx.vin.resize(1); + mtx.vin[0].nSequence = id; + if (num_witness > 0) { + mtx.vin[0].scriptWitness = CScriptWitness{}; + mtx.vin[0].scriptWitness.stack.resize(num_witness); + } + return MakeTransactionRef(mtx); +} + +BOOST_AUTO_TEST_CASE(basic) +{ + SetMockTime(Now()); + + PrivateBroadcast pb; + const NodeId recipient1{1}; + + // No transactions initially. + BOOST_CHECK(!pb.PickTxForSend(/*will_send_to_nodeid=*/recipient1).has_value()); + BOOST_CHECK_EQUAL(pb.GetStale().size(), 0); + BOOST_CHECK(!pb.HavePendingTransactions()); + + // Make a transaction and add it. + const auto tx1{MakeDummyTx(/*id=*/1, /*num_witness=*/0)}; + + BOOST_CHECK(pb.Add(tx1)); + BOOST_CHECK(!pb.Add(tx1)); + + // Make another transaction with same txid, different wtxid and add it. + const auto tx2{MakeDummyTx(/*id=*/1, /*num_witness=*/1)}; + BOOST_REQUIRE(tx1->GetHash() == tx2->GetHash()); + BOOST_REQUIRE(tx1->GetWitnessHash() != tx2->GetWitnessHash()); + + BOOST_CHECK(pb.Add(tx2)); + + const auto tx_for_recipient1{pb.PickTxForSend(/*will_send_to_nodeid=*/recipient1).value()}; + BOOST_CHECK(tx_for_recipient1 == tx1 || tx_for_recipient1 == tx2); + + // A second pick must return the other transaction. + const NodeId recipient2{2}; + const auto tx_for_recipient2{pb.PickTxForSend(/*will_send_to_nodeid=*/recipient2).value()}; + BOOST_CHECK(tx_for_recipient2 == tx1 || tx_for_recipient2 == tx2); + BOOST_CHECK_NE(tx_for_recipient1, tx_for_recipient2); + + const NodeId nonexistent_recipient{0}; + + // Confirm transactions <-> recipients mapping is correct. + BOOST_CHECK(!pb.GetTxForNode(nonexistent_recipient).has_value()); + BOOST_CHECK_EQUAL(pb.GetTxForNode(recipient1).value(), tx_for_recipient1); + BOOST_CHECK_EQUAL(pb.GetTxForNode(recipient2).value(), tx_for_recipient2); + + // Confirm none of the transactions' reception have been confirmed. + BOOST_CHECK(!pb.DidNodeConfirmReception(recipient1)); + BOOST_CHECK(!pb.DidNodeConfirmReception(recipient2)); + BOOST_CHECK(!pb.DidNodeConfirmReception(nonexistent_recipient)); + + BOOST_CHECK_EQUAL(pb.GetStale().size(), 2); + + // Confirm reception by recipient1. + pb.NodeConfirmedReception(nonexistent_recipient); // Dummy call. + pb.NodeConfirmedReception(recipient1); + + BOOST_CHECK(pb.DidNodeConfirmReception(recipient1)); + BOOST_CHECK(!pb.DidNodeConfirmReception(recipient2)); + + BOOST_CHECK_EQUAL(pb.GetStale().size(), 1); + BOOST_CHECK_EQUAL(pb.GetStale()[0], tx_for_recipient2); + + SetMockTime(Now() + 10h); + + BOOST_CHECK_EQUAL(pb.GetStale().size(), 2); + + BOOST_CHECK_EQUAL(pb.Remove(tx_for_recipient1).value(), 1); + BOOST_CHECK(!pb.Remove(tx_for_recipient1).has_value()); + BOOST_CHECK_EQUAL(pb.Remove(tx_for_recipient2).value(), 0); + BOOST_CHECK(!pb.Remove(tx_for_recipient2).has_value()); + + BOOST_CHECK(!pb.PickTxForSend(/*will_send_to_nodeid=*/nonexistent_recipient).has_value()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/util/net.h b/src/test/util/net.h index 77954d92a48..605b2fa81a0 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -143,6 +143,7 @@ constexpr ConnectionType ALL_CONNECTION_TYPES[]{ ConnectionType::FEELER, ConnectionType::BLOCK_RELAY, ConnectionType::ADDR_FETCH, + ConnectionType::PRIVATE_BROADCAST, }; constexpr auto ALL_NETWORKS = std::array{ diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 8550298d34b..56c676c189f 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -2013,6 +2013,9 @@ bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, case node::TxBroadcast::MEMPOOL_NO_BROADCAST: what = "to mempool without broadcast"; break; + case node::TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + what = "for private broadcast without adding to the mempool"; + break; } WalletLogPrintf("Submitting wtx %s %s\n", wtx.GetHash().ToString(), what); // We must set TxStateInMempool here. Even though it will also be set later by the diff --git a/test/functional/feature_config_args.py b/test/functional/feature_config_args.py index d2e001db278..cbd68971d6a 100755 --- a/test/functional/feature_config_args.py +++ b/test/functional/feature_config_args.py @@ -411,6 +411,29 @@ class ConfArgsTest(BitcoinTestFramework): self.restart_node(0, extra_args=[connect_arg, '-dnsseed', '-proxy=localhost:1080']) self.stop_node(0) + def test_privatebroadcast(self): + self.log.info("Test that an invalid usage of -privatebroadcast throws an init error") + self.stop_node(0) + # -privatebroadcast init error: Tor/I2P not reachable at startup + self.nodes[0].assert_start_raises_init_error( + extra_args=["-privatebroadcast"], + expected_msg=( + "Error: Private broadcast of own transactions requested (-privatebroadcast), " + "but none of Tor or I2P networks is reachable")) + # -privatebroadcast init error: incompatible with -connect + self.nodes[0].assert_start_raises_init_error( + extra_args=["-privatebroadcast", "-connect=127.0.0.1:8333", "-onion=127.0.0.1:9050"], + expected_msg=( + "Error: Private broadcast of own transactions requested (-privatebroadcast), but -connect is also configured. " + "They are incompatible because the private broadcast needs to open new connections to randomly " + "chosen Tor or I2P peers. Consider using -maxconnections=0 -addnode=... instead")) + # Warning case: private broadcast allowed, but -proxyrandomize=0 triggers a privacy warning + self.start_node(0, extra_args=["-privatebroadcast", "-onion=127.0.0.1:9050", "-proxyrandomize=0"]) + self.stop_node(0, expected_stderr=( + "Warning: Private broadcast of own transactions requested (-privatebroadcast) and " + "-proxyrandomize is disabled. Tor circuits for private broadcast connections may " + "be correlated to other connections over Tor. For maximum privacy set -proxyrandomize=1.")) + def test_ignored_conf(self): self.log.info('Test error is triggered when the datadir in use contains a bitcoin.conf file that would be ignored ' 'because a conflicting -conf file argument is passed.') @@ -496,6 +519,7 @@ class ConfArgsTest(BitcoinTestFramework): self.test_seed_peers() self.test_networkactive() self.test_connect_with_seednode() + self.test_privatebroadcast() self.test_dir_config() self.test_negated_config() diff --git a/test/functional/p2p_private_broadcast.py b/test/functional/p2p_private_broadcast.py new file mode 100755 index 00000000000..4c3739d8a04 --- /dev/null +++ b/test/functional/p2p_private_broadcast.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python3 +# Copyright (c) 2017-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test how locally submitted transactions are sent to the network when private broadcast is used. +""" + +import time +import threading + +from test_framework.p2p import ( + P2PDataStore, + P2PInterface, + P2P_SERVICES, + P2P_VERSION, +) +from test_framework.messages import ( + CAddress, + CInv, + COIN, + MSG_WTX, + malleate_tx_to_invalid_witness, + msg_inv, + msg_tx, +) +from test_framework.netutil import ( + format_addr_port +) +from test_framework.script_util import ValidWitnessMalleatedTx +from test_framework.socks5 import ( + Socks5Configuration, + Socks5Server, +) +from test_framework.test_framework import ( + BitcoinTestFramework, +) +from test_framework.util import ( + MAX_NODES, + assert_equal, + assert_not_equal, + assert_raises_rpc_error, + p2p_port, + tor_port, +) +from test_framework.wallet import ( + MiniWallet, +) + +MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8 +MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2 +NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS +NUM_PRIVATE_BROADCAST_PER_TX = 3 + +# Fill addrman with these addresses. Must have enough Tor addresses, so that even +# if all 10 default connections are opened to a Tor address (!?) there must be more +# for private broadcast. +ADDRMAN_ADDRESSES = [ + "20.0.0.1", + "30.0.0.1", + "40.0.0.1", + "50.0.0.1", + "60.0.0.1", + "70.0.0.1", + "80.0.0.1", + "90.0.0.1", + "100.0.0.1", + "110.0.0.1", + "120.0.0.1", + "130.0.0.1", + "140.0.0.1", + "150.0.0.1", + "160.0.0.1", + "170.0.0.1", + "180.0.0.1", + "190.0.0.1", + "200.0.0.1", + "210.0.0.1", + + "[20::1]", + "[30::1]", + "[40::1]", + "[50::1]", + "[60::1]", + "[70::1]", + "[80::1]", + "[90::1]", + "[100::1]", + "[110::1]", + "[120::1]", + "[130::1]", + "[140::1]", + "[150::1]", + "[160::1]", + "[170::1]", + "[180::1]", + "[190::1]", + "[200::1]", + "[210::1]", + + "testonlyad777777777777777777777777777777777777777775b6qd.onion", + "testonlyah77777777777777777777777777777777777777777z7ayd.onion", + "testonlyal77777777777777777777777777777777777777777vp6qd.onion", + "testonlyap77777777777777777777777777777777777777777r5qad.onion", + "testonlyat77777777777777777777777777777777777777777udsid.onion", + "testonlyax77777777777777777777777777777777777777777yciid.onion", + "testonlya777777777777777777777777777777777777777777rhgyd.onion", + "testonlybd77777777777777777777777777777777777777777rs4ad.onion", + "testonlybp77777777777777777777777777777777777777777zs2ad.onion", + "testonlybt777777777777777777777777777777777777777777x6id.onion", + "testonlybx777777777777777777777777777777777777777775styd.onion", + "testonlyb3777777777777777777777777777777777777777774ckid.onion", + "testonlycd77777777777777777777777777777777777777777733id.onion", + "testonlych77777777777777777777777777777777777777777t6kid.onion", + "testonlycl77777777777777777777777777777777777777777tt3ad.onion", + "testonlyct77777777777777777777777777777777777777777wvhyd.onion", + "testonlycx7777777777777777777777777777777777777777774bad.onion", + "testonlyc377777777777777777777777777777777777777777u6aid.onion", + "testonlydd777777777777777777777777777777777777777777u5ad.onion", + "testonlydh77777777777777777777777777777777777777777wgnyd.onion", + + "testonlyad77777777777777777777777777777777777777777q.b32.i2p", + "testonlyah77777777777777777777777777777777777777777q.b32.i2p", + "testonlyap77777777777777777777777777777777777777777q.b32.i2p", + "testonlyat77777777777777777777777777777777777777777q.b32.i2p", + "testonlyax77777777777777777777777777777777777777777q.b32.i2p", + "testonlya377777777777777777777777777777777777777777q.b32.i2p", + "testonlya777777777777777777777777777777777777777777q.b32.i2p", + "testonlybd77777777777777777777777777777777777777777q.b32.i2p", + "testonlybh77777777777777777777777777777777777777777q.b32.i2p", + "testonlybl77777777777777777777777777777777777777777q.b32.i2p", + "testonlybp77777777777777777777777777777777777777777q.b32.i2p", + "testonlybt77777777777777777777777777777777777777777q.b32.i2p", + "testonlybx77777777777777777777777777777777777777777q.b32.i2p", + "testonlyb777777777777777777777777777777777777777777q.b32.i2p", + "testonlych77777777777777777777777777777777777777777q.b32.i2p", + "testonlycp77777777777777777777777777777777777777777q.b32.i2p", + "testonlyct77777777777777777777777777777777777777777q.b32.i2p", + "testonlycx77777777777777777777777777777777777777777q.b32.i2p", + "testonlyc377777777777777777777777777777777777777777q.b32.i2p", + "testonlyc777777777777777777777777777777777777777777q.b32.i2p", + + "[fc00::1]", + "[fc00::2]", + "[fc00::3]", + "[fc00::5]", + "[fc00::6]", + "[fc00::7]", + "[fc00::8]", + "[fc00::9]", + "[fc00::10]", + "[fc00::11]", + "[fc00::12]", + "[fc00::13]", + "[fc00::15]", + "[fc00::16]", + "[fc00::17]", + "[fc00::18]", + "[fc00::19]", + "[fc00::20]", + "[fc00::22]", + "[fc00::23]", +] + + +class P2PPrivateBroadcast(BitcoinTestFramework): + def set_test_params(self): + self.disable_autoconnect = False + self.num_nodes = 2 + + def setup_nodes(self): + # Start a SOCKS5 proxy server. + socks5_server_config = Socks5Configuration() + # self.nodes[0] listens on p2p_port(0), + # self.nodes[1] listens on p2p_port(1), + # thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes) (self.num_nodes is 2) + socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes)) + socks5_server_config.unauth = True + socks5_server_config.auth = True + + self.socks5_server = Socks5Server(socks5_server_config) + self.socks5_server.start() + + # Tor ports are the highest among p2p/rpc/tor, so this should be the first available port. + ports_base = tor_port(MAX_NODES) + 1 + + self.destinations = [] + + self.destinations_lock = threading.Lock() + + def destinations_factory(requested_to_addr, requested_to_port): + with self.destinations_lock: + i = len(self.destinations) + actual_to_addr = "" + actual_to_port = 0 + listener = None + if i == NUM_INITIAL_CONNECTIONS: + # Instruct the SOCKS5 server to redirect the first private + # broadcast connection from nodes[0] to nodes[1] + actual_to_addr = "127.0.0.1" # nodes[1] listen address + actual_to_port = tor_port(1) # nodes[1] listen port for Tor + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])") + else: + # Create a Python P2P listening node and instruct the SOCKS5 proxy to + # redirect the connection to it. The first outbound connection is used + # later to serve GETDATA, thus make it P2PDataStore(). + listener = P2PDataStore() if i == 0 else P2PInterface() + listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor) + listener.peer_connect_send_version(services=P2P_SERVICES) + + def on_listen_done(addr, port): + nonlocal actual_to_addr + nonlocal actual_to_port + actual_to_addr = addr + actual_to_port = port + + self.network_thread.listen( + addr="127.0.0.1", + port=ports_base + i, + p2p=listener, + callback=on_listen_done) + # Wait until the callback has been called. + self.wait_until(lambda: actual_to_port != 0) + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)") + + self.destinations.append({ + "requested_to": format_addr_port(requested_to_addr, requested_to_port), + "node": listener, + }) + assert_equal(len(self.destinations), i + 1) + + return { + "actual_to_addr": actual_to_addr, + "actual_to_port": actual_to_port, + } + + self.socks5_server.conf.destinations_factory = destinations_factory + + self.extra_args = [ + [ + # Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable). + "-cjdnsreachable", + # Connecting, sending garbage, being disconnected messes up with this test's + # check_broadcasts() which waits for a particular Python node to receive a connection. + "-v2transport=0", + "-test=addrman", + "-privatebroadcast", + f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}", + # To increase coverage, make it think that the I2P network is reachable so that it + # selects such addresses as well. Pick a proxy address where nobody is listening + # and connection attempts fail quickly. + "-i2psam=127.0.0.1:1", + ], + [ + "-connect=0", + f"-bind=127.0.0.1:{tor_port(1)}=onion", + ], + ] + super().setup_nodes() + + def setup_network(self): + self.setup_nodes() + + def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations): + broadcasts_done = 0 + i = skip_destinations - 1 + while broadcasts_done < broadcasts_to_expect: + i += 1 + self.log.debug(f"{label}: waiting for outbound connection i={i}") + # At this point the connection may not yet have been established (A), + # may be active (B), or may have already been closed (C). + self.wait_until(lambda: len(self.destinations) > i) + dest = self.destinations[i] + peer = dest["node"] + peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False) + # Now it is either (B) or (C). + if peer.last_message["version"].nServices != 0: + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)") + continue + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} must be a private broadcast, checking it") + peer.wait_for_disconnect() + # Now it is (C). + assert_equal(peer.message_count, { + "version": 1, + "verack": 1, + "inv": 1, + "tx": 1, + "ping": 1 + }) + dummy_address = CAddress() + dummy_address.nServices = 0 + assert_equal(peer.last_message["version"].nVersion, P2P_VERSION) + assert_equal(peer.last_message["version"].nServices, 0) + assert_equal(peer.last_message["version"].nTime, 0) + assert_equal(peer.last_message["version"].addrTo, dummy_address) + assert_equal(peer.last_message["version"].addrFrom, dummy_address) + assert_equal(peer.last_message["version"].strSubVer, "/pynode:0.0.1/") + assert_equal(peer.last_message["version"].nStartingHeight, 0) + assert_equal(peer.last_message["version"].relay, 0) + assert_equal(peer.last_message["tx"].tx.txid_hex, tx["txid"]) + self.log.info(f"{label}: ok: outbound connection i={i} is private broadcast of txid={tx['txid']}") + broadcasts_done += 1 + + def run_test(self): + tx_originator = self.nodes[0] + tx_receiver = self.nodes[1] + far_observer = tx_receiver.add_p2p_connection(P2PInterface()) + + wallet = MiniWallet(tx_originator) + + # Fill tx_originator's addrman. + for addr in ADDRMAN_ADDRESSES: + res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False) + if not res["success"]: + self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)") + + self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS) + + # The next opened connection by tx_originator should be "private broadcast" + # for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver. + + txs = wallet.create_self_transfer_chain(chain_length=3) + self.log.info(f"Created txid={txs[0]['txid']}: for basic test") + self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast") + self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool") + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1) + + self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, " + "must be the first private broadcast connection") + self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0) + far_observer.wait_for_tx(txs[0]["txid"]) + self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: " + "the private broadcast target received and further relayed the transaction") + + # One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts. + self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1) + + self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)") + ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]): + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + + self.log.info("Sending a malleated transaction with an invalid witness via RPC") + malleated_invalid = malleate_tx_to_invalid_witness(txs[0]) + assert_raises_rpc_error(-26, "mempool-script-verify-flag-failed", + tx_originator.sendrawtransaction, + hexstring=malleated_invalid.serialize_with_witness().hex(), + maxfeerate=0.1) + + self.log.info("Checking that the transaction is not in the originator node's mempool") + assert_equal(len(tx_originator.getrawmempool()), 0) + + wtxid_int = int(txs[0]["wtxid"], 16) + inv = CInv(MSG_WTX, wtxid_int) + + self.log.info("Sending INV and waiting for GETDATA from node") + tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator. + tx_returner.tx_store[wtxid_int] = txs[0]["tx"] + assert "getdata" not in tx_returner.last_message + received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network" + with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]): + tx_returner.send_without_ping(msg_inv([inv])) + tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0) + + self.log.info("Waiting for normal broadcast to another peer") + self.destinations[1]["node"].wait_for_inv([inv]) + + self.log.info("Sending a transaction that is already in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + self.check_broadcasts("Broadcast of mempool transaction", txs[0], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1) + self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)") + assert_equal(len(tx_originator.getrawmempool()), 1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0.1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0) + + # Since txs[1] has not been received back by tx_originator, + # it should be re-broadcast after a while. Advance tx_originator's clock + # to trigger a re-broadcast. Should be more than the maximum returned by + # NextTxBroadcast() in net_processing.cpp. + self.log.info("Checking that rebroadcast works") + delta = 20 * 60 # 20min + skip_destinations = len(self.destinations) + rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]): + tx_originator.setmocktime(int(time.time()) + delta) + tx_originator.mockscheduler(delta) + self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations) + tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this). + + self.log.info("Sending a pair of transactions with the same txid but different valid wtxids via RPC") + txgen = ValidWitnessMalleatedTx() + funding = wallet.get_utxo() + fee_sat = 1000 + siblings_parent = txgen.build_parent_tx(funding["txid"], amount=funding["value"] * COIN - fee_sat) + sibling1, sibling2 = txgen.build_malleated_children(siblings_parent.txid_hex, amount=siblings_parent.vout[0].nValue - fee_sat) + self.log.info(f" - sibling1: txid={sibling1.txid_hex}, wtxid={sibling1.wtxid_hex}") + self.log.info(f" - sibling2: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}") + assert_equal(sibling1.txid_hex, sibling2.txid_hex) + assert_not_equal(sibling1.wtxid_hex, sibling2.wtxid_hex) + wallet.sign_tx(siblings_parent) + assert_equal(len(tx_originator.getrawmempool()), 1) + tx_returner.send_without_ping(msg_tx(siblings_parent)) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 1) + self.log.info(" - siblings' parent added to the mempool") + tx_originator.sendrawtransaction(hexstring=sibling1.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sent sibling1: ok") + tx_originator.sendrawtransaction(hexstring=sibling2.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sent sibling2: ok") + + # Stop the SOCKS5 proxy server to avoid it being upset by the bitcoin + # node disconnecting in the middle of the SOCKS5 handshake when we + # restart below. + self.socks5_server.stop() + + self.log.info("Trying to send a transaction when none of Tor or I2P is reachable") + self.restart_node(0, extra_args=[ + "-privatebroadcast", + "-v2transport=0", + # A location where definitely a Tor control is not listening. This would allow + # Bitcoin Core to start, hoping/assuming that the location of the Tor proxy + # may be retrieved after startup from the Tor control, but it will not be, so + # the RPC should throw. + "-torcontrol=127.0.0.1:1", + "-listenonion", + ]) + assert_raises_rpc_error(-1, "none of the Tor or I2P networks is reachable", + tx_originator.sendrawtransaction, hexstring=txs[0]["hex"], maxfeerate=0.1) + + +if __name__ == "__main__": + P2PPrivateBroadcast(__file__).main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index a4af5578468..db406be46cb 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -311,6 +311,7 @@ BASE_SCRIPTS = [ 'rpc_dumptxoutset.py', 'feature_minchainwork.py', 'rpc_estimatefee.py', + 'p2p_private_broadcast.py', 'rpc_getblockstats.py', 'feature_port.py', 'feature_bind_port_externalip.py',