net: use txrequest for transaction request logic

The major changes are:

* Announcements from outbound (and whitelisted) peers are now
  always preferred over those from inbound peers. This used to be
  the case for the first request (by delaying the first request
  from inbound peers), and a bias after. The 2s delay for requests
  from inbound peers still exists, but after that, if viable
  outbound peers remain for any given transaction, they will
  always be tried first.

* No more hard cap of 100 in flight transactions per peer, as
  there is less need for it (memory usage is linear in the number
  of announcements, but independent from the number in flight,
  and CPU usage isn't affected by it). Furthermore, if only one
  peer announces a transaction, and it has over 100 in flight and
  requestable already, we still want to request it from them. The
  cap is replaced with an additional 2s delay (possibly combined
  with the existing 2s delays for inbound connections).

Backported from: 242d1647
                 173a1d2d
Original Author: Pieter Wuille <pieter@wuille.net>

Conflicts:
  - replaced GenTx with uint256 because no segwit
  - removed additional 2s penalty for non-segwit peers
  - used int64_t instead of std::chrono::microseconds per utiltime
  - implemented TxRequest as g_txrequest instead of as a member of
    PeerManager, which we don't have
  - removed the Dogecoin-specific strict max inflight test
  - make exceptions for whitelisted nodes as there is no fine-
    grained permission system
This commit is contained in:
Patrick Lodder 2024-07-05 19:15:41 -04:00
parent 4bc8099741
commit 3a700cde38
No known key found for this signature in database
GPG Key ID: 7C523F5FBABE80E7
2 changed files with 68 additions and 249 deletions

View File

@ -78,7 +78,6 @@ class TxDownloadTest(BitcoinTestFramework):
self.test_tx_request()
self.test_invblock_resolution()
self.test_max_inflight()
self.test_disconnect_fallback()
self.test_notfound_fallback()
@ -212,58 +211,6 @@ class TxDownloadTest(BitcoinTestFramework):
assert tx.hash in self.nodes[0].getrawmempool()
def test_max_inflight(self):
# First, forward time by 2x inflight timeout, so that we have clean
# registers for each peer
self.forward_mocktime(2 * TX_EXPIRY_INTERVAL)
# now send MAX_GETDATA_IN_FLIGHT (=100) invs with peer 0
peer = self.incoming_peers[0]
invd = []
for i in range(MAX_GETDATA_IN_FLIGHT):
txid = self.next_fake_txid()
peer.send_tx_inv([txid])
invd.append(txid)
# warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# test that we got all the getdatas
assert self.wait_for_getdata(invd, [peer])
# send one more inv with our now maxed out peer
txid_failed = self.next_fake_txid()
peer.send_tx_inv([txid_failed])
# and send one inv with another peer
txid_success = self.next_fake_txid()
self.incoming_peers[1].send_tx_inv([txid_success])
# warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# test that we got a getdata for the successful tx with peer 1
assert self.wait_for_getdata([txid_success], [self.incoming_peers[1]])
# test that we did not get a getdata for the failed txid with peer 0
assert not self.any_received_getdata(txid_failed, [peer])
# clear out the inflight register by expiring all requests
self.forward_mocktime(TX_EXPIRY_INTERVAL)
# send one inv with 4 txs
txids = []
for i in range(4):
txids.append(self.next_fake_txid())
peer.send_tx_inv(txids)
# warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# test that we got a getdata for the final inv with peer 0
assert self.wait_for_getdata(txids, [peer])
def test_notfound_fallback(self):
# use peer 4 and 5 to concurrently send 2 invs
peers = self.incoming_peers[4:6]

View File

@ -25,6 +25,7 @@
#include "random.h"
#include "tinyformat.h"
#include "txmempool.h"
#include "txrequest.h"
#include "ui_interface.h"
#include "util.h"
#include "utilmoneystr.h"
@ -54,15 +55,11 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
/** Maximum number of announced transactions from a peer */
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
/** How many microseconds to delay requesting transactions from inbound peers */
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds
static constexpr int64_t NONPREF_PEER_TX_DELAY = 2 * 1000000; // 2 seconds
/** How many microseconds to delay requesting transactions from overloaded peers */
static constexpr int64_t OVERLOADED_PEER_TX_DELAY = 2 * 1000000;
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr int64_t GETDATA_TX_INTERVAL = 30 * 1000000; // 30 seconds
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL;
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
static const unsigned int MAX_GETDATA_SZ = 1000;
@ -79,6 +76,8 @@ void EraseOrphansFor(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
static size_t vExtraTxnForCompactIt = 0;
static std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(cs_main);
static TxRequestTracker g_txrequest GUARDED_BY(cs_main);
static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8]
// Internal stuff
@ -218,70 +217,6 @@ struct CNodeState {
*/
bool fSupportsDesiredCmpctVersion;
/*
* State associated with transaction download.
*
* Tx download algorithm:
*
* When inv comes in, queue up (process_time, txid) inside the peer's
* CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
* isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
*
* The process_time for a transaction is set to current_time for outbound
* peers, current_time + 2 seconds for inbound peers. This is the time at
* which we'll consider trying to request the transaction from the peer in
* SendMessages(). The delay for inbound peers is to allow outbound peers
* a chance to announce before we request from inbound peers, to prevent
* an adversary from using inbound connections to blind us to a
* transaction (InvBlock).
*
* When we call SendMessages() for a given peer,
* we will loop over the transactions in m_tx_process_time, looking
* at the transactions whose process_time <= current_time. We'll request
* each such transaction that we don't have already and that hasn't been
* requested from another peer recently, up until we hit the
* MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
* g_already_asked_for for each requested txid, storing the time of the
* GETDATA request. We use g_already_asked_for to coordinate transaction
* requests amongst our peers.
*
* For transactions that we still need but we have already recently
* requested from some other peer, we'll reinsert (process_time, txid)
* back into the peer's m_tx_process_time at the point in the future at
* which the most recent GETDATA request would time out (ie
* GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
* We add an additional delay for inbound peers, again to prefer
* attempting download from outbound peers first.
* We also add an extra small random delay up to 2 seconds
* to avoid biasing some peers over others. (e.g., due to fixed ordering
* of peer processing in ThreadMessageHandler).
*
* When we receive a transaction from a peer, we remove the txid from the
* peer's m_tx_in_flight set and from their recently announced set
* (m_tx_announced). We also clear g_already_asked_for for that entry, so
* that if somehow the transaction is not accepted but also not added to
* the reject filter, then we will eventually redownload from other
* peers.
*/
struct TxDownloadState {
/* Track when to attempt download of announced transactions (process
* time in micros -> txid)
*/
std::multimap<int64_t, uint256> m_tx_process_time;
//! Store all the transactions a peer has recently announced
std::set<uint256> m_tx_announced;
//! Store transactions which were requested by us, with timestamp
std::map<uint256, int64_t> m_tx_in_flight;
//! Periodically check for stuck getdata requests
int64_t m_check_expiry_timer{0};
};
TxDownloadState m_tx_download;
CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
fCurrentlyConnected = false;
nMisbehavior = 0;
@ -305,10 +240,8 @@ struct CNodeState {
fWantsCmpctWitness = false;
fSupportsDesiredCmpctVersion = false;
}
};
// Keeps track of the time (in microseconds) when transactions were requested last time
limitedmap<uint256, int64_t> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
};
/** Map maintaining per-node state. Requires cs_main. */
std::map<NodeId, CNodeState> mapNodeState;
@ -358,6 +291,7 @@ void InitializeNode(CNode *pnode, CConnman& connman) {
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
assert(g_txrequest.Count(nodeid) == 0);
}
if(!pnode->fInbound)
PushNodeVersion(pnode, connman, GetTime());
@ -379,6 +313,8 @@ void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
mapBlocksInFlight.erase(entry.hash);
}
EraseOrphansFor(nodeid);
g_txrequest.DisconnectedPeer(nodeid);
nPreferredDownload -= state->fPreferredDownload;
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
assert(nPeersWithValidatedDownloads >= 0);
@ -390,6 +326,7 @@ void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
assert(mapBlocksInFlight.empty());
assert(nPreferredDownload == 0);
assert(nPeersWithValidatedDownloads == 0);
assert(g_txrequest.Size() == 0);
}
}
@ -643,71 +580,35 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<con
}
}
void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
} // anon namespace
void AddTxAnnouncement(CNode* node, const uint256& txhash, int64_t current_time)
{
g_already_asked_for.erase(txid);
}
AssertLockHeld(cs_main); // For g_txrequest
NodeId nodeid = node->GetId();
int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
auto it = g_already_asked_for.find(txid);
if (it != g_already_asked_for.end()) {
return it->second;
}
return 0;
}
void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
auto it = g_already_asked_for.find(txid);
if (it == g_already_asked_for.end()) {
g_already_asked_for.insert(std::make_pair(txid, request_time));
} else {
g_already_asked_for.update(it, request_time);
}
}
int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
int64_t process_time;
int64_t last_request_time = GetTxRequestTime(txid);
// First time requesting this tx
if (last_request_time == 0) {
process_time = current_time;
} else {
// Randomize the delay to avoid biasing some peers over others (such as due to
// fixed ordering of peer processing in ThreadMessageHandler)
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
}
// We delay processing announcements from inbound peers
if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
return process_time;
}
void RequestTx(CNodeState* state, const uint256& txid, int64_t current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
peer_download_state.m_tx_announced.count(txid)) {
// Too many queued announcements from this peer, or we already have
// this announcement
if (!node->fWhitelisted && g_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) {
// Too many queued announcements from this peer
return;
}
peer_download_state.m_tx_announced.insert(txid);
// Calculate the time to try requesting this transaction. Use
// fPreferredDownload as a proxy for outbound peers.
int64_t process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload);
// Decide the TxRequestTracker parameters for this announcement:
// - "preferred": if fPreferredDownload is set (= outbound, or whitelisted)
// - "reqtime": current time plus delays for:
// - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections
// - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least
// MAX_PEER_TX_IN_FLIGHT requests in flight and aren't whitelisted.
const CNodeState* state = State(nodeid);
const bool preferred = state->fPreferredDownload;
const bool overloaded = (!node->fWhitelisted && g_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_IN_FLIGHT);
peer_download_state.m_tx_process_time.emplace(process_time, txid);
int64_t delay = 0;
if (!preferred) delay += NONPREF_PEER_TX_DELAY;
if (overloaded) delay += OVERLOADED_PEER_TX_DELAY;
g_txrequest.ReceivedInv(nodeid, txhash, preferred, current_time + delay);
}
} // anon namespace
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
LOCK(cs_main);
CNodeState *state = State(nodeid);
@ -941,6 +842,9 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn
}
LogPrint("mempool", "Erased %d orphan tx included or conflicted by block\n", nErased);
}
// Forget tracked announcements for transactions included in a block.
g_txrequest.ForgetTxHash(tx.GetHash());
}
static CCriticalSection cs_most_recent_block;
@ -1848,7 +1752,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (fBlocksOnly)
LogPrint("net", "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->id);
else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload())
RequestTx(State(pfrom->GetId()), inv.hash, current_time);
AddTxAnnouncement(pfrom, inv.hash, current_time);
}
}
@ -2089,15 +1993,18 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
bool fMissingInputs = false;
CValidationState state;
CNodeState* nodestate = State(pfrom->GetId());
nodestate->m_tx_download.m_tx_announced.erase(inv.hash);
nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash);
EraseTxRequest(inv.hash);
// Mark the tx as received
g_txrequest.ReceivedResponse(pfrom->GetId(), inv.hash);
std::list<CTransactionRef> lRemovedTxn;
if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, true, &fMissingInputs, &lRemovedTxn)) {
mempool.check(pcoinsTip);
// As this version of the transaction was acceptable, we can forget
// about any requests for it.
g_txrequest.ForgetTxHash(tx.GetHash());
RelayTransaction(tx, connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
@ -2134,10 +2041,16 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
BOOST_FOREACH(const CTxIn& txin, tx.vin) {
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv);
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time);
if (!AlreadyHave(_inv)) {
AddTxAnnouncement(pfrom, _inv.hash, current_time);
}
}
AddOrphanTx(ptx, pfrom->GetId());
// Once added to the orphan pool, a tx is considered
// AlreadyHave, and we shouldn't request it anymore.
g_txrequest.ForgetTxHash(tx.GetHash());
// DoS prevention: do not allow mapOrphanTransactions to grow unbounded
unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS));
unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx);
@ -2148,6 +2061,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
// We will continue to reject this tx since it has rejected
// parents so avoid re-requesting it from other peers.
recentRejects->insert(tx.GetHash());
g_txrequest.ForgetTxHash(tx.GetHash());
}
} else {
if (!tx.HasWitness() && !state.CorruptionPossible()) {
@ -2156,6 +2070,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
// See https://github.com/bitcoin/bitcoin/issues/8279 for details.
assert(recentRejects);
recentRejects->insert(tx.GetHash());
g_txrequest.ForgetTxHash(tx.GetHash());
if (RecursiveDynamicUsage(*ptx) < 100000) {
AddToCompactExtraTransactions(ptx);
}
@ -2851,23 +2766,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
else if (strCommand == NetMsgType::NOTFOUND) {
// Remove the NOTFOUND transactions from the peer
LOCK(cs_main);
CNodeState *state = State(pfrom->GetId());
std::vector<CInv> vInv;
vRecv >> vInv;
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
for (CInv &inv : vInv) {
if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) {
// If we receive a NOTFOUND message for a txid we requested, erase
// it from our data structures for this peer.
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
// Skip any further work if this is a spurious NOTFOUND
// message.
continue;
}
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
state->m_tx_download.m_tx_announced.erase(inv.hash);
LogPrint("net", "received: notfound tx %s from peer=%d\n", inv.hash.ToString(), pfrom->id);
// If we receive a NOTFOUND message for a txid we requested, we
// mark the announcement as completed in TxRequestTracker.
g_txrequest.ReceivedResponse(pfrom->GetId(), inv.hash);
}
}
}
@ -3543,59 +3449,25 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic<bool>& interr
//
// Message: getdata (non-blocks)
//
// For robustness, expire old requests after a long timeout, so that
// we can resume downloading transactions from a peer even if they
// were unresponsive in the past.
// Eventually we should consider disconnecting peers, but this is
// conservative.
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
LogPrint("net", "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
state.m_tx_download.m_tx_announced.erase(it->first);
state.m_tx_download.m_tx_in_flight.erase(it++);
} else {
++it;
}
}
// On average, we do this check every TX_EXPIRY_INTERVAL/3.75. Randomize
// so that we're not doing this for all peers at the same time.
state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL/5 + GetRand(TX_EXPIRY_INTERVAL/5);
std::vector<std::pair<NodeId, uint256>> expired;
auto requestable = g_txrequest.GetRequestable(pto->GetId(), current_time, &expired);
for (const auto& entry : expired) {
LogPrint("net", "timeout of inflight tx %s from peer=%d\n", entry.second.ToString(), entry.first);
}
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
const uint256 txid = tx_process_time.begin()->second;
// Erase this entry from tx_process_time (it may be added back for
// processing at a later time, see below)
tx_process_time.erase(tx_process_time.begin());
CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txid);
for (const uint256& txhash : requestable) {
CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txhash);
if (!AlreadyHave(inv)) {
// If this transaction was last requested more than 1 minute ago,
// then request.
int64_t last_request_time = GetTxRequestTime(inv.hash);
if (last_request_time <= current_time - GETDATA_TX_INTERVAL) {
LogPrint("net", "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
vGetData.push_back(inv);
if (vGetData.size() >= MAX_GETDATA_SZ) {
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
vGetData.clear();
}
UpdateTxRequestTime(inv.hash, current_time);
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, current_time);
} else {
// This transaction is in flight from someone else; queue
// up processing to happen after the download times out
// (with a slight delay for inbound peers, to prefer
// requests to outbound peers).
int64_t next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload);
tx_process_time.emplace(next_process_time, txid);
LogPrint("net", "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
vGetData.emplace_back(inv);
if (vGetData.size() >= MAX_GETDATA_SZ) {
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
vGetData.clear();
}
g_txrequest.RequestedTx(pto->GetId(), txhash, current_time + GETDATA_TX_INTERVAL);
} else {
// We have already seen this transaction, no need to download.
state.m_tx_download.m_tx_announced.erase(inv.hash);
state.m_tx_download.m_tx_in_flight.erase(inv.hash);
g_txrequest.ForgetTxHash(txhash);
}
}