p2p: Cache inbound reconciling peers count

It helps to avoid recomputing every time we consider
a transaction for fanout/reconciliation.
This commit is contained in:
Gleb Naumenko 2024-01-17 11:28:02 +02:00
parent 07f3ad4d94
commit a14dfd9c87
5 changed files with 36 additions and 23 deletions

View File

@ -32,7 +32,7 @@ static void ShouldFanoutTo(benchmark::Bench& bench)
bench.run([&] {
for (NodeId peer = 0; peer < num_peers; ++peer) {
tracker.ShouldFanoutTo(txs[rand() % txs.size()], peer, /*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0);
tracker.ShouldFanoutTo(txs[rand() % txs.size()], peer, /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
}
});
}

View File

@ -1610,7 +1610,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
}
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid, node.IsInboundConn());
m_num_preferred_download_peers -= state->fPreferredDownload;
m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
assert(m_peers_downloading_from >= 0);
@ -3623,7 +3623,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// We could have optimistically pre-registered/registered the peer. In that case,
// we should forget about the reconciliation state here if this wasn't followed
// by WTXIDRELAY (since WTXIDRELAY can't be announced later).
m_txreconciliation->ForgetPeer(pfrom.GetId());
m_txreconciliation->ForgetPeer(pfrom.GetId(), pfrom.IsInboundConn());
}
}

View File

@ -95,6 +95,12 @@ private:
*/
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);
/*
* Keeps track of how many of the registered peers are inbound. Updated on registering or
* forgetting peers.
*/
size_t m_inbounds_count GUARDED_BY(m_txreconciliation_mutex){0};
/*
* A least-recently-added cache tracking which peers we should fanout a transaction to.
*
@ -170,6 +176,9 @@ public:
m_states.erase(recon_state);
m_states.emplace(peer_id, std::move(new_state));
if (is_peer_inbound && m_inbounds_count < std::numeric_limits<size_t>::max()) {
++m_inbounds_count;
}
return ReconciliationRegisterResult::SUCCESS;
}
@ -214,13 +223,20 @@ public:
return peer_state->m_local_set.erase(wtxid_to_remove) > 0;
}
void ForgetPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
void ForgetPeer(NodeId peer_id, bool is_peer_inbound) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
if (m_states.erase(peer_id)) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Forget txreconciliation state of peer=%d\n", peer_id);
const auto peer = m_states.find(peer_id);
if (peer == m_states.end()) return;
const bool registered = std::holds_alternative<TxReconciliationState>(peer->second);
m_states.erase(peer);
if (registered && is_peer_inbound) {
Assert(m_inbounds_count > 0);
--m_inbounds_count;
}
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Forget txreconciliation state of peer=%d\n", peer_id);
}
/**
@ -315,16 +331,9 @@ public:
if (peer_state->m_we_initiate) {
destinations = OUTBOUND_FANOUT_DESTINATIONS - outbounds_fanout_tx_relay;
} else {
const size_t inbound_rcncl_peers = std::count_if(m_states.begin(), m_states.end(),
[](const auto& indexed_state) {
const auto* cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state) return !cur_state->m_we_initiate;
return false;
});
// Since we use the fraction for inbound peers, we first need to compute the total
// number of inbound targets.
const double inbound_targets = (inbounds_fanout_tx_relay + inbound_rcncl_peers) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
const double inbound_targets = (inbounds_fanout_tx_relay + m_inbounds_count) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
destinations = inbound_targets - inbounds_fanout_tx_relay;
}
@ -363,9 +372,9 @@ bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wt
return m_impl->TryRemovingFromSet(peer_id, wtxid_to_remove);
}
void TxReconciliationTracker::ForgetPeer(NodeId peer_id)
void TxReconciliationTracker::ForgetPeer(NodeId peer_id, bool is_peer_inbound)
{
m_impl->ForgetPeer(peer_id);
m_impl->ForgetPeer(peer_id, is_peer_inbound);
}
bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const

View File

@ -92,7 +92,7 @@ public:
* Attempts to forget txreconciliation-related state of the peer (if we previously stored any).
* After this, we won't be able to reconcile transactions with the peer.
*/
void ForgetPeer(NodeId peer_id);
void ForgetPeer(NodeId peer_id, bool is_peer_inbound);
/**
* Check if a peer is registered to reconcile transactions with us.

View File

@ -53,9 +53,13 @@ BOOST_AUTO_TEST_CASE(ForgetPeerTest)
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
// Removing peer which is not there works.
tracker.ForgetPeer(peer_id0, true);
tracker.ForgetPeer(peer_id0, false);
// Removing peer after pre-registring works and does not let to register the peer.
tracker.PreRegisterPeer(peer_id0);
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_CHECK_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::NOT_FOUND);
// Removing peer after it is registered works.
@ -63,7 +67,7 @@ BOOST_AUTO_TEST_CASE(ForgetPeerTest)
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
}
@ -80,7 +84,7 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
}
@ -102,7 +106,7 @@ BOOST_AUTO_TEST_CASE(AddToSetTest)
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())};
BOOST_REQUIRE(!tracker.AddToSet(peer_id0, wtxid2));
@ -138,7 +142,7 @@ BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest)
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
}
@ -178,7 +182,7 @@ BOOST_AUTO_TEST_CASE(ShouldFanoutToTest)
/*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/1));
}
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, false);
// A forgotten (reconciliation-wise) peer should be always selected for fanout again.
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(tracker.ShouldFanoutTo(Wtxid::FromUint256(frc.rand256()), peer_id0,