diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index f45490134..601db15a6 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -4,16 +4,19 @@ import logging import re from json import JSONDecodeError -from typing import Any, AsyncIterator +from typing import Any, AsyncIterator, Iterator from cachetools import TTLCache # type: ignore from llama_stack_client import APIConnectionError from llama_stack_client.lib.agents.agent import AsyncAgent # type: ignore from llama_stack_client import AsyncLlamaStackClient # type: ignore -from llama_stack_client.types.shared.interleaved_content_item import TextContentItem from llama_stack_client.types import UserMessage # type: ignore +from llama_stack_client.lib.agents.event_logger import interleaved_content_as_str +from llama_stack_client.types.shared import ToolCall +from llama_stack_client.types.shared.interleaved_content_item import TextContentItem + from fastapi import APIRouter, HTTPException, Request, Depends, status from fastapi.responses import StreamingResponse @@ -46,7 +49,8 @@ _agent_cache: TTLCache[str, AsyncAgent] = TTLCache(maxsize=1000, ttl=3600) -async def get_agent( # pylint: disable=too-many-arguments,too-many-positional-arguments +# # pylint: disable=R0913,R0917 +async def get_agent( client: AsyncLlamaStackClient, model_id: str, system_prompt: str, @@ -127,7 +131,7 @@ def stream_end_event(metadata_map: dict) -> str: ) -def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> str | None: +def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> Iterator[str]: """Build a streaming event from a chunk response. This function processes chunks from the LLama Stack streaming response and formats @@ -142,58 +146,261 @@ def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> str | N chunk_id: The current chunk ID counter (gets incremented for each token) Returns: - str | None: A formatted SSE data string with event information, or None if - the chunk doesn't contain processable event data + Iterator[str]: An iterable list of formatted SSE data strings with event information """ - # pylint: disable=R1702 - if hasattr(chunk.event, "payload"): - if chunk.event.payload.event_type == "step_progress": - if hasattr(chunk.event.payload.delta, "text"): - text = chunk.event.payload.delta.text - return format_stream_data( + if hasattr(chunk, "error"): + yield from _handle_error_event(chunk, chunk_id) + return + + event_type = chunk.event.payload.event_type + step_type = getattr(chunk.event.payload, "step_type", None) + + if event_type in {"turn_start", "turn_awaiting_input"}: + yield from _handle_turn_start_event(chunk_id) + elif event_type == "turn_complete": + yield from _handle_turn_complete_event(chunk, chunk_id) + elif step_type == "shield_call": + yield from _handle_shield_event(chunk, chunk_id) + elif step_type == "inference": + yield from _handle_inference_event(chunk, chunk_id) + elif step_type == "tool_execution": + yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map) + else: + yield from _handle_heartbeat_event(chunk_id) + + +# ----------------------------------- +# Error handling +# ----------------------------------- +def _handle_error_event(chunk: Any, chunk_id: int) -> Iterator[str]: + yield format_stream_data( + { + "event": "error", + "data": { + "id": chunk_id, + "token": chunk.error["message"], + }, + } + ) + + +# ----------------------------------- +# Turn handling +# ----------------------------------- +def _handle_turn_start_event(chunk_id: int) -> Iterator[str]: + yield format_stream_data( + { + "event": "token", + "data": { + "id": chunk_id, + "token": "", + }, + } + ) + + +def _handle_turn_complete_event(chunk: Any, chunk_id: int) -> Iterator[str]: + yield format_stream_data( + { + "event": "turn_complete", + "data": { + "id": chunk_id, + "token": chunk.event.payload.turn.output_message.content, + }, + } + ) + + +# ----------------------------------- +# Shield handling +# ----------------------------------- +def _handle_shield_event(chunk: Any, chunk_id: int) -> Iterator[str]: + if chunk.event.payload.event_type == "step_complete": + violation = chunk.event.payload.step_details.violation + if not violation: + yield format_stream_data( + { + "event": "token", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": "No Violation", + }, + } + ) + else: + violation = ( + f"Violation: {violation.user_message} (Metadata: {violation.metadata})" + ) + yield format_stream_data( + { + "event": "token", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": violation, + }, + } + ) + + +# ----------------------------------- +# Inference handling +# ----------------------------------- +def _handle_inference_event(chunk: Any, chunk_id: int) -> Iterator[str]: + if chunk.event.payload.event_type == "step_start": + yield format_stream_data( + { + "event": "token", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": "", + }, + } + ) + + elif chunk.event.payload.event_type == "step_progress": + if chunk.event.payload.delta.type == "tool_call": + if isinstance(chunk.event.payload.delta.tool_call, str): + yield format_stream_data( { - "event": "token", + "event": "tool_call", "data": { "id": chunk_id, "role": chunk.event.payload.step_type, - "token": text, + "token": chunk.event.payload.delta.tool_call, }, } ) - if ( - chunk.event.payload.event_type == "step_complete" - and chunk.event.payload.step_details.step_type == "tool_execution" - ): - for r in chunk.event.payload.step_details.tool_responses: - if r.tool_name == "knowledge_search" and r.content: - for text_content_item in r.content: - if isinstance(text_content_item, TextContentItem): - for match in METADATA_PATTERN.findall( - text_content_item.text - ): - try: - meta = json.loads(match.replace("'", '"')) + elif isinstance(chunk.event.payload.delta.tool_call, ToolCall): + yield format_stream_data( + { + "event": "tool_call", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": chunk.event.payload.delta.tool_call.tool_name, + }, + } + ) + + elif chunk.event.payload.delta.type == "text": + yield format_stream_data( + { + "event": "token", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": chunk.event.payload.delta.text, + }, + } + ) + + +# ----------------------------------- +# Tool Execution handling +# ----------------------------------- +# pylint: disable=R1702,R0912 +def _handle_tool_execution_event( + chunk: Any, chunk_id: int, metadata_map: dict +) -> Iterator[str]: + if chunk.event.payload.event_type == "step_start": + yield format_stream_data( + { + "event": "tool_call", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": "", + }, + } + ) + + elif chunk.event.payload.event_type == "step_complete": + for t in chunk.event.payload.step_details.tool_calls: + yield format_stream_data( + { + "event": "tool_call", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": f"Tool:{t.tool_name} arguments:{t.arguments}", + }, + } + ) + + for r in chunk.event.payload.step_details.tool_responses: + if r.tool_name == "query_from_memory": + inserted_context = interleaved_content_as_str(r.content) + yield format_stream_data( + { + "event": "tool_call", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": f"Fetched {len(inserted_context)} bytes from memory", + }, + } + ) + + elif r.tool_name == "knowledge_search" and r.content: + summary = "" + for i, text_content_item in enumerate(r.content): + if isinstance(text_content_item, TextContentItem): + if i == 0: + summary = text_content_item.text + newline_pos = summary.find("\n") + if newline_pos > 0: + summary = summary[:newline_pos] + for match in METADATA_PATTERN.findall(text_content_item.text): + try: + meta = json.loads(match.replace("'", '"')) + if "document_id" in meta: metadata_map[meta["document_id"]] = meta - except JSONDecodeError: - logger.debug( - "JSONDecodeError was thrown in processing %s", - match, - ) - if chunk.event.payload.step_details.tool_calls: - tool_name = str( - chunk.event.payload.step_details.tool_calls[0].tool_name + except JSONDecodeError: + logger.debug( + "JSONDecodeError was thrown in processing %s", + match, + ) + + yield format_stream_data( + { + "event": "tool_call", + "data": { + "id": chunk_id, + "role": chunk.event.payload.step_type, + "token": f"Tool:{r.tool_name} summary:{summary}", + }, + } ) - return format_stream_data( + + else: + yield format_stream_data( { - "event": "token", + "event": "tool_call", "data": { "id": chunk_id, "role": chunk.event.payload.step_type, - "token": tool_name, + "token": f"Tool:{r.tool_name} response:{r.content}", }, } ) - return None + + +# ----------------------------------- +# Catch-all for everything else +# ----------------------------------- +def _handle_heartbeat_event(chunk_id: int) -> Iterator[str]: + yield format_stream_data( + { + "event": "heartbeat", + "data": { + "id": chunk_id, + "token": "heartbeat", + }, + } + ) @router.post("/streaming_query") @@ -233,7 +440,7 @@ async def response_generator(turn_response: Any) -> AsyncIterator[str]: yield stream_start_event(conversation_id) async for chunk in turn_response: - if event := stream_build_event(chunk, chunk_id, metadata_map): + for event in stream_build_event(chunk, chunk_id, metadata_map): complete_response += json.loads(event.replace("data: ", ""))[ "data" ]["token"] diff --git a/tests/unit/app/endpoints/test_streaming_query.py b/tests/unit/app/endpoints/test_streaming_query.py index 3b29e2b48..305a28615 100644 --- a/tests/unit/app/endpoints/test_streaming_query.py +++ b/tests/unit/app/endpoints/test_streaming_query.py @@ -1,5 +1,7 @@ """Unit tests for the /streaming-query REST API endpoint.""" +from datetime import datetime + # pylint: disable=too-many-lines import json @@ -11,7 +13,26 @@ from llama_stack_client import APIConnectionError from llama_stack_client.types import UserMessage # type: ignore +from llama_stack_client.types.agents import Turn +from llama_stack_client.types.shared.completion_message import CompletionMessage from llama_stack_client.types.shared.interleaved_content_item import TextContentItem +from llama_stack_client.types.shared.safety_violation import SafetyViolation +from llama_stack_client.types.shield_call_step import ShieldCallStep +from llama_stack_client.types.shared.tool_call import ToolCall +from llama_stack_client.types.shared.content_delta import TextDelta, ToolCallDelta +from llama_stack_client.types.agents.turn_response_event import TurnResponseEvent +from llama_stack_client.types.agents.agent_turn_response_stream_chunk import ( + AgentTurnResponseStreamChunk, +) +from llama_stack_client.types.agents.turn_response_event_payload import ( + AgentTurnResponseStepProgressPayload, + AgentTurnResponseStepCompletePayload, + AgentTurnResponseTurnStartPayload, + AgentTurnResponseTurnAwaitingInputPayload, + AgentTurnResponseTurnCompletePayload, +) +from llama_stack_client.types.tool_execution_step import ToolExecutionStep +from llama_stack_client.types.tool_response import ToolResponse from configuration import AppConfig from app.endpoints.query import get_rag_toolgroups @@ -149,27 +170,34 @@ async def _test_streaming_query_endpoint_handler(mocker, store_transcript=False) mocker.Mock(identifier="model2", model_type="llm", provider_id="provider2"), ] - # Mock the streaming response from LLama Stack + # Construct the streaming response from LLama Stack. + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. mock_streaming_response = mocker.AsyncMock() mock_streaming_response.__aiter__.return_value = [ - mocker.Mock( - event=mocker.Mock( - payload=mocker.Mock( + AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepProgressPayload( event_type="step_progress", - delta=mocker.Mock(text="LLM answer"), step_type="inference", + delta=TextDelta(text="LLM answer", type="text"), + step_id="s1", ) ) ), - mocker.Mock( - event=mocker.Mock( - payload=mocker.Mock( + AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepCompletePayload( event_type="step_complete", + step_id="s1", step_type="tool_execution", - step_details=mocker.Mock( + step_details=ToolExecutionStep( + turn_id="t1", + step_id="s2", step_type="tool_execution", tool_responses=[ - mocker.Mock( + ToolResponse( + call_id="c1", tool_name="knowledge_search", content=[ TextContentItem(text=s, type="text") @@ -178,8 +206,8 @@ async def _test_streaming_query_endpoint_handler(mocker, store_transcript=False) ) ], tool_calls=[ - mocker.Mock( - tool_name="knowledge_search", + ToolCall( + call_id="t1", tool_name="knowledge_search", arguments={} ) ], ), @@ -233,8 +261,8 @@ async def _test_streaming_query_endpoint_handler(mocker, store_transcript=False) assert "LLM answer" in full_content # Assert referenced documents - assert len(streaming_content) == 4 - d = json.loads(streaming_content[3][5:]) + assert len(streaming_content) == 5 + d = json.loads(streaming_content[4][5:]) referenced_documents = d["data"]["referenced_documents"] assert len(referenced_documents) == 2 assert referenced_documents[1]["doc_title"] == "Doc2" @@ -247,7 +275,8 @@ async def _test_streaming_query_endpoint_handler(mocker, store_transcript=False) query_is_valid=True, query=query, query_request=query_request, - response="LLM answerknowledge_search", + response="LLM answerTool:knowledge_search arguments:{}Tool:knowledge_search " + "summary:knowledge_search tool found 2 chunks:", attachments=[], rag_chunks=[], truncated=False, @@ -612,19 +641,182 @@ async def test_retrieve_response_with_two_attachments(prepare_agent_mocks, mocke ) -def test_stream_build_event_step_progress(mocker): +def test_stream_build_event_turn_start(): + """Test stream_build_event function with turn_start event type.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseTurnStartPayload( + event_type="turn_start", + turn_id="t1", + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "token"' in result + assert '"token": ""' in result + assert '"id": 0' in result + + +def test_stream_build_event_turn_awaiting_input(): + """Test stream_build_event function with turn_awaiting_input event type.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseTurnAwaitingInputPayload( + event_type="turn_awaiting_input", + turn=Turn( + input_messages=[], + output_message=CompletionMessage( + content="content", + role="assistant", + stop_reason="end_of_turn", + ), + session_id="session-1", + started_at=datetime.now(), + steps=[], + turn_id="t1", + ), + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "token"' in result + assert '"token": ""' in result + assert '"id": 0' in result + + +def test_stream_build_event_turn_complete(): + """Test stream_build_event function with turn_complete event type.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseTurnCompletePayload( + event_type="turn_complete", + turn=Turn( + input_messages=[], + output_message=CompletionMessage( + content="content", + role="assistant", + stop_reason="end_of_turn", + ), + session_id="session-1", + started_at=datetime.now(), + steps=[], + turn_id="t1", + ), + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "turn_complete"' in result + assert '"token": "content"' in result + assert '"id": 0' in result + + +def test_stream_build_event_shield_call_step_complete_no_violation(): + """Test stream_build_event function with shield_call_step_complete event type.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepCompletePayload( + event_type="step_complete", + step_type="shield_call", + step_details=ShieldCallStep( + step_id="s1", + step_type="shield_call", + turn_id="t1", + ), + step_id="s1", + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "token"' in result + assert '"token": "No Violation"' in result + assert '"role": "shield_call"' in result + assert '"id": 0' in result + + +def test_stream_build_event_shield_call_step_complete_with_violation(): + """Test stream_build_event function with shield_call_step_complete event type with violation.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepCompletePayload( + event_type="step_complete", + step_type="shield_call", + step_details=ShieldCallStep( + step_id="s1", + step_type="shield_call", + turn_id="t1", + violation=SafetyViolation( + metadata={}, + violation_level="info", + user_message="I don't like the cut of your jib", + ), + ), + step_id="s1", + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "token"' in result + assert ( + '"token": "Violation: I don\'t like the cut of your jib (Metadata: {})"' + in result + ) + assert '"role": "shield_call"' in result + assert '"id": 0' in result + + +def test_stream_build_event_step_progress(): """Test stream_build_event function with step_progress event type.""" - # Create a properly nested mock chunk structure - mock_chunk = mocker.Mock() - mock_chunk.event = mocker.Mock() - mock_chunk.event.payload = mocker.Mock() - mock_chunk.event.payload.event_type = "step_progress" - mock_chunk.event.payload.step_type = "inference" - mock_chunk.event.payload.delta = mocker.Mock() - mock_chunk.event.payload.delta.text = "This is a test response" - - chunk_id = 0 - result = stream_build_event(mock_chunk, chunk_id, {}) + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepProgressPayload( + event_type="step_progress", + step_type="inference", + delta=TextDelta(text="This is a test response", type="text"), + step_id="s1", + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) assert result is not None assert "data: " in result @@ -634,18 +826,84 @@ def test_stream_build_event_step_progress(mocker): assert '"id": 0' in result -def test_stream_build_event_step_complete(mocker): +def test_stream_build_event_step_progress_tool_call_str(): + """Test stream_build_event function with step_progress_tool_call event type with a string.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepProgressPayload( + event_type="step_progress", + step_type="inference", + delta=ToolCallDelta( + parse_status="succeeded", tool_call="tool-called", type="tool_call" + ), + step_id="s1", + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "tool_call"' in result + assert '"token": "tool-called"' in result + assert '"role": "inference"' in result + assert '"id": 0' in result + + +def test_stream_build_event_step_progress_tool_call_tool_call(): + """Test stream_build_event function with step_progress_tool_call event type with a ToolCall.""" + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepProgressPayload( + event_type="step_progress", + step_type="inference", + delta=ToolCallDelta( + parse_status="succeeded", + tool_call=ToolCall( + arguments={}, call_id="tc1", tool_name="my-tool" + ), + type="tool_call", + ), + step_id="s1", + ) + ) + ) + + result = next(stream_build_event(chunk, 0, {})) + + assert result is not None + assert "data: " in result + assert '"event": "tool_call"' in result + assert '"token": "my-tool"' in result + assert '"role": "inference"' in result + assert '"id": 0' in result + + +def test_stream_build_event_step_complete(): """Test stream_build_event function with step_complete event type.""" - # Create a properly nested mock chunk structure - mock_chunk = mocker.Mock( - event=mocker.Mock( - payload=mocker.Mock( + # Create a properly nested chunk structure + # We cannot use 'mock' as 'hasattr(mock, "xxx")' adds the missing + # attribute and therefore makes checks to see whether it is missing fail. + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepCompletePayload( event_type="step_complete", + step_id="s1", step_type="tool_execution", - step_details=mocker.Mock( + step_details=ToolExecutionStep( + turn_id="t1", + step_id="s2", step_type="tool_execution", tool_responses=[ - mocker.Mock( + ToolResponse( + call_id="c1", tool_name="knowledge_search", content=[ TextContentItem(text=s, type="text") @@ -654,8 +912,8 @@ def test_stream_build_event_step_complete(mocker): ) ], tool_calls=[ - mocker.Mock( - tool_name="knowledge_search", + ToolCall( + call_id="t1", tool_name="knowledge_search", arguments={} ) ], ), @@ -663,28 +921,61 @@ def test_stream_build_event_step_complete(mocker): ) ) - chunk_id = 0 - result = stream_build_event(mock_chunk, chunk_id, {}) + itr = stream_build_event(chunk, 0, {}) + result = next(itr) assert result is not None assert "data: " in result - assert '"event": "token"' in result - assert '"token": "knowledge_search"' in result + assert '"event": "tool_call"' in result + assert '"token": "Tool:knowledge_search arguments:' in result + + result = next(itr) + assert ( + '"token": "Tool:knowledge_search summary:knowledge_search tool found 2 chunks:"' + in result + ) assert '"role": "tool_execution"' in result assert '"id": 0' in result -def test_stream_build_event_returns_none(mocker): - """Test stream_build_event function returns None when chunk doesn't have expected structure.""" - # Create a mock chunk without the expected payload structure - mock_chunk = mocker.Mock() - mock_chunk.event = mocker.Mock() - # Deliberately not setting payload attribute +def test_stream_build_event_error(): + """Test stream_build_event function returns a 'error' when chunk contains error information.""" + # Create a mock chunk without an expected payload structure + + # pylint: disable=R0903 + class MockError: + """Dummy class to mock an exception.""" + + error = {"message": "Something went wrong"} + + result = next(stream_build_event(MockError(), 0, {})) + + assert result is not None + assert '"id": 0' in result + assert '"event": "error"' in result + assert '"token": "Something went wrong"' in result + + +def test_stream_build_event_returns_heartbeat(): + """Test stream_build_event function returns a 'heartbeat' when chunk is unrecognised.""" + # Create a mock chunk without an expected payload structure + chunk = AgentTurnResponseStreamChunk( + event=TurnResponseEvent( + payload=AgentTurnResponseStepProgressPayload( + event_type="step_progress", + step_type="memory_retrieval", + delta=TextDelta(text="", type="text"), + step_id="s1", + ) + ) + ) - chunk_id = 0 - result = stream_build_event(mock_chunk, chunk_id, {}) + result = next(stream_build_event(chunk, 0, {})) - assert result is None + assert result is not None + assert '"id": 0' in result + assert '"event": "heartbeat"' in result + assert '"token": "heartbeat"' in result async def test_retrieve_response_with_mcp_servers(prepare_agent_mocks, mocker):