diff --git a/CLAUDE.md b/CLAUDE.md index c1c58d95..49d8409c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -344,6 +344,13 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads - The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks - **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip +### Runtime Aggregator Toggle (Hot-Standby Model) +- `POST /lean/v0/admin/aggregator` with `{"enabled": bool}` toggles the aggregator role at runtime without restart (ported from leanSpec PR #636) +- `GET /lean/v0/admin/aggregator` returns `{"is_aggregator": bool}` +- The CLI `--is-aggregator` flag **seeds** the initial value; runtime toggles are in-process only (not persisted across restarts) +- Runtime toggles do NOT resubscribe gossip subnets — those are frozen at startup by `build_swarm`. Toggling ON at runtime only activates aggregation logic for subnets the node was already subscribed to +- **Operational model**: standby aggregators should boot with `--is-aggregator=true` (so subscriptions are in place), then use the admin endpoint to rotate duties. A node booted with `--is-aggregator=false` and toggled ON later will have no extra subnets to aggregate + ### Signature Verification - Fork choice tests use `on_block_without_verification()` to skip signature checks - Signature spec tests use `on_block()` which always verifies diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index e89b6990..3c3f816c 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -23,6 +23,7 @@ use ethlambda_network_api::{InitBlockChain, InitP2P, ToBlockChainToP2PRef, ToP2P use ethlambda_p2p::{Bootnode, P2P, SwarmConfig, build_swarm, parse_enrs}; use ethlambda_types::primitives::H256; use ethlambda_types::{ + aggregator::AggregatorController, genesis::GenesisConfig, signature::ValidatorSecretKey, state::{State, ValidatorPubkeyBytes}, @@ -64,7 +65,15 @@ struct CliOptions { /// When set, skips genesis initialization and syncs from checkpoint. #[arg(long)] checkpoint_sync_url: Option, - /// Whether this node acts as a committee aggregator + /// Whether this node acts as a committee aggregator. + /// + /// Seeds the initial value of the live aggregator flag shared by the + /// blockchain actor and the admin API. The flag can be toggled at + /// runtime via `POST /lean/v0/admin/aggregator`. Runtime toggles do + /// NOT persist across restarts and do NOT update gossip subnet + /// subscriptions, which are frozen at startup — standby aggregators + /// should boot with this flag enabled to establish subscriptions, then + /// use the admin endpoint to rotate duties (hot-standby model). #[arg(long, default_value = "false")] is_aggregator: bool, /// Number of attestation committees (subnets) per slot @@ -154,8 +163,19 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Failed to initialize state"))?; let validator_ids: Vec = validator_keys.keys().copied().collect(); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator); + // Shared, runtime-mutable aggregator flag. Seeded from the CLI and + // threaded into both the blockchain actor (which reads on every tick) + // and the API server (which exposes GET/POST admin endpoints). + let aggregator = AggregatorController::new(options.is_aggregator); + + let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + + // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the + // AggregatorController — subnet subscriptions are decided once here and + // are not re-evaluated at runtime. Toggling via the admin API affects + // aggregation logic but not the gossip mesh. See crates/net/p2p/src/lib.rs + // for the invariant. let built = build_swarm(SwarmConfig { node_key: node_p2p_key, bootnodes, @@ -190,7 +210,7 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Metrics server failed")); }); tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store) + let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) .await .inspect_err(|err| error!(%err, "API server failed")); }); diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 6423e513..77c98826 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -6,6 +6,7 @@ use ethlambda_state_transition::is_proposer; use ethlambda_storage::{ALL_TABLES, Store}; use ethlambda_types::{ ShortRoot, + aggregator::AggregatorController, attestation::{SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, SignedBlock}, primitives::{H256, HashTreeRoot as _}, @@ -50,9 +51,9 @@ impl BlockChain { pub fn spawn( store: Store, validator_keys: HashMap, - is_aggregator: bool, + aggregator: AggregatorController, ) -> BlockChain { - metrics::set_is_aggregator(is_aggregator); + metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); @@ -61,7 +62,7 @@ impl BlockChain { p2p: None, key_manager, pending_blocks: HashMap::new(), - is_aggregator, + aggregator, pending_block_parents: HashMap::new(), current_aggregation: None, } @@ -104,7 +105,11 @@ pub struct BlockChainServer { pending_block_parents: HashMap, /// Whether this node acts as a committee aggregator. - is_aggregator: bool, + /// + /// Read fresh on every tick and gossip event so runtime toggles via the + /// admin API take effect without a restart. Seeded from the CLI + /// `--is-aggregator` flag at spawn. + aggregator: AggregatorController, /// In-flight committee-signature aggregation, if any. Present only while a /// worker started at the most recent interval 2 is still running or until @@ -131,6 +136,13 @@ impl BlockChainServer { // Update current slot metric metrics::update_current_slot(slot); + // Snapshot the aggregator flag once per tick so all read sites within + // the tick see a consistent value even if the admin API toggles it + // mid-tick. Mirror it to the gauge from the actor side so + // `lean_is_aggregator` reflects the value the actor is acting on. + let is_aggregator = self.aggregator.is_enabled(); + metrics::set_is_aggregator(is_aggregator); + // At interval 0, check if we will propose (but don't build the block yet). // Tick forkchoice first to accept attestations, then build the block // using the freshly-accepted attestations. @@ -145,7 +157,7 @@ impl BlockChainServer { proposer_validator_id.is_some(), ); - if interval == 2 && self.is_aggregator { + if interval == 2 && is_aggregator { self.start_aggregation_session(slot, ctx).await; } @@ -154,9 +166,11 @@ impl BlockChainServer { self.propose_block(slot, validator_id); } - // Produce attestations at interval 1 (all validators including proposer) + // Produce attestations at interval 1 (all validators including proposer). + // Reuse the same snapshot so self-delivery decisions match the rest + // of the tick. if interval == 1 { - self.produce_attestations(slot); + self.produce_attestations(slot, is_aggregator); } // Update safe target slot metric (updated by store.on_tick at interval 3) @@ -232,7 +246,7 @@ impl BlockChainServer { .find(|&vid| is_proposer(vid, slot, num_validators)) } - fn produce_attestations(&mut self, slot: u64) { + fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) { let _timing = metrics::time_attestations_production(); // Produce attestation data once for all validators @@ -262,7 +276,7 @@ impl BlockChainServer { // Gossipsub does not deliver messages back to the sender, so without // this the aggregator never sees its own validator's signature in // gossip_signatures and it is excluded from aggregated proofs. - if self.is_aggregator { + if is_aggregator { let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true) .inspect_err(|err| { warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed") @@ -536,7 +550,11 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) { - let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator) + // Read fresh here too: a gossip event can arrive between ticks, and + // if the admin API just toggled, the first gossip after the toggle + // should already use the new value. + let is_aggregator = self.aggregator.is_enabled(); + let _ = store::on_gossip_attestation(&mut self.store, attestation, is_aggregator) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } diff --git a/crates/common/types/src/aggregator.rs b/crates/common/types/src/aggregator.rs new file mode 100644 index 00000000..b003fb7c --- /dev/null +++ b/crates/common/types/src/aggregator.rs @@ -0,0 +1,26 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Shared, runtime-mutable aggregator role flag. +#[derive(Clone, Debug)] +pub struct AggregatorController { + flag: Arc, +} + +impl AggregatorController { + /// Construct a controller seeded with the CLI `--is-aggregator` value. + pub fn new(initial: bool) -> Self { + Self { + flag: Arc::new(AtomicBool::new(initial)), + } + } + + pub fn is_enabled(&self) -> bool { + self.flag.load(Ordering::Relaxed) + } + + /// Update the role and return the previous value. + pub fn set_enabled(&self, enabled: bool) -> bool { + self.flag.swap(enabled, Ordering::Relaxed) + } +} diff --git a/crates/common/types/src/lib.rs b/crates/common/types/src/lib.rs index 6f9b28b9..aa180c98 100644 --- a/crates/common/types/src/lib.rs +++ b/crates/common/types/src/lib.rs @@ -1,3 +1,4 @@ +pub mod aggregator; pub mod attestation; pub mod block; pub mod checkpoint; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index e5dcff84..8f2e4a6f 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -74,6 +74,15 @@ pub(crate) struct Behaviour { } /// Configuration for building the libp2p swarm. +/// +/// INVARIANT: `is_aggregator` is consumed once during [`build_swarm`] to decide +/// subnet subscriptions and is NOT stored on [`P2PServer`]. Runtime toggles +/// of the aggregator role via the admin API (see +/// [`ethlambda_types::aggregator::AggregatorController`]) intentionally do +/// not resubscribe gossip subnets — this is the leanSpec PR #636 scope +/// limitation ("hot-standby model"). If a runtime reader is ever added on +/// the P2P side, it must consult the shared `AggregatorController` instead +/// of a bool captured here, or the runtime toggle will silently diverge. pub struct SwarmConfig { pub node_key: Vec, pub bootnodes: Vec, diff --git a/crates/net/rpc/src/admin.rs b/crates/net/rpc/src/admin.rs new file mode 100644 index 00000000..43804d02 --- /dev/null +++ b/crates/net/rpc/src/admin.rs @@ -0,0 +1,251 @@ +//! Admin endpoints for runtime-toggleable node roles. +//! +//! Ported from leanSpec PR #636. The POST handler strictly rejects non-boolean +//! values (including JSON integers 0/1) to match the spec's semantics. +//! +//! # Scope +//! +//! Toggling the aggregator flag at runtime does **not** change gossip subnet +//! subscriptions, which are frozen at startup. For full parity with the CLI +//! `--is-aggregator` flag, a standby node must boot with the flag enabled so +//! that subscriptions are in place, then use this endpoint to disable/enable +//! the role (hot-standby model). See leanSpec PR #636 for the full rationale. + +use axum::{ + Extension, Json, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use ethlambda_types::aggregator::AggregatorController; +use serde::Serialize; +use serde_json::Value; +use tracing::info; + +use crate::json_response; + +#[derive(Serialize)] +struct StatusResponse { + is_aggregator: bool, +} + +#[derive(Serialize)] +struct ToggleResponse { + is_aggregator: bool, + previous: bool, +} + +/// GET /lean/v0/admin/aggregator — returns current aggregator role. +/// +/// Returns 503 when the controller is not wired. Kept for spec parity with +/// leanSpec, even though in ethlambda the controller is always wired when +/// the API server is started via `main.rs`. +/// +/// The `Option>` wrapping makes the extractor infallible: a bare +/// `Extension` would cause axum to short-circuit with a 500 when the +/// extension is missing, whereas `Option` yields `None` and lets us return +/// a clean 503 with a useful message. +pub async fn get_aggregator(controller: Option>) -> Response { + match controller { + Some(Extension(controller)) => json_response(StatusResponse { + is_aggregator: controller.is_enabled(), + }), + None => service_unavailable("Aggregator controller not available"), + } +} + +/// POST /lean/v0/admin/aggregator — toggles aggregator role at runtime. +/// +/// Body: `{"enabled": bool}`. Returns `{"is_aggregator": , "previous": }`. +/// 400 on missing/invalid body, 503 when the controller is not wired. +/// +/// The `Option>` wrapping makes the extractor infallible: a bare +/// `Extension` would cause axum to short-circuit with a 500 when the +/// extension is missing, whereas `Option` yields `None` and lets us return +/// a clean 503 with a useful message. +pub async fn post_aggregator( + controller: Option>, + body: Option>, +) -> Response { + let Some(Extension(controller)) = controller else { + return service_unavailable("Aggregator controller not available"); + }; + + // Parsing happens through `Option>` so we can distinguish + // "no body / malformed JSON" (None) from "valid JSON with wrong shape". + let Some(Json(payload)) = body else { + return bad_request("Invalid or missing JSON body"); + }; + + let Some(enabled_value) = payload.get("enabled") else { + return bad_request("Missing 'enabled' field in body"); + }; + + let Some(enabled) = enabled_value.as_bool() else { + return bad_request("'enabled' must be a boolean"); + }; + + let previous = controller.set_enabled(enabled); + if previous != enabled { + info!(enabled, previous, "Aggregator role toggled via admin API"); + } + + json_response(ToggleResponse { + is_aggregator: enabled, + previous, + }) +} + +fn bad_request(reason: &'static str) -> Response { + (StatusCode::BAD_REQUEST, reason).into_response() +} + +fn service_unavailable(reason: &'static str) -> Response { + (StatusCode::SERVICE_UNAVAILABLE, reason).into_response() +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::Body; + use axum::http::{Method, Request, StatusCode}; + use axum::routing::get; + use axum::{Extension, Router}; + use http_body_util::BodyExt; + use tower::ServiceExt; + + fn router(controller: Option) -> Router { + let mut router = Router::new().route( + "/lean/v0/admin/aggregator", + get(get_aggregator).post(post_aggregator), + ); + if let Some(controller) = controller { + router = router.layer(Extension(controller)); + } + router + } + + async fn body_json(resp: Response) -> Value { + let body = resp.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&body).unwrap() + } + + #[tokio::test] + async fn get_returns_current_state() { + let controller = AggregatorController::new(true); + let resp = router(Some(controller)) + .oneshot( + Request::builder() + .uri("/lean/v0/admin/aggregator") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": true}) + ); + } + + #[tokio::test] + async fn get_returns_503_without_controller() { + let resp = router(None) + .oneshot( + Request::builder() + .uri("/lean/v0/admin/aggregator") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); + } + + async fn post(controller: Option, body: &str) -> Response { + router(controller) + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/lean/v0/admin/aggregator") + .header("content-type", "application/json") + .body(Body::from(body.to_string())) + .unwrap(), + ) + .await + .unwrap() + } + + #[tokio::test] + async fn post_activates_and_returns_previous() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller.clone()), r#"{"enabled": true}"#).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": true, "previous": false}), + ); + assert!(controller.is_enabled()); + } + + #[tokio::test] + async fn post_deactivates_and_returns_previous() { + let controller = AggregatorController::new(true); + let resp = post(Some(controller.clone()), r#"{"enabled": false}"#).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": false, "previous": true}), + ); + assert!(!controller.is_enabled()); + } + + #[tokio::test] + async fn post_noop_when_value_matches_state() { + let controller = AggregatorController::new(true); + let _ = post(Some(controller.clone()), r#"{"enabled": true}"#).await; + let resp = post(Some(controller.clone()), r#"{"enabled": true}"#).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": true, "previous": true}), + ); + } + + #[tokio::test] + async fn post_rejects_missing_enabled_field() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller), r#"{"other": true}"#).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_rejects_integer_in_place_of_bool() { + // JSON parsers in other languages sometimes coerce 0/1 → bool; the + // spec explicitly rejects this, so we do too. + let controller = AggregatorController::new(false); + let resp = post(Some(controller), r#"{"enabled": 1}"#).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_rejects_string_in_place_of_bool() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller), r#"{"enabled": "true"}"#).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_rejects_malformed_json() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller), "not json").await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_returns_503_without_controller() { + let resp = post(None, r#"{"enabled": true}"#).await; + assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); + } +} diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 5973dc62..acec7fa1 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,19 +1,27 @@ use std::net::SocketAddr; -use axum::{Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get}; +use axum::{ + Extension, Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get, +}; use ethlambda_storage::Store; +use ethlambda_types::aggregator::AggregatorController; use ethlambda_types::primitives::H256; use libssz::SszEncode; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; +mod admin; mod fork_choice; mod heap_profiling; pub mod metrics; -pub async fn start_api_server(address: SocketAddr, store: Store) -> Result<(), std::io::Error> { - let api_router = build_api_router(store); +pub async fn start_api_server( + address: SocketAddr, + store: Store, + aggregator: AggregatorController, +) -> Result<(), std::io::Error> { + let api_router = build_api_router(store).layer(Extension(aggregator)); let listener = tokio::net::TcpListener::bind(address).await?; axum::serve(listener, api_router).await?; @@ -34,6 +42,10 @@ pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Er } /// Build the API router with the given store. +/// +/// The aggregator controller is threaded in via `Extension` by the caller +/// (see `start_api_server`) so existing store-backed handlers don't need to +/// know about it and admin handlers extract it independently. fn build_api_router(store: Store) -> Router { Router::new() .route("/lean/v0/health", get(metrics::get_health)) @@ -47,6 +59,10 @@ fn build_api_router(store: Store) -> Router { "/lean/v0/fork_choice/ui", get(fork_choice::get_fork_choice_ui), ) + .route( + "/lean/v0/admin/aggregator", + get(admin::get_aggregator).post(admin::post_aggregator), + ) .with_state(store) }