mirror of
https://github.com/dogecoin/dogecoin.git
synced 2026-01-31 10:30:52 +00:00
net: improve upon transaction scheduling rework
- Add an explicit memory bound to m_tx_process_time
Previously there was an implicit bound based on the handling
of m_tx_announced, but that approach is error-prone
(particularly if we start automatically removing things from
that set).
- Remove NOTFOUND transactions from in-flight data structures
This prevents a bug where the in-flight queue for our peers
will not be drained, resulting in not downloading any new
transactions from our peers.
- Expire old entries from the in-flight tx map
If a peer hasn't responded to a getdata request, eventually
time out the request and remove it from the in-flight data
structures. This is to prevent any bugs in our handling of
those in-flight data structures from filling up the
in-flight map and preventing us from requesting more
transactions (such as the NOTFOUND bug mentioned above).
- Fix bug around transaction requests
If a transaction is already in-flight when a peer announces
a new tx to us, we schedule a time in the future to
reconsider whether to download. At that future time, there
was a bug that would prevent transactions from being
rescheduled for potential download again (ie if the
transaction was still in-flight at the time of
reconsideration, such as from some other peer). This fixes
that bug.
- Improve NOTFOUND comment
Cherry-picked from: 218697b6, 23163b75, e32e0840, f635a3ba
and 308b7673
Co-authored-by: Anthony Towns <aj@erisian.com.au>
This commit is contained in:
parent
596041ccb7
commit
1e596ed731
@ -52,11 +52,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
|
||||
/** Maximum number of announced transactions from a peer */
|
||||
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
|
||||
/** How many microseconds to delay requesting transactions from inbound peers */
|
||||
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000;
|
||||
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds
|
||||
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
|
||||
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000;
|
||||
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute
|
||||
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
|
||||
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000;
|
||||
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds
|
||||
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
|
||||
static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL;
|
||||
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
|
||||
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
|
||||
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
|
||||
@ -223,9 +225,9 @@ struct CNodeState {
|
||||
* CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
|
||||
* isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
|
||||
*
|
||||
* The process_time for a transaction is set to nNow for outbound peers,
|
||||
* nNow + 2 seconds for inbound peers. This is the time at which we'll
|
||||
* consider trying to request the transaction from the peer in
|
||||
* The process_time for a transaction is set to current_time for outbound
|
||||
* peers, current_time + 2 seconds for inbound peers. This is the time at
|
||||
* which we'll consider trying to request the transaction from the peer in
|
||||
* SendMessages(). The delay for inbound peers is to allow outbound peers
|
||||
* a chance to announce before we request from inbound peers, to prevent
|
||||
* an adversary from using inbound connections to blind us to a
|
||||
@ -233,8 +235,8 @@ struct CNodeState {
|
||||
*
|
||||
* When we call SendMessages() for a given peer,
|
||||
* we will loop over the transactions in m_tx_process_time, looking
|
||||
* at the transactions whose process_time <= nNow. We'll request each
|
||||
* such transaction that we don't have already and that hasn't been
|
||||
* at the transactions whose process_time <= current_time. We'll request
|
||||
* each such transaction that we don't have already and that hasn't been
|
||||
* requested from another peer recently, up until we hit the
|
||||
* MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
|
||||
* g_already_asked_for for each requested txid, storing the time of the
|
||||
@ -269,8 +271,11 @@ struct CNodeState {
|
||||
//! Store all the transactions a peer has recently announced
|
||||
std::set<uint256> m_tx_announced;
|
||||
|
||||
//! Store transactions which were requested by us
|
||||
std::set<uint256> m_tx_in_flight;
|
||||
//! Store transactions which were requested by us, with timestamp
|
||||
std::map<uint256, int64_t> m_tx_in_flight;
|
||||
|
||||
//! Periodically check for stuck getdata requests
|
||||
int64_t m_check_expiry_timer{0};
|
||||
};
|
||||
|
||||
TxDownloadState m_tx_download;
|
||||
@ -661,30 +666,40 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO
|
||||
}
|
||||
}
|
||||
|
||||
void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
{
|
||||
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
||||
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) {
|
||||
// Too many queued announcements from this peer, or we already have
|
||||
// this announcement
|
||||
return;
|
||||
}
|
||||
peer_download_state.m_tx_announced.insert(txid);
|
||||
|
||||
int64_t process_time;
|
||||
int64_t last_request_time = GetTxRequestTime(txid);
|
||||
|
||||
// First time requesting this tx
|
||||
if (last_request_time == 0) {
|
||||
process_time = nNow;
|
||||
process_time = current_time;
|
||||
} else {
|
||||
// Randomize the delay to avoid biasing some peers over others (such as due to
|
||||
// fixed ordering of peer processing in ThreadMessageHandler)
|
||||
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
|
||||
}
|
||||
|
||||
// We delay processing announcements from non-preferred (eg inbound) peers
|
||||
if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY;
|
||||
// We delay processing announcements from inbound peers
|
||||
if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
|
||||
|
||||
return process_time;
|
||||
}
|
||||
|
||||
void RequestTx(CNodeState* state, const uint256& txid, int64_t current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
{
|
||||
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
||||
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
||||
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
||||
peer_download_state.m_tx_announced.count(txid)) {
|
||||
// Too many queued announcements from this peer, or we already have
|
||||
// this announcement
|
||||
return;
|
||||
}
|
||||
peer_download_state.m_tx_announced.insert(txid);
|
||||
|
||||
// Calculate the time to try requesting this transaction. Use
|
||||
// fPreferredDownload as a proxy for outbound peers.
|
||||
int64_t process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload);
|
||||
|
||||
peer_download_state.m_tx_process_time.emplace(process_time, txid);
|
||||
}
|
||||
@ -1282,12 +1297,19 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
||||
|
||||
if (!vNotFound.empty()) {
|
||||
// Let the peer know that we didn't find what it asked for, so it doesn't
|
||||
// have to wait around forever. Currently only SPV clients actually care
|
||||
// about this message: it's needed when they are recursively walking the
|
||||
// dependencies of relevant unconfirmed transactions. SPV clients want to
|
||||
// do that because they want to know about (and store and rebroadcast and
|
||||
// risk analyze) the dependencies of transactions relevant to them, without
|
||||
// having to download the entire memory pool.
|
||||
// have to wait around forever.
|
||||
// SPV clients care about this message: it's needed when they are
|
||||
// recursively walking the dependencies of relevant unconfirmed
|
||||
// transactions. SPV clients want to do that because they want to know
|
||||
// about (and store and rebroadcast and risk analyze) the dependencies
|
||||
// of transactions relevant to them, without having to download the
|
||||
// entire memory pool.
|
||||
// Also, other nodes can use these messages to automatically request a
|
||||
// transaction from some other peer that annnounced it, and stop
|
||||
// waiting for us to respond.
|
||||
// In normal operation, we often send NOTFOUND messages for parents of
|
||||
// transactions that we relay; if a peer is missing a parent, they may
|
||||
// assume we have them and request the parents from us.
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound));
|
||||
}
|
||||
}
|
||||
@ -1675,7 +1697,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||
LOCK(cs_main);
|
||||
|
||||
uint32_t nFetchFlags = GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus(chainActive.Height()));
|
||||
int64_t nNow = GetTimeMicros();
|
||||
int64_t current_time = GetTimeMicros();
|
||||
|
||||
std::vector<CInv> vToFetch;
|
||||
|
||||
@ -1712,7 +1734,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||
if (fBlocksOnly)
|
||||
LogPrint("net", "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->id);
|
||||
else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload())
|
||||
RequestTx(State(pfrom->GetId()), inv.hash, nNow);
|
||||
RequestTx(State(pfrom->GetId()), inv.hash, current_time);
|
||||
}
|
||||
|
||||
}
|
||||
@ -2041,12 +2063,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||
}
|
||||
if (!fRejectedParents) {
|
||||
uint32_t nFetchFlags = GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus(chainActive.Height()));
|
||||
int64_t nNow = GetMockableTimeMicros();
|
||||
int64_t current_time = GetMockableTimeMicros();
|
||||
|
||||
BOOST_FOREACH(const CTxIn& txin, tx.vin) {
|
||||
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
|
||||
pfrom->AddInventoryKnown(_inv);
|
||||
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow);
|
||||
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time);
|
||||
}
|
||||
AddOrphanTx(ptx, pfrom->GetId());
|
||||
|
||||
@ -2761,8 +2783,28 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||
}
|
||||
|
||||
else if (strCommand == NetMsgType::NOTFOUND) {
|
||||
// We do not care about the NOTFOUND message, but logging an Unknown Command
|
||||
// message would be undesirable as we transmit it ourselves.
|
||||
// Remove the NOTFOUND transactions from the peer
|
||||
LOCK(cs_main);
|
||||
CNodeState *state = State(pfrom->GetId());
|
||||
std::vector<CInv> vInv;
|
||||
vRecv >> vInv;
|
||||
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
for (CInv &inv : vInv) {
|
||||
if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) {
|
||||
// If we receive a NOTFOUND message for a txid we requested, erase
|
||||
// it from our data structures for this peer.
|
||||
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
|
||||
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
|
||||
// Skip any further work if this is a spurious NOTFOUND
|
||||
// message.
|
||||
continue;
|
||||
}
|
||||
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
|
||||
state->m_tx_download.m_tx_announced.erase(inv.hash);
|
||||
LogPrint("net", "received: notfound tx %s from peer=%d\n", inv.hash.ToString(), pfrom->id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else {
|
||||
@ -3425,9 +3467,33 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic<bool>& interr
|
||||
//
|
||||
// Message: getdata (non-blocks)
|
||||
//
|
||||
|
||||
// For robustness, expire old requests after a long timeout, so that
|
||||
// we can resume downloading transactions from a peer even if they
|
||||
// were unresponsive in the past.
|
||||
// Eventually we should consider disconnecting peers, but this is
|
||||
// conservative.
|
||||
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
|
||||
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
|
||||
if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
|
||||
LogPrint("net", "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
|
||||
state.m_tx_download.m_tx_announced.erase(it->first);
|
||||
state.m_tx_download.m_tx_in_flight.erase(it++);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
|
||||
// so that we're not doing this for all peers at the same time.
|
||||
state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL);
|
||||
}
|
||||
|
||||
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
||||
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
||||
const uint256& txid = tx_process_time.begin()->second;
|
||||
const uint256 txid = tx_process_time.begin()->second;
|
||||
// Erase this entry from tx_process_time (it may be added back for
|
||||
// processing at a later time, see below)
|
||||
tx_process_time.erase(tx_process_time.begin());
|
||||
CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txid);
|
||||
if (!AlreadyHave(inv)) {
|
||||
// If this transaction was last requested more than 1 minute ago,
|
||||
@ -3441,20 +3507,20 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic<bool>& interr
|
||||
vGetData.clear();
|
||||
}
|
||||
UpdateTxRequestTime(inv.hash, current_time);
|
||||
state.m_tx_download.m_tx_in_flight.insert(inv.hash);
|
||||
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, current_time);
|
||||
} else {
|
||||
// This transaction is in flight from someone else; queue
|
||||
// up processing to happen after the download times out
|
||||
// (with a slight delay for inbound peers, to prefer
|
||||
// requests to outbound peers).
|
||||
RequestTx(&state, txid, current_time);
|
||||
int64_t next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload);
|
||||
tx_process_time.emplace(next_process_time, txid);
|
||||
}
|
||||
} else {
|
||||
// We have already seen this transaction, no need to download.
|
||||
state.m_tx_download.m_tx_announced.erase(inv.hash);
|
||||
state.m_tx_download.m_tx_in_flight.erase(inv.hash);
|
||||
}
|
||||
tx_process_time.erase(tx_process_time.begin());
|
||||
}
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user