diff --git a/qa/rpc-tests/p2p-tx-download.py b/qa/rpc-tests/p2p-tx-download.py index 381a143c2..5768634e7 100644 --- a/qa/rpc-tests/p2p-tx-download.py +++ b/qa/rpc-tests/p2p-tx-download.py @@ -78,7 +78,6 @@ class TxDownloadTest(BitcoinTestFramework): self.test_tx_request() self.test_invblock_resolution() - self.test_max_inflight() self.test_disconnect_fallback() self.test_notfound_fallback() @@ -212,58 +211,6 @@ class TxDownloadTest(BitcoinTestFramework): assert tx.hash in self.nodes[0].getrawmempool() - def test_max_inflight(self): - # First, forward time by 2x inflight timeout, so that we have clean - # registers for each peer - self.forward_mocktime(2 * TX_EXPIRY_INTERVAL) - - # now send MAX_GETDATA_IN_FLIGHT (=100) invs with peer 0 - peer = self.incoming_peers[0] - invd = [] - for i in range(MAX_GETDATA_IN_FLIGHT): - txid = self.next_fake_txid() - peer.send_tx_inv([txid]) - invd.append(txid) - - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) - - # test that we got all the getdatas - assert self.wait_for_getdata(invd, [peer]) - - # send one more inv with our now maxed out peer - txid_failed = self.next_fake_txid() - peer.send_tx_inv([txid_failed]) - # and send one inv with another peer - txid_success = self.next_fake_txid() - self.incoming_peers[1].send_tx_inv([txid_success]) - - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) - - # test that we got a getdata for the successful tx with peer 1 - assert self.wait_for_getdata([txid_success], [self.incoming_peers[1]]) - # test that we did not get a getdata for the failed txid with peer 0 - assert not self.any_received_getdata(txid_failed, [peer]) - - # clear out the inflight register by expiring all requests - self.forward_mocktime(TX_EXPIRY_INTERVAL) - - # send one inv with 4 txs - txids = [] - for i in range(4): - txids.append(self.next_fake_txid()) - peer.send_tx_inv(txids) - - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) - - # test that we got a getdata for the final inv with peer 0 - assert self.wait_for_getdata(txids, [peer]) - def test_notfound_fallback(self): # use peer 4 and 5 to concurrently send 2 invs peers = self.incoming_peers[4:6] diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e5378584a..e2a839523 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -25,6 +25,7 @@ #include "random.h" #include "tinyformat.h" #include "txmempool.h" +#include "txrequest.h" #include "ui_interface.h" #include "util.h" #include "utilmoneystr.h" @@ -54,15 +55,11 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; /** Maximum number of announced transactions from a peer */ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; /** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds +static constexpr int64_t NONPREF_PEER_TX_DELAY = 2 * 1000000; // 2 seconds +/** How many microseconds to delay requesting transactions from overloaded peers */ +static constexpr int64_t OVERLOADED_PEER_TX_DELAY = 2 * 1000000; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ static constexpr int64_t GETDATA_TX_INTERVAL = 30 * 1000000; // 30 seconds -/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds -/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ -static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL; -static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, -"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ static const unsigned int MAX_GETDATA_SZ = 1000; @@ -79,6 +76,8 @@ void EraseOrphansFor(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main); static size_t vExtraTxnForCompactIt = 0; static std::vector> vExtraTxnForCompact GUARDED_BY(cs_main); +static TxRequestTracker g_txrequest GUARDED_BY(cs_main); + static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8] // Internal stuff @@ -218,70 +217,6 @@ struct CNodeState { */ bool fSupportsDesiredCmpctVersion; - /* - * State associated with transaction download. - * - * Tx download algorithm: - * - * When inv comes in, queue up (process_time, txid) inside the peer's - * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer - * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). - * - * The process_time for a transaction is set to current_time for outbound - * peers, current_time + 2 seconds for inbound peers. This is the time at - * which we'll consider trying to request the transaction from the peer in - * SendMessages(). The delay for inbound peers is to allow outbound peers - * a chance to announce before we request from inbound peers, to prevent - * an adversary from using inbound connections to blind us to a - * transaction (InvBlock). - * - * When we call SendMessages() for a given peer, - * we will loop over the transactions in m_tx_process_time, looking - * at the transactions whose process_time <= current_time. We'll request - * each such transaction that we don't have already and that hasn't been - * requested from another peer recently, up until we hit the - * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested txid, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate transaction - * requests amongst our peers. - * - * For transactions that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, txid) - * back into the peer's m_tx_process_time at the point in the future at - * which the most recent GETDATA request would time out (ie - * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). - * We add an additional delay for inbound peers, again to prefer - * attempting download from outbound peers first. - * We also add an extra small random delay up to 2 seconds - * to avoid biasing some peers over others. (e.g., due to fixed ordering - * of peer processing in ThreadMessageHandler). - * - * When we receive a transaction from a peer, we remove the txid from the - * peer's m_tx_in_flight set and from their recently announced set - * (m_tx_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the transaction is not accepted but also not added to - * the reject filter, then we will eventually redownload from other - * peers. - */ - - struct TxDownloadState { - /* Track when to attempt download of announced transactions (process - * time in micros -> txid) - */ - std::multimap m_tx_process_time; - - //! Store all the transactions a peer has recently announced - std::set m_tx_announced; - - //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; - - //! Periodically check for stuck getdata requests - int64_t m_check_expiry_timer{0}; - }; - - TxDownloadState m_tx_download; - CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; nMisbehavior = 0; @@ -305,10 +240,8 @@ struct CNodeState { fWantsCmpctWitness = false; fSupportsDesiredCmpctVersion = false; } -}; -// Keeps track of the time (in microseconds) when transactions were requested last time -limitedmap g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); +}; /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -358,6 +291,7 @@ void InitializeNode(CNode *pnode, CConnman& connman) { { LOCK(cs_main); mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); + assert(g_txrequest.Count(nodeid) == 0); } if(!pnode->fInbound) PushNodeVersion(pnode, connman, GetTime()); @@ -379,6 +313,8 @@ void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { mapBlocksInFlight.erase(entry.hash); } EraseOrphansFor(nodeid); + g_txrequest.DisconnectedPeer(nodeid); + nPreferredDownload -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); @@ -390,6 +326,7 @@ void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { assert(mapBlocksInFlight.empty()); assert(nPreferredDownload == 0); assert(nPeersWithValidatedDownloads == 0); + assert(g_txrequest.Size() == 0); } } @@ -643,71 +580,35 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectorGetId(); -int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(txid); - if (it != g_already_asked_for.end()) { - return it->second; - } - return 0; -} - -void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(txid); - - if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(txid, request_time)); - } else { - g_already_asked_for.update(it, request_time); - } -} - -int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - int64_t process_time; - int64_t last_request_time = GetTxRequestTime(txid); - // First time requesting this tx - if (last_request_time == 0) { - process_time = current_time; - } else { - // Randomize the delay to avoid biasing some peers over others (such as due to - // fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); - } - - // We delay processing announcements from inbound peers - if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; - - return process_time; -} - -void RequestTx(CNodeState* state, const uint256& txid, int64_t current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(txid)) { - // Too many queued announcements from this peer, or we already have - // this announcement + if (!node->fWhitelisted && g_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) { + // Too many queued announcements from this peer return; } - peer_download_state.m_tx_announced.insert(txid); - // Calculate the time to try requesting this transaction. Use - // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload); + // Decide the TxRequestTracker parameters for this announcement: + // - "preferred": if fPreferredDownload is set (= outbound, or whitelisted) + // - "reqtime": current time plus delays for: + // - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections + // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least + // MAX_PEER_TX_IN_FLIGHT requests in flight and aren't whitelisted. + const CNodeState* state = State(nodeid); + const bool preferred = state->fPreferredDownload; + const bool overloaded = (!node->fWhitelisted && g_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_IN_FLIGHT); - peer_download_state.m_tx_process_time.emplace(process_time, txid); + int64_t delay = 0; + if (!preferred) delay += NONPREF_PEER_TX_DELAY; + if (overloaded) delay += OVERLOADED_PEER_TX_DELAY; + + g_txrequest.ReceivedInv(nodeid, txhash, preferred, current_time + delay); } -} // anon namespace - bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { LOCK(cs_main); CNodeState *state = State(nodeid); @@ -941,6 +842,9 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn } LogPrint("mempool", "Erased %d orphan tx included or conflicted by block\n", nErased); } + + // Forget tracked announcements for transactions included in a block. + g_txrequest.ForgetTxHash(tx.GetHash()); } static CCriticalSection cs_most_recent_block; @@ -1848,7 +1752,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (fBlocksOnly) LogPrint("net", "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->id); else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload()) - RequestTx(State(pfrom->GetId()), inv.hash, current_time); + AddTxAnnouncement(pfrom, inv.hash, current_time); } } @@ -2089,15 +1993,18 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fMissingInputs = false; CValidationState state; - CNodeState* nodestate = State(pfrom->GetId()); - nodestate->m_tx_download.m_tx_announced.erase(inv.hash); - nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); - EraseTxRequest(inv.hash); + // Mark the tx as received + g_txrequest.ReceivedResponse(pfrom->GetId(), inv.hash); std::list lRemovedTxn; if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, true, &fMissingInputs, &lRemovedTxn)) { mempool.check(pcoinsTip); + + // As this version of the transaction was acceptable, we can forget + // about any requests for it. + g_txrequest.ForgetTxHash(tx.GetHash()); + RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); @@ -2134,10 +2041,16 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr BOOST_FOREACH(const CTxIn& txin, tx.vin) { CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time); + if (!AlreadyHave(_inv)) { + AddTxAnnouncement(pfrom, _inv.hash, current_time); + } } AddOrphanTx(ptx, pfrom->GetId()); + // Once added to the orphan pool, a tx is considered + // AlreadyHave, and we shouldn't request it anymore. + g_txrequest.ForgetTxHash(tx.GetHash()); + // DoS prevention: do not allow mapOrphanTransactions to grow unbounded unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS)); unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx); @@ -2148,6 +2061,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // We will continue to reject this tx since it has rejected // parents so avoid re-requesting it from other peers. recentRejects->insert(tx.GetHash()); + g_txrequest.ForgetTxHash(tx.GetHash()); } } else { if (!tx.HasWitness() && !state.CorruptionPossible()) { @@ -2156,6 +2070,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // See https://github.com/bitcoin/bitcoin/issues/8279 for details. assert(recentRejects); recentRejects->insert(tx.GetHash()); + g_txrequest.ForgetTxHash(tx.GetHash()); if (RecursiveDynamicUsage(*ptx) < 100000) { AddToCompactExtraTransactions(ptx); } @@ -2851,23 +2766,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr else if (strCommand == NetMsgType::NOTFOUND) { // Remove the NOTFOUND transactions from the peer LOCK(cs_main); - CNodeState *state = State(pfrom->GetId()); std::vector vInv; vRecv >> vInv; if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { for (CInv &inv : vInv) { if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) { - // If we receive a NOTFOUND message for a txid we requested, erase - // it from our data structures for this peer. - auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); - if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { - // Skip any further work if this is a spurious NOTFOUND - // message. - continue; - } - state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(inv.hash); - LogPrint("net", "received: notfound tx %s from peer=%d\n", inv.hash.ToString(), pfrom->id); + // If we receive a NOTFOUND message for a txid we requested, we + // mark the announcement as completed in TxRequestTracker. + g_txrequest.ReceivedResponse(pfrom->GetId(), inv.hash); } } } @@ -3543,59 +3449,25 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr // // Message: getdata (non-blocks) // - - // For robustness, expire old requests after a long timeout, so that - // we can resume downloading transactions from a peer even if they - // were unresponsive in the past. - // Eventually we should consider disconnecting peers, but this is - // conservative. - if (state.m_tx_download.m_check_expiry_timer <= current_time) { - for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= current_time - TX_EXPIRY_INTERVAL) { - LogPrint("net", "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(it->first); - state.m_tx_download.m_tx_in_flight.erase(it++); - } else { - ++it; - } - } - // On average, we do this check every TX_EXPIRY_INTERVAL/3.75. Randomize - // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL/5 + GetRand(TX_EXPIRY_INTERVAL/5); + std::vector> expired; + auto requestable = g_txrequest.GetRequestable(pto->GetId(), current_time, &expired); + for (const auto& entry : expired) { + LogPrint("net", "timeout of inflight tx %s from peer=%d\n", entry.second.ToString(), entry.first); } - auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const uint256 txid = tx_process_time.begin()->second; - // Erase this entry from tx_process_time (it may be added back for - // processing at a later time, see below) - tx_process_time.erase(tx_process_time.begin()); - CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txid); + for (const uint256& txhash : requestable) { + CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txhash); if (!AlreadyHave(inv)) { - // If this transaction was last requested more than 1 minute ago, - // then request. - int64_t last_request_time = GetTxRequestTime(inv.hash); - if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { - LogPrint("net", "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= MAX_GETDATA_SZ) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); - } - UpdateTxRequestTime(inv.hash, current_time); - state.m_tx_download.m_tx_in_flight.emplace(inv.hash, current_time); - } else { - // This transaction is in flight from someone else; queue - // up processing to happen after the download times out - // (with a slight delay for inbound peers, to prefer - // requests to outbound peers). - int64_t next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload); - tx_process_time.emplace(next_process_time, txid); + LogPrint("net", "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.emplace_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); } + g_txrequest.RequestedTx(pto->GetId(), txhash, current_time + GETDATA_TX_INTERVAL); } else { // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(inv.hash); - state.m_tx_download.m_tx_in_flight.erase(inv.hash); + g_txrequest.ForgetTxHash(txhash); } }