feat(agent): SSE streaming for LLM completions with per-provider accumulators#944
feat(agent): SSE streaming for LLM completions with per-provider accumulators#944wpfleger96 wants to merge 6 commits into
Conversation
|
Pushed Thufir's two review fixes (HEAD now Fix 1 — connect_timeout restored to 10s. Fix 2 — mid-body read errors retry like stalls. The Test:
Note: branch was force-updated from a dedicated worktree ( |
edbe0e4 to
f268646
Compare
…mulators Replace buffered LLM requests with Server-Sent Events streaming. Each text delta emits an agent_message_chunk session update as it arrives, providing a natural keepalive that resets the ACP idle clock without relying solely on the 30s ticker. Design decisions: - Two reqwest::Client instances: `http` (with global timeout for summarize) and `http_stream` (no global timeout, enforces first-byte and inter-chunk timeouts via tokio::time::timeout) - Two-phase timeout: llm_timeout (120s) until first content delta, then stream_chunk_timeout (30s) between subsequent events - Anthropic: index-keyed HashMap<usize, (String, String)> for parallel tool-call accumulation via content_block index - OpenAI Chat: Vec-indexed accumulation by tool_calls[].index - Responses API: routes on JSON `type` field inside data payload, uses response.output_text.delta and response.function_call_arguments.delta - MAX_LLM_RESPONSE_BYTES caps accumulated semantic content (text + tool args), not raw SSE wire bytes - SSE parser handles : comments, multi-line data:, id:/retry: fields - Keepalive ticker (G) preserved as fallback for reasoning models that pause before producing content Co-authored-by: Will Pfleger <pfleger.will@gmail.com> Signed-off-by: Will Pfleger <pfleger.will@gmail.com>
output_item.added may arrive after function_call_arguments.delta when the Responses API reorders events. The delta handler creates the HashMap entry with an empty name, and the previous or_insert_with no-oped on existing entries. Use and_modify to backfill the name regardless of insertion order. Co-authored-by: Will Pfleger <pfleger.will@gmail.com> Signed-off-by: Will Pfleger <pfleger.will@gmail.com>
…am retry, ordered tool calls The SSE reader split events only on "\n\n", so providers or proxies emitting CRLF or bare-CR line terminators broke event framing. It also let SseReader.buf grow without bound when a stream never produced an event boundary. Streaming requests had no retry on the initial send, and the Responses-API tool-call accumulator used a HashMap, yielding non-deterministic tool-call order. - Normalize CR and CRLF to LF on chunk append (SSE spec equivalence) - Cap SseReader.buf at MAX_LLM_RESPONSE_BYTES, erroring before any unbounded growth - Add send_stream_with_retry() applying the buffered path's transport/ 5xx/429 backoff to the initial request only, stopping before any chunk is emitted to avoid duplicate output - Replace the HashMap tool-call accumulator with a first-seen-ordered Vec to match the buffered parser's ordering Co-authored-by: Will Pfleger <pfleger.will@gmail.com> Signed-off-by: Will Pfleger <pfleger.will@gmail.com>
f1599f2 to
06378f4
Compare
The buffered-body inter-chunk timeout was named llm_stream_chunk_timeout, confusingly similar to stream_chunk_timeout (the SSE inter-chunk timeout). Operators tuning "stream chunk timeout" would set the wrong env var. Rename to llm_body_chunk_timeout / SPROUT_AGENT_LLM_BODY_CHUNK_TIMEOUT_SECS to clearly distinguish the buffered path from the streaming path. Extract the openai_to_sse_events test helper (duplicated in fake_llm.rs, golden_transcripts.rs, and regressions.rs) into tests/common/mod.rs. Co-authored-by: Will Pfleger <pfleger.will@gmail.com> Signed-off-by: Will Pfleger <pfleger.will@gmail.com>
… retry Cover three high-priority gaps identified in the PR #944 test plan: Gap 3 (dual-timeout switchover): Verify first_byte_timeout governs pre-content and stream_chunk_timeout governs post-content. Three tests exercise the transition using sub-second timeouts against a delayed-write TCP server. Gap 1 (split-boundary framing): Verify SseReader handles event boundaries split across TCP chunks, data: prefix split mid-keyword, and trailing data without a final \n\n boundary before EOF. Gap 6 (send_stream_with_retry): Verify 5xx triggers retry and succeeds on second attempt, all-503 exhausts retries with clear error, and 401 surfaces LlmAuth immediately without retrying. Co-authored-by: Will Pfleger <pfleger.will@gmail.com> Signed-off-by: Will Pfleger <pfleger.will@gmail.com>
…alidation tests StreamEmitter::test_channel() helper: returns a live mpsc receiver so tests can assert which agent_message_chunk messages were emitted, unlike noop() which drops the receiver. Chunk-emission (2 tests): verify one chunk per text delta with correct content, and that empty/absent text emits nothing. Auto→Responses upgrade (2 tests): verify that Auto mode retries on /responses when /chat/completions returns a 400 with an is_responses_required body, and that the sticky auto_upgraded flag causes subsequent calls to skip /chat/completions entirely. Both tests assert which paths the fake server saw — the discriminating assertion per Thufir's review. Config validation (2 tests): stream_chunk_timeout=0 and llm_body_chunk_timeout=0 both rejected with the documented error message. Co-authored-by: Will Pfleger <pfleger.will@gmail.com> Signed-off-by: Will Pfleger <pfleger.will@gmail.com>
Summary
Replace buffered
stream: falseLLM requests with Server-Sent Events streaming. Each text delta emits anagent_message_chunksession update as it arrives, providing a natural keepalive that resets the ACP idle clock without relying solely on the 30s ticker.Stack: #935 (Phase 1: base) → this PR (Phase 2: streaming)
Design Decisions
reqwest::Clientinstances:http(with global timeout forsummarize()) andhttp_stream(no global timeout — first-byte and inter-chunk timeouts enforced viatokio::time::timeout)llm_timeout(120s) until first content delta, thenstream_chunk_timeout(30s) between subsequent chunksagent_message_chunksession update, resetting ACP idle clock; 30s ticker preserved as fallback for reasoning modelsMAX_LLM_RESPONSE_BYTEScaps accumulated semantic content (text + tool arg strings), not raw SSE wire bytesPer-Provider Accumulators
HashMap<usize, (String, String)>for parallel tool-call blocks viacontent_blockindexVec-indexed accumulation bytool_calls[].indextypefield insidedata:payload —response.output_text.deltafor text,response.function_call_arguments.deltafor tool args,response.output_item.donefor item completionSSE Parser
Custom inline parser (~70 lines) handling:
:comment lines (discarded)data:fields (joined with\n)id:andretry:fields (ignored)[DONE]sentinel detectionChanges
crates/sprout-agent/src/llm.rshttp_streamclient field (no global timeout)post_stream()function with SSE line parserllm_timeoutfor pre-content,stream_chunk_timeoutfor post-contentMAX_LLM_RESPONSE_BYTESenforcement on semantic contentcrates/sprout-agent/src/config.rsstream_chunk_timeoutfieldSPROUT_AGENT_LLM_STREAM_CHUNK_TIMEOUT_SECSenv var (default 30s)crates/sprout-agent/src/agent.rsWireSender+session_idintocomplete()for delta emissionTests
MAX_LLM_RESPONSE_BYTESenforcement during streamingAcceptance Criteria
cargo testpasses (existing + new streaming tests)cargo clippycleanagent_message_chunk, tool calls accumulate via index-keyed HashMapsummarize()still works viahttpclient (non-streaming path preserved)MAX_LLM_RESPONSE_BYTESenforced during streaming (semantic content)llm_timeoutpre-content,stream_chunk_timeoutpost-content