diff --git a/scripts/collect-devnet-logs.sh b/scripts/collect-devnet-logs.sh new file mode 100755 index 0000000..bf73705 --- /dev/null +++ b/scripts/collect-devnet-logs.sh @@ -0,0 +1,331 @@ +#!/usr/bin/env bash +# Collect Docker logs and runtime metadata from every devnet validator host, +# fetch them in parallel to the local machine, and bundle the result into a +# single timestamped tar.gz under ./tmp/. +# +# Host list defaults to ansible-devnet/genesis/validator-config.yaml (each +# validator's enrFields.ip). Use --inventory to use Ansible inventory instead. +# +# Usage: +# scripts/collect-devnet-logs.sh [options] +# +# Options: +# --validator-config PATH +# Validator config YAML (default: ansible-devnet/genesis/validator-config.yaml) +# --inventory PATH Use Ansible inventory for node list and ansible_host instead +# of validator-config (disables validator-config default) +# --nodes LIST Comma-separated subset of node names +# (default: all validators / all inventory nodes) +# --since DURATION Only include log entries newer than DURATION +# (e.g. "2h", "30m", "2026-04-23T00:00:00"; default: unset = full log) +# --tail N Only include the last N log lines per node +# (default: unset = full log) +# --output DIR Directory to place the bundle and staging data in +# (default: ./tmp) +# --keep-staging Do not delete the per-node staging directory after bundling +# --jobs N Parallel SSH fan-out (default: 8) +# --ssh-key PATH Private key used to authenticate over SSH +# (default: ~/.ssh/id_ed25519_github) +# --ssh-user USER Remote login user +# (default: root) +# -h | --help Show this help +# +# The bundle layout is: +# devnet-logs-/ +# manifest.txt # nodes + hosts + capture time +# / +# docker.log # `docker logs` stdout+stderr, with timestamps +# docker-inspect.json # `docker inspect ` +# docker-ps.txt # `docker ps -a` on the host +# ssh.log # any SSH-level errors for this node +# +# Requires locally: bash, ssh, tar, python3, PyYAML (to parse YAML). +# Requires on each remote host: docker. + +set -u +set -o pipefail + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/.." &>/dev/null && pwd)" + +USE_INVENTORY=false +VALIDATOR_CONFIG="${REPO_ROOT}/ansible-devnet/genesis/validator-config.yaml" +INVENTORY="${REPO_ROOT}/ansible/inventory/hosts.yml" +NODES_FILTER="" +SINCE="" +TAIL="" +OUTPUT_DIR="${REPO_ROOT}/tmp" +KEEP_STAGING=false +JOBS=8 +SSH_KEY_DEFAULT="${HOME}/.ssh/id_ed25519_github" +SSH_USER_DEFAULT="root" + +print_help() { + sed -n '2,/^set -u/p' "${BASH_SOURCE[0]}" | sed -E 's/^# ?//' | sed -e '/^set -u/,$d' +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --validator-config) VALIDATOR_CONFIG="$2"; USE_INVENTORY=false; shift 2 ;; + --inventory) INVENTORY="$2"; USE_INVENTORY=true; shift 2 ;; + --nodes) NODES_FILTER="$2"; shift 2 ;; + --since) SINCE="$2"; shift 2 ;; + --tail) TAIL="$2"; shift 2 ;; + --output) OUTPUT_DIR="$2"; shift 2 ;; + --keep-staging) KEEP_STAGING=true; shift ;; + --jobs) JOBS="$2"; shift 2 ;; + --ssh-key) SSH_KEY_DEFAULT="$2"; shift 2 ;; + --ssh-user) SSH_USER_DEFAULT="$2"; shift 2 ;; + -h|--help) print_help; exit 0 ;; + *) echo "unknown argument: $1" >&2; print_help >&2; exit 2 ;; + esac +done + +if ${USE_INVENTORY}; then + if [[ ! -f "${INVENTORY}" ]]; then + echo "inventory not found: ${INVENTORY}" >&2 + exit 1 + fi +else + if [[ ! -f "${VALIDATOR_CONFIG}" ]]; then + echo "validator config not found: ${VALIDATOR_CONFIG}" >&2 + exit 1 + fi +fi + +if ! command -v python3 >/dev/null 2>&1; then + echo "python3 is required (used to parse YAML)" >&2 + exit 1 +fi + +if [[ ! -f "${SSH_KEY_DEFAULT}" ]]; then + echo "ssh key not found: ${SSH_KEY_DEFAULT}" >&2 + echo "pass --ssh-key PATH to override" >&2 + exit 1 +fi + +# Extract "\t\t\t" lines (user/key columns are ignored +# for SSH; collect_one uses --ssh-user / --ssh-key). Source: validator-config +# or Ansible inventory depending on USE_INVENTORY. +NODES_TSV="$( + INVENTORY_PATH="${INVENTORY}" \ + VALIDATOR_CONFIG_PATH="${VALIDATOR_CONFIG}" \ + USE_INVENTORY="${USE_INVENTORY}" \ + NODES_FILTER="${NODES_FILTER}" \ + python3 - <<'PY' +import os, sys +try: + import yaml +except ImportError as exc: + sys.stderr.write(f"PyYAML not available ({exc}); pip install pyyaml\n") + sys.exit(1) + +nodes_filter = {n.strip() for n in os.environ.get("NODES_FILTER", "").split(",") if n.strip()} +use_inventory = os.environ.get("USE_INVENTORY", "false").lower() in ("1", "true", "yes") +rows: list[str] = [] + +if use_inventory: + inventory_path = os.environ["INVENTORY_PATH"] + with open(inventory_path, "r") as f: + data = yaml.safe_load(f) + children = data.get("all", {}).get("children", {}) or {} + for group_name, group in children.items(): + if group_name in ("local", "bootnodes"): + continue + for node_name, node_vars in (group.get("hosts") or {}).items(): + host = node_vars.get("ansible_host") + user = node_vars.get("ansible_user") or os.environ.get("USER", "root") + key = node_vars.get("ansible_ssh_private_key_file") or "" + if not host: + continue + if nodes_filter and node_name not in nodes_filter: + continue + rows.append("\t".join([node_name, user, host, key])) +else: + vc_path = os.environ["VALIDATOR_CONFIG_PATH"] + with open(vc_path, "r") as f: + data = yaml.safe_load(f) + for v in data.get("validators") or []: + node_name = v.get("name") + enr = v.get("enrFields") or {} + host = enr.get("ip") + if not node_name or not host: + continue + if nodes_filter and node_name not in nodes_filter: + continue + rows.append("\t".join([str(node_name), "_", str(host), ""])) + +for row in rows: + print(row) +PY +)" + +if [[ -z "${NODES_TSV}" ]]; then + echo "no nodes matched (filter='${NODES_FILTER}')" >&2 + exit 1 +fi + +TIMESTAMP="$(date -u +"%Y%m%dT%H%M%SZ")" +BUNDLE_NAME="devnet-logs-${TIMESTAMP}" +STAGING_DIR="${OUTPUT_DIR}/${BUNDLE_NAME}" +BUNDLE_PATH="${OUTPUT_DIR}/${BUNDLE_NAME}.tar.gz" + +mkdir -p "${STAGING_DIR}" + +# Build a compact manifest up front so we have context even if the run aborts. +{ + echo "bundle: ${BUNDLE_NAME}" + echo "captured_at_utc: ${TIMESTAMP}" + echo "captured_from: $(hostname)" + if ${USE_INVENTORY}; then + echo "node_source: inventory" + echo "inventory: ${INVENTORY}" + else + echo "node_source: validator-config" + echo "validator_config: ${VALIDATOR_CONFIG}" + fi + echo "since: ${SINCE:-}" + echo "tail: ${TAIL:-}" + echo "parallel_jobs: ${JOBS}" + echo "" + printf "%-14s %-6s %s\n" "NODE" "USER" "HOST" + printf "%-14s %-6s %s\n" "----" "----" "----" + while IFS=$'\t' read -r node user host _key; do + [[ -z "${node}" ]] && continue + printf "%-14s %-6s %s\n" "${node}" "${user}" "${host}" + done <<<"${NODES_TSV}" +} >"${STAGING_DIR}/manifest.txt" + +# Assemble the remote command. We always stream everything to stdout on the +# remote, wrapped in `--- BEGIN/END
---` markers; the local side +# splits it back into per-file artifacts. This keeps the SSH invocation to a +# single round-trip per node. +DOCKER_LOGS_CMD="docker logs --timestamps" +[[ -n "${SINCE}" ]] && DOCKER_LOGS_CMD+=" --since '${SINCE}'" +[[ -n "${TAIL}" ]] && DOCKER_LOGS_CMD+=" --tail '${TAIL}'" + +# Per-node fetcher. Runs in a subshell so it can be backgrounded by xargs. +collect_one() { + local tsv_line="$1" + IFS=$'\t' read -r node user host key <<<"${tsv_line}" + [[ -z "${node}" ]] && return 0 + + # Always log in as root with the GitHub SSH key, regardless of inventory + # overrides or the caller's SSH agent state. + user="${SSH_USER_DEFAULT}" + key="${SSH_KEY_DEFAULT}" + + local node_dir="${STAGING_DIR}/${node}" + mkdir -p "${node_dir}" + + local ssh_opts=( + -o BatchMode=yes + -o ConnectTimeout=10 + -o StrictHostKeyChecking=accept-new + -o UserKnownHostsFile=/dev/null + -o LogLevel=ERROR + -o ServerAliveInterval=30 + -o IdentitiesOnly=yes + ) + [[ -n "${key}" ]] && ssh_opts+=(-i "${key}") + + local remote_script + remote_script=$(cat <&1 || true +echo "--- END docker-ps ---" +echo "--- BEGIN docker-inspect ---" +docker inspect '${node}' 2>&1 || true +echo "--- END docker-inspect ---" +echo "--- BEGIN docker-log ---" +${DOCKER_LOGS_CMD} '${node}' 2>&1 || true +echo "--- END docker-log ---" +REMOTE +) + + local raw="${node_dir}/.raw.out" + if ! ssh "${ssh_opts[@]}" "${user}@${host}" "${remote_script}" \ + >"${raw}" 2>"${node_dir}/ssh.log"; then + echo "[${node}] ssh to ${user}@${host} failed (see ssh.log)" >&2 + fi + + # Split the single stream back into per-section files, stripping markers. + python3 - "${raw}" "${node_dir}" <<'SPLIT' +import sys, os, re +raw_path, out_dir = sys.argv[1], sys.argv[2] +sections = { + "docker-ps": "docker-ps.txt", + "docker-inspect": "docker-inspect.json", + "docker-log": "docker.log", +} +try: + with open(raw_path, "r", errors="replace") as f: + text = f.read() +except FileNotFoundError: + sys.exit(0) +for tag, fname in sections.items(): + m = re.search( + rf"^--- BEGIN {re.escape(tag)} ---\n(.*?)^--- END {re.escape(tag)} ---\n", + text, + flags=re.MULTILINE | re.DOTALL, + ) + out_path = os.path.join(out_dir, fname) + with open(out_path, "w") as f: + f.write(m.group(1) if m else "") +os.remove(raw_path) +SPLIT + + local log_size="?" + if [[ -f "${node_dir}/docker.log" ]]; then + log_size=$(wc -c <"${node_dir}/docker.log" | tr -d ' ') + fi + echo "[${node}] captured (log bytes=${log_size})" +} + +NODE_COUNT="$(printf '%s\n' "${NODES_TSV}" | wc -l | tr -d ' ')" +echo "Collecting logs for ${NODE_COUNT} node(s) -> ${STAGING_DIR}" + +# Fan out across nodes with bounded parallelism using background jobs. On +# macOS's bash 3.2 there is no `wait -n`, so we run in fixed-size waves of +# ${JOBS} and `wait` for each wave to drain before starting the next one. +wave=() +flush_wave() { + # macOS bash 3.2 + `set -u` does not tolerate `${wave[@]}` when `wave` + # has never been assigned, so guard both the count check and the + # expansion with the `${var+...}` / `${var[@]-}` defaults. + if [[ "${#wave[@]}" -eq 0 ]]; then + return 0 + fi + local pid + for pid in "${wave[@]}"; do + wait "${pid}" || true + done + wave=() +} + +while IFS= read -r tsv_line; do + [[ -z "${tsv_line}" ]] && continue + collect_one "${tsv_line}" & + wave+=("$!") + if (( ${#wave[@]} >= JOBS )); then + flush_wave + fi +done <<<"${NODES_TSV}" +flush_wave + +# Build the tarball. Working directly in ${OUTPUT_DIR} keeps the leading path +# component inside the archive equal to the bundle name. +( cd "${OUTPUT_DIR}" && tar -czf "${BUNDLE_NAME}.tar.gz" "${BUNDLE_NAME}" ) + +if ! ${KEEP_STAGING}; then + rm -rf "${STAGING_DIR}" +fi + +BUNDLE_SIZE=$(du -h "${BUNDLE_PATH}" | awk '{print $1}') +echo "" +echo "Bundle: ${BUNDLE_PATH}" +echo "Size: ${BUNDLE_SIZE}" +if ${KEEP_STAGING}; then + echo "Staging: ${STAGING_DIR}" +fi diff --git a/scripts/measure-aggregation-metrics.sh b/scripts/measure-aggregation-metrics.sh new file mode 100755 index 0000000..41c39d0 --- /dev/null +++ b/scripts/measure-aggregation-metrics.sh @@ -0,0 +1,267 @@ +#!/usr/bin/env bash +# Scrape zeam aggregator Prometheus metrics and summarize aggregation performance. +# +# Usage: +# scripts/measure-aggregation-metrics.sh [options] +# +# Options: +# --validator-config PATH (default: ansible-devnet/genesis/validator-config.yaml) +# --nodes LIST Comma-separated zeam node names (default: zeam aggregators) +# --ssh-key PATH (default: ~/.ssh/id_ed25519_github) +# --ssh-user USER (default: root) +# --log-since DURATION Docker log grep window per node (default: 30m) +# --output FILE Write JSON summary here (default: stdout only) +# -h | --help + +set -u +set -o pipefail + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/.." &>/dev/null && pwd)" + +VALIDATOR_CONFIG="${REPO_ROOT}/ansible-devnet/genesis/validator-config.yaml" +NODES_FILTER="" +SSH_KEY="${HOME}/.ssh/id_ed25519_github" +SSH_USER="root" +LOG_SINCE="30m" +OUTPUT_FILE="" + +print_help() { + sed -n '2,/^set -u/p' "${BASH_SOURCE[0]}" | sed -E 's/^# ?//' | sed -e '/^set -u/,$d' +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --validator-config) VALIDATOR_CONFIG="$2"; shift 2 ;; + --nodes) NODES_FILTER="$2"; shift 2 ;; + --ssh-key) SSH_KEY="$2"; shift 2 ;; + --ssh-user) SSH_USER="$2"; shift 2 ;; + --log-since) LOG_SINCE="$2"; shift 2 ;; + --output) OUTPUT_FILE="$2"; shift 2 ;; + -h|--help) print_help; exit 0 ;; + *) echo "unknown argument: $1" >&2; exit 2 ;; + esac +done + +if [[ ! -f "$VALIDATOR_CONFIG" ]]; then + echo "validator config not found: $VALIDATOR_CONFIG" >&2 + exit 1 +fi + +export VALIDATOR_CONFIG NODES_FILTER +NODE_TABLE="$(python3 - <<'PY' +import os, sys +try: + import yaml +except ImportError: + sys.stderr.write("PyYAML required: pip install pyyaml\n") + sys.exit(1) + +cfg_path = os.environ["VALIDATOR_CONFIG"] +nodes_filter = os.environ.get("NODES_FILTER", "").strip() +want = set(x.strip() for x in nodes_filter.split(",") if x.strip()) if nodes_filter else None + +with open(cfg_path) as f: + cfg = yaml.safe_load(f) + +validators = cfg.get("validators", cfg) +if isinstance(validators, dict): + validators = validators.values() + +rows = [] +for v in validators: + if not isinstance(v, dict): + continue + name = v.get("name", "") + client = name.split("_")[0] + if client != "zeam": + continue + if want is not None and name not in want: + continue + if not v.get("isAggregator", False): + if want is None: + continue + ip = v["enrFields"]["ip"] + port = v.get("metricsPort", 9102) + rows.append((name, ip, port, bool(v.get("isAggregator", False)))) + +if not rows: + sys.stderr.write("no matching zeam nodes\n") + sys.exit(1) + +for name, ip, port, agg in sorted(rows): + print(f"{name}\t{ip}\t{port}\t{int(agg)}") +PY +)" || exit 1 + +histogram_quantile() { + local quantile="$1" + local metric_prefix="$2" + local body="$3" + python3 - "$quantile" "$metric_prefix" <<'PY' <<<"$body" +import sys +q = float(sys.argv[1]) +prefix = sys.argv[2] +text = sys.stdin.read() +buckets = {} +count = 0 +sum_v = 0.0 +for line in text.splitlines(): + if not line.startswith(prefix + "_bucket"): + if line.startswith(prefix + "_count"): + count = int(float(line.rsplit(" ", 1)[-1])) + elif line.startswith(prefix + "_sum"): + sum_v = float(line.rsplit(" ", 1)[-1]) + continue + le_part, val_s = line.rsplit(" ", 1) + le = le_part.split("le=\"")[-1].rstrip("\"}") + val = float(val_s) + buckets[le] = val +if count == 0: + print("n/a") + sys.exit(0) +if not buckets: + print(f"{sum_v/count:.3f}") + sys.exit(0) +items = sorted(buckets.items(), key=lambda kv: float("inf") if kv[0] == "+Inf" else float(kv[0])) +target = q * count +prev_le = 0.0 +prev_count = 0.0 +for le_s, cum in items: + le = float("inf") if le_s == "+Inf" else float(le_s) + if cum >= target: + if cum == prev_count: + print(f"{le:.3f}") + else: + frac = (target - prev_count) / (cum - prev_count) + est = prev_le + frac * (le - prev_le) + print(f"{est:.3f}") + break + prev_le = le + prev_count = cum +else: + print(f"{sum_v/count:.3f}") +PY +} + +counter_sum() { + local prefix="$1" + local body="$2" + echo "$body" | awk -v p="$prefix" '$1 ~ "^"p"{s+=$NF} END{printf "%.0f", s+0}' +} + +labeled_counter() { + local prefix="$1" + local body="$2" + echo "$body" | awk -v p="$prefix" '$1 ~ "^"p"{"{print}' | sort +} + +gauge_value() { + local pattern="$1" + local body="$2" + echo "$body" | awk -v p="$pattern" '$1 ~ p {print $NF; exit}' +} + +fetch_metrics() { + local ip="$1" + local port="$2" + curl -sf --max-time 8 "http://${ip}:${port}/metrics" 2>/dev/null || true +} + +fetch_remote_logs() { + local ip="$1" + local node="$2" + ssh -i "$SSH_KEY" -o BatchMode=yes -o ConnectTimeout=8 -o StrictHostKeyChecking=accept-new \ + "${SSH_USER}@${ip}" \ + "docker logs --since ${LOG_SINCE} zeam-${node} 2>&1 || docker logs --since ${LOG_SINCE} ${node} 2>&1 || true" 2>/dev/null || true +} + +verify_image() { + local ip="$1" + local node="$2" + ssh -i "$SSH_KEY" -o BatchMode=yes -o ConnectTimeout=8 -o StrictHostKeyChecking=accept-new \ + "${SSH_USER}@${ip}" \ + "docker ps --format '{{.Names}} {{.Image}}' | grep -E '${node}|zeam-${node}' | head -1" 2>/dev/null || true +} + +TS="$(date -u +"%Y-%m-%dT%H:%M:%SZ")" +echo "=== Zeam aggregation metrics @ ${TS} ===" +echo "validator-config: ${VALIDATOR_CONFIG}" +echo "log window: ${LOG_SINCE}" +echo + +JSON_LINES=() +JSON_LINES+=("{") +JSON_LINES+=("\"captured_at\": \"${TS}\",") +JSON_LINES+=("\"nodes\": [") + +first_node=true +while IFS=$'\t' read -r node ip port is_agg; do + [[ -z "$node" ]] && continue + echo "--- ${node} (${ip}:${port}) aggregator=${is_agg} ---" + + image_line="$(verify_image "$ip" "$node")" + if [[ -n "$image_line" ]]; then + echo "container: ${image_line}" + else + echo "container: (not running or unreachable via SSH)" + fi + + body="$(fetch_metrics "$ip" "$port")" + if [[ -z "$body" ]]; then + echo "metrics: UNREACHABLE" + echo + continue + fi + + worker_p50="$(histogram_quantile 0.5 zeam_aggregate_worker_duration_seconds "$body")" + worker_p95="$(histogram_quantile 0.95 zeam_aggregate_worker_duration_seconds "$body")" + worker_count="$(counter_sum zeam_aggregate_worker_duration_seconds_count "$body")" + worker_sum="$(echo "$body" | awk '/^zeam_aggregate_worker_duration_seconds_sum /{print $2; exit}')" + build_p50="$(histogram_quantile 0.5 lean_pq_sig_aggregated_signatures_building_time_seconds "$body")" + build_count="$(counter_sum lean_pq_sig_aggregated_signatures_building_time_seconds_count "$body")" + publish_total="$(counter_sum zeam_aggregator_publish_aggregations_total "$body")" + timely_cov="$(gauge_value 'lean_attestation_aggregate_coverage_validators\{section="timely",subnet="combined"\}' "$body")" + late_cov="$(gauge_value 'lean_attestation_aggregate_coverage_validators\{section="late",subnet="combined"\}' "$body")" + combined_cov="$(gauge_value 'lean_attestation_aggregate_coverage_validators\{section="combined",subnet="combined"\}' "$body")" + + echo "worker_duration_seconds: count=${worker_count} sum=${worker_sum:-0} p50=${worker_p50} p95=${worker_p95}" + echo "building_time_seconds (wrap only): count=${build_count} p50=${build_p50}" + echo "publish_aggregations_total: ${publish_total}" + echo "coverage_validators (latest gauge): timely=${timely_cov:-?} late=${late_cov:-?} combined=${combined_cov:-?}" + echo "aggregate_skip_total:" + labeled_counter zeam_aggregate_skip_total "$body" | sed 's/^/ /' + echo "building_phase_seconds (sum/count):" + echo "$body" | awk '/^zeam_pq_sig_aggregated_signatures_building_phase_seconds_sum\{/{print " "$0}' | sort + echo "$body" | awk '/^zeam_pq_sig_aggregated_signatures_building_phase_seconds_count\{/{print " "$0}' | sort + + logs="$(fetch_remote_logs "$ip" "$node")" + agg_skips="$(echo "$logs" | grep -c 'skipping aggregation for slot=' || true)" + att_skips="$(echo "$logs" | grep -c 'skipping attestation production for slot=' || true)" + in_flight="$(echo "$logs" | grep -c 'already in flight' || true)" + agg_starts="$(echo "$logs" | grep -c 'agg start slot=' || true)" + trivial_drop="$(echo "$logs" | grep -c 'aggregator pre-filter: dropped' || true)" + echo "logs (since ${LOG_SINCE}): agg_skip_lines=${agg_skips} in_flight=${in_flight} agg_start=${agg_starts} attestation_skip=${att_skips} trivial_pre_filter=${trivial_drop}" + + if [[ "$first_node" == false ]]; then + JSON_LINES+=(",") + fi + first_node=false + skip_json="$(labeled_counter zeam_aggregate_skip_total "$body" | python3 -c 'import sys,json; d={}; +for line in sys.stdin: + line=line.strip() + if not line: continue + lbl=line.split("{reason=\"",1)[1].split("\"}",1)[0] + d[lbl]=int(float(line.rsplit(" ",1)[-1])) +print(json.dumps(d))')" + JSON_LINES+=(" {\"name\": \"${node}\", \"ip\": \"${ip}\", \"metrics_port\": ${port}, \"is_aggregator\": ${is_agg}, \"worker_count\": ${worker_count:-0}, \"worker_p50_s\": \"${worker_p50}\", \"worker_p95_s\": \"${worker_p95}\", \"publish_total\": ${publish_total:-0}, \"timely_coverage\": \"${timely_cov:-}\", \"late_coverage\": \"${late_cov:-}\", \"combined_coverage\": \"${combined_cov:-}\", \"log_agg_skips\": ${agg_skips}, \"log_in_flight\": ${in_flight}, \"log_agg_starts\": ${agg_starts}, \"log_att_skips\": ${att_skips}, \"aggregate_skip\": ${skip_json}}") + echo +done <<<"$NODE_TABLE" + +JSON_LINES+=("]") +JSON_LINES+=("}") + +if [[ -n "$OUTPUT_FILE" ]]; then + printf '%s\n' "${JSON_LINES[@]}" >"$OUTPUT_FILE" + echo "Wrote ${OUTPUT_FILE}" +fi diff --git a/scripts/monitor-subnet-aggregations.py b/scripts/monitor-subnet-aggregations.py new file mode 100755 index 0000000..28b3fe5 --- /dev/null +++ b/scripts/monitor-subnet-aggregations.py @@ -0,0 +1,493 @@ +#!/usr/bin/env python3 +"""Monitor all 8 subnet aggregators: timely coverage, publish lag, real aggregation time.""" + +from __future__ import annotations + +import argparse +import re +import subprocess +import sys +import urllib.request +from dataclasses import dataclass, field +from typing import Any + +try: + import yaml +except ImportError: + sys.stderr.write("pip install pyyaml\n") + sys.exit(1) + +SUBNET_IPS = ( + "77.42.121.211", # 0 + "89.167.41.98", # 1 + "89.167.114.168", # 2 + "89.167.120.1", # 3 + "89.167.112.241", # 4 + "95.217.153.36", # 5 + "89.167.3.22", # 6 + "89.167.120.224", # 7 +) + + +@dataclass +class AggInfo: + subnet: int + name: str + client: str + ip: str + metrics_port: int + + +@dataclass +class AggReport: + info: AggInfo + metrics_ok: bool = False + worker_metric: str | None = None + worker_count: int = 0 + worker_mean_s: float | None = None + worker_p50_s: float | None = None + build_mean_s: float | None = None + build_count: int = 0 + compute_ffi_mean_s: float | None = None + skip_in_flight: int = 0 + coalesced_total: int = 0 + publish_total: int = 0 + timely_combined: int | None = None + late_combined: int | None = None + combined_combined: int | None = None + timely_by_subnet: dict[int, int] = field(default_factory=dict) + late_by_subnet: dict[int, int] = field(default_factory=dict) + combined_by_subnet: dict[int, int] = field(default_factory=dict) + log_agg_starts: int = 0 + log_in_flight: int = 0 + log_coalesced: int = 0 + log_publishes: int = 0 + publish_lag_p50: int | None = None + publish_lag_mean: float | None = None + publish_lag_n: int = 0 + publish_on_time_pct: float | None = None # lag <= 2 slots + container_image: str = "" + errors: list[str] = field(default_factory=list) + + +def load_aggregators(cfg_path: str) -> list[AggInfo]: + with open(cfg_path) as f: + cfg = yaml.safe_load(f) + validators = cfg.get("validators", cfg) + by_ip: dict[str, dict[str, Any]] = {} + for v in validators: + if not isinstance(v, dict) or not v.get("isAggregator"): + continue + ip = v["enrFields"]["ip"] + by_ip[ip] = v + + out: list[AggInfo] = [] + for subnet, ip in enumerate(SUBNET_IPS): + v = by_ip.get(ip) + if not v: + out.append( + AggInfo( + subnet=subnet, + name=f"?(missing@{ip})", + client="?", + ip=ip, + metrics_port=9102, + ) + ) + continue + name = v["name"] + out.append( + AggInfo( + subnet=subnet, + name=name, + client=name.split("_")[0], + ip=ip, + metrics_port=int(v.get("metricsPort", 9102)), + ) + ) + return out + + +def fetch_metrics(ip: str, port: int, timeout: float = 8.0) -> str: + url = f"http://{ip}:{port}/metrics" + with urllib.request.urlopen(url, timeout=timeout) as resp: + return resp.read().decode("utf-8", errors="replace") + + +def ssh_cmd(ssh_key: str, ip: str, remote: str, timeout: int = 20) -> str: + cmd = [ + "ssh", + "-i", + ssh_key, + "-o", + "BatchMode=yes", + "-o", + "ConnectTimeout=8", + "-o", + "StrictHostKeyChecking=accept-new", + f"root@{ip}", + remote, + ] + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + if r.returncode != 0 and r.stderr.strip(): + return "" + return r.stdout + except (subprocess.TimeoutExpired, OSError): + return "" + + +def hist_stats(text: str, prefix: str) -> tuple[int, float, float | None, float | None]: + buckets: dict[str, float] = {} + count = 0 + total = 0.0 + for line in text.splitlines(): + if line.startswith(prefix + "_bucket"): + le = line.split('le="')[1].split('"')[0] + buckets[le] = float(line.rsplit(" ", 1)[-1]) + elif line.startswith(prefix + "_count "): + count = int(float(line.rsplit(" ", 1)[-1])) + elif line.startswith(prefix + "_sum "): + total = float(line.rsplit(" ", 1)[-1]) + + if count == 0: + return 0, 0.0, None, None + + def quantile(q: float) -> float: + target = q * count + prev_le = 0.0 + prev_c = 0.0 + for le_s, cum in sorted( + buckets.items(), key=lambda x: float("inf") if x[0] == "+Inf" else float(x[0]) + ): + le = float("inf") if le_s == "+Inf" else float(le_s) + if cum >= target: + if cum == prev_c: + return le + frac = (target - prev_c) / (cum - prev_c) + return prev_le + frac * (le - prev_le) + prev_le, prev_c = le, cum + return total / count + + return count, total / count, quantile(0.5), quantile(0.95) + + +def parse_gauge(text: str, pattern: str) -> int | None: + for line in text.splitlines(): + if pattern in line and not line.startswith("#"): + try: + return int(float(line.rsplit(" ", 1)[-1])) + except ValueError: + pass + return None + + +def parse_subnet_gauges(text: str, section: str) -> dict[int, int]: + out: dict[int, int] = {} + pat = f'section="{section}",subnet="subnet_' + for line in text.splitlines(): + if not line.startswith("lean_attestation_aggregate_coverage_validators"): + continue + if pat not in line: + continue + m = re.search(r'subnet="subnet_(\d+)"\}\s+(\S+)', line) + if m: + out[int(m.group(1))] = int(float(m.group(2))) + return out + + +def parse_labeled_counter(text: str, prefix: str, label: str) -> int: + pat = f'{prefix}{{reason="{label}"}}' + for line in text.splitlines(): + if line.startswith(pat): + return int(float(line.rsplit(" ", 1)[-1])) + return 0 + + +def parse_counter(text: str, name: str) -> int: + for line in text.splitlines(): + if not line.startswith(name): + continue + rest = line[len(name) :] + if rest.startswith(" ") or rest.startswith("\t"): + return int(float(line.rsplit(" ", 1)[-1])) + return 0 + + +def parse_publish_counter(text: str, client: str) -> int: + prefixes = [ + "zeam_aggregator_publish_aggregations_total", + "lean_aggregator_publish_aggregations_total", + ] + total = 0 + for line in text.splitlines(): + if any(line.startswith(p) for p in prefixes): + total += int(float(line.rsplit(" ", 1)[-1])) + # fallback: any metric with publish and aggregation in name + if total == 0: + for line in text.splitlines(): + low = line.lower() + if "publish" in low and "aggregat" in low and not line.startswith("#"): + if "_total" in line or "_count" in line: + try: + total += int(float(line.rsplit(" ", 1)[-1])) + except ValueError: + pass + return total + + +def analyze_logs(log_text: str) -> dict[str, Any]: + cur_slot: int | None = None + lags: list[int] = [] + agg_starts = 0 + in_flight = 0 + coalesced = 0 + publishes = 0 + + for line in log_text.splitlines(): + m = re.search(r"\[s=(\d+) i=(\d+)\]", line) + if m: + cur_slot = int(m.group(1)) + if "agg start slot=" in line: + agg_starts += 1 + if "already in flight" in line or "skipping aggregation for slot=" in line: + in_flight += 1 + if "coalescing aggregation for slot=" in line: + coalesced += 1 + m = re.search( + r"published aggregation to network: slot=(\d+)|publish.*aggregat.*slot[= ](\d+)", + line, + re.I, + ) + if m and cur_slot is not None: + att_slot = int(m.group(1) or m.group(2)) + lags.append(cur_slot - att_slot) + publishes += 1 + + result: dict[str, Any] = { + "agg_starts": agg_starts, + "in_flight": in_flight, + "coalesced": coalesced, + "publishes": publishes, + } + if lags: + lags.sort() + n = len(lags) + on_time = sum(1 for x in lags if x <= 2) + result.update( + { + "lag_n": n, + "lag_p50": lags[n // 2], + "lag_mean": sum(lags) / n, + "lag_max": lags[-1], + "on_time_pct": 100.0 * on_time / n, + } + ) + return result + + +def analyze_agg(info: AggInfo, ssh_key: str, log_since: str) -> AggReport: + rep = AggReport(info=info) + + # container image + ps = ssh_cmd( + ssh_key, + info.ip, + f"docker ps --format '{{{{.Names}}}} {{{{.Image}}}}' | grep -E '{info.name}|zeam-{info.name}' | head -1", + ).strip() + if ps: + rep.container_image = ps.split(" ", 1)[-1] if " " in ps else ps + + try: + body = fetch_metrics(info.ip, info.metrics_port) + rep.metrics_ok = True + except Exception as e: + rep.errors.append(f"metrics: {e}") + body = "" + + if body: + # worker duration — zeam-specific first, then generic building time as proxy + for metric, label in [ + ("zeam_aggregate_worker_duration_seconds", "zeam_worker"), + ("lean_pq_sig_aggregated_signatures_building_time_seconds", "build_per_att_data"), + ]: + count, mean, p50, _ = hist_stats(body, metric) + if count > 0 and rep.worker_mean_s is None and metric.startswith("zeam_aggregate"): + rep.worker_metric = metric + rep.worker_count = count + rep.worker_mean_s = mean + rep.worker_p50_s = p50 + if metric.endswith("building_time_seconds") and count > 0: + rep.build_count = count + rep.build_mean_s = mean + + if rep.worker_mean_s is None: + count, mean, p50, _ = hist_stats( + body, "lean_pq_sig_aggregated_signatures_building_time_seconds" + ) + if count > 0: + rep.worker_metric = "lean_pq_sig_aggregated_signatures_building_time_seconds (proxy)" + rep.worker_count = count + rep.worker_mean_s = mean + rep.worker_p50_s = p50 + + _, ffi_mean, _, _ = hist_stats( + body, "zeam_pq_sig_aggregated_signatures_building_phase_seconds" + ) + # phase histogram has label — parse compute_ffi specifically + ffi_buckets: dict[str, float] = {} + ffi_count = 0 + ffi_sum = 0.0 + for line in body.splitlines(): + if 'phase="compute_ffi"' not in line: + continue + if "_bucket" in line and line.startswith( + "zeam_pq_sig_aggregated_signatures_building_phase_seconds_bucket" + ): + le = line.split('le="')[1].split('"')[0] + ffi_buckets[le] = float(line.rsplit(" ", 1)[-1]) + elif line.startswith( + "zeam_pq_sig_aggregated_signatures_building_phase_seconds_count{phase=\"compute_ffi\"}" + ): + ffi_count = int(float(line.rsplit(" ", 1)[-1])) + elif line.startswith( + "zeam_pq_sig_aggregated_signatures_building_phase_seconds_sum{phase=\"compute_ffi\"}" + ): + ffi_sum = float(line.rsplit(" ", 1)[-1]) + if ffi_count > 0: + rep.compute_ffi_mean_s = ffi_sum / ffi_count + + rep.skip_in_flight = parse_labeled_counter(body, "zeam_aggregate_skip_total", "in_flight") + rep.coalesced_total = parse_counter(body, "zeam_aggregate_coalesced_total") + rep.publish_total = parse_publish_counter(body, info.client) + + rep.timely_combined = parse_gauge( + body, 'section="timely",subnet="combined"' + ) + rep.late_combined = parse_gauge(body, 'section="late",subnet="combined"') + rep.combined_combined = parse_gauge(body, 'section="combined",subnet="combined"') + rep.timely_by_subnet = parse_subnet_gauges(body, "timely") + rep.late_by_subnet = parse_subnet_gauges(body, "late") + rep.combined_by_subnet = parse_subnet_gauges(body, "combined") + + log = ssh_cmd( + ssh_key, + info.ip, + f"docker logs --since {log_since} {info.name} 2>&1 || docker logs --since {log_since} zeam-{info.name} 2>&1 || true", + timeout=45, + ) + if log: + la = analyze_logs(log) + rep.log_agg_starts = la.get("agg_starts", 0) + rep.log_in_flight = la.get("in_flight", 0) + rep.log_coalesced = la.get("coalesced", 0) + rep.log_publishes = la.get("publishes", 0) + if "lag_n" in la: + rep.publish_lag_n = la["lag_n"] + rep.publish_lag_p50 = la["lag_p50"] + rep.publish_lag_mean = la["lag_mean"] + rep.publish_on_time_pct = la["on_time_pct"] + + return rep + + +def fmt_s(v: float | None) -> str: + return f"{v:.2f}s" if v is not None else "n/a" + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument( + "--validator-config", + default="ansible-devnet/genesis/validator-config.yaml", + ) + ap.add_argument("--ssh-key", default=f"{__import__('os').environ.get('HOME', '')}/.ssh/id_ed25519_github") + ap.add_argument("--log-since", default="30m") + args = ap.parse_args() + + aggs = load_aggregators(args.validator_config) + reports = [analyze_agg(a, args.ssh_key, args.log_since) for a in aggs] + + print(f"# Subnet aggregation report (log window: {args.log_since})") + print() + + # Table 1: aggregator performance + print("## Aggregator prove/publish performance") + print( + "| Subnet | Aggregator | Client | Worker/prove mean | p50 | compute_ffi (zeam) | coalesced | publishes (logs) | publish lag p50 | on-time (lag≤2) |" + ) + print("|---:|---|---|---:|---:|---:|---:|---:|---:|---:|") + for r in reports: + i = r.info + coalesce_rate = "" + denom = r.log_agg_starts + r.log_coalesced + if denom > 0: + coalesce_rate = f" ({100*r.log_coalesced/denom:.0f}%)" + print( + f"| {i.subnet} | {i.name} | {i.client} | {fmt_s(r.worker_mean_s)} | {fmt_s(r.worker_p50_s)} | {fmt_s(r.compute_ffi_mean_s)} | {r.coalesced_total or r.log_coalesced}{coalesce_rate} | {r.log_publishes or r.publish_total} | {r.publish_lag_p50 if r.publish_lag_p50 is not None else 'n/a'} | {f'{r.publish_on_time_pct:.0f}%' if r.publish_on_time_pct is not None else 'n/a'} |" + ) + + print() + print("## Subnet receive timeliness (latest coverage gauge on each aggregator)") + print("Timely = peer aggregate in merge buffer before i=0/i=4. Late = arrived after merge window.") + print() + print("| Subnet | Aggregator | timely (own obs) | late | combined | receives on-time? |") + print("|---:|---|---:|---:|---:|---|") + for r in reports: + i = r.info + timely = r.timely_combined + late = r.late_combined + comb = r.combined_combined + if timely is None and not r.metrics_ok: + status = "no metrics" + elif timely and timely > 0: + status = "YES (some timely)" + elif comb and comb > 0 and (timely or 0) == 0: + status = "NO (only late)" + elif comb == 0 or comb is None: + status = "NO (none seen)" + else: + status = "marginal" + print( + f"| {i.subnet} | {i.name} | {timely if timely is not None else '?'} | {late if late is not None else '?'} | {comb if comb is not None else '?'} | {status} |" + ) + + # Cross-subnet view from zeam_8 (subnet 0 aggregator sees peer subnets via gossip aggregates) + zeam8 = next((r for r in reports if r.info.subnet == 0), None) + if zeam8 and (zeam8.timely_by_subnet or zeam8.late_by_subnet): + print() + print("## Cross-subnet visibility from zeam_8 (subnet 0 aggregator)") + print("| Subnet | timely validators | late validators | combined | on-time? |") + print("|---:|---:|---:|---:|---|") + for sn in range(8): + t = zeam8.timely_by_subnet.get(sn, 0) + l = zeam8.late_by_subnet.get(sn, 0) + c = zeam8.combined_by_subnet.get(sn, 0) + if t > 0: + st = "YES" + elif c > 0: + st = "NO (late only)" + else: + st = "none seen" + print(f"| {sn} | {t} | {l} | {c} | {st} |") + + print() + print("## Summary") + on_time_aggs = [ + r for r in reports if r.publish_on_time_pct is not None and r.publish_on_time_pct >= 50 + ] + slow_aggs = [r for r in reports if r.worker_mean_s is not None and r.worker_mean_s > 6] + print( + f"- Aggregators publishing ≥50% within 2 slots: {', '.join(r.info.name for r in on_time_aggs) or 'none'}" + ) + print( + f"- Aggregators with mean prove/worker time >6s (slot budget): {', '.join(f'{r.info.name} ({r.worker_mean_s:.1f}s)' for r in slow_aggs) or 'none'}" + ) + timely_subnets = [r.info.subnet for r in reports if (r.timely_combined or 0) > 0] + print( + f"- Subnets with any timely peer coverage at aggregator: {timely_subnets or 'none'}" + ) + + +if __name__ == "__main__": + main()