Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Reduce bandwidth over the VC<>BN API using dependant roots #4170

Closed
wants to merge 11 commits into from
Closed
209 changes: 140 additions & 69 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use crate::{
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore},
};
use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// flag in the cli to enable collection of per validator metrics.
const VALIDATOR_METRICS_MIN_COUNT: usize = 64;

/// The number of validators to request duty information for in the initial request.
/// The initial request is used to determine if further requests are required, so that it
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;

#[derive(Debug)]
pub enum Error {
UnableToReadSlotClock,
Expand Down Expand Up @@ -674,84 +682,74 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
&[metrics::UPDATE_ATTESTERS_FETCH],
);

let response = duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, local_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?;

drop(fetch_timer);
let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE],
);
// Request duties for either `INITIAL_DUTIES_QUERY_SIZE` validators or the count of validators for which we
// don't already know their duties for that epoch, whichever subset is larger. We use the `dependent_root`
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
// in the response to determine whether validator duties need to be updated. This is to ensure that we don't
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let indices_to_request = if uninitialized_validators.len() >= INITIAL_DUTIES_QUERY_SIZE {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the (hypothetical) case where INITIAL_DUTIES_QUERY_SIZE == 2 and uninitialized_validators == 1 I think we could fail to initialize that one uninitialized validator in the first call.

To avoid this, I think we'd want something like:

if !uninitialized_validators.is_empty() {
  uninitalized_validators
} else {
  &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
}

My suggestion removes the guarantee that we'll always request INITIAL_DUTIES_QUERY_SIZE validators, but it does ensure that we always query for all uninitialized validators. I think it's probably OK to sometimes query for less than INITIAL_DUTIES_QUERY_SIZE, especially since we current have it set to 1.

uninitialized_validators.as_slice()
} else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
};

let response =
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
let dependent_root = response.dependent_root;

// Filter any duties that are not relevant or already known.
let new_duties = {
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
let validators_to_update: Vec<_> = {
// Avoid holding the read-lock for any longer than required.
let attesters = duties_service.attesters.read();
response
local_pubkeys
.iter()
.filter(|pubkey| {
attesters.get(pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
})
.collect::<Vec<_>>()
};

if validators_to_update.is_empty() {
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
drop(fetch_timer);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make sure we run update_per_validator_duty_metrics so that we start reporting the next-epoch duties as the current_slot progresses past the current-epoch slot.

Since we've changed update_per_validator_duty_metrics, perhaps it makes sense to hoist it up into poll_beacon_attesters (perhaps after each call to poll_beacon_attesters_for_epoch in here)? That way we can exit this whenever we like and still be confident that update_per_validator_duty_metrics is being called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch 🙏 , I've moved this to where you suggested!

}

// Filter out validators which have already been requested.
let attesters_in_response: Vec<_> = response.data.iter().map(|duty| duty.pubkey).collect();
let indices_to_request = validators_to_update
.iter()
.filter(|pubkey| !attesters_in_response.contains(pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>();
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved

let new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
.into_iter()
.filter(|duty| {
if duties_service.per_validator_metrics() {
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());

// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}

local_pubkeys.contains(&duty.pubkey) && {
// Only update the duties if either is true:
//
// - There were no known duties for this epoch.
// - The dependent root has changed, signalling a re-org.
attesters.get(&duty.pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
}
})
.chain(response.data)
.collect::<Vec<_>>()
} else {
response.data
};

drop(fetch_timer);

let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE],
);

if duties_service.per_validator_metrics() {
update_per_validator_duty_metrics::<E>(current_slot, &new_duties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we were running this function for all duties and now we're only running it for new duties. I think that might pose a problem where we don't update the metrics to show the next epoch duties, once that next epoch arrives (assuming they haven't changed since the previous epoch).

I think we could solve this by iterating over duties_service.attesters instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, didn't think about this scenario! Will push a fix.

}

debug!(
log,
"Downloaded attester duties";
Expand Down Expand Up @@ -799,6 +797,79 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
Ok(())
}

/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.map_or(true, |duties| !duties.contains_key(epoch))
})
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>()
}

fn update_per_validator_duty_metrics<E: EthSpec>(current_slot: Slot, new_duties: &[AttesterData]) {
new_duties.iter().for_each(|duty| {
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());

// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
});
}

async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, validator_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}

/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
Expand Down