Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

litep2p: Provide partial results to speedup GetRecord queries #7099

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.8.4", features = ["websocket"] }
litep2p = { git = "https://github.com/paritytech/litep2p.git", branch = "lexnv/kad-found-records", features = ["websocket"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will do one soon, wanted to double-check first on our Kusama validator that this indeed solves the problem 🙏

log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_7099.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Provide partial results to speedup GetRecord queries

doc:
- audience: Node Dev
description: |
This PR provides the partial results of the GetRecord kademlia query.

This significantly improves the authority discovery records, from ~37 minutes to ~2/3 minutes.
In contrast, libp2p discovers authority records in around ~10 minutes.

The authority discovery was slow because litep2p provided the records only after the Kademlia query was completed. A normal Kademlia query completes in around 40 seconds to a few minutes.
In this PR, partial records are provided as soon as they are discovered from the network.

crates:
- name: sc-network
bump: patch
33 changes: 25 additions & 8 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use litep2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
Quorum, Record, RecordKey,
},
ping::{Config as PingConfig, PingEvent},
},
Expand Down Expand Up @@ -129,13 +129,19 @@ pub enum DiscoveryEvent {
address: Multiaddr,
},

/// Record was found from the DHT.
/// `GetRecord` query succeeded.
GetRecordSuccess {
/// Query ID.
query_id: QueryId,
},

/// Records.
records: RecordsType,
/// Record was found from the DHT.
GetRecordPartialResult {
/// Query ID.
query_id: QueryId,

/// Record.
record: PeerRecord,
},

/// Record was successfully stored on the DHT.
Expand Down Expand Up @@ -573,13 +579,24 @@ impl Stream for Discovery {
peers: peers.into_iter().collect(),
}))
},
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
"`GET_RECORD` succeeded for {query_id:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
},
Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
query_id,
record,
}));
},
Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Expand Down
87 changes: 35 additions & 52 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use litep2p::{
protocol::{
libp2p::{
bitswap::Config as BitswapConfig,
kademlia::{QueryId, Record, RecordsType},
kademlia::{QueryId, Record},
},
request_response::ConfigBuilder as RequestResponseConfigBuilder,
},
Expand Down Expand Up @@ -836,23 +836,45 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.peerstore_handle.add_known_peer(peer.into());
}
}
Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
if !self.pending_queries.contains_key(&query_id) {
log::error!(
target: LOG_TARGET,
"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
);

continue
}

let peer_id: sc_network_types::PeerId = record.peer.into();
let record = PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
};

self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record.into()
)
)
);
}
Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
match self.pending_queries.remove(&query_id) {
Some(KadQuery::GetValue(key, started)) => {
log::trace!(
target: LOG_TARGET,
"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
key,
"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
);
for record in litep2p_to_libp2p_peer_record(records) {
self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record.into()
)
)
);
}

if let Some(ref metrics) = self.metrics {
metrics
Expand Down Expand Up @@ -1165,42 +1187,3 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
}

// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord.
fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
match records {
litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
vec![PeerRecord {
record: P2PRecord {
key: record.key.to_vec().into(),
value: record.value,
publisher: record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.expires,
},
peer: None,
}]
},
litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
.into_iter()
.map(|record| {
let peer_id: sc_network_types::PeerId = record.peer.into();

PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
}
})
.collect::<Vec<_>>(),
}
}
Loading