feat: Add Logfire error tracking to fleet env#6
Conversation
Structured observability for fleet env errors (init failures, tool call failures, MCP timeouts, verifier errors). Adds telemetry.py wrapper and 15 instrumentation sites across task_env.py, client.py, mcp_tools.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…, modality - Add set_task_context() to establish base attributes for all events - All telemetry events now inherit env_key, env_version, task_key, modality - Parse env_key:version in client.py to log separately - Add fleet_rollout_started and fleet_rollout_completed events - Default environment changed to "training_rollouts" - Update README with new schema and example SQL query 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Tracks MCP server errors (returned in tool results) separately from
Python exceptions:
- fleet_tool_call_failed: Python exception during call_tool()
- fleet_mcp_tool_error: MCP server returned {"error": ...}, {"status": "failed"}, or {"isError": true}
This aligns telemetry with WandB tool_error counting which tracks both
exception-based errors and error patterns in tool results.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
… full context - Move set_task_context() before from_fleet() call in task_env.py - Remove explicit env_key/env_version from client.py telemetry calls (now from context) - This ensures fleet_make_failed events include task_key and modality 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove unused fleet_error import from task_env.py
- Fix _is_tool_error to check truthy values (avoid {"error": null} false positives)
- Make close() exception-safe with try/finally for cleanup
- Emit fleet_rollout_completed on ALL paths (not just verifier success)
- Remove unused _env_name/_env_version parsing in client.py
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The eval step was hanging for 6+ hours because MCP list_tools/call_tool were not enforcing timeouts during connection establishment. Changes: - Add OPERATION_TIMEOUT_S = 60s hard limit using asyncio.wait_for() - Reduce internal timeouts (30s connect, 60s SSE read) - Raise TimeoutError with clear message when operations hang This prevents a single slow/stuck Fleet env from blocking the entire training run. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| env_version=self.env_version, | ||
| task_key=self.task_key, | ||
| modality=self.modality, | ||
| ) |
There was a problem hiding this comment.
Shared ContextVar causes wrong telemetry context across instances
Medium Severity
set_task_context is called only in __init__, writing to a thread-level ContextVar. When multiple FleetTaskEnv instances exist in the same thread (e.g., via from_json_file_all), each constructor overwrites the shared _session_context. All subsequent telemetry events from earlier instances (during reset_async, step_async, _compute_reward) will carry the last-created instance's env_key/task_key/etc., producing silently incorrect monitoring data. The context needs to be re-established before telemetry-emitting methods, not just once at construction.
Additional Locations (1)
Fleet.make() is synchronous and can block for ~10 minutes per attempt when an env has health check failures (e.g., fostgres). With 3 retries and time.sleep(), this blocks the entire event loop for ~30 minutes, freezing ALL other async trajectories in the batch. Changes: - Add FleetEnvClient.from_fleet_async() using AsyncFleet.make() and asyncio.sleep() for retries — yields to event loop while waiting - Defer provisioning from FleetTaskEnv.__init__() to reset_async() via _ensure_provisioned(), so fleet.make() runs in async context - After async provisioning, get sync env handle via Fleet.instance() for close() and verify_detailed() compatibility (fast GET, ~100ms) - Update tests to match new deferred provisioning pattern No SkyRL changes needed — it already calls __init__() then reset_async(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add console-visible logger.warning() alongside logfire events for fleet_mcp_tool_error and fleet_tool_call_failed. Includes: - env_key:env_version (e.g., amazon:v0.0.12) - step N/max_steps (e.g., step 3/50) - tool_name and error message Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- fleet_env_reset_failed: use fleet_warning (no traceback) instead of
fleet_exception — reset 502s are expected, one-line warning is enough
- fleet_env_close_failed: silence entirely — "Instance already terminated"
is expected when TTL expires before cleanup
- fleet_env_created: remove logfire console print — logger.info already
prints the useful "Fleet instance ready in Xs: {id}" line
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…teps - Move fleet_rollout_started to fire before provisioning so init failures (e.g., fostgres health check) are counted in total_rollouts - Emit fleet_rollout_completed with failure_reason="init_error" on init failure, ensuring completed <= total_rollouts invariant - Add total_steps (SUM of step_count) and init_errors columns to SQL query - Clarify verifier_errors = code exceptions, not model failures - Add test for init failure telemetry path Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…me_s Tracks per-instance provisioning latency in Logfire to diagnose Fleet queue serialization (96 concurrent make() calls get processed at ~1/10s, causing 10+ min queue delays for the last instance). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Route MCP by modality: computer_use → /api/v1/mcp (aggregator, port 8081), tool_use → /mcp (per-env server, port 3003). Eliminates partial failure where aggregator timeout silently dropped the computer tool. - Make data_key, data_version, image_type required args on from_fleet/from_fleet_async. - Emit fleet_rollout_completed on post-provisioning failures (tools_error, computer_tool_missing) — closes telemetry gap where rollouts started but never completed in Logfire. - Match harness retry config: 8s initial wait, 8 retries, exponential backoff capped at 5s. - Fatal failures: list_tools fail/empty and missing computer tool now raise instead of being swallowed as warnings. - Update README with sequence chart, endpoint routing table, failure reasons. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| env_key: str, | ||
| data_key: str, | ||
| data_version: str, | ||
| image_type: str, |
There was a problem hiding this comment.
Breaking API change leaves existing callers without required args
High Severity
from_fleet changed data_key, data_version, and image_type from Optional[str] = None to required positional arguments, but existing callers in test_fleet_env.py (lines 83, 92, 105) and examples/fleet_env_example.py still call it with only api_key and env_key. These raise TypeError at runtime, breaking previously passing tests and the documented example.
Additional Locations (1)
call_tool() referenced self.retry_base_delay which was removed when matching harness retry config. Would crash with AttributeError on first retry. Now uses min(2**attempt, self.max_backoff) like list_tools(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| # Wait for MCP services to initialize (matches harness initial_wait=8) | ||
| if self.initial_wait > 0: | ||
| logger.info(f"Waiting {self.initial_wait:.0f}s for MCP services to initialize...") | ||
| await asyncio.sleep(self.initial_wait) |
There was a problem hiding this comment.
Unconditional 8-second wait on every list_tools call
Medium Severity
The initial_wait sleep in list_tools() fires on every invocation, not just the first one after provisioning. The FleetMCPTools instance is created once in _ensure_provisioned() and kept alive across reset_async() calls (without close()). Each subsequent call to reset_async() re-invokes self._tools.list_tools(), which unconditionally sleeps for initial_wait=8.0 seconds. In a standard multi-episode RL training loop (where reset() is called once per episode on the same env instance), every episode after the first incurs an unnecessary 8-second penalty.
Additional Locations (1)
computer_use: 1800s (30 min) — browser + inference is slow tool_use: 600s (10 min) — API calls are fast Can be overridden by passing ttl_seconds explicitly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Every fleet_rollout_started now gets a matching fleet_rollout_completed. Previously, rollouts that were stopped by the caller (max_turns, context overflow, job cancellation) never emitted the completed event, creating gaps in the Logfire dashboard (e.g., 132 started but only 9 completed). close() now infers stop_reason from state: - max_steps: step_count >= max_steps - tool_error: last tool call failed (likely TTL expiry) - caller_stopped: steps were taken but caller stopped early - cancelled: rollout started but no steps taken Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
tool_error is transitory (rollout continues after a failed tool call), so it's misleading as a stop reason. Simplified to just two: - max_steps: step_count >= max_steps - abandoned: everything else (caller stopped, cancelled, ctx overflow) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Documents the rollout lifecycle accounting: started = completed + init_err + tools_err + no_computer + max_steps + abandoned Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…expiry Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| # it's still a rollout attempt (e.g., fostgres health check failures). | ||
| fleet_info("fleet_rollout_started") | ||
| self._rollout_started = True | ||
| self._rollout_completed_emitted = False |
There was a problem hiding this comment.
Consecutive resets silently lose rollout tracking events
Medium Severity
If reset_async() is called a second time without the first rollout completing (e.g., the caller decides to restart the episode), a new fleet_rollout_started is emitted and _rollout_completed_emitted is reset to False, but no fleet_rollout_completed is ever emitted for the first rollout. This breaks the invariant documented in the README that "every fleet_rollout_started gets a matching fleet_rollout_completed." The close() handler only emits one compensating fleet_rollout_completed, leaving the first rollout permanently unmatched.
Additional Locations (1)
AsyncFleet.make() is not truly non-blocking — diagnostics confirmed the event loop stays healthy while make() blocks internally. Switch to running sync Fleet.make() in a thread pool via asyncio.to_thread() to guarantee non-blocking behavior. Also removes the separate sync Fleet.instance() call since we already have a sync env handle from make(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| _logger.info(f"Fleet instance ready in {time.time() - start:.1f}s: {env.instance_id}") | ||
| elapsed = time.time() - start | ||
| instance_id = getattr(env, "instance_id", "unknown") | ||
| _logger.info(f"Fleet instance ready in {elapsed:.1f}s: {instance_id}") |
There was a problem hiding this comment.
Sync from_fleet missing fleet_provisioning_completed telemetry event
Low Severity
The async from_fleet_async() emits a fleet_provisioning_completed telemetry event with provisioning_time_s and instance_id, but the sync from_fleet() does not. This means provisioning latency data is silently missing for any code path using the sync method (including the example at examples/fleet_env_example.py), breaking the Logfire SQL queries documented in the README.
Additional Locations (1)
… loop Three sync calls were blocking the event loop during concurrent trajectories: 1. self._orch.reset() — sync HTTP POST to manager API, now uses reset_async() which runs in a thread pool 2. fleet_task.verify_detailed() — sync verifier execution, now wrapped in asyncio.to_thread() 3. self._fleet_env.close() — sync instance termination, now has close_async() Also reverts AsyncFleet.make() (was correctly async all along — diagnostics confirmed the event loop stays healthy during make() calls). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds create_trace_job() and upload_trace() to fleet_env module. These allow SkyRL eval to send conversation traces (with screenshots) to the Fleet API for viewing in the Fleet UI. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r UI rendering Images were being passed as OpenAI format (type: "image_url") but Fleet's ingest API expects (type: "image", mime_type, data). The API then uploads base64 to S3 and the UI renders them full-size via OpinionatedImage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The ingest API determines session status from score presence, not the status field. Without score, all sessions stay as in_progress. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2289906 to
c99c1e5
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Telemetry context overwritten by concurrent FleetTaskEnv instances
- FleetTaskEnv now reapplies its own telemetry context at the start of runtime lifecycle methods instead of setting shared context in init, preventing cross-instance context bleed in concurrent usage.
Or push these changes by commenting:
@cursor push 5469588568
Preview (5469588568)
diff --git a/src/envs/fleet_env/task_env.py b/src/envs/fleet_env/task_env.py
--- a/src/envs/fleet_env/task_env.py
+++ b/src/envs/fleet_env/task_env.py
@@ -125,14 +125,6 @@
self._rollout_started = False
self._tools_cache: Optional[List[Dict]] = None
- # Set telemetry context so init failures are tracked with full context
- set_task_context(
- env_key=self.env_key,
- env_version=self.env_version,
- task_key=self.task_key,
- modality=self.modality,
- )
-
# Provisioning is deferred to _ensure_provisioned() (called from reset_async)
# to avoid blocking the event loop with sync Fleet.make() calls.
self._orch = None
@@ -191,6 +183,15 @@
"""
return self.task.get("env_variables")
+ def _set_telemetry_context(self):
+ """Set telemetry context for this environment instance."""
+ set_task_context(
+ env_key=self.env_key,
+ env_version=self.env_version,
+ task_key=self.task_key,
+ modality=self.modality,
+ )
+
def reset(self, seed: Optional[int] = None) -> Dict[str, Any]:
"""Reset the environment and return initial observation (sync wrapper).
@@ -253,6 +254,7 @@
import logging
logger = logging.getLogger(__name__)
+ self._set_telemetry_context()
# Count this rollout attempt immediately — even if provisioning fails,
# it's still a rollout attempt (e.g., fostgres health check failures).
@@ -437,6 +439,8 @@
Returns:
Tuple of (observation, reward, done, info)
"""
+ self._set_telemetry_context()
+
if self._done:
raise RuntimeError("Episode is done. Call reset() to start a new episode.")
@@ -526,6 +530,8 @@
Returns:
1.0 if verifier passes, 0.0 otherwise
"""
+ self._set_telemetry_context()
+
# Support both field names: verifier_code (OpenEnv) and verifier_func (Fleet SDK)
verifier_code = self.task.get("verifier_code") or self.task.get("verifier_func")
score = 0.0
@@ -617,6 +623,8 @@
context overflow, job cancellation, TTL expiry).
"""
try:
+ self._set_telemetry_context()
+
# Emit rollout_completed for orphaned rollouts (started but never completed).
# This happens when the caller (SkyRL) stops without telling us why:
# max_turns hit, context overflow, job cancellation, etc.
@@ -649,6 +657,8 @@
async def close_async(self):
"""Async close — avoids blocking the event loop on Fleet instance termination."""
try:
+ self._set_telemetry_context()
+
if self._rollout_started and not self._rollout_completed_emitted:
stop_reason = "max_steps" if self._step_count >= self.max_steps else "abandoned"
fleet_info(
diff --git a/tests/envs/test_fleet_task_env.py b/tests/envs/test_fleet_task_env.py
--- a/tests/envs/test_fleet_task_env.py
+++ b/tests/envs/test_fleet_task_env.py
@@ -82,7 +82,51 @@
with pytest.raises(ValueError, match="Fleet API key required"):
FleetTaskEnv(sample_task_config)
+ @pytest.mark.anyio
+ async def test_reset_uses_instance_telemetry_context(self, mock_fleet_env_client):
+ """Telemetry should match the env instance being reset."""
+ from envs.fleet_env.task_env import FleetTaskEnv
+ task_a = {
+ "task_key": "task-a",
+ "prompt": "Task A",
+ "env_key": "env-a",
+ "env_version": "v1",
+ "task_modality": "tool_use",
+ }
+ task_b = {
+ "task_key": "task-b",
+ "prompt": "Task B",
+ "env_key": "env-b",
+ "env_version": "v2",
+ "task_modality": "tool_use",
+ }
+
+ env_a = FleetTaskEnv(task_a, api_key="test")
+ env_b = FleetTaskEnv(task_b, api_key="test")
+
+ for env in (env_a, env_b):
+ env._orch = MagicMock()
+ env._tools = MagicMock()
+ env._tools.list_tools = AsyncMock(
+ return_value=MagicMock(tools=[{"name": "search"}])
+ )
+
+ with patch("envs.fleet_env.telemetry.logfire.info") as mock_info:
+ await env_a.reset_async()
+ await env_b.reset_async()
+
+ started_calls = [
+ call
+ for call in mock_info.call_args_list
+ if call.args and call.args[0] == "fleet_rollout_started"
+ ]
+ assert started_calls[0].kwargs["task_key"] == "task-a"
+ assert started_calls[0].kwargs["env_key"] == "env-a"
+ assert started_calls[1].kwargs["task_key"] == "task-b"
+ assert started_calls[1].kwargs["env_key"] == "env-b"
+
+
class TestFleetTaskEnvSpecs:
"""Tests for env/data spec building."""| from typing import Optional | ||
|
|
||
| # Session context - set once per rollout/task execution | ||
| _session_context: ContextVar[dict] = ContextVar("fleet_session_context", default={}) |
There was a problem hiding this comment.
Telemetry context overwritten by concurrent FleetTaskEnv instances
Medium Severity
_session_context is a single ContextVar shared across all FleetTaskEnv instances. set_task_context() is called in __init__, so creating multiple instances in the same async task causes later instances to overwrite earlier ones' telemetry context. Subsequent telemetry events (e.g., from reset_async()) for earlier instances will carry the wrong env_key, task_key, etc.
Additional Locations (1)
New HintGenerator module that produces concise hints from failed rollout data to rescue GRPO signal on hard tasks. Three modes: - Option B: LLM call with verifier code + tool errors + chat_history - Option C: LLM call with verifier code only (cacheable per-task) - Option D: LLM call synthesizing tool errors + verifier failure FleetTaskEnv changes: - Accumulate tool errors during step_async() - Capture verifier error details in _compute_reward() - New properties: verifier_code, tool_errors, verifier_error - New reset_for_hint_async(hint) for hinted rollouts (reuses provisioned instance, resets DB to seed, appends hint to prompt) - compute_hint_reward() utility: R = (1 - raw_score) * hint_score Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This reverts commit fc0508f.



Summary
telemetry.pywrapper with 4 functions (fleet_info,fleet_warning,fleet_error,fleet_exception)task_env.py,client.py,mcp_tools.pyconfigure_fleet_telemetry()is never called — logfire silently drops eventsTest plan
configure_fleet_telemetry(send_to_logfire=False)to verify structured output🤖 Generated with Claude Code