Skip to content
139 changes: 123 additions & 16 deletions bin/ethlambda/src/checkpoint_sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::Duration;

use ethlambda_types::block::SignedBlock;
use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::state::{State, Validator};
use ethlambda_types::state::{State, Validator, anchor_pair_is_consistent};
use libssz::{DecodeError, SszDecode};
use reqwest::Client;

Expand All @@ -13,6 +14,12 @@ const CHECKPOINT_CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
/// This is an inactivity timeout - it resets on each successful read.
const CHECKPOINT_READ_TIMEOUT: Duration = Duration::from_secs(15);

/// Path of the finalized-state endpoint (relative to the peer's API base URL).
const FINALIZED_STATE_PATH: &str = "/lean/v0/states/finalized";

/// Path of the finalized-block endpoint (relative to the peer's API base URL).
const FINALIZED_BLOCK_PATH: &str = "/lean/v0/blocks/finalized";

#[derive(Debug, thiserror::Error)]
pub enum CheckpointSyncError {
#[error("HTTP request failed: {0}")]
Expand Down Expand Up @@ -49,9 +56,11 @@ pub enum CheckpointSyncError {
BlockHeaderFinalizedRootMismatch,
#[error("block header at justified slot must match justified root")]
BlockHeaderJustifiedRootMismatch,
#[error("anchor block does not match anchor state")]
AnchorPairingMismatch,
}

/// Fetch finalized state from checkpoint sync URL.
/// Build the HTTP client used for checkpoint sync fetches.
///
/// Uses two-phase timeout strategy:
/// - Connect timeout (15s): Fails quickly if peer is unreachable
Expand All @@ -63,33 +72,101 @@ pub enum CheckpointSyncError {
/// failing fast if the connection stalls. A plain total timeout would
/// disconnect even for valid downloads if the state is simply too large to
/// transfer within the time limit.
pub async fn fetch_checkpoint_state(
url: &str,
expected_genesis_time: u64,
expected_validators: &[Validator],
) -> Result<State, CheckpointSyncError> {
// Use .read_timeout() to detect stalled downloads (inactivity timer).
// This allows large states to complete as long as data keeps flowing.
let client = Client::builder()
fn build_client() -> Result<Client, CheckpointSyncError> {
Ok(Client::builder()
.connect_timeout(CHECKPOINT_CONNECT_TIMEOUT)
.read_timeout(CHECKPOINT_READ_TIMEOUT)
.build()?;
.build()?)
}

let response = client
/// Fetch and SSZ-decode an `application/octet-stream` body from `url`.
async fn fetch_ssz<T: SszDecode>(client: &Client, url: &str) -> Result<T, CheckpointSyncError> {
let bytes = client
.get(url)
.header("Accept", "application/octet-stream")
.send()
.await?
.error_for_status()?;
.error_for_status()?
.bytes()
.await?;

let bytes = response.bytes().await?;
let state = State::from_ssz_bytes(&bytes).map_err(CheckpointSyncError::SszDecode)?;
T::from_ssz_bytes(&bytes).map_err(CheckpointSyncError::SszDecode)
}

verify_checkpoint_state(&state, expected_genesis_time, expected_validators)?;
/// Normalize a checkpoint-sync URL to a base URL.
///
/// Operators historically pass the full state URL (e.g.
/// `http://peer:5052/lean/v0/states/finalized`) via `--checkpoint-sync-url`.
/// The new contract is a base URL (`http://peer:5052`) so we can derive both
/// the state and block endpoints. To avoid breaking existing devnet scripts,
/// strip a trailing legacy path if present and also trim any trailing slash.
// TODO: remove this and use the full URL
fn normalize_base_url(url: &str) -> &str {
// Trim trailing slashes FIRST so that the legacy-suffix strip succeeds on
// inputs like `…/lean/v0/states/finalized/`; otherwise we'd leave the
// state path embedded in the "base URL" and double-prefix every request.
let trimmed = url.trim_end_matches('/');
trimmed
.strip_suffix(FINALIZED_STATE_PATH)
.unwrap_or(trimmed)
}
Comment thread
MegaRedHand marked this conversation as resolved.

/// Fetch the finalized state from a checkpoint peer and verify it
/// against the local genesis configuration.
async fn fetch_finalized_state(
client: &Client,
base_url: &str,
expected_genesis_time: u64,
expected_validators: &[Validator],
) -> Result<State, CheckpointSyncError> {
let url = format!("{base_url}{FINALIZED_STATE_PATH}");
let state: State = fetch_ssz(client, &url).await?;
verify_checkpoint_state(&state, expected_genesis_time, expected_validators)?;
Ok(state)
}

/// Fetch the finalized signed block from a checkpoint peer.
async fn fetch_finalized_block(
client: &Client,
base_url: &str,
) -> Result<SignedBlock, CheckpointSyncError> {
let url = format!("{base_url}{FINALIZED_BLOCK_PATH}");
fetch_ssz(client, &url).await
}

/// Fetch the finalized state and signed block in parallel and verify they pair.
///
/// If the peer advances finalization between the two requests the pairing will
/// not hold; the caller is expected to retry.
pub async fn fetch_finalized_anchor(
url: &str,
expected_genesis_time: u64,
expected_validators: &[Validator],
) -> Result<(State, SignedBlock), CheckpointSyncError> {
let base_url = normalize_base_url(url);
let client = build_client()?;

// Issue both fetches concurrently; either failure cancels the pair.
let (mut state, signed_block) = tokio::try_join!(
fetch_finalized_state(
&client,
base_url,
expected_genesis_time,
expected_validators
),
fetch_finalized_block(&client, base_url),
)?;

// Strictly mirrors the invariants `Store::get_forkchoice_store` asserts —
// header equality, state self-consistency, and `block.state_root` equal
// to the canonical tree-hash root of the state.
if !anchor_pair_is_consistent(&mut state, &signed_block.message) {
return Err(CheckpointSyncError::AnchorPairingMismatch);
}

Ok((state, signed_block))
}

/// Verify checkpoint state is structurally valid.
///
/// Arguments:
Expand Down Expand Up @@ -417,4 +494,34 @@ mod tests {
state.latest_justified.root = H256::from([99u8; 32]); // Wrong root
assert!(verify_checkpoint_state(&state, 1000, &validators).is_err());
}

// --- normalize_base_url ---

#[test]
fn normalize_strips_legacy_state_path() {
assert_eq!(
normalize_base_url("http://peer:5052/lean/v0/states/finalized"),
"http://peer:5052"
);
}

#[test]
fn normalize_passes_through_base_url() {
assert_eq!(normalize_base_url("http://peer:5052"), "http://peer:5052");
}

#[test]
fn normalize_strips_trailing_slash() {
assert_eq!(normalize_base_url("http://peer:5052/"), "http://peer:5052");
}

#[test]
fn normalize_strips_legacy_state_path_with_trailing_slash() {
// Regression: a trailing slash on the legacy path used to defeat
// strip_suffix, leaving the path embedded in the "base URL".
assert_eq!(
normalize_base_url("http://peer:5052/lean/v0/states/finalized/"),
"http://peer:5052"
);
}
}
67 changes: 55 additions & 12 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use clap::Parser;
use ethlambda_blockchain::key_manager::ValidatorKeyPair;
use ethlambda_network_api::{InitBlockChain, InitP2P, ToBlockChainToP2PRef, ToP2PToBlockChainRef};
use ethlambda_p2p::{Bootnode, P2P, PeerId, SwarmConfig, build_swarm, parse_enrs};
use ethlambda_types::primitives::H256;
use ethlambda_types::primitives::{H256, HashTreeRoot as _};
use ethlambda_types::{
aggregator::AggregatorController,
genesis::GenesisConfig,
Expand Down Expand Up @@ -76,8 +76,12 @@ struct CliOptions {
/// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0")
#[arg(long)]
node_id: String,
/// URL to download checkpoint state from (e.g., http://peer:5052/lean/v0/states/finalized)
/// When set, skips genesis initialization and syncs from checkpoint.
/// Base URL of a checkpoint-sync peer's API server (e.g., http://peer:5052).
/// When set, skips genesis initialization and fetches the finalized state
/// and block from the peer's `/lean/v0/states/finalized` and
/// `/lean/v0/blocks/finalized` endpoints. For backward compatibility, a
/// URL ending in `/lean/v0/states/finalized` is accepted and the trailing
/// path is stripped.
#[arg(long)]
checkpoint_sync_url: Option<String>,
/// Whether this node acts as a committee aggregator.
Expand Down Expand Up @@ -557,14 +561,19 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
/// Fetch the initial state for the node.
///
/// If `checkpoint_url` is provided, performs checkpoint sync by downloading
/// and verifying the finalized state from a remote peer. Otherwise, creates
/// a genesis state from the local genesis configuration.
/// and verifying the finalized state AND signed block in parallel from a
/// remote peer. Otherwise, creates a genesis state from the local genesis
/// configuration.
///
/// Fetching the matching signed block lets the local store serve a valid
/// anchor via the `BlocksByRoot` req-resp protocol; without it, peers
/// requesting the anchor would receive a synthetic block whose hash differs
/// from `latest_finalized.root` and would score-penalize us.
///
/// # Arguments
///
/// * `checkpoint_url` - Optional URL to fetch checkpoint state from
/// * `checkpoint_url` - Optional base URL to a peer's API server
/// * `genesis` - Genesis configuration (for genesis_time verification and genesis state creation)
/// * `validators` - Validator set (moved for genesis state creation)
/// * `backend` - Storage backend for Store creation
///
/// # Returns
Expand All @@ -587,19 +596,53 @@ async fn fetch_initial_state(
// Checkpoint sync path
info!(%checkpoint_url, "Starting checkpoint sync");

let state =
checkpoint_sync::fetch_checkpoint_state(checkpoint_url, genesis.genesis_time, &validators)
.await?;
// The state and block are fetched in parallel; if the peer advances
// finalization between the two requests the pair won't match. Retry a
// small number of times so this transient race doesn't fail node startup.
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

let mut attempt = 1;
let (state, signed_block) = loop {
match checkpoint_sync::fetch_finalized_anchor(
checkpoint_url,
genesis.genesis_time,
&validators,
)
.await
{
Ok(pair) => break pair,
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
}
};

info!(
slot = state.slot,
validators = state.validators.len(),
finalized_slot = state.latest_finalized.slot,
anchor_block_slot = signed_block.message.slot,
"Checkpoint sync complete"
);

// Store the anchor state and header, without body
Ok(Store::from_anchor_state(backend, state))
// Initialize the store from state + anchor block body, then persist the
// signatures so we can serve the anchor on BlocksByRoot. `insert_signed_block`
// overlaps with what `get_forkchoice_store` already wrote, but it's
// idempotent and the only path that also stores `BlockSignatures`.
let anchor_root = signed_block.message.header().hash_tree_root();
let mut store = Store::get_forkchoice_store(backend, state, signed_block.message.clone());
store.insert_signed_block(anchor_root, signed_block);
Ok(store)
}

#[cfg(test)]
Expand Down
14 changes: 4 additions & 10 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1610,17 +1610,11 @@ mod tests {
use std::sync::Arc;

let genesis_state = State::from_genesis(1000, vec![]);
let genesis_block = Block {
slot: 0,
proposer_index: 0,
parent_root: H256::ZERO,
state_root: H256::ZERO,
body: BlockBody {
attestations: AggregatedAttestations::default(),
},
};
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store::get_forkchoice_store(backend, genesis_state, genesis_block);
// Use `from_anchor_state` here rather than `get_forkchoice_store`:
// the latter now enforces `block.state_root == hash_tree_root(state)`,
// which a synthetic genesis block with zero state_root cannot satisfy.
let mut store = Store::from_anchor_state(backend, genesis_state);

let head_root = store.head();
let att_data = AttestationData {
Expand Down
32 changes: 29 additions & 3 deletions crates/blockchain/tests/forkchoice_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ethlambda_types::{
attestation::{AttestationData, SignedAggregatedAttestation, SignedAttestation},
block::{AggregatedSignatureProof, Block},
primitives::{ByteList, H256, HashTreeRoot as _},
state::State,
state::{State, anchor_pair_is_consistent},
};

use ethlambda_test_fixtures::fork_choice::{AttestationCheck, ForkChoiceTestVector, StoreChecks};
Expand Down Expand Up @@ -39,10 +39,36 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
}
println!("Running test: {}", name);

// Initialize store from anchor state/block
let anchor_state: State = test.anchor_state.into();
// Initialize store from anchor state/block.
//
// Fixtures whose `steps` is empty are "anchor rejection" cases (e.g.
// `test_store_from_anchor_rejects_mismatched_state_root`): they assert
// that init refuses an inconsistent (state, block) pair. We detect that
// up front with the non-panicking helper instead of letting
// `get_forkchoice_store`'s assert! panic out of the test harness.
let mut anchor_state: State = test.anchor_state.into();
let anchor_block: Block = test.anchor_block.into();
let genesis_time = anchor_state.config.genesis_time;

let pair_ok = anchor_pair_is_consistent(&mut anchor_state, &anchor_block);
if test.steps.is_empty() {
if pair_ok {
return Err(format!(
"Fixture '{name}' has no steps (expects anchor rejection) \
but the (state, block) pair is consistent"
)
.into());
}
continue;
}
if !pair_ok {
return Err(format!(
"Fixture '{name}' has steps (expects anchor acceptance) \
but the (state, block) pair is inconsistent"
)
.into());
}

let backend = Arc::new(InMemoryBackend::new());
let mut store = Store::get_forkchoice_store(backend, anchor_state, anchor_block);

Expand Down
Loading