diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 1f1d73415..04a49cfd0 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -602,6 +602,14 @@ async def run_node( ) event_source.set_status(updated_status) + # Wire the responder's view of wall-clock time. + # + # Without this, the responder cannot bound the sliding history window and + # rejects every range request. The block-by-slot and block-by-root lookups + # still need SignedBlock storage; they share the same gap and are owned by + # a follow-up storage refactor. + event_source.set_current_slot_lookup(node.clock.current_slot) + # Connect to bootnodes. # # Best-effort connection: failures don't abort the loop. diff --git a/src/lean_spec/subspecs/networking/client/event_source/live.py b/src/lean_spec/subspecs/networking/client/event_source/live.py index dec41b6af..90bf0fb4d 100644 --- a/src/lean_spec/subspecs/networking/client/event_source/live.py +++ b/src/lean_spec/subspecs/networking/client/event_source/live.py @@ -78,7 +78,9 @@ from lean_spec.subspecs.networking.gossipsub.types import TopicId from lean_spec.subspecs.networking.reqresp.handler import ( REQRESP_PROTOCOL_IDS, + AsyncBlockBySlotLookup, AsyncBlockLookup, + CurrentSlotLookup, ReqRespServer, RequestHandler, ) @@ -313,6 +315,33 @@ def set_block_lookup(self, lookup: AsyncBlockLookup) -> None: """ self._reqresp_handler.block_lookup = lookup + def set_block_by_slot_lookup(self, lookup: AsyncBlockBySlotLookup) -> None: + """ + Set the callback for looking up canonical blocks by slot. + + Used by the inbound ReqResp handler to serve BlocksByRange requests. + + The callback MUST consult fork choice. + It returns the canonical block at that slot, or None for empty slots. + + Args: + lookup: Async function from Slot to SignedBlock or None. + """ + self._reqresp_handler.block_by_slot_lookup = lookup + + def set_current_slot_lookup(self, lookup: CurrentSlotLookup) -> None: + """ + Set the callback returning the node's current slot. + + Used to compute the BlocksByRange sliding history window. + + Without this callback, the responder rejects every range request with SERVER_ERROR. + + Args: + lookup: Function returning the current Slot. + """ + self._reqresp_handler.current_slot_lookup = lookup + def subscribe_gossip_topic(self, topic: TopicId) -> None: """ Subscribe to a gossip topic. diff --git a/src/lean_spec/subspecs/networking/client/reqresp_client.py b/src/lean_spec/subspecs/networking/client/reqresp_client.py index 28df2751e..4b9c3a3d0 100644 --- a/src/lean_spec/subspecs/networking/client/reqresp_client.py +++ b/src/lean_spec/subspecs/networking/client/reqresp_client.py @@ -32,15 +32,18 @@ import logging from dataclasses import dataclass, field -from lean_spec.forks.lstar.containers import SignedBlock +from lean_spec.forks.lstar.containers import SignedBlock, Slot +from lean_spec.subspecs.networking.config import MAX_REQUEST_BLOCKS from lean_spec.subspecs.networking.reqresp.codec import ( CodecError, ResponseCode, encode_request, ) from lean_spec.subspecs.networking.reqresp.message import ( + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, STATUS_PROTOCOL_V1, + BlocksByRangeRequest, BlocksByRootRequest, RequestedBlockRoots, Status, @@ -50,7 +53,8 @@ QuicConnection, QuicConnectionManager, ) -from lean_spec.types import Bytes32 +from lean_spec.subspecs.ssz.hash import hash_tree_root +from lean_spec.types import Bytes32, Uint64 logger = logging.getLogger(__name__) @@ -205,6 +209,187 @@ async def _do_blocks_by_root_request( finally: await stream.close() + async def request_blocks_by_range( + self, + peer_id: PeerId, + start_slot: Slot, + count: Uint64, + ) -> list[SignedBlock]: + """ + Request blocks by range from a peer. + + Implements the NetworkRequester protocol method. + + Returns an empty list on transport-level errors (no connection, + timeout, network failure). + + Raises CodecError on protocol violations by the peer (non-monotonic + slots, out-of-range slots, parent-root mismatch, more than count + chunks). Callers may use this signal for peer downscoring. + + Args: + peer_id: Peer to request from. + start_slot: Start slot of the range. + count: Number of blocks to request. + + Returns: + List of blocks received. May be fewer than requested if peer + does not have all blocks. + """ + # Local validation: matches the responder's bounds so a malformed call + # is rejected before opening a stream. + if count == Uint64(0): + return [] + if count > Uint64(MAX_REQUEST_BLOCKS): + return [] + if int(start_slot) + int(count) > int(Uint64.max_value()): + # Range would overflow Uint64; cannot be satisfied. + return [] + + conn = self._connections.get(peer_id) + if conn is None: + logger.debug("No connection to peer %s for blocks_by_range", peer_id) + return [] + + try: + return await asyncio.wait_for( + self._do_blocks_by_range_request(conn, start_slot, count), + timeout=self.timeout, + ) + except CodecError: + # Protocol violation: propagate so callers can downscore the peer. + raise + except asyncio.TimeoutError: + logger.warning("Timeout requesting blocks from %s", peer_id) + return [] + except Exception as e: + logger.warning("Error requesting blocks from %s: %s", peer_id, e) + return [] + + async def _do_blocks_by_range_request( + self, + conn: QuicConnection, + start_slot: Slot, + count: Uint64, + ) -> list[SignedBlock]: + """ + Execute a BlocksByRange request. + + Opens a stream, negotiates the protocol, sends the request, + and reads all response chunks. + + Args: + conn: QuicConnection to use. + start_slot: Start slot of the range. + count: Number of blocks to request. + + Returns: + List of blocks received. + """ + # Open a new stream and negotiate the protocol. + stream = await conn.open_stream(BLOCKS_BY_RANGE_PROTOCOL_V1) + end_slot_exclusive = int(start_slot) + int(count) + + try: + # Build and send the request. + request = BlocksByRangeRequest(start_slot=start_slot, count=count) + request_bytes = encode_request(request.encode_bytes()) + await stream.write(request_bytes) + + # Half-close to signal we're done sending. + finish_write = getattr(stream, "finish_write", None) + if finish_write is not None: + await finish_write() + + # Read response chunks. + # + # Each block is sent as a separate response chunk. + # We read until the stream closes or we get all blocks. + blocks: list[SignedBlock] = [] + prev_slot: Slot | None = None + prev_root: Bytes32 | None = None + stream_ended = False + + for _ in range(int(count)): + try: + response_data = await stream.read() + if not response_data: + # Stream closed; no more blocks. + stream_ended = True + break + + code, ssz_bytes = ResponseCode.decode(response_data) + + if code == ResponseCode.SUCCESS: + inner = SignedBlock.decode_bytes(ssz_bytes).block + block_slot = inner.slot + + # Slots MUST be strictly increasing across the stream. + if prev_slot is not None and block_slot <= prev_slot: + raise CodecError(f"Non-monotonic slot: {block_slot} <= {prev_slot}") + + # Block MUST fall inside the requested half-open range. + # Use int math so an end_slot near 2**64 cannot wrap. + if int(block_slot) < int(start_slot) or ( + int(block_slot) >= end_slot_exclusive + ): + raise CodecError(f"Block slot {block_slot} outside requested range") + + # Parent-root continuity. + # + # The responder serves canonical blocks only and skips + # empty slots. The next non-empty block's parent_root + # therefore equals the last received block's root, + # regardless of how many empty slots lie between. + if prev_root is not None and inner.parent_root != prev_root: + raise CodecError( + f"Parent root mismatch at slot {block_slot}: " + f"expected {prev_root.hex()}, " + f"got {inner.parent_root.hex()}" + ) + + blocks.append(SignedBlock.decode_bytes(ssz_bytes)) + prev_slot = block_slot + prev_root = hash_tree_root(inner) + + elif code == ResponseCode.RESOURCE_UNAVAILABLE: + # Peer doesn't have this block, continue. + continue + else: + # Other error, stop reading. + logger.debug("BlocksByRange error response: %s", code) + stream_ended = True + break + + except CodecError as e: + # Protocol violation: log with peer id and re-raise. + logger.warning("Protocol violation from %s: %s", conn.peer_id, e) + raise + + # No-more-than-count enforcement. + # + # After we have read count chunks the peer MUST have finished + # writing. Any extra response chunk is a protocol violation. + # Skip when the stream already ended inside the loop. + if not stream_ended: + try: + extra = await stream.read() + except Exception: + extra = b"" + if extra: + msg = "Peer sent more than count BlocksByRange chunks" + logger.warning("Protocol violation from %s: %s", conn.peer_id, msg) + raise CodecError(msg) + + return blocks + + finally: + # Always close the stream. + try: + await stream.close() + except Exception as e: + logger.debug("Error closing stream: %s", e) + async def send_status( self, peer_id: PeerId, diff --git a/src/lean_spec/subspecs/networking/config.py b/src/lean_spec/subspecs/networking/config.py index 258865cb7..8e8e73104 100644 --- a/src/lean_spec/subspecs/networking/config.py +++ b/src/lean_spec/subspecs/networking/config.py @@ -72,3 +72,14 @@ "libp2p" is the Application-Layer Protocol Negotiation (ALPN) value used during the TLS 1.3 handshake to identify libp2p connections. """ + +MIN_SLOTS_FOR_BLOCK_REQUESTS: Final[int] = 3600 +"""History window for BlocksByRange responders, in slots. + +Responders MUST keep this many recent slots available. + +The window slides with the node's current slot. It is never an absolute slot number. + +A request whose start slot falls below the window receives RESOURCE_UNAVAILABLE. +This lets nodes prune state below the window. +""" diff --git a/src/lean_spec/subspecs/networking/reqresp/__init__.py b/src/lean_spec/subspecs/networking/reqresp/__init__.py index 49f0a6c97..ede4aa261 100644 --- a/src/lean_spec/subspecs/networking/reqresp/__init__.py +++ b/src/lean_spec/subspecs/networking/reqresp/__init__.py @@ -8,14 +8,17 @@ ) from .handler import ( REQRESP_PROTOCOL_IDS, + AsyncBlockBySlotLookup, AsyncBlockLookup, ReqRespServer, RequestHandler, StreamResponseAdapter, ) from .message import ( + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, STATUS_PROTOCOL_V1, + BlocksByRangeRequest, BlocksByRootRequest, RequestedBlockRoots, Status, @@ -23,10 +26,12 @@ __all__ = [ # Protocol IDs + "BLOCKS_BY_RANGE_PROTOCOL_V1", "BLOCKS_BY_ROOT_PROTOCOL_V1", "STATUS_PROTOCOL_V1", "REQRESP_PROTOCOL_IDS", # Message types + "BlocksByRangeRequest", "BlocksByRootRequest", "RequestedBlockRoots", "Status", @@ -36,6 +41,7 @@ "encode_request", "decode_request", # Inbound handlers + "AsyncBlockBySlotLookup", "AsyncBlockLookup", "RequestHandler", "ReqRespServer", diff --git a/src/lean_spec/subspecs/networking/reqresp/handler.py b/src/lean_spec/subspecs/networking/reqresp/handler.py index 8bc444389..daabcc3cf 100644 --- a/src/lean_spec/subspecs/networking/reqresp/handler.py +++ b/src/lean_spec/subspecs/networking/reqresp/handler.py @@ -64,18 +64,24 @@ from dataclasses import dataclass from typing import Final -from lean_spec.forks.lstar.containers import SignedBlock +from lean_spec.forks.lstar.containers import SignedBlock, Slot from lean_spec.snappy import SnappyDecompressionError, frame_decompress -from lean_spec.subspecs.networking.config import MAX_ERROR_MESSAGE_SIZE +from lean_spec.subspecs.networking.config import ( + MAX_ERROR_MESSAGE_SIZE, + MAX_REQUEST_BLOCKS, + MIN_SLOTS_FOR_BLOCK_REQUESTS, +) from lean_spec.subspecs.networking.transport.protocols import InboundStreamProtocol from lean_spec.subspecs.networking.types import ProtocolId from lean_spec.subspecs.networking.varint import VarintError, decode_varint -from lean_spec.types import Bytes32 +from lean_spec.types import Bytes32, Uint64 from .codec import ResponseCode from .message import ( + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, STATUS_PROTOCOL_V1, + BlocksByRangeRequest, BlocksByRootRequest, Status, ) @@ -124,6 +130,21 @@ async def finish(self) -> None: Takes a block root and returns the block if available, None otherwise. """ +type AsyncBlockBySlotLookup = Callable[[Slot], Awaitable[SignedBlock | None]] +"""Type alias for block lookup by slot function. + +Takes a slot and returns the canonical block if available, None otherwise. +""" + +type CurrentSlotLookup = Callable[[], Slot] +"""Type alias for current-slot lookup function. + +Returns the node's current wall-clock slot. + +Used to compute the sliding history window for range requests. +The window covers recent slots; older state may be pruned. +""" + @dataclass(slots=True) class RequestHandler: @@ -152,6 +173,18 @@ class RequestHandler: block_lookup: AsyncBlockLookup | None = None """Callback to look up blocks by root.""" + block_by_slot_lookup: AsyncBlockBySlotLookup | None = None + """Callback to look up canonical blocks by slot.""" + + current_slot_lookup: CurrentSlotLookup | None = None + """Callback returning the node's current slot. + + Required to bound the BlocksByRange sliding history window. + + When unset, the handler cannot place the window and conservatively rejects + every range request with SERVER_ERROR. + """ + async def handle_status(self, response: StreamResponseAdapter) -> None: """ Handle incoming Status request. @@ -221,11 +254,76 @@ async def handle_blocks_by_root( # The peer can retry or ask another peer for this specific block. logger.warning("Error looking up block %s: %s", root.hex()[:8], e) + async def handle_blocks_by_range( + self, + request: BlocksByRangeRequest, + response: StreamResponseAdapter, + ) -> None: + """ + Handle incoming BlocksByRange request. + + Looks up and sends each requested block in the range. + + Checks proceed in order: + + 1. Reject when no block lookup is configured (server misconfiguration). + 2. Reject malformed requests: count of zero, or count above the limit. + 3. Reject when the start slot falls below the sliding history window. + 4. Stream canonical blocks; empty slots are silently skipped per spec. + + Args: + request: Block range to look up. + response: Stream for sending blocks. + """ + if self.block_by_slot_lookup is None: + logger.warning("BlocksByRange request received but no block_by_slot_lookup configured") + await response.send_error(ResponseCode.SERVER_ERROR, "Block lookup not available") + return + + # A count of zero is INVALID_REQUEST in modern forks. + # Phase 0 accepted it as an empty response; we follow the stricter rule. + if request.count == Uint64(0) or request.count > Uint64(MAX_REQUEST_BLOCKS): + await response.send_error(ResponseCode.INVALID_REQUEST, "Invalid count") + return + + # Without a current-slot source the window cannot be placed. + # Refuse the request rather than silently misreport available history. + if self.current_slot_lookup is None: + logger.warning("BlocksByRange request received but no current_slot_lookup configured") + await response.send_error(ResponseCode.SERVER_ERROR, "Current slot not available") + return + + # Sliding window: max(0, current_slot - MIN_SLOTS_FOR_BLOCK_REQUESTS) to current_slot. + current_slot = self.current_slot_lookup() + window_floor = ( + current_slot - Slot(MIN_SLOTS_FOR_BLOCK_REQUESTS) + if current_slot >= Slot(MIN_SLOTS_FOR_BLOCK_REQUESTS) + else Slot(0) + ) + if request.start_slot < window_floor: + await response.send_error( + ResponseCode.RESOURCE_UNAVAILABLE, "Requested slot predates history window" + ) + return + + # Stream blocks in slot order. + # The callback returns canonical-only blocks and None for empty slots. + # Empty slots are skipped silently per spec. + for i in range(int(request.count)): + slot = request.start_slot + Slot(i) + try: + block = await self.block_by_slot_lookup(slot) + if block is not None: + await response.send_success(block.encode_bytes()) + except Exception as e: + logger.warning("Error looking up block at slot %s: %s", slot, e) + REQRESP_PROTOCOL_IDS: Final[frozenset[ProtocolId]] = frozenset( { STATUS_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, + BLOCKS_BY_RANGE_PROTOCOL_V1, } ) """Protocol IDs handled by ReqRespServer.""" @@ -450,6 +548,21 @@ async def _dispatch( return await self.handler.handle_blocks_by_root(request, response) + elif protocol_id == BLOCKS_BY_RANGE_PROTOCOL_V1: + # BlocksByRange request: Peer wants blocks by range. + # + # The request is an SSZ object with start_slot and count. + try: + request = BlocksByRangeRequest.decode_bytes(ssz_bytes) + except Exception as e: + # SSZ decode failure: wrong size, malformed offsets, etc. + logger.debug("BlocksByRangeRequest decode error: %s", e) + await response.send_error( + ResponseCode.INVALID_REQUEST, "Invalid BlocksByRangeRequest message" + ) + return + await self.handler.handle_blocks_by_range(request, response) + else: # Unknown protocol ID. # diff --git a/src/lean_spec/subspecs/networking/reqresp/message.py b/src/lean_spec/subspecs/networking/reqresp/message.py index 0dc90ca8e..c4b304632 100644 --- a/src/lean_spec/subspecs/networking/reqresp/message.py +++ b/src/lean_spec/subspecs/networking/reqresp/message.py @@ -7,8 +7,8 @@ from typing import ClassVar, Final -from lean_spec.forks.lstar.containers import Checkpoint -from lean_spec.types import Bytes32, SSZList +from lean_spec.forks.lstar.containers import Checkpoint, Slot +from lean_spec.types import Bytes32, SSZList, Uint64 from lean_spec.types.container import Container from ..config import MAX_REQUEST_BLOCKS @@ -43,6 +43,9 @@ class Status(Container): BLOCKS_BY_ROOT_PROTOCOL_V1: Final = ProtocolId("/leanconsensus/req/blocks_by_root/1/ssz_snappy") """The protocol ID for the BlocksByRoot v1 request/response message.""" +BLOCKS_BY_RANGE_PROTOCOL_V1: Final = ProtocolId("/leanconsensus/req/blocks_by_range/1/ssz_snappy") +"""The protocol ID for the BlocksByRange v1 request/response message.""" + class RequestedBlockRoots(SSZList[Bytes32]): """List of block roots requested from a peer.""" @@ -59,3 +62,23 @@ class BlocksByRootRequest(Container): roots: RequestedBlockRoots """List of block roots requested from a peer.""" + + +class BlocksByRangeRequest(Container): + """ + A request for one or more blocks by their slot numbers. + + This is primarily used to recover recent or missing blocks from a peer. + """ + + start_slot: Slot + """The starting slot of the range (inclusive).""" + + count: Uint64 + """The number of blocks to request (at most MAX_REQUEST_BLOCKS). + + The legacy step field is omitted. + + Early phase 0 BeaconBlocksByRange v1 carried a step parameter that Altair deprecated. + Modern usage is equivalent to step == 1: responders return one block per slot in order. + """ diff --git a/src/lean_spec/subspecs/sync/backfill_sync.py b/src/lean_spec/subspecs/sync/backfill_sync.py index 8b7f222cf..ff7037391 100644 --- a/src/lean_spec/subspecs/sync/backfill_sync.py +++ b/src/lean_spec/subspecs/sync/backfill_sync.py @@ -38,17 +38,45 @@ from __future__ import annotations +import logging from dataclasses import dataclass, field from typing import Protocol -from lean_spec.forks.lstar.containers import SignedBlock +from lean_spec.forks.lstar.containers import SignedBlock, Slot +from lean_spec.subspecs.networking.config import MAX_REQUEST_BLOCKS from lean_spec.subspecs.networking.transport.peer_id import PeerId -from lean_spec.types import Bytes32 +from lean_spec.types import Bytes32, Uint64 from .block_cache import BlockCache from .config import MAX_BACKFILL_DEPTH, MAX_BLOCKS_PER_REQUEST from .peer_manager import PeerManager +logger = logging.getLogger(__name__) + + +class StoreView(Protocol): + """ + Read-only view of forkchoice state used by backfill. + + Used to skip blocks already in the Store and to find the highest known + canonical slot for gap detection. + + Decouples backfill from the concrete Store class. + Lets tests supply a tiny in-memory implementation. + """ + + def has_root(self, root: Bytes32) -> bool: + """Return True if the block root is present in the Store.""" + ... + + def finalized_slot(self) -> Slot: + """Return the slot of the latest finalized checkpoint.""" + ... + + def head_slot(self) -> Slot: + """Return the slot of the current canonical head.""" + ... + class NetworkRequester(Protocol): """ @@ -82,6 +110,27 @@ async def request_blocks_by_root( """ ... + async def request_blocks_by_range( + self, + peer_id: PeerId, + start_slot: Slot, + count: Uint64, + ) -> list[SignedBlock]: + """ + Request blocks by slot range from a specific peer. + + Args: + peer_id: Peer to request from. + start_slot: Start slot of the range (inclusive). + count: Number of blocks to request (up to MAX_REQUEST_BLOCKS). + + Returns: + List of blocks the peer returned. May be fewer than requested + if the peer does not have all blocks (empty slots are skipped). + Empty on error. + """ + ... + @dataclass(slots=True) class BackfillSync: @@ -122,9 +171,29 @@ class BackfillSync: network: NetworkRequester """Network interface for block requests.""" + store_view: StoreView | None = field(default=None) + """Read-only window into the Store. + + Used for known-root and gap checks. + + When None, backfill recurses by root only. + No short-circuiting for blocks already in the Store. + No range fills on detected gaps. + Production callers must supply a view. + """ + _pending: set[Bytes32] = field(default_factory=set) """Roots currently being fetched (to avoid duplicate requests).""" + _max_range_slot: Slot = field(default_factory=lambda: Slot(0)) + """Highest slot covered by a successfully completed range request. + + Advanced only when the peer answers without raising — success or empty result. + + On exception, the watermark is left untouched. + A subsequent call can then retry the same range, possibly via a different peer. + """ + async def fill_missing( self, roots: list[Bytes32], @@ -172,6 +241,99 @@ async def fill_missing( # Always clear pending status, even on error. self._pending.difference_update(roots_to_fetch) + async def fill_range( + self, + start_slot: Slot, + count: Uint64, + depth: int = 0, + ) -> None: + """ + Fetch missing blocks by slot range. + + This is a more efficient alternative to fill_missing when a large + contiguous gap is detected. + + Args: + start_slot: Start slot of the range. + count: Number of blocks to request. + depth: Current backfill depth. + """ + if depth >= MAX_BACKFILL_DEPTH: + return + + if count == Uint64(0): + return + + # Skip slots already covered by a previous successful range fetch. + # The watermark tracks the highest answered slot; earlier slots need not + # be re-requested. Failed fetches do not advance the watermark, so a + # retry can still re-cover the range. + watermark = self._max_range_slot + actual_start = start_slot if start_slot > watermark else watermark + Slot(1) + end_slot = start_slot + Slot(int(count) - 1) + + if end_slot < actual_start: + logger.debug( + "Skipping range fetch [%s, %s]: already covered (watermark=%s)", + start_slot, + end_slot, + watermark, + ) + return + + # Split into per-message batches. + # + # Each network request is bounded by MAX_REQUEST_BLOCKS, so a wider + # range becomes several round-trips here. + current_slot = actual_start + remaining = int(end_slot) - int(current_slot) + 1 + + while remaining > 0: + batch_count = min(remaining, MAX_REQUEST_BLOCKS) + await self._fetch_range(current_slot, Uint64(batch_count), depth) + current_slot = current_slot + Slot(batch_count) + remaining -= batch_count + + async def _fetch_range( + self, + start_slot: Slot, + count: Uint64, + depth: int, + ) -> None: + """Fetch a range of blocks from a peer.""" + last_slot = start_slot + Slot(int(count) - 1) + peer = self.peer_manager.select_peer_for_request(min_slot=last_slot) + if peer is None: + # Fallback to any peer if no one reports having the whole range. + peer = self.peer_manager.select_peer_for_request() + + if peer is None: + return + + peer.on_request_start() + try: + blocks = await self.network.request_blocks_by_range( + peer_id=peer.peer_id, + start_slot=start_slot, + count=count, + ) + except Exception as e: + # Peer-side failure. + # + # Do not advance the watermark. + # A retry against another peer can still re-cover the range. + logger.warning("Range fetch failed from %s: %s", peer.peer_id, e) + self.peer_manager.on_request_failure(peer.peer_id) + return + + # The peer answered: success or empty. + # Mark the range as covered so future range fills can skip it. + self._max_range_slot = max(self._max_range_slot, last_slot) + self.peer_manager.on_request_success(peer.peer_id) + + if blocks: + await self._process_received_blocks(blocks, peer.peer_id, depth) + async def _fetch_batch( self, roots: list[Bytes32], @@ -249,21 +411,50 @@ async def _process_received_blocks( backfill_depth=depth + 1, ) - # Check if this block's parent is known. - # - # A block is orphan if its parent is not in the cache. - # (We cannot check the Store here; that is the SyncService's job.) + # A block is an orphan if its parent is not in the cache or in the Store. parent_root = pending.parent_root - if parent_root not in self.block_cache: + parent_known = parent_root in self.block_cache or ( + self.store_view.has_root(parent_root) if self.store_view is not None else False + ) + + if not parent_known: # Parent unknown. Mark as orphan and queue for fetch. self.block_cache.mark_orphan(pending.root) if parent_root not in self._pending: new_orphan_parents.append(parent_root) - # Recursively fetch orphan parents. - if new_orphan_parents: - await self.fill_missing(new_orphan_parents, depth=depth + 1) + if not new_orphan_parents: + return + + # When the earliest received block still has a missing parent far above + # the highest slot we already know, fetching the gap as a contiguous + # range is cheaper than recursing parent-by-parent. + # + # Floor the range at the head slot (highest known canonical slot), not + # at the finalized slot. Slots above finalized but at or below head are + # already in the Store and should not be re-downloaded. + if self.store_view is not None and blocks: + earliest_block = min(blocks, key=lambda b: b.block.slot) + head_slot = self.store_view.head_slot() + if earliest_block.block.slot > head_slot: + gap_floor = head_slot + Slot(1) + gap_size = int(earliest_block.block.slot - gap_floor) + if gap_size > 0: + logger.debug( + "Backfill gap (%d slots) at slot %s; range-fetching from %s.", + gap_size, + earliest_block.block.slot, + gap_floor, + ) + await self.fill_range( + start_slot=gap_floor, + count=Uint64(gap_size), + depth=depth + 1, + ) + + await self.fill_missing(new_orphan_parents, depth=depth + 1) def reset(self) -> None: """Clear all pending state.""" self._pending.clear() + self._max_range_slot = Slot(0) diff --git a/src/lean_spec/subspecs/sync/head_sync.py b/src/lean_spec/subspecs/sync/head_sync.py index 22b0c979d..0da4b91d8 100644 --- a/src/lean_spec/subspecs/sync/head_sync.py +++ b/src/lean_spec/subspecs/sync/head_sync.py @@ -49,10 +49,10 @@ from dataclasses import dataclass, field from lean_spec.forks import Store -from lean_spec.forks.lstar.containers import SignedBlock +from lean_spec.forks.lstar.containers import SignedBlock, Slot from lean_spec.subspecs.networking.transport.peer_id import PeerId from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.types import Bytes32 +from lean_spec.types import Bytes32, Uint64 from .backfill_sync import BackfillSync from .block_cache import BlockCache @@ -369,14 +369,62 @@ async def _cache_and_backfill( block_inner = block.block parent_root = block_inner.parent_root + # Reject blocks at or below the finalized slot. + # + # Anything at or below finalized cannot be canonical and is most likely + # a stale or replayed gossip from a misbehaving peer. Dropping it here + # also prevents the gap math below from underflowing. + finalized_slot = store.latest_finalized.slot + if block_inner.slot <= finalized_slot: + logger.debug( + "Ignoring gossip block at slot %s: at or below finalized (%s)", + block_inner.slot, + finalized_slot, + ) + return HeadSyncResult( + processed=False, + cached=False, + backfill_triggered=False, + descendants_processed=0, + ), store + # Add to cache. pending = self.block_cache.add(block=block, peer=peer_id) # Mark as orphan. self.block_cache.mark_orphan(pending.root) - # Trigger backfill for the missing parent. - await self.backfill.fill_missing([parent_root]) + # Trigger backfill for the missing parent(s). + # + # When the new block sits several slots above the current head, fetch + # the gap as a contiguous range rather than recursing parent-by-parent. + # The floor is the head slot; slots above finalized but at or below + # head are already in the Store. + # + # Alt-fork gossip at a slot at or below head still goes through plain + # parent-by-root recursion: those blocks share an ancestor with the + # canonical chain and a range request would just refetch known slots. + head_slot = store.blocks[store.head].slot + if block_inner.slot > head_slot: + gap_floor = head_slot + Slot(1) + gap_size = int(block_inner.slot - gap_floor) + if gap_size > 0: + logger.debug( + "Backfill gap (%d slots) above head %s; range-fetching from %s.", + gap_size, + head_slot, + gap_floor, + ) + await self.backfill.fill_range( + start_slot=gap_floor, + count=Uint64(gap_size), + ) + else: + # Direct parent missing (single-slot gap above head). + await self.backfill.fill_missing([parent_root]) + else: + # Alt-fork gossip at or below head: recurse by parent root only. + await self.backfill.fill_missing([parent_root]) return HeadSyncResult( processed=False, diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 7a4a1ae52..ecc661547 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -68,6 +68,30 @@ logger = logging.getLogger(__name__) +@dataclass(slots=True) +class _SyncStoreView: + """StoreView adapter delegating to the live SyncService.store reference. + + Wraps a getter so updates to ``SyncService.store`` (assigned after each + block is processed) are observed by backfill without re-wiring. + """ + + _get_store: Callable[[], Store] + + def has_root(self, root: Bytes32) -> bool: + """Return True if the block root is present in the Store.""" + return root in self._get_store().blocks + + def finalized_slot(self) -> Slot: + """Return the slot of the latest finalized checkpoint.""" + return self._get_store().latest_finalized.slot + + def head_slot(self) -> Slot: + """Return the slot of the current canonical head.""" + store = self._get_store() + return store.blocks[store.head].slot + + def _ancestor_set(blocks: BlockLookup, head: Bytes32) -> set[Bytes32]: """Walk parent links from head and collect every reachable block root.""" seen: set[Bytes32] = set() @@ -270,10 +294,13 @@ def _init_components(self) -> None: # BackfillSync handles fetching missing parent blocks from peers. # # It needs network access to request blocks and the cache to store them. + # The store view is a thin adapter that always reads the current + # store reference, since we replace `self.store` after each block. self._backfill = BackfillSync( peer_manager=self.peer_manager, block_cache=self.block_cache, network=self.network, + store_view=_SyncStoreView(_get_store=lambda: self.store), ) # HeadSync processes incoming gossip blocks and coordinates backfill. diff --git a/tests/lean_spec/helpers/mocks.py b/tests/lean_spec/helpers/mocks.py index 8255260f0..b404b6e19 100644 --- a/tests/lean_spec/helpers/mocks.py +++ b/tests/lean_spec/helpers/mocks.py @@ -18,16 +18,18 @@ from lean_spec.subspecs.networking import PeerId from lean_spec.subspecs.networking.service.events import NetworkEvent from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.types import Bytes32 +from lean_spec.types import Bytes32, Uint64 class MockNetworkRequester: """Mock network that returns pre-configured blocks and tracks requests.""" def __init__(self) -> None: - """Initialize with empty block store and request log.""" + """Initialize with empty block store and request logs.""" self.blocks_by_root: dict[Bytes32, SignedBlock] = {} - self.request_log: list[tuple[PeerId, list[Bytes32]]] = [] + self.blocks_by_slot: dict[Slot, SignedBlock] = {} + self.root_request_log: list[tuple[PeerId, list[Bytes32]]] = [] + self.range_request_log: list[tuple[PeerId, Slot, Uint64]] = [] self.should_fail: bool = False async def request_blocks_by_root( @@ -36,7 +38,7 @@ async def request_blocks_by_root( roots: list[Bytes32], ) -> list[SignedBlock]: """Return blocks for requested roots.""" - self.request_log.append((peer_id, roots)) + self.root_request_log.append((peer_id, roots)) if self.should_fail: raise ConnectionError("Network failed") return [self.blocks_by_root[r] for r in roots if r in self.blocks_by_root] @@ -49,10 +51,29 @@ async def request_block_by_root( """Return a single block by root.""" return self.blocks_by_root.get(root) + async def request_blocks_by_range( + self, + peer_id: PeerId, + start_slot: Slot, + count: Uint64, + ) -> list[SignedBlock]: + """Return blocks for requested slot range.""" + self.range_request_log.append((peer_id, start_slot, count)) + if self.should_fail: + raise ConnectionError("Network failed") + + blocks: list[SignedBlock] = [] + for i in range(int(count)): + slot = start_slot + Slot(i) + if slot in self.blocks_by_slot: + blocks.append(self.blocks_by_slot[slot]) + return blocks + def add_block(self, block: SignedBlock) -> Bytes32: """Add a block to the mock network. Returns its root.""" root = hash_tree_root(block.block) self.blocks_by_root[root] = block + self.blocks_by_slot[block.block.slot] = block return root diff --git a/tests/lean_spec/subspecs/networking/client/test_reqresp_client_range.py b/tests/lean_spec/subspecs/networking/client/test_reqresp_client_range.py new file mode 100644 index 000000000..15c1ac9ed --- /dev/null +++ b/tests/lean_spec/subspecs/networking/client/test_reqresp_client_range.py @@ -0,0 +1,372 @@ +"""Tests for the outbound BlocksByRange protocol on the ReqResp client.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field + +import pytest + +from lean_spec.forks.lstar.containers import Block, BlockBody, SignedBlock +from lean_spec.forks.lstar.containers.block import BlockSignatures +from lean_spec.forks.lstar.containers.block.types import ( + AggregatedAttestations, + AttestationSignatures, +) +from lean_spec.forks.lstar.containers.slot import Slot +from lean_spec.forks.lstar.containers.validator import ValidatorIndex +from lean_spec.subspecs.networking.client.reqresp_client import ReqRespClient +from lean_spec.subspecs.networking.config import MAX_REQUEST_BLOCKS +from lean_spec.subspecs.networking.reqresp.codec import ( + CodecError, + ResponseCode, +) +from lean_spec.subspecs.networking.reqresp.message import ( + BLOCKS_BY_RANGE_PROTOCOL_V1, +) +from lean_spec.subspecs.networking.transport import PeerId +from lean_spec.subspecs.ssz.hash import hash_tree_root +from lean_spec.types import Bytes32, Uint64 +from tests.lean_spec.helpers import make_mock_signature + + +@dataclass +class MockRangeStream: + """Mock stream feeding a queue of pre-encoded response chunks.""" + + stream_id: int = 0 + """Mock stream ID.""" + + protocol_id: str = BLOCKS_BY_RANGE_PROTOCOL_V1 + """The negotiated protocol ID.""" + + response_chunks: list[bytes] = field(default_factory=list) + """Response chunks to return on successive read() calls.""" + + written: list[bytes] = field(default_factory=list) + """Data written to the stream.""" + + closed: bool = False + """Whether close() has been called.""" + + finish_write_called: bool = False + """Whether finish_write() has been called.""" + + _read_index: int = 0 + """Index into response_chunks for next read().""" + + async def read(self) -> bytes: + """Return the next response chunk, or empty bytes when exhausted.""" + if self._read_index >= len(self.response_chunks): + return b"" + chunk = self.response_chunks[self._read_index] + self._read_index += 1 + return chunk + + async def write(self, data: bytes) -> None: + """Accumulate written request bytes.""" + self.written.append(data) + + async def finish_write(self) -> None: + """Signal half-close.""" + self.finish_write_called = True + + async def close(self) -> None: + """Mark the stream as closed.""" + self.closed = True + + +@dataclass +class MockRangeConnection: + """Mock QUIC connection that exposes a peer_id and a single canned stream.""" + + peer_id: PeerId + """Identity reported on protocol-violation logs.""" + + streams: list[MockRangeStream] = field(default_factory=list) + """Pre-configured streams to return on successive open_stream() calls.""" + + opened_protocols: list[str] = field(default_factory=list) + """Protocols requested via open_stream().""" + + _stream_index: int = 0 + """Index into streams for next open_stream().""" + + async def open_stream(self, protocol: str) -> MockRangeStream: + """Return the next preconfigured stream, recording the protocol.""" + self.opened_protocols.append(protocol) + if self._stream_index >= len(self.streams): + return MockRangeStream(protocol_id=protocol) + stream = self.streams[self._stream_index] + stream.protocol_id = protocol + self._stream_index += 1 + return stream + + +def make_client() -> ReqRespClient: + """Create a ReqRespClient that bypasses the connection manager.""" + return ReqRespClient(connection_manager=None) # type: ignore[arg-type] + + +def empty_signed_block(slot: Slot, parent_root: Bytes32, state_seed: int) -> SignedBlock: + """Build a SignedBlock with the requested slot and parent_root.""" + block = Block( + slot=slot, + proposer_index=ValidatorIndex(0), + parent_root=parent_root, + state_root=Bytes32(bytes([state_seed]) * 32), + body=BlockBody(attestations=AggregatedAttestations(data=[])), + ) + return SignedBlock( + block=block, + signature=BlockSignatures( + attestation_signatures=AttestationSignatures(data=[]), + proposer_signature=make_mock_signature(), + ), + ) + + +def build_chain(start_slot: int, count: int, root_seed: int = 0xAA) -> list[SignedBlock]: + """Return a chain of strictly-increasing-slot blocks starting at start_slot. + + Each child links to its predecessor via the previous block's tree-hash root. + The root of the first slot is derived from the seed parameter. + """ + parent_root = Bytes32(bytes([root_seed]) * 32) + blocks: list[SignedBlock] = [] + for i in range(count): + block = empty_signed_block( + slot=Slot(start_slot + i), + parent_root=parent_root, + state_seed=(root_seed + i + 1) & 0xFF, + ) + blocks.append(block) + parent_root = hash_tree_root(block.block) + return blocks + + +def encode_success(block: SignedBlock) -> bytes: + """Encode a SignedBlock as a SUCCESS response chunk.""" + return ResponseCode.SUCCESS.encode(block.encode_bytes()) + + +class TestReqRespClientBlocksByRange: + """Tests for the outbound blocks-by-range request flow.""" + + async def test_zero_count_returns_empty_without_opening_stream(self, peer_id: PeerId) -> None: + """A count of zero short-circuits without opening a stream.""" + client = make_client() + conn = MockRangeConnection(peer_id=peer_id) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(0), Uint64(0)) + + assert blocks == [] + assert conn.opened_protocols == [] + + async def test_count_above_max_returns_empty_without_opening_stream( + self, peer_id: PeerId + ) -> None: + """A count strictly larger than MAX_REQUEST_BLOCKS is rejected locally.""" + client = make_client() + conn = MockRangeConnection(peer_id=peer_id) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range( + peer_id, Slot(0), Uint64(MAX_REQUEST_BLOCKS + 1) + ) + + assert blocks == [] + assert conn.opened_protocols == [] + + async def test_overflow_range_returns_empty_without_opening_stream( + self, peer_id: PeerId + ) -> None: + """A start_slot+count overflow above 2**64-1 is rejected locally.""" + client = make_client() + conn = MockRangeConnection(peer_id=peer_id) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + max_slot = int(Uint64.max_value()) + blocks = await client.request_blocks_by_range(peer_id, Slot(max_slot - 4), Uint64(10)) + + assert blocks == [] + assert conn.opened_protocols == [] + + async def test_no_connection_returns_empty(self, peer_id: PeerId) -> None: + """A request with no registered connection returns an empty list.""" + client = make_client() + + blocks = await client.request_blocks_by_range(peer_id, Slot(1), Uint64(3)) + + assert blocks == [] + + async def test_full_range_success(self, peer_id: PeerId) -> None: + """A clean response of count blocks is returned in order.""" + client = make_client() + chain = build_chain(start_slot=10, count=4) + + stream = MockRangeStream(response_chunks=[encode_success(b) for b in chain]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(10), Uint64(4)) + + assert blocks == chain + assert conn.opened_protocols == [BLOCKS_BY_RANGE_PROTOCOL_V1] + assert stream.closed is True + assert stream.finish_write_called is True + + async def test_partial_response_when_stream_closes_early(self, peer_id: PeerId) -> None: + """Stream closing before count is reached returns the partial list.""" + client = make_client() + chain = build_chain(start_slot=20, count=2) + + stream = MockRangeStream(response_chunks=[encode_success(b) for b in chain]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(20), Uint64(5)) + + assert blocks == chain + + async def test_resource_unavailable_chunks_are_skipped(self, peer_id: PeerId) -> None: + """RESOURCE_UNAVAILABLE chunks do not raise and the remaining blocks are returned.""" + client = make_client() + chain = build_chain(start_slot=30, count=2) + unavailable = ResponseCode.RESOURCE_UNAVAILABLE.encode(b"missing") + + stream = MockRangeStream( + response_chunks=[ + unavailable, + encode_success(chain[0]), + unavailable, + encode_success(chain[1]), + ], + ) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(30), Uint64(4)) + + assert blocks == chain + + async def test_non_monotonic_slots_raise_codec_error(self, peer_id: PeerId) -> None: + """A response with two blocks at the same slot is rejected as a protocol violation.""" + client = make_client() + first = empty_signed_block(Slot(40), Bytes32(b"\xaa" * 32), state_seed=1) + # Reuses the same slot as the first block. Parent root is irrelevant because + # the slot check fires before the parent-root check. + duplicate = empty_signed_block(Slot(40), hash_tree_root(first.block), state_seed=2) + + stream = MockRangeStream(response_chunks=[encode_success(first), encode_success(duplicate)]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + with pytest.raises(CodecError, match=r"Non-monotonic slot"): + await client.request_blocks_by_range(peer_id, Slot(40), Uint64(2)) + + async def test_out_of_range_slot_raises_codec_error(self, peer_id: PeerId) -> None: + """A block whose slot falls outside the requested range is rejected.""" + client = make_client() + # Request [50, 53). Peer responds with a block at slot 60. + out_of_range = empty_signed_block(Slot(60), Bytes32(b"\xaa" * 32), state_seed=1) + + stream = MockRangeStream(response_chunks=[encode_success(out_of_range)]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + with pytest.raises(CodecError, match=r"outside requested range"): + await client.request_blocks_by_range(peer_id, Slot(50), Uint64(3)) + + async def test_parent_root_continuity_violation_across_skipped_slot( + self, peer_id: PeerId + ) -> None: + """A wrong parent root after a skipped empty slot is rejected as a protocol violation. + + The responder serves canonical blocks only and skips empty slots. + So a block following an empty slot must still chain off the previous + non-empty block's root. + """ + client = make_client() + # Request [70, 75). Peer responds with slot 70 then slot 73. + # The slot 73 block's parent_root is wrong (zero root) instead of slot 70's root. + first = empty_signed_block(Slot(70), Bytes32(b"\xaa" * 32), state_seed=1) + # Wrong parent: should equal the tree-hash root of the first block. + bad = empty_signed_block(Slot(73), Bytes32.zero(), state_seed=2) + + stream = MockRangeStream(response_chunks=[encode_success(first), encode_success(bad)]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + with pytest.raises(CodecError, match=r"Parent root mismatch"): + await client.request_blocks_by_range(peer_id, Slot(70), Uint64(5)) + + async def test_parent_root_continuity_holds_across_skipped_slots(self, peer_id: PeerId) -> None: + """A correct parent_root linkage across empty slots is accepted.""" + client = make_client() + # Request [80, 90). Peer responds with slot 80 and slot 85, where the + # slot 85 block chains correctly off the slot 80 block. + first = empty_signed_block(Slot(80), Bytes32(b"\xaa" * 32), state_seed=1) + second = empty_signed_block(Slot(85), parent_root=hash_tree_root(first.block), state_seed=2) + + stream = MockRangeStream(response_chunks=[encode_success(first), encode_success(second)]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(80), Uint64(10)) + + assert blocks == [first, second] + + async def test_more_than_count_chunks_raises_codec_error(self, peer_id: PeerId) -> None: + """An extra chunk past the requested count is rejected as a protocol violation.""" + client = make_client() + chain = build_chain(start_slot=100, count=2) + # An extra third chunk that the peer is not allowed to send. + extra = empty_signed_block( + Slot(102), parent_root=hash_tree_root(chain[1].block), state_seed=99 + ) + + stream = MockRangeStream( + response_chunks=[ + encode_success(chain[0]), + encode_success(chain[1]), + encode_success(extra), + ], + ) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + with pytest.raises(CodecError, match=r"more than count"): + await client.request_blocks_by_range(peer_id, Slot(100), Uint64(2)) + + async def test_timeout_returns_empty_list(self, peer_id: PeerId) -> None: + """A request that times out returns an empty list rather than raising.""" + client = make_client() + client.timeout = 0.01 + conn = MockRangeConnection(peer_id=peer_id, streams=[MockRangeStream()]) + + async def slow_read() -> bytes: + await asyncio.sleep(1.0) + return b"" + + conn.streams[0].read = slow_read # type: ignore[method-assign] + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(0), Uint64(3)) + + assert blocks == [] + + async def test_server_error_stops_reading_and_returns_partial(self, peer_id: PeerId) -> None: + """A SERVER_ERROR chunk halts reading and returns blocks received so far.""" + client = make_client() + chain = build_chain(start_slot=200, count=1) + error_chunk = ResponseCode.SERVER_ERROR.encode(b"db boom") + + stream = MockRangeStream(response_chunks=[encode_success(chain[0]), error_chunk]) + conn = MockRangeConnection(peer_id=peer_id, streams=[stream]) + client.register_connection(peer_id, conn) # type: ignore[arg-type] + + blocks = await client.request_blocks_by_range(peer_id, Slot(200), Uint64(3)) + + assert blocks == chain diff --git a/tests/lean_spec/subspecs/networking/reqresp/test_handler.py b/tests/lean_spec/subspecs/networking/reqresp/test_handler.py index ce27a4401..d3607c865 100644 --- a/tests/lean_spec/subspecs/networking/reqresp/test_handler.py +++ b/tests/lean_spec/subspecs/networking/reqresp/test_handler.py @@ -4,30 +4,38 @@ import asyncio from dataclasses import dataclass, field +from typing import Final + +import pytest from lean_spec.forks.lstar.containers import Checkpoint, SignedBlock from lean_spec.forks.lstar.containers.slot import Slot -from lean_spec.subspecs.networking.config import MAX_ERROR_MESSAGE_SIZE +from lean_spec.subspecs.networking.config import MAX_ERROR_MESSAGE_SIZE, MAX_REQUEST_BLOCKS from lean_spec.subspecs.networking.reqresp.codec import ( ResponseCode, encode_request, ) from lean_spec.subspecs.networking.reqresp.handler import ( REQRESP_PROTOCOL_IDS, + AsyncBlockBySlotLookup, ReqRespServer, RequestHandler, StreamResponseAdapter, ) from lean_spec.subspecs.networking.reqresp.message import ( + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, STATUS_PROTOCOL_V1, + BlocksByRangeRequest, BlocksByRootRequest, RequestedBlockRoots, Status, ) from lean_spec.subspecs.networking.types import ProtocolId from lean_spec.subspecs.networking.varint import encode_varint -from lean_spec.types import Bytes32 +from lean_spec.subspecs.ssz import hash_tree_root +from lean_spec.types import Bytes32, Uint64 +from lean_spec.types.exceptions import SSZSerializationError from tests.lean_spec.helpers import make_test_block, make_test_status @@ -1193,3 +1201,430 @@ async def test_read_request_rejects_oversized_compressed_data(self) -> None: code, _ = ResponseCode.decode(stream.written[0]) assert code == ResponseCode.INVALID_REQUEST + + +class TestBlocksByRangeRequestRoundTrip: + """SSZ encode/decode round-trip tests.""" + + def test_basic_roundtrip(self) -> None: + """Encode then decode yields identical container.""" + req = BlocksByRangeRequest(start_slot=Slot(100), count=Uint64(10)) + encoded = req.encode_bytes() + assert len(encoded) == 16 # 8 (Slot) + 8 (Uint64) + + decoded = BlocksByRangeRequest.decode_bytes(encoded) + assert decoded.start_slot == Slot(100) + assert decoded.count == Uint64(10) + + def test_roundtrip_preserves_equality(self) -> None: + """Decoded request equals original.""" + original = BlocksByRangeRequest(start_slot=Slot(42), count=Uint64(1024)) + decoded = BlocksByRangeRequest.decode_bytes(original.encode_bytes()) + assert original == decoded + + def test_roundtrip_various_values(self) -> None: + """Round-trip works for diverse value combinations.""" + cases = [ + (Slot(0), Uint64(1)), + (Slot(1), Uint64(1)), + (Slot(0), Uint64(MAX_REQUEST_BLOCKS)), + (Slot(999_999), Uint64(512)), + ] + for start_slot, count in cases: + req = BlocksByRangeRequest(start_slot=start_slot, count=count) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.start_slot == start_slot + assert decoded.count == count + + +class TestBlocksByRangeRequestHashTreeRoot: + """hash_tree_root stability tests.""" + + def test_hash_tree_root_stable_across_reencodings(self) -> None: + """hash_tree_root is identical for equal requests re-encoded.""" + req1 = BlocksByRangeRequest(start_slot=Slot(100), count=Uint64(10)) + req2 = BlocksByRangeRequest(start_slot=Slot(100), count=Uint64(10)) + + root1 = hash_tree_root(req1) + root2 = hash_tree_root(req2) + assert root1 == root2 + + def test_hash_tree_root_after_decode(self) -> None: + """hash_tree_root matches between original and decoded copy.""" + original = BlocksByRangeRequest(start_slot=Slot(500), count=Uint64(64)) + decoded = BlocksByRangeRequest.decode_bytes(original.encode_bytes()) + + assert hash_tree_root(original) == hash_tree_root(decoded) + + def test_hash_tree_root_differs_for_different_values(self) -> None: + """Different requests produce different roots.""" + req_a = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(10)) + req_b = BlocksByRangeRequest(start_slot=Slot(1), count=Uint64(10)) + req_c = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(11)) + + assert hash_tree_root(req_a) != hash_tree_root(req_b) + assert hash_tree_root(req_a) != hash_tree_root(req_c) + + +class TestBlocksByRangeRequestBoundaryValues: + """Boundary and edge-case value tests.""" + + def test_start_slot_zero_decodes_cleanly(self) -> None: + """start_slot at Slot(0) encodes and decodes correctly.""" + req = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(1)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.start_slot == Slot(0) + + def test_large_start_slot_decodes_cleanly(self) -> None: + """Large start_slot values encode and decode correctly.""" + large_slot = Slot(2**63 - 1) # Very large but valid + req = BlocksByRangeRequest(start_slot=large_slot, count=Uint64(1)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.start_slot == large_slot + + def test_max_uint64_start_slot_decodes_cleanly(self) -> None: + """start_slot at Uint64 max boundary decodes cleanly.""" + max_slot = Slot(2**64 - 1) + req = BlocksByRangeRequest(start_slot=max_slot, count=Uint64(1)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.start_slot == max_slot + + def test_count_one(self) -> None: + """Minimum meaningful count (1) round-trips correctly.""" + req = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(1)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.count == Uint64(1) + + def test_count_max_request_blocks(self) -> None: + """count == MAX_REQUEST_BLOCKS round-trips correctly.""" + req = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(MAX_REQUEST_BLOCKS)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.count == Uint64(MAX_REQUEST_BLOCKS) + + def test_count_zero_encodes_and_decodes(self) -> None: + """count == 0 encodes/decodes at SSZ layer (validation is handler-level).""" + req = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(0)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.count == Uint64(0) + + def test_count_above_max_encodes_and_decodes(self) -> None: + """count > MAX_REQUEST_BLOCKS encodes/decodes at SSZ layer. + + Enforcement of MAX_REQUEST_BLOCKS is the handler's job, not SSZ's. + """ + req = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(MAX_REQUEST_BLOCKS + 1)) + decoded = BlocksByRangeRequest.decode_bytes(req.encode_bytes()) + assert decoded.count == Uint64(MAX_REQUEST_BLOCKS + 1) + + +class TestBlocksByRangeRequestMalformedPayloads: + """Truncated and oversized payload rejection tests.""" + + def test_truncated_payload_rejected(self) -> None: + """Payload shorter than 16 bytes is rejected.""" + truncated = b"\x01" * 15 # 15 bytes, need 16 + with pytest.raises(SSZSerializationError): + BlocksByRangeRequest.decode_bytes(truncated) + + def test_empty_payload_rejected(self) -> None: + """Zero-length payload is rejected.""" + with pytest.raises(SSZSerializationError): + BlocksByRangeRequest.decode_bytes(b"") + + def test_single_byte_rejected(self) -> None: + """Single byte payload is rejected.""" + with pytest.raises(SSZSerializationError): + BlocksByRangeRequest.decode_bytes(b"\x00") + + def test_eight_byte_payload_rejected(self) -> None: + """8 bytes (half-payload, single field) is rejected.""" + partial = (100).to_bytes(8, "little") + with pytest.raises(SSZSerializationError): + BlocksByRangeRequest.decode_bytes(partial) + + +class TestBlocksByRangeProtocolId: + """Protocol ID format tests.""" + + def test_protocol_id_format(self) -> None: + """Protocol ID follows lean namespace convention.""" + assert BLOCKS_BY_RANGE_PROTOCOL_V1.startswith("/leanconsensus/req/") + assert BLOCKS_BY_RANGE_PROTOCOL_V1.endswith("/ssz_snappy") + assert "blocks_by_range" in BLOCKS_BY_RANGE_PROTOCOL_V1 + + def test_protocol_id_version_is_v1(self) -> None: + """Protocol ID is version 1.""" + assert "/1/" in BLOCKS_BY_RANGE_PROTOCOL_V1 + + def test_protocol_id_distinct_from_blocks_by_root(self) -> None: + """BlocksByRange and BlocksByRoot have distinct protocol IDs.""" + assert BLOCKS_BY_RANGE_PROTOCOL_V1 != BLOCKS_BY_ROOT_PROTOCOL_V1 + + +def _slot_lookup_from(blocks: dict[int, SignedBlock]) -> AsyncBlockBySlotLookup: + """Build an AsyncBlockBySlotLookup that returns blocks from the given dict.""" + + async def _lookup(slot: Slot) -> SignedBlock | None: + return blocks.get(int(slot)) + + return _lookup + + +_DEFAULT_CURRENT_SLOT: Final[Slot] = Slot(5000) +"""Default current slot used by `_make_range_handler`. + +With MIN_SLOTS_FOR_BLOCK_REQUESTS=3600 the window floor is 1400. +Tests with start_slot below 1400 must override the current slot explicitly. +""" + + +def _make_range_handler( + blocks: dict[int, SignedBlock] | None = None, + current_slot: Slot = _DEFAULT_CURRENT_SLOT, + block_by_slot_lookup: AsyncBlockBySlotLookup | None = None, +) -> RequestHandler: + """Build a RequestHandler wired for BlocksByRange tests.""" + lookup = block_by_slot_lookup or _slot_lookup_from(blocks or {}) + return RequestHandler( + block_by_slot_lookup=lookup, + current_slot_lookup=lambda: current_slot, + ) + + +class TestRequestHandlerBlocksByRange: + """Tests for RequestHandler.handle_blocks_by_range.""" + + async def test_returns_exactly_count_consecutive_blocks(self) -> None: + """Returns exactly `count` consecutive blocks from start_slot when all retained.""" + blocks_db = {4000 + i: make_test_block(slot=4000 + i, seed=40 + i) for i in range(5)} + handler = _make_range_handler(blocks=blocks_db) + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(5)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + decoded_slots = [SignedBlock.decode_bytes(s).block.slot for s in response.successes] + assert response.errors == [] + assert decoded_slots == [Slot(4000 + i) for i in range(5)] + + async def test_returns_fewer_when_range_overruns_head(self) -> None: + """Returns fewer than count when range overruns head, no error.""" + blocks_db = {4000 + i: make_test_block(slot=4000 + i, seed=40 + i) for i in range(3)} + handler = _make_range_handler(blocks=blocks_db) + response = MockResponseStream() + + # Request 10 blocks but only 3 exist + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(10)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + decoded_slots = [SignedBlock.decode_bytes(s).block.slot for s in response.successes] + assert response.errors == [] + assert decoded_slots == [Slot(4000), Slot(4001), Slot(4002)] + + async def test_skips_empty_slots_preserves_monotonicity(self) -> None: + """Skips empty slots, preserves slot monotonicity.""" + # Blocks at slots 4000, 4002, 4004 (4001 and 4003 are empty) + blocks_db = { + 4000: make_test_block(slot=4000, seed=40), + 4002: make_test_block(slot=4002, seed=42), + 4004: make_test_block(slot=4004, seed=44), + } + handler = _make_range_handler(blocks=blocks_db) + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(5)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + decoded_slots = [SignedBlock.decode_bytes(s).block.slot for s in response.successes] + assert response.errors == [] + assert decoded_slots == [Slot(4000), Slot(4002), Slot(4004)] + + async def test_resource_unavailable_when_start_slot_predates_window(self) -> None: + """RESOURCE_UNAVAILABLE when start_slot is older than the sliding window. + + With current_slot=5000 and MIN_SLOTS_FOR_BLOCK_REQUESTS=3600 the floor is + slot 1400; start_slot=0 falls outside it. + """ + handler = _make_range_handler(current_slot=Slot(5000)) + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(10)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.successes == [] + assert len(response.errors) == 1 + assert response.errors[0][0] == ResponseCode.RESOURCE_UNAVAILABLE + + async def test_genesis_request_allowed_before_window_extends_below_zero(self) -> None: + """At genesis (current_slot < MIN_SLOTS_FOR_BLOCK_REQUESTS) start_slot=0 is in window.""" + block_zero = make_test_block(slot=0, seed=0) + handler = _make_range_handler(blocks={0: block_zero}, current_slot=Slot(100)) + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(0), count=Uint64(1)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + decoded_slots = [SignedBlock.decode_bytes(s).block.slot for s in response.successes] + assert response.errors == [] + assert decoded_slots == [Slot(0)] + + async def test_invalid_request_on_count_zero(self) -> None: + """INVALID_REQUEST when count == 0.""" + handler = _make_range_handler() + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(0)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.successes == [] + assert len(response.errors) == 1 + assert response.errors[0][0] == ResponseCode.INVALID_REQUEST + + async def test_invalid_request_on_count_exceeds_max(self) -> None: + """INVALID_REQUEST when count > MAX_REQUEST_BLOCKS.""" + handler = _make_range_handler() + response = MockResponseStream() + + request = BlocksByRangeRequest( + start_slot=Slot(4000), + count=Uint64(MAX_REQUEST_BLOCKS + 1), + ) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.successes == [] + assert len(response.errors) == 1 + assert response.errors[0][0] == ResponseCode.INVALID_REQUEST + + async def test_count_at_max_boundary_succeeds(self) -> None: + """count == MAX_REQUEST_BLOCKS is valid (boundary case).""" + handler = _make_range_handler() + response = MockResponseStream() + + request = BlocksByRangeRequest( + start_slot=Slot(4000), + count=Uint64(MAX_REQUEST_BLOCKS), + ) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.errors == [] + assert response.successes == [] + + async def test_no_block_by_slot_lookup_returns_error(self) -> None: + """SERVER_ERROR when no block_by_slot_lookup callback is configured.""" + handler = RequestHandler() # no lookups + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(5)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.successes == [] + assert len(response.errors) == 1 + assert response.errors[0][0] == ResponseCode.SERVER_ERROR + assert "Block lookup not available" in response.errors[0][1] + + async def test_no_current_slot_lookup_returns_error(self) -> None: + """SERVER_ERROR when current_slot_lookup is missing — window cannot be bounded.""" + handler = RequestHandler(block_by_slot_lookup=_slot_lookup_from({})) + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(5)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.successes == [] + assert len(response.errors) == 1 + assert response.errors[0][0] == ResponseCode.SERVER_ERROR + assert "Current slot not available" in response.errors[0][1] + + async def test_lookup_error_continues(self) -> None: + """Lookup exceptions are caught and processing continues.""" + block_at_4001 = make_test_block(slot=4001, seed=41) + + async def slot_lookup(slot: Slot) -> SignedBlock | None: + if int(slot) == 4000: + raise RuntimeError("Database error") + if int(slot) == 4001: + return block_at_4001 + return None + + handler = _make_range_handler(block_by_slot_lookup=slot_lookup) + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(3)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + decoded_slots = [SignedBlock.decode_bytes(s).block.slot for s in response.successes] + assert response.errors == [] + assert decoded_slots == [Slot(4001)] + + async def test_empty_range_returns_no_blocks(self) -> None: + """Range with no blocks at all returns empty (no error).""" + handler = _make_range_handler() + response = MockResponseStream() + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(5)) + await handler.handle_blocks_by_range(request, response) # type: ignore[arg-type] + + assert response.errors == [] + assert response.successes == [] + + +class TestReqRespServerBlocksByRange: + """Full ReqRespServer integration tests for BlocksByRange.""" + + async def test_handle_blocks_by_range_request(self) -> None: + """Full BlocksByRange request/response flow through ReqRespServer.""" + block = make_test_block(slot=4000, seed=40) + handler = _make_range_handler(blocks={4000: block}) + server = ReqRespServer(handler=handler) + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(1)) + request_bytes = encode_request(request.encode_bytes()) + + stream = MockStream(request_data=request_bytes) + await server.handle_stream(stream, BLOCKS_BY_RANGE_PROTOCOL_V1) + + assert stream.closed is True + success_blocks = [] + for chunk in stream.written: + code, ssz_bytes = ResponseCode.decode(chunk) + if code == ResponseCode.SUCCESS: + success_blocks.append(SignedBlock.decode_bytes(ssz_bytes)) + assert [b.block.slot for b in success_blocks] == [Slot(4000)] + + async def test_invalid_ssz_returns_invalid_request(self) -> None: + """Invalid SSZ for BlocksByRange returns INVALID_REQUEST.""" + handler = _make_range_handler() + server = ReqRespServer(handler=handler) + + invalid_ssz = b"\xff" * 10 + request_bytes = encode_request(invalid_ssz) + stream = MockStream(request_data=request_bytes) + + await server.handle_stream(stream, BLOCKS_BY_RANGE_PROTOCOL_V1) + + assert stream.closed is True + codes = [ResponseCode.decode(chunk)[0] for chunk in stream.written] + assert codes == [ResponseCode.INVALID_REQUEST] + + async def test_protocol_id_in_reqresp_set(self) -> None: + """BlocksByRange protocol ID is in REQRESP_PROTOCOL_IDS.""" + assert BLOCKS_BY_RANGE_PROTOCOL_V1 in REQRESP_PROTOCOL_IDS + + async def test_roundtrip_blocks_by_range_multiple(self) -> None: + """Full encode -> server -> decode roundtrip for multiple blocks.""" + blocks_db = {4000 + i: make_test_block(slot=4000 + i, seed=40 + i) for i in range(3)} + handler = _make_range_handler(blocks=blocks_db) + server = ReqRespServer(handler=handler) + + request = BlocksByRangeRequest(start_slot=Slot(4000), count=Uint64(3)) + request_wire = encode_request(request.encode_bytes()) + stream = MockStream(request_data=request_wire) + + await server.handle_stream(stream, BLOCKS_BY_RANGE_PROTOCOL_V1) + + success_blocks = [] + for response_wire in stream.written: + code, ssz_bytes = ResponseCode.decode(response_wire) + if code == ResponseCode.SUCCESS: + success_blocks.append(SignedBlock.decode_bytes(ssz_bytes)) + assert [b.block.slot for b in success_blocks] == [Slot(4000 + i) for i in range(3)] diff --git a/tests/lean_spec/subspecs/sync/test_backfill_sync.py b/tests/lean_spec/subspecs/sync/test_backfill_sync.py index d4fe76df2..37750c289 100644 --- a/tests/lean_spec/subspecs/sync/test_backfill_sync.py +++ b/tests/lean_spec/subspecs/sync/test_backfill_sync.py @@ -2,6 +2,8 @@ from __future__ import annotations +from dataclasses import dataclass, field + import pytest from lean_spec.forks.lstar.containers.slot import Slot @@ -14,14 +16,40 @@ from lean_spec.subspecs.sync.config import MAX_BACKFILL_DEPTH, MAX_BLOCKS_PER_REQUEST from lean_spec.subspecs.sync.peer_manager import ( INITIAL_PEER_SCORE, + SCORE_FAILURE_PENALTY, SCORE_SUCCESS_BONUS, PeerManager, SyncPeer, ) -from lean_spec.types import Bytes32 +from lean_spec.types import Bytes32, Uint64 from tests.lean_spec.helpers import MockNetworkRequester, make_signed_block +@dataclass +class FakeStoreView: + """In-memory StoreView used to drive backfill tests. + + Concrete implementation. Avoids MagicMock so tests fail loudly when + fields drift. Tests mutate `known_roots`, `head`, and `finalized` directly. + """ + + known_roots: set[Bytes32] = field(default_factory=set) + head: Slot = field(default_factory=lambda: Slot(0)) + finalized: Slot = field(default_factory=lambda: Slot(0)) + + def has_root(self, root: Bytes32) -> bool: + """Return True if the root has been registered with this view.""" + return root in self.known_roots + + def head_slot(self) -> Slot: + """Return the head slot stored on this view.""" + return self.head + + def finalized_slot(self) -> Slot: + """Return the finalized slot stored on this view.""" + return self.finalized + + @pytest.fixture def network() -> MockNetworkRequester: """Provide mock network.""" @@ -29,14 +57,23 @@ def network() -> MockNetworkRequester: @pytest.fixture -def backfill_system(peer_id: PeerId, network: MockNetworkRequester) -> BackfillSync: - """Provide a complete BackfillSync with connected peer.""" +def store_view() -> FakeStoreView: + """Provide a fresh in-memory StoreView.""" + return FakeStoreView() + + +@pytest.fixture +def backfill_system( + peer_id: PeerId, network: MockNetworkRequester, store_view: FakeStoreView +) -> BackfillSync: + """Provide a complete BackfillSync with connected peer and StoreView.""" manager = PeerManager() manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)) return BackfillSync( peer_manager=manager, block_cache=BlockCache(), network=network, + store_view=store_view, ) @@ -74,11 +111,22 @@ async def test_fetch_single_missing_block( async def test_recursive_parent_chain_resolution( self, - backfill_system: BackfillSync, - network: MockNetworkRequester, peer_id: PeerId, + network: MockNetworkRequester, ) -> None: - """Backfill recursively fetches missing parents up the chain.""" + """Backfill recursively fetches missing parents up the chain. + + No store view is supplied, so backfill resolves missing parents purely + by root recursion (the gap-detection path requires a view). + """ + manager = PeerManager() + manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)) + backfill = BackfillSync( + peer_manager=manager, + block_cache=BlockCache(), + network=network, + ) + grandparent = make_signed_block( slot=Slot(1), proposer_index=ValidatorIndex(0), @@ -103,11 +151,11 @@ async def test_recursive_parent_chain_resolution( ) child_root = network.add_block(child) - await backfill_system.fill_missing([child_root]) + await backfill.fill_missing([child_root]) - child_cached = backfill_system.block_cache.get(child_root) - parent_cached = backfill_system.block_cache.get(parent_root) - grandparent_cached = backfill_system.block_cache.get(grandparent_root) + child_cached = backfill.block_cache.get(child_root) + parent_cached = backfill.block_cache.get(parent_root) + grandparent_cached = backfill.block_cache.get(grandparent_root) assert child_cached is not None assert child_cached == PendingBlock( @@ -152,7 +200,8 @@ async def test_depth_limit_stops_infinite_recursion( await backfill_system.fill_missing([root], depth=MAX_BACKFILL_DEPTH) - assert network.request_log == [] + assert network.root_request_log == [] + assert network.range_request_log == [] assert root not in backfill_system.block_cache async def test_skips_already_cached_blocks( @@ -174,7 +223,8 @@ async def test_skips_already_cached_blocks( await backfill_system.fill_missing([block_root]) - assert network.request_log == [] + assert network.root_request_log == [] + assert network.range_request_log == [] class TestBatchingAndPeerManagement: @@ -192,7 +242,7 @@ async def test_large_request_split_into_batches( await backfill_system.fill_missing(roots) - assert network.request_log == [ + assert network.root_request_log == [ (peer_id, roots[:MAX_BLOCKS_PER_REQUEST]), (peer_id, roots[MAX_BLOCKS_PER_REQUEST:]), ] @@ -211,7 +261,8 @@ async def test_no_requests_without_available_peer( await backfill.fill_missing([Bytes32(b"\x01" * 32)]) - assert network.request_log == [] + assert network.root_request_log == [] + assert network.range_request_log == [] async def test_network_failure_handled_gracefully( self, @@ -272,12 +323,12 @@ async def test_in_flight_deduplication( # First call fetches the block (may also try to fetch its parent). await backfill_system.fill_missing([root]) - requests_after_first = len(network.request_log) + requests_after_first = len(network.root_request_log) assert requests_after_first >= 1 # Second call: root is now in cache, so no new request. await backfill_system.fill_missing([root]) - assert len(network.request_log) == requests_after_first + assert len(network.root_request_log) == requests_after_first async def test_retry_after_failure_clears_pending( self, @@ -310,3 +361,154 @@ async def test_retry_after_failure_clears_pending( network.should_fail = False await backfill.fill_missing([root]) assert root in backfill.block_cache + + +class TestBackfillOptimizations: + """Tests for range sync and store awareness in BackfillSync.""" + + async def test_store_awareness_skips_known_parents( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + store_view: FakeStoreView, + peer_id: PeerId, + ) -> None: + """Backfill does not request parents that are already in the Store.""" + # Parent is in the store; head is at the parent slot so no range gap fires. + parent_root = Bytes32(b"\x01" * 32) + store_view.known_roots.add(parent_root) + store_view.head = Slot(10) + store_view.finalized = Slot(10) + + # Child is received above the head. + child = make_signed_block( + slot=Slot(11), + parent_root=parent_root, + proposer_index=ValidatorIndex(0), + state_root=Bytes32.zero(), + ) + child_root = network.add_block(child) + + await backfill_system.fill_missing([child_root]) + + assert child_root in backfill_system.block_cache + # Only the initial child request is made; parent is in Store. + assert network.root_request_log == [(peer_id, [child_root])] + assert network.range_request_log == [] + + async def test_range_sync_triggered_by_gap_above_head( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + store_view: FakeStoreView, + peer_id: PeerId, + ) -> None: + """A large gap above head triggers a single range request to fill it. + + Floor is the head slot, not the finalized slot: slots above finalized + but at or below head are already canonical for us and are not refetched. + """ + # Store head is at slot 49, finalized is older at slot 10. + store_view.head = Slot(49) + store_view.finalized = Slot(10) + store_view.known_roots.add(Bytes32.zero()) + + # Pre-fill the parent in the network at slot 50. + block_50 = make_signed_block( + slot=Slot(50), + parent_root=Bytes32.zero(), + proposer_index=ValidatorIndex(0), + state_root=Bytes32.zero(), + ) + parent_root = network.add_block(block_50) + + # Receive a block at slot 100 via fill_missing. + block_100 = make_signed_block( + slot=Slot(100), + parent_root=parent_root, + proposer_index=ValidatorIndex(0), + state_root=Bytes32.zero(), + ) + root_100 = network.add_block(block_100) + + await backfill_system.fill_missing([root_100]) + + # First call fetches root_100 by root. + # That returns block_100; its parent (block_50) is unknown, triggering + # the gap path: range fetch (head+1=50, count=100-50=50) covers block_50. + # block_50's parent is ZERO_HASH which IS in the store, so recursion stops. + assert network.root_request_log == [(peer_id, [root_100])] + assert network.range_request_log == [(peer_id, Slot(50), Uint64(50))] + + async def test_range_deduplication( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + peer_id: PeerId, + ) -> None: + """Overlapping range requests skip slots already covered by a prior fetch.""" + # First range: 1-10 (count=10 means slots 1..10). + await backfill_system.fill_range(start_slot=Slot(1), count=Uint64(10)) + assert backfill_system._max_range_slot == Slot(10) + assert network.range_request_log == [(peer_id, Slot(1), Uint64(10))] + + # Overlapping range 5-15: only slots 11..15 should be re-requested. + await backfill_system.fill_range(start_slot=Slot(5), count=Uint64(11)) + + assert backfill_system._max_range_slot == Slot(15) + assert network.range_request_log == [ + (peer_id, Slot(1), Uint64(10)), + (peer_id, Slot(11), Uint64(5)), + ] + + async def test_full_range_skip_if_already_covered( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + peer_id: PeerId, + ) -> None: + """Range requests fully covered by previous successful ones are skipped.""" + await backfill_system.fill_range(start_slot=Slot(1), count=Uint64(100)) + assert network.range_request_log == [(peer_id, Slot(1), Uint64(100))] + + # Sub-range: no new request. + await backfill_system.fill_range(start_slot=Slot(10), count=Uint64(20)) + + assert network.range_request_log == [(peer_id, Slot(1), Uint64(100))] + + async def test_failed_range_does_not_advance_watermark( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + peer_id: PeerId, + ) -> None: + """A failed range fetch leaves the watermark untouched so retries are honored.""" + network.should_fail = True + + await backfill_system.fill_range(start_slot=Slot(1), count=Uint64(10)) + + # Watermark unchanged: a future call covering the same range can retry. + assert backfill_system._max_range_slot == Slot(0) + # The peer recorded a failure (score decreased). + peer = backfill_system.peer_manager.get_peer(peer_id) + assert peer is not None + assert peer.score == INITIAL_PEER_SCORE - SCORE_FAILURE_PENALTY + + # Retry succeeds. + network.should_fail = False + await backfill_system.fill_range(start_slot=Slot(1), count=Uint64(10)) + assert backfill_system._max_range_slot == Slot(10) + + async def test_reset_clears_watermark( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + ) -> None: + """reset() restores the watermark so post-reset range fetches are honored.""" + await backfill_system.fill_range(start_slot=Slot(1), count=Uint64(10)) + assert backfill_system._max_range_slot == Slot(10) + + backfill_system.reset() + + assert backfill_system._max_range_slot == Slot(0) + assert backfill_system._pending == set() diff --git a/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py b/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py new file mode 100644 index 000000000..434d38d42 --- /dev/null +++ b/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py @@ -0,0 +1,232 @@ +"""Tests for backfill routing decisions taken by head sync on gossip blocks. + +These cover the gap-detection branch in cache-and-backfill: + +- Reject blocks at or below the finalized slot (no cache, no backfill). +- Accept blocks above head with a single-slot gap and route to root recursion. +- Accept blocks above head with a multi-slot gap and route to range fetch. +- Accept alt-fork blocks at or below head and route to root recursion. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, cast + +from lean_spec.forks.lstar import Store +from lean_spec.forks.lstar.containers import SignedBlock +from lean_spec.forks.lstar.containers.slot import Slot +from lean_spec.forks.lstar.containers.validator import ValidatorIndex +from lean_spec.subspecs.networking import PeerId +from lean_spec.subspecs.ssz.hash import hash_tree_root +from lean_spec.subspecs.sync.backfill_sync import BackfillSync +from lean_spec.subspecs.sync.block_cache import BlockCache +from lean_spec.subspecs.sync.head_sync import HeadSync, HeadSyncResult +from lean_spec.types import Bytes32, Uint64 +from tests.lean_spec.helpers import MockForkchoiceStore, make_signed_block + + +@dataclass +class _RecordingBackfill: + """Backfill stub recording range and root-recursion requests. + + Used as the I/O boundary mock for head sync's backfill dependency. + Each method appends its arguments verbatim and performs no real work. + """ + + range_calls: list[tuple[Slot, Uint64]] = field(default_factory=list) + """Recorded range fetch requests as (start_slot, count).""" + + missing_calls: list[list[Bytes32]] = field(default_factory=list) + """Recorded root recursion requests, each entry the full roots list.""" + + async def fill_range(self, start_slot: Slot, count: Uint64) -> None: + """Record a range fetch request.""" + self.range_calls.append((start_slot, count)) + + async def fill_missing(self, roots: list[Bytes32]) -> None: + """Record a root recursion request.""" + self.missing_calls.append(roots) + + +def _backfill() -> tuple[_RecordingBackfill, BackfillSync]: + """Build a recording backfill double and the BackfillSync-typed view.""" + recorder = _RecordingBackfill() + return recorder, cast(BackfillSync, recorder) + + +def _store_with_head( + *, + finalized_slot: int, + head_slot: int, +) -> tuple[Store, Bytes32]: + """Build a mock forkchoice store with a finalized slot and a head block. + + Returns the typed store and the head root. The head block carries the + requested head slot so subsequent gap math sees the configured value. + """ + store = MockForkchoiceStore() + store.latest_finalized.slot = Slot(finalized_slot) + + head_root = Bytes32(b"\x77" * 32) + head_block = make_signed_block( + slot=Slot(head_slot), + proposer_index=ValidatorIndex(0), + parent_root=Bytes32.zero(), + state_root=Bytes32(b"\x88" * 32), + ) + store.blocks[head_root] = head_block.block + store.head = head_root + store.head_slot = Slot(head_slot) + return cast(Store, store), head_root + + +def _make_head_sync(backfill: BackfillSync) -> HeadSync: + """Create a HeadSync wired to the supplied backfill double.""" + + def never_called(_s: Any, _b: SignedBlock) -> Any: + # The cache-and-backfill path must never invoke block processing. + raise AssertionError("process_block must not be called for cached blocks") + + return HeadSync( + block_cache=BlockCache(), + backfill=backfill, + process_block=never_called, + ) + + +def _gossip_block(slot: int, parent_root: Bytes32, state_seed: int) -> SignedBlock: + """Construct a gossip block with the requested slot and parent_root.""" + return make_signed_block( + slot=Slot(slot), + proposer_index=ValidatorIndex(0), + parent_root=parent_root, + state_root=Bytes32(bytes([state_seed]) * 32), + ) + + +class TestRejectionBelowFinalized: + """Tests for the finalized-slot floor on incoming gossip blocks.""" + + async def test_block_at_finalized_slot_is_silently_rejected(self, peer_id: PeerId) -> None: + """A gossip block at slot equal to finalized is dropped without effect.""" + store, _head_root = _store_with_head(finalized_slot=10, head_slot=10) + recorder, backfill = _backfill() + head_sync = _make_head_sync(backfill) + + unknown_parent = Bytes32(b"\x33" * 32) + block = _gossip_block(slot=10, parent_root=unknown_parent, state_seed=1) + block_root = hash_tree_root(block.block) + + result, returned_store = await head_sync.on_gossip_block(block, peer_id, store) + + assert result == HeadSyncResult( + processed=False, + cached=False, + backfill_triggered=False, + descendants_processed=0, + ) + assert returned_store is store + assert recorder.range_calls == [] + assert recorder.missing_calls == [] + assert block_root not in head_sync.block_cache + + async def test_block_below_finalized_slot_is_silently_rejected(self, peer_id: PeerId) -> None: + """A gossip block at slot below finalized is dropped without effect.""" + store, _head_root = _store_with_head(finalized_slot=10, head_slot=20) + recorder, backfill = _backfill() + head_sync = _make_head_sync(backfill) + + unknown_parent = Bytes32(b"\x33" * 32) + block = _gossip_block(slot=5, parent_root=unknown_parent, state_seed=1) + block_root = hash_tree_root(block.block) + + result, returned_store = await head_sync.on_gossip_block(block, peer_id, store) + + assert result == HeadSyncResult( + processed=False, + cached=False, + backfill_triggered=False, + descendants_processed=0, + ) + assert returned_store is store + assert recorder.range_calls == [] + assert recorder.missing_calls == [] + assert block_root not in head_sync.block_cache + + +class TestBackfillRoutingAboveHead: + """Tests for routing between range and root backfill above the head slot.""" + + async def test_single_slot_gap_above_head_uses_root_recursion(self, peer_id: PeerId) -> None: + """A gossip block at exactly head+1 with unknown parent recurses by root.""" + store, _head_root = _store_with_head(finalized_slot=5, head_slot=10) + recorder, backfill = _backfill() + head_sync = _make_head_sync(backfill) + + unknown_parent = Bytes32(b"\x44" * 32) + block = _gossip_block(slot=11, parent_root=unknown_parent, state_seed=1) + block_root = hash_tree_root(block.block) + + result, _ = await head_sync.on_gossip_block(block, peer_id, store) + + assert result == HeadSyncResult( + processed=False, + cached=True, + backfill_triggered=True, + descendants_processed=0, + ) + assert recorder.range_calls == [] + assert recorder.missing_calls == [[unknown_parent]] + assert block_root in head_sync.block_cache + + async def test_multi_slot_gap_above_head_uses_range_fetch(self, peer_id: PeerId) -> None: + """A gossip block far above head triggers a contiguous range fetch.""" + store, _head_root = _store_with_head(finalized_slot=10, head_slot=20) + recorder, backfill = _backfill() + head_sync = _make_head_sync(backfill) + + unknown_parent = Bytes32(b"\x55" * 32) + block = _gossip_block(slot=100, parent_root=unknown_parent, state_seed=1) + block_root = hash_tree_root(block.block) + + result, _ = await head_sync.on_gossip_block(block, peer_id, store) + + assert result == HeadSyncResult( + processed=False, + cached=True, + backfill_triggered=True, + descendants_processed=0, + ) + # gap_floor = head+1 = 21, gap_size = 100 - 21 = 79. + assert recorder.range_calls == [(Slot(21), Uint64(79))] + assert recorder.missing_calls == [] + assert block_root in head_sync.block_cache + + +class TestAltForkRoutingAtOrBelowHead: + """Tests for alt-fork gossip handling when the slot is at or below head.""" + + async def test_alt_fork_block_below_head_above_finalized_uses_root_recursion( + self, peer_id: PeerId + ) -> None: + """An alt-fork block at slot below head and above finalized recurses by root.""" + store, _head_root = _store_with_head(finalized_slot=10, head_slot=20) + recorder, backfill = _backfill() + head_sync = _make_head_sync(backfill) + + unknown_parent = Bytes32(b"\x66" * 32) + block = _gossip_block(slot=15, parent_root=unknown_parent, state_seed=1) + block_root = hash_tree_root(block.block) + + result, _ = await head_sync.on_gossip_block(block, peer_id, store) + + assert result == HeadSyncResult( + processed=False, + cached=True, + backfill_triggered=True, + descendants_processed=0, + ) + assert recorder.range_calls == [] + assert recorder.missing_calls == [[unknown_parent]] + assert block_root in head_sync.block_cache