Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/runtime-guard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }}
CLOUD_RUN_SERVICES: ${{ vars.CLOUD_RUN_SERVICES }}
CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }}
CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }}
GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }}
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
TELEGRAM_TOKEN_SECRET_NAME: ${{ vars.TELEGRAM_TOKEN_SECRET_NAME }}
Expand Down
156 changes: 155 additions & 1 deletion scripts/cloud_run_runtime_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ def _run_gcloud(args: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(args, text=True, capture_output=True, check=False)


def _run_gcloud_json(args: list[str], context: str) -> Any:
result = _run_gcloud(args)
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip()
raise RuntimeError(detail or f"gcloud {context} failed")
if not result.stdout.strip():
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(f"gcloud {context} returned invalid JSON: {exc}") from exc


def _run_gcloud_logging(project: str, log_filter: str, limit: int) -> list[dict[str, Any]]:
command = [
"gcloud",
Expand All @@ -126,6 +139,144 @@ def _run_gcloud_logging(project: str, log_filter: str, limit: int) -> list[dict[
return payload if isinstance(payload, list) else []


def _parse_timestamp(value: Any) -> dt.datetime | None:
if not value:
return None
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)


def _format_timestamp(value: dt.datetime) -> str:
return value.astimezone(dt.timezone.utc).isoformat().replace("+00:00", "Z")


def _target_payloads() -> list[dict[str, Any]]:
raw_targets = (os.environ.get("CLOUD_RUN_SERVICE_TARGETS_JSON") or "").strip()
if not raw_targets:
return []
try:
payload = json.loads(raw_targets)
except json.JSONDecodeError:
return []
targets = payload.get("targets") if isinstance(payload, dict) else payload
if not isinstance(targets, list):
return []
return [target for target in targets if isinstance(target, dict)]


def _runtime_target(target: dict[str, Any]) -> dict[str, Any]:
runtime_target = target.get("runtime_target") or target.get("runtime_target_json")
if isinstance(runtime_target, str):
try:
runtime_target = json.loads(runtime_target)
except json.JSONDecodeError:
runtime_target = {}
return runtime_target if isinstance(runtime_target, dict) else {}


def _target_service_names(target: dict[str, Any]) -> list[str]:
runtime_target = _runtime_target(target)
for key in ("service", "service_name", "cloud_run_service"):
value = target.get(key) or runtime_target.get(key)
if value:
return _split_values(str(value))
return []


def _region_for_service(service: str) -> str:
for target in _target_payloads():
if service not in _target_service_names(target):
continue
runtime_target = _runtime_target(target)
for key in ("region", "cloud_run_region", "location"):
value = target.get(key) or runtime_target.get(key)
if value:
return str(value).strip()
return (
os.environ.get("RUNTIME_GUARD_CLOUD_RUN_REGION")
or os.environ.get("CLOUD_RUN_REGION")
or os.environ.get("CLOUD_RUN_LOCATION")
or os.environ.get("GOOGLE_CLOUD_REGION")
or ""
).strip()


def _latest_ready_revision_started_at(project: str, service: str) -> dt.datetime | None:
region = _region_for_service(service)
if not region:
return None

service_payload = _run_gcloud_json(
[
"gcloud",
"run",
"services",
"describe",
service,
"--project",
project,
"--region",
region,
"--format=json",
],
f"run services describe {service}",
)
if not isinstance(service_payload, dict):
return None
status = service_payload.get("status") or {}
if not isinstance(status, dict):
return None
revision = str(status.get("latestReadyRevisionName") or "").strip()
if not revision:
return None

revision_payload = _run_gcloud_json(
[
"gcloud",
"run",
"revisions",
"describe",
revision,
"--project",
project,
"--region",
region,
"--format=json",
],
f"run revisions describe {revision}",
)
if not isinstance(revision_payload, dict):
return None
metadata = revision_payload.get("metadata") or {}
if not isinstance(metadata, dict):
return None
return _parse_timestamp(metadata.get("creationTimestamp"))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cut off at the ready transition time

When the new revision takes time to become Ready, metadata.creationTimestamp is before the Ready/ContainerHealthy transition, so startup or readiness-check errors emitted after creation still pass the timestamp filter. With RUNTIME_GUARD_IGNORE_PRE_READY_REVISION_LOGS enabled this can still alert on transient pre-ready logs for the revision that eventually became ready; use the ready condition transition time as the lower bound, falling back to creation only if that timestamp is unavailable.

Useful? React with 👍 / 👎.



def _cloud_run_log_since(project: str, service: str, fallback: dt.datetime) -> dt.datetime:
try:
revision_start = _latest_ready_revision_started_at(project, service)
except RuntimeError as exc:
print(
f"Unable to resolve latest ready revision for {service}; using lookback window: {exc}",
file=sys.stderr,
)
return fallback
if revision_start and revision_start > fallback:
return revision_start
Comment on lines +275 to +276

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use serving revisions when narrowing the log window

When a service has a latest ready revision that is not receiving 100% of traffic, such as a no-traffic deploy or traffic split, this cutoff is applied to every log for the service even though the query still only filters by service_name. That can hide failures from the currently serving older revision earlier in the lookback window as soon as a newer ready revision exists; base the cutoff/filter on status.traffic serving revisions, or include the resolved revision_name if the guard is intentionally checking only that revision.

Useful? React with 👍 / 👎.

return fallback


def _status(entry: dict[str, Any]) -> int | None:
value = (entry.get("httpRequest") or {}).get("status")
try:
Expand Down Expand Up @@ -267,6 +418,7 @@ def main() -> int:
require_success = _env_bool("RUNTIME_GUARD_REQUIRE_SUCCESS", False)
fail_workflow = _env_bool("RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT", True)
check_scheduler = _env_bool("RUNTIME_GUARD_CHECK_SCHEDULER", True)
ignore_pre_ready_logs = _env_bool("RUNTIME_GUARD_IGNORE_PRE_READY_REVISION_LOGS", True)

since = (
dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=lookback_minutes)
Expand All @@ -288,10 +440,12 @@ def main() -> int:
)

for service in services:
service_since = _cloud_run_log_since(project, service, since) if ignore_pre_ready_logs else since

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep the success check on the configured lookback

When RUNTIME_GUARD_REQUIRE_SUCCESS=true and a new revision became ready inside the lookback window but has not received traffic yet, this same shortened service_since window is used for counting successes. That excludes successful requests earlier in the configured lookback and emits a no successful Cloud Run request ... in the last N minutes alert even though the requirement was satisfied; keep the success query on since or change the alert semantics/message to require a post-ready success.

Useful? React with 👍 / 👎.

service_since_text = _format_timestamp(service_since)
log_filter = (
'resource.type="cloud_run_revision" '
f'AND resource.labels.service_name="{service}" '
f'AND timestamp >= "{since_text}"'
f'AND timestamp >= "{service_since_text}"'
)
try:
entries = _run_gcloud_logging(project, log_filter, limit)
Expand Down
53 changes: 52 additions & 1 deletion tests/test_cloud_run_runtime_guard.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import subprocess
import datetime as dt
import json
import re
import subprocess

from scripts import cloud_run_runtime_guard as guard

Expand All @@ -13,6 +15,7 @@ def test_scheduler_job_pattern_includes_service_alias():
assert re.search(pattern, "longbridge-quant-hk-scheduler")
assert not re.search(pattern, "longbridge-quant-sg-scheduler")


def test_telegram_token_falls_back_to_secret_manager(monkeypatch):
monkeypatch.delenv("TELEGRAM_TOKEN", raising=False)
monkeypatch.delenv("TG_TOKEN", raising=False)
Expand All @@ -39,3 +42,51 @@ def fake_run_gcloud(command):
"longbridgequant",
]


def test_cloud_run_log_since_uses_latest_ready_revision(monkeypatch):
monkeypatch.setenv("CLOUD_RUN_REGION", "us-central1")
observed = []

def fake_run_gcloud(command):
observed.append(command)
if command[1:4] == ["run", "services", "describe"]:
payload = {"status": {"latestReadyRevisionName": "longbridge-quant-hk-service-00002"}}
else:
payload = {"metadata": {"creationTimestamp": "2026-07-01T06:50:04.123Z"}}
return subprocess.CompletedProcess(command, 0, stdout=json.dumps(payload), stderr="")

monkeypatch.setattr(guard, "_run_gcloud", fake_run_gcloud)

fallback = dt.datetime(2026, 7, 1, 6, 0, tzinfo=dt.timezone.utc)
result = guard._cloud_run_log_since("longbridgequant", "longbridge-quant-hk-service", fallback)

assert result == dt.datetime(2026, 7, 1, 6, 50, 4, 123000, tzinfo=dt.timezone.utc)
assert observed[0] == [
"gcloud",
"run",
"services",
"describe",
"longbridge-quant-hk-service",
"--project",
"longbridgequant",
"--region",
"us-central1",
"--format=json",
]
assert observed[1][1:5] == ["run", "revisions", "describe", "longbridge-quant-hk-service-00002"]


def test_region_for_service_prefers_target_region(monkeypatch):
monkeypatch.setenv("CLOUD_RUN_REGION", "us-central1")
monkeypatch.setenv(
"CLOUD_RUN_SERVICE_TARGETS_JSON",
json.dumps(
{
"targets": [
{"service": "longbridge-quant-hk-service", "region": "asia-east1"},
]
}
),
)

assert guard._region_for_service("longbridge-quant-hk-service") == "asia-east1"
Loading