diff --git a/qa/rpc-tests/p2p-tx-download.py b/qa/rpc-tests/p2p-tx-download.py index 8c8a951f8..34333ef2f 100644 --- a/qa/rpc-tests/p2p-tx-download.py +++ b/qa/rpc-tests/p2p-tx-download.py @@ -16,10 +16,12 @@ TxDownload -- test transaction download logic GETDATA_TX_INTERVAL = 30 # seconds TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL # 5 minutes INBOUND_PEER_TX_DELAY = 2 # seconds -TXID_RELAY_DELAY = 2 # seconds +OVERLOADED_PEER_DELAY = 2 # seconds MAX_GETDATA_IN_FLIGHT = 100 MAX_PEER_TX_ANNOUNCEMENTS = 5000 +MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + class TxDownloadTestNode(SingleNodeConnCB): def __init__(self): SingleNodeConnCB.__init__(self) @@ -87,6 +89,8 @@ class TxDownloadTest(BitcoinTestFramework): self.test_disconnect_fallback() self.test_notfound_fallback() self.test_max_announcements() + self.test_inflight_throttling() + self.test_expiry_fallback() def setup_network(self): # set up full nodes @@ -100,14 +104,10 @@ class TxDownloadTest(BitcoinTestFramework): connect_nodes(self.nodes[0], 1) self.sync_all() - # set up incoming (non-honest) peers - self.incoming_peers = [] - for i in range(8): - self.incoming_peers.append(self.create_testnode()) - + # create a single control peer that is only used for ping sync + self.control_peer = self.create_testnode() NetworkThread().start() - for peer in self.incoming_peers: - peer.wait_for_verack() + self.control_peer.wait_for_verack() def create_testnode(self, node_idx=0): node = TxDownloadTestNode() @@ -115,6 +115,20 @@ class TxDownloadTest(BitcoinTestFramework): node.add_connection(conn) return node + def connect_incoming_peers(self, num): + peers = [] + for _ in range(num): + peer = self.create_testnode() + peer.wait_for_verack() + peers.append(peer) + return peers + + def disconnect_incoming_peers(self, peers): + for peer in peers: + if not peer.disconnect(): + return False + return True + def any_received_getdata(self, hash, peers): for peer in peers: if hash in peer.tx_getdata_received: @@ -135,6 +149,11 @@ class TxDownloadTest(BitcoinTestFramework): return True return wait_until(getdata_received, timeout=10) + def wait_for_mocktime(self, node): + def mocktime_is_good(): + return node.getmocktime() >= self.mocktime + return wait_until(mocktime_is_good, timeout=10) + def find_winning_peer(self, peers, hash): # detect which peer won a race for getting a getdata hash selected = None @@ -154,15 +173,20 @@ class TxDownloadTest(BitcoinTestFramework): self.mocktime += delta_time for node in self.nodes: node.setmocktime(self.mocktime) - # give the nodes some time to process the new mocktime - # can be removed when we have getmocktime - time.sleep(0.1) + if not self.wait_for_mocktime(node): + return False + # sync the control peer with ping so that we're 100% sure we have + # entered a new message handling loop + self.control_peer.sync_with_ping() + return True def forward_mocktime_step2(self, iterations): # forward mocktime in steps of 2 seconds to allow the nodes # time to recognize they have to do something for i in range(iterations): - self.forward_mocktime(2) + if not self.forward_mocktime(2): + return False + return True def next_fake_txid(self): self.fake_txid += 1 @@ -170,24 +194,32 @@ class TxDownloadTest(BitcoinTestFramework): def test_tx_request(self): txid = self.next_fake_txid() - self.forward_mocktime(0) + assert self.forward_mocktime(1) - # use incoming peers 0 and 1 - peerset = self.incoming_peers[0:4] - for peer in peerset: + # use 4 peers + peers = self.connect_incoming_peers(4) + for peer in peers: peer.send_tx_inv([txid]) - # To make sure we eventually ask the tx from all 4 nodes that announced + # use 2 more peers that do not send invs + otherpeers = self.connect_incoming_peers(2) + + # To make sure we eventually ask the tx from all 4 peers that announced # to us, we're now jumping 4 * (2 + 2 + 30) = 136 seconds to the future - warp = 4 * (INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + GETDATA_TX_INTERVAL) + warp = 4 * MAX_GETDATA_INBOUND_WAIT self.forward_mocktime_step2(warp//2) # All peers that sent the inv should now have received a getdata request - assert self.wait_for_getdata([txid], peerset) + assert self.wait_for_getdata([txid], peers) # Make sure the other peers did not receive the getdata because they # didn't indicate they have the tx - assert not self.any_received_getdata(txid, self.incoming_peers[4:8]) + assert not self.any_received_getdata(txid, otherpeers) + + # cleanup + assert self.disconnect_incoming_peers(peers) + assert self.disconnect_incoming_peers(otherpeers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) def test_invblock_resolution(self): inputs = [self.nodes[1].listunspent()[0]] @@ -198,93 +230,187 @@ class TxDownloadTest(BitcoinTestFramework): tx.rehash() txid = int(tx.hash, 16) - self.forward_mocktime(0) + assert self.forward_mocktime(1) # make sure that node 1 is outbound for node 0 assert self.nodes[0].getpeerinfo()[0]['inbound'] == False - # use all peers that only inv but never respond to getdata - for peer in self.incoming_peers: + # use 8 peers that only inv but never respond to getdata + peers = self.connect_incoming_peers(8) + for peer in peers: peer.send_tx_inv([txid]) # send from our honest node last self.nodes[1].sendrawtransaction(tx_hex) - # We jump forward 2x (2 + 2) + 30 + 2 (margin) = 40 seconds to make sure + # We jump forward 2x max inbound wait time to make sure # that we get to the point where we re-evaluate the transaction in 2 # second steps - warp = 2 * (INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY) + GETDATA_TX_INTERVAL + 2 - self.forward_mocktime_step2(warp//2) + warp = 2 * MAX_GETDATA_INBOUND_WAIT + assert self.forward_mocktime_step2(warp//2) assert tx.hash in self.nodes[0].getrawmempool() - def test_notfound_fallback(self): - # use peer 4 and 5 to concurrently send 2 invs - peers = self.incoming_peers[4:6] + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + + def test_inflight_throttling(self): + # First, forward time by 2x inflight timeout, so that we have clean + # registers for each peer + self.forward_mocktime(2 * TX_EXPIRY_INTERVAL) + + # now send MAX_GETDATA_IN_FLIGHT (=100) invs with 1 peer + peer = self.connect_incoming_peers(1)[0] + invs = [] + for i in range(MAX_GETDATA_IN_FLIGHT): + txid = self.next_fake_txid() + invs.append(txid) + + peer.send_tx_inv(invs) + + # warp forward 3 seconds in steps of 1 second + warp = INBOUND_PEER_TX_DELAY + 1 + for _ in range(warp): + assert self.forward_mocktime(1) + peer.sync_with_ping() + + # test that we got all the getdata + assert peer.wait_until_numgetdata(MAX_GETDATA_IN_FLIGHT) + + peer.send_tx_inv([self.next_fake_txid()]) + + # warp forward 3 seconds again + warp = INBOUND_PEER_TX_DELAY + 1 + for _ in range(warp): + assert self.forward_mocktime(1) + peer.sync_with_ping() + + # test that we haven't received the getdata request yet + assert len(peer.tx_getdata_received) == MAX_GETDATA_IN_FLIGHT + + # additionally warp the overloaded peer delay time second margin + warp = OVERLOADED_PEER_DELAY + for _ in range(warp): + assert self.forward_mocktime(1) + peer.sync_with_ping() + + # test that we now received the getdata + assert peer.wait_until_numgetdata(MAX_GETDATA_IN_FLIGHT + 1) + + # cleanup + assert self.disconnect_incoming_peers([peer]) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + + def test_expiry_fallback(self): + # create 2 new peers + peers = self.connect_incoming_peers(2) + txid = self.next_fake_txid() - self.forward_mocktime(1) + assert self.forward_mocktime(1) for peer in peers: peer.send_tx_inv([txid]) - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward 2 + 2 (margin) = 4 seconds in steps of 2 + warp = INBOUND_PEER_TX_DELAY + 2 + assert self.forward_mocktime_step2(warp//2) + + winner, loser = self.find_winning_peer(peers, txid) + + # expire the request from the winning peer by doing nothing + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) + + # the losing peer is now the fallback and received a getdata message + assert self.wait_for_getdata([txid], [loser]) + + #cleanup + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + + def test_notfound_fallback(self): + # use 2 peers to concurrently send 2 invs + peers = self.connect_incoming_peers(2) + txid = self.next_fake_txid() + assert self.forward_mocktime(1) + + for peer in peers: + peer.send_tx_inv([txid]) + + # warp forward 2 + 2 (margin) = 4 seconds in steps of 2 + warp = INBOUND_PEER_TX_DELAY + 2 + assert self.forward_mocktime_step2(warp//2) winner, loser = self.find_winning_peer(peers, txid) # send a reject message from the peer that won the race winner.send_tx_notfound([txid]) - # warp forward 30 + 2 + 2 + 2 (margin) = 36 seconds in steps of 2 - warp = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward the max wait time in steps of 2 + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) # the losing peer is now the fallback and received a getdata message assert self.wait_for_getdata([txid], [loser]) + #cleanup + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + def test_disconnect_fallback(self): - # use peer 6 and 7 to concurrently send 2 invs - peers = self.incoming_peers[6:8] + # use 2 peers to concurrently send 2 invs + peers = self.connect_incoming_peers(2) txid = self.next_fake_txid() - self.forward_mocktime(1) + assert self.forward_mocktime(1) for peer in peers: peer.send_tx_inv([txid]) - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward 2 + 2 (margin) = 4 seconds in steps of 2 + warp = INBOUND_PEER_TX_DELAY + 2 + assert self.forward_mocktime_step2(warp//2) winner, loser = self.find_winning_peer(peers, txid) # drop connection from the peer that won the race assert winner.disconnect() - # warp forward 30 + 2 + 2 + 2 (margin) = 36 seconds in steps of 2 - warp = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward the max wait time in steps of 2 + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) # the losing peer is now the fallback and received a getdata message assert self.wait_for_getdata([txid], [loser]) + #cleanup + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + def test_max_announcements(self): # create a test node - peer = self.create_testnode() - peer.wait_for_verack() + peer = self.connect_incoming_peers(1)[0] + + assert self.forward_mocktime(1) hashes = [] for _ in range(MAX_PEER_TX_ANNOUNCEMENTS): hashes.append(self.next_fake_txid()) peer.send_tx_inv(hashes) - peer.wait_until_numgetdata(MAX_PEER_TX_ANNOUNCEMENTS) + + # wait the maximum time before expiry minus 2 seconds to receive all + # getdata requests with this peer + warp = MAX_GETDATA_INBOUND_WAIT - 2 + assert self.forward_mocktime_step2(warp//2) + assert peer.wait_until_numgetdata(MAX_PEER_TX_ANNOUNCEMENTS) peer.sync_with_ping() - # send one more - this should never come back. + # send one more and wait the maximum time - this should never come back. extratx = self.next_fake_txid() peer.send_tx_inv([extratx]) + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) assert not self.any_received_getdata(extratx, [peer]) + #cleanup + assert self.disconnect_incoming_peers([peer]) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + if __name__ == '__main__': TxDownloadTest().main()