This commit is contained in:
Matias Furszyfer 2024-04-29 04:31:48 +02:00 committed by GitHub
commit 54846c9840
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 331 additions and 10 deletions

View File

@ -202,6 +202,7 @@ BITCOIN_CORE_H = \
merkleblock.h \
net.h \
net_permissions.h \
blockrequesttracker.h \
net_processing.h \
net_types.h \
netaddress.h \
@ -410,6 +411,7 @@ libbitcoin_node_a_SOURCES = \
kernel/mempool_removal_reason.cpp \
mapport.cpp \
net.cpp \
blockrequesttracker.cpp \
net_processing.cpp \
netgroup.cpp \
node/abort.cpp \

View File

@ -0,0 +1,90 @@
// Copyright (c) 2023 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
#include <blockrequesttracker.h>
template <typename T, typename V>
static bool Contain(const T& t, const V& value) { return std::find(t.begin(), t.end(), value) != t.end(); }
bool BlockRequestTracker::track(const uint256& block_hash, std::optional<NodeId> peer_id)
{
LOCK(cs_block_tracker);
auto it = m_tracked_blocks.emplace(block_hash, BlockRequest{});
if (!it.second) return false; // block already tracked
if (peer_id) {
// Add in-flight request
BlockRequest& req = it.first->second;
if (std::find(req.in_flight.begin(), req.in_flight.end(), *peer_id) != req.in_flight.end()) {
return false; // Already existent request
}
req.in_flight.emplace_back(*peer_id);
} else {
// Add block to pending list
m_pending.emplace_back(it.first);
}
return true;
}
void BlockRequestTracker::untrack_internal(const uint256& block_hash)
{
auto it = m_tracked_blocks.find(block_hash);
if (it == m_tracked_blocks.end()) return;
auto pending_it = std::remove(m_pending.begin(), m_pending.end(), it);
if (pending_it != m_pending.end()) m_pending.erase(pending_it);
m_tracked_blocks.erase(it);
}
void BlockRequestTracker::untrack(const uint256& block_hash)
{
LOCK(cs_block_tracker);
untrack_internal(block_hash);
}
void BlockRequestTracker::untrack_request(NodeId peer_id, const uint256& block_hash)
{
LOCK(cs_block_tracker);
// Get tracked block request
const auto& it = m_tracked_blocks.find(block_hash);
if (it == m_tracked_blocks.end()) return;
// Check if we were waiting for this peer
BlockRequest& request = it->second;
auto remove_it = std::remove(request.in_flight.begin(), request.in_flight.end(), peer_id);
if (remove_it == request.in_flight.end()) return;
// Clear request
request.in_flight.erase(remove_it);
// And add it to the pending vector if there are no other inflight request for this block
if (request.in_flight.empty() && !Contain(m_pending, it)) {
m_pending.emplace_back(it);
}
}
void BlockRequestTracker::for_pending(NodeId peer_id, const std::function<ForPendingStatus(const uint256&)>& check)
{
LOCK(cs_block_tracker);
for (auto it = m_pending.begin(); it != m_pending.end();) {
const auto& pending = *it;
switch(check(pending->first)) {
case ForPendingStatus::POP: {
BlockRequest& request = pending->second;
request.in_flight.emplace_back(peer_id);
it = m_pending.erase(it);
break;
}
case ForPendingStatus::SKIP:
++it;
break;
case ForPendingStatus::STOP:
return;
case ForPendingStatus::CANCEL:
untrack_internal(pending->first);
++it;
break;
}
}
}

71
src/blockrequesttracker.h Normal file
View File

@ -0,0 +1,71 @@
// Copyright (c) 2023 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_BLOCKREQUESTTRACKER_H
#define BITCOIN_BLOCKREQUESTTRACKER_H
#include <chain.h>
#include <net.h>
#include <serialize.h>
#include <util/hasher.h>
#include <list>
/**
* Structure in charge of tracking single blocks download progress.
* For now, only used to track blocks that aren't part of the automatic download process.
*/
class BlockRequestTracker
{
private:
struct BlockRequest {
// The in-flight request vector (preserving insertion order).
// Could be empty if:
// 1) peer failed to provide us the block and got disconnected.
// 2) if there are no available peers to request the block from.
// And, only when this is empty, 'm_pending' will contain an entry pointing
// to this structure. Which can be assigned to a peer by calling 'pop_pending()'.
std::vector<NodeId> in_flight;
explicit BlockRequest() {}
};
Mutex cs_block_tracker;
// The blocks we are tracking
using BlockTrackMap = std::unordered_map<uint256, BlockRequest, BlockHasher>;
BlockTrackMap m_tracked_blocks GUARDED_BY(cs_block_tracker);
// Block requests waiting for getdata submission
std::list<BlockTrackMap::iterator> m_pending;
void untrack_internal(const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_block_tracker);
public:
/**
* Add block to the tracking list.
* Returns false when block is already tracked or when request is
* already in-flight for 'peer_id'.
*/
bool track(const uint256& block_hash, std::optional<NodeId> peer_id=std::nullopt) EXCLUSIVE_LOCKS_REQUIRED(!cs_block_tracker);
/** Stop tracking the block */
void untrack(const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_block_tracker);
/** Remove request made to peer_id */
void untrack_request(NodeId peer_id, const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_block_tracker);
/**
* Try to pop pending requests and assign them to a certain peer.
* The node must send the getdata request after acquiring the pending request.
*/
enum class ForPendingStatus {
POP, // Mark the current pending request as in-flight.
SKIP, // Skip the current pending request.
STOP, // Stop traversing the pending list.
CANCEL // Untrack the request. Remove it from the tracking list.
};
void for_pending(NodeId peer_id, const std::function<ForPendingStatus(const uint256&)>& check) EXCLUSIVE_LOCKS_REQUIRED(!cs_block_tracker);
};
#endif // BITCOIN_BLOCKREQUESTTRACKER_H

View File

@ -9,6 +9,7 @@
#include <banman.h>
#include <blockencodings.h>
#include <blockfilter.h>
#include <blockrequesttracker.h>
#include <chainparams.h>
#include <consensus/amount.h>
#include <consensus/validation.h>
@ -510,7 +511,7 @@ public:
/** Implement PeerManager */
void StartScheduledTasks(CScheduler& scheduler) override;
void CheckForStaleTipAndEvictPeers() override;
std::optional<std::string> FetchBlock(NodeId peer_id, const CBlockIndex& block_index) override
std::optional<std::string> FetchBlock(std::optional<NodeId> op_peer_id, const CBlockIndex& block_index, bool retry) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
bool IgnoresIncomingTxs() override { return m_opts.ignore_incoming_txs; }
@ -794,6 +795,13 @@ private:
*/
std::map<uint256, std::pair<NodeId, bool>> mapBlockSource GUARDED_BY(cs_main);
/**
* Follow block download progress of manually requested blocks.
* Enabling the "re-try download from another peer" functionality
* when requests fail.
*/
BlockRequestTracker m_block_tracker;
/** Number of peers with wtxid relay. */
std::atomic<int> m_wtxid_relay_peers{0};
@ -1406,6 +1414,33 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co
// Make sure pindexBestKnownBlock is up to date, we'll need it.
ProcessBlockAvailability(peer.m_id);
// Go through the manually requested blocks first
if (!IsLimitedPeer(peer)) {
m_block_tracker.for_pending(peer.m_id, [this, &state, count, &vBlocks](const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) {
// Don't exceed the amount of blocks that we can request at time
if (vBlocks.size() >= count) return BlockRequestTracker::ForPendingStatus::CANCEL;
const CBlockIndex* index = m_chainman.m_blockman.LookupBlockIndex(block_hash);
if (!index) {
// We only request blocks we know about but, just in case, log the error and stop tracking block.
LogError("Pending block %s not found in block index\n", block_hash.ToString());
return BlockRequestTracker::ForPendingStatus::CANCEL;
}
// Check if this peer will be able to provide the block
if (state->pindexBestKnownBlock && state->pindexBestKnownBlock->nHeight >= index->nHeight) {
vBlocks.emplace_back(index);
return BlockRequestTracker::ForPendingStatus::POP;
}
// Continue traversing the requests pending list
return BlockRequestTracker::ForPendingStatus::SKIP;
});
// Don't exceed the amount of blocks that we can request at time
if (vBlocks.size() >= count) return;
}
if (state->pindexBestKnownBlock == nullptr || state->pindexBestKnownBlock->nChainWork < m_chainman.ActiveChain().Tip()->nChainWork || state->pindexBestKnownBlock->nChainWork < m_chainman.MinimumChainWork()) {
// This peer has nothing interesting.
return;
@ -1671,6 +1706,9 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
range.first = mapBlocksInFlight.erase(range.first);
}
}
// Untrack request
m_block_tracker.untrack_request(nodeid, entry.pindex->GetBlockHash());
}
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
@ -1919,10 +1957,21 @@ bool PeerManagerImpl::BlockRequestAllowed(const CBlockIndex* pindex)
(GetBlockProofEquivalentTime(*m_chainman.m_best_header, *pindex, *m_chainman.m_best_header, m_chainparams.GetConsensus()) < STALE_RELAY_AGE_LIMIT);
}
std::optional<std::string> PeerManagerImpl::FetchBlock(NodeId peer_id, const CBlockIndex& block_index)
std::optional<std::string> PeerManagerImpl::FetchBlock(std::optional<NodeId> op_peer_id, const CBlockIndex& block_index, bool retry)
{
if (m_chainman.m_blockman.LoadingBlocks()) return "Loading blocks ...";
// If no peer id was specified, track block. The internal block sync process will
// be in charge of downloading the block.
if (!op_peer_id) {
if (!retry) return "'retry' disabled, cannot perform single block request"; // future: enable one-try requests.
if (!m_block_tracker.track(block_index.GetBlockHash())) return "Already tracked block";
LogPrint(BCLog::NET, "Block added to the tracking list, hash %s\n", block_index.GetBlockHash().ToString());
return std::nullopt;
}
NodeId peer_id = *op_peer_id;
// Ensure this peer exists and hasn't been disconnected
PeerRef peer = GetPeerRef(peer_id);
if (peer == nullptr) return "Peer does not exist";
@ -1930,14 +1979,17 @@ std::optional<std::string> PeerManagerImpl::FetchBlock(NodeId peer_id, const CBl
// Ignore pre-segwit peers
if (!CanServeWitnesses(*peer)) return "Pre-SegWit peer";
LOCK(cs_main);
// Ignore limited peers
if (IsLimitedPeer(*peer) && m_best_height - block_index.nHeight > int{NODE_NETWORK_LIMITED_MIN_BLOCKS}) return "Cannot fetch block from a limited peer";
// Forget about all prior requests
RemoveBlockRequest(block_index.GetBlockHash(), std::nullopt);
LOCK(cs_main);
// Mark block as in-flight
if (!BlockRequested(peer_id, block_index)) return "Already requested from this peer";
// Track block request (only if requested)
if (retry) m_block_tracker.track(block_index.GetBlockHash(), peer_id);
// Construct message to request the block
const uint256& hash{block_index.GetBlockHash()};
std::vector<CInv> invs{CInv(MSG_BLOCK | MSG_WITNESS_FLAG, hash)};
@ -4762,6 +4814,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// need it even when it's not a candidate for a new best tip.
forceProcessing = IsBlockRequested(hash);
RemoveBlockRequest(hash, pfrom.GetId());
m_block_tracker.untrack(hash);
// mapBlockSource is only used for punishing peers and setting
// which peers send us compact blocks, so the race between here and
// cs_main in ProcessNewBlock is fine.

View File

@ -71,11 +71,11 @@ public:
/**
* Attempt to manually fetch block from a given peer. We must already have the header.
*
* @param[in] peer_id The peer id
* @param[in] op_peer_id The peer id (std::nullopt is equivalent to "any peer")
* @param[in] block_index The blockindex
* @returns std::nullopt if a request was successfully made, otherwise an error message
*/
virtual std::optional<std::string> FetchBlock(NodeId peer_id, const CBlockIndex& block_index) = 0;
virtual std::optional<std::string> FetchBlock(std::optional<NodeId> op_peer_id, const CBlockIndex& block_index, bool retry) = 0;
/** Begin running background tasks, should only be called once */
virtual void StartScheduledTasks(CScheduler& scheduler) = 0;

View File

@ -436,7 +436,10 @@ static RPCHelpMan getblockfrompeer()
"Returns an empty JSON object if the request was successfully scheduled.",
{
{"blockhash", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The block hash to try to fetch"},
{"peer_id", RPCArg::Type::NUM, RPCArg::Optional::NO, "The peer to fetch it from (see getpeerinfo for peer IDs)"},
{"peer_id", RPCArg::Type::NUM, RPCArg::Optional::OMITTED, "The peer to fetch it from (see getpeerinfo for peer IDs). "
"If omitted, the node will fetch the block from any available peer."},
{"retry", RPCArg::Type::BOOL, RPCArg::Optional::OMITTED, "Whether to automatically retry to download the block from different peers if the initial request fails or not. "
"If omitted, the node will continuously attempt to download the block from various peers until it succeeds."},
},
RPCResult{RPCResult::Type::OBJ, "", /*optional=*/false, "", {}},
RPCExamples{
@ -450,7 +453,9 @@ static RPCHelpMan getblockfrompeer()
PeerManager& peerman = EnsurePeerman(node);
const uint256& block_hash{ParseHashV(request.params[0], "blockhash")};
const NodeId peer_id{request.params[1].getInt<int64_t>()};
const std::optional<NodeId> peer_id = request.params[1].isNull() ? std::nullopt : std::make_optional(request.params[1].getInt<int64_t>());
const bool retry = !request.params[2].isNull() ? request.params[2].get_bool() : true; // default true
if (!retry && !peer_id) throw JSONRPCError(RPC_INVALID_PARAMETER, "Cannot disable the 'retry' process without specifying a peer from which the block will be downloaded ('peer_id')");
const CBlockIndex* const index = WITH_LOCK(cs_main, return chainman.m_blockman.LookupBlockIndex(block_hash););
@ -469,7 +474,7 @@ static RPCHelpMan getblockfrompeer()
throw JSONRPCError(RPC_MISC_ERROR, "Block already downloaded");
}
if (const auto err{peerman.FetchBlock(peer_id, *index)}) {
if (const auto err{peerman.FetchBlock(peer_id, *index, retry)}) {
throw JSONRPCError(RPC_MISC_ERROR, err.value());
}
return UniValue::VOBJ;

View File

@ -65,6 +65,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "getbalance", 2, "include_watchonly" },
{ "getbalance", 3, "avoid_reuse" },
{ "getblockfrompeer", 1, "peer_id" },
{ "getblockfrompeer", 2, "retry" },
{ "getblockhash", 0, "height" },
{ "waitforblockheight", 0, "height" },
{ "waitforblockheight", 1, "timeout" },

View File

@ -10,6 +10,7 @@ from test_framework.messages import (
from_hex,
msg_headers,
NODE_WITNESS,
NODE_NETWORK_LIMITED
)
from test_framework.p2p import (
P2P_SERVICES,
@ -20,6 +21,7 @@ from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
import time
class GetBlockFromPeerTest(BitcoinTestFramework):
@ -152,6 +154,103 @@ class GetBlockFromPeerTest(BitcoinTestFramework):
assert_equal(pruned_node.pruneblockchain(1000), pruneheight)
assert_raises_rpc_error(-1, "Block not available (pruned data)", pruned_node.getblock, pruned_block)
######################################################
# Test reject fetching old block from a limited peer #
######################################################
self.log.info("Test reject fetching old block from limited peer")
pruned_node.add_p2p_connection(P2PInterface(), services=NODE_NETWORK_LIMITED | NODE_WITNESS)
limited_peer_id = next(peer for peer in pruned_node.getpeerinfo() if peer["servicesnames"] == ['WITNESS', 'NETWORK_LIMITED'])["id"]
pruned_block_10 = self.nodes[0].getblockhash(10)
assert_raises_rpc_error(-1, "Cannot fetch block from a limited peer", pruned_node.getblockfrompeer, pruned_block_10, limited_peer_id)
##############################################################################################
# Verify node managing to fetch block from another peer after first peer fails to deliver it #
##############################################################################################
self.log.info("Try to fetch block from a peer, fail and verify that the node can fetch it from somewhere else")
# Disconnect only "good" peer that can serve blocks
self.disconnect_nodes(0, 2)
# Set mock time and connect two "bad" peers
current_time = int(time.time())
for node in self.nodes:
node.setmocktime(current_time)
# Connect a limited peer
pruned_node.add_p2p_connection(P2PInterface(), services=NODE_NETWORK_LIMITED | NODE_WITNESS)
# Connect second peer that can serve blocks but will not answer to the getdata requests
pruned_node.add_p2p_connection(P2PInterface())
not_responding_peer_id = next(peer for peer in pruned_node.getpeerinfo() if peer["servicesnames"] == ['NETWORK', 'WITNESS'])["id"]
# Try to fetch block
pruned_block_15 = self.nodes[0].getblockhash(15)
result = pruned_node.getblockfrompeer(pruned_block_15, not_responding_peer_id)
assert_equal(result, {})
# Connect other full-nodes nodes
self.connect_nodes(2, 0)
self.connect_nodes(1, 2)
# Move clock above the block request timeout and assert the initial block fetching failed
current_time = current_time + 610
with pruned_node.assert_debug_log([f"Timeout downloading block {pruned_block_15} from peer={not_responding_peer_id}"], timeout=5):
for node in self.nodes:
node.setmocktime(current_time)
# Now verify that the block was requested and received from another peer after the initial failure
self.wait_until(lambda: self.check_for_block(node=2, hash=pruned_block_15), timeout=3)
#######################################
# Test fetching block from "any" peer #
#######################################
self.log.info("Fetch block from \"any\" peer")
# Disconnect only connection that can provide the block
self.disconnect_nodes(0, 2)
self.disconnect_nodes(1, 2)
# Try to fetch the block from "any" peer. When there is no available peer
result = pruned_node.getblockfrompeer(pruned_block_10)
assert_equal(result, {})
# Now connect the full node. The node should automatically request the missing block
self.connect_nodes(0, 2)
self.wait_until(lambda: self.check_for_block(node=2, hash=pruned_block_10), timeout=5)
##############################################################################
# Try to fetch block from certain peer only once, no automatic retry process #
##############################################################################
self.log.info("Try to fetch block from certain peer only once")
# Advance peers time so every peer stay responsive
current_time = current_time + 60
for node in self.nodes:
node.setmocktime(current_time)
# Connect peer that can serve blocks but will not answer to the getdata requests
pruned_node.add_p2p_connection(P2PInterface())
not_responding_peer_id = next(peer for peer in pruned_node.getpeerinfo() if peer["servicesnames"] == ['NETWORK', 'WITNESS'])["id"]
# Also connect full node that can serve the block
self.connect_nodes(1, 2)
# Request block with 'retry=false'
pruned_block_9 = self.nodes[0].getblockhash(9)
result = pruned_node.getblockfrompeer(pruned_block_9, not_responding_peer_id, retry=False)
assert_equal(result, {})
# Move clock above the block request timeout and assert the initial block fetching failed
with pruned_node.assert_debug_log([f"Timeout downloading block {pruned_block_9} from peer={not_responding_peer_id}"]):
for node in self.nodes:
node.setmocktime(current_time + 610)
# Sleep for a bit and verify that the block was not requested to any other peer
time.sleep(3)
self.wait_until(lambda: not self.check_for_block(node=2, hash=pruned_block_9), timeout=3)
if __name__ == '__main__':
GetBlockFromPeerTest().main()