From a81349be57334ada43af351078646021eaef4514 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Tue, 3 Mar 2026 10:13:58 +0000 Subject: [PATCH 1/4] test: isolate astream_incremental tests from CI Fixes #562 --- test/core/test_astream_incremental.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/core/test_astream_incremental.py b/test/core/test_astream_incremental.py index d34b8e70d..af0dca903 100644 --- a/test/core/test_astream_incremental.py +++ b/test/core/test_astream_incremental.py @@ -14,6 +14,7 @@ @pytest.mark.ollama @pytest.mark.llm +@pytest.mark.qualitative async def test_astream_returns_incremental_chunks(): """Test that astream() returns only new content, not accumulated content. @@ -68,6 +69,7 @@ async def test_astream_returns_incremental_chunks(): @pytest.mark.ollama @pytest.mark.llm +@pytest.mark.qualitative async def test_astream_multiple_calls_accumulate_correctly(): """Test that multiple astream() calls accumulate to the final value. @@ -111,6 +113,7 @@ async def test_astream_multiple_calls_accumulate_correctly(): @pytest.mark.ollama @pytest.mark.llm +@pytest.mark.qualitative async def test_astream_beginning_length_tracking(): """Test that beginning_length is correctly tracked across astream calls. @@ -141,6 +144,7 @@ async def test_astream_beginning_length_tracking(): @pytest.mark.ollama @pytest.mark.llm +@pytest.mark.qualitative async def test_astream_empty_beginning(): """Test astream when _underlying_value starts as None.""" session = start_session() @@ -167,7 +171,6 @@ async def test_astream_empty_beginning(): @pytest.mark.llm async def test_computed_mot_raises_error_for_astream(): """Test that computed mot raises an error for astream() calls.""" - # Create a pre-computed thunk mot = ModelOutputThunk(value="Hello, world!") mot._computed = True From fa0209ae98c1ba55c7e29345bfbe6ed14637b8d1 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Tue, 3 Mar 2026 10:38:18 +0000 Subject: [PATCH 2/4] test: add deterministic mock tests for astream incremental logic Introduces `test_astream_mock.py` to test `ModelOutputThunk`'s async queue incremental streaming logic deterministically without relying on highly-variable LLM backends. --- test/core/test_astream_mock.py | 162 +++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 test/core/test_astream_mock.py diff --git a/test/core/test_astream_mock.py b/test/core/test_astream_mock.py new file mode 100644 index 000000000..e3c58c697 --- /dev/null +++ b/test/core/test_astream_mock.py @@ -0,0 +1,162 @@ +"""Deterministic Mock Tests for ModelOutputThunk.astream() incremental return behavior. + +Tests that astream() returns only new content added since the beginning of +each astream() call, not the entire accumulated value. Uses manual queue +injection to bypass LLM calls and network operations, guaranteeing determinism. +""" + +import asyncio +from typing import Any + +import pytest + +from mellea.core.base import CBlock, GenerateType, ModelOutputThunk + + +async def mock_process(mot: ModelOutputThunk, chunk: Any) -> None: + """Mock process function that simply appends the chunk to the underlying value.""" + if mot._underlying_value is None: + mot._underlying_value = "" + if chunk is not None: + mot._underlying_value += chunk + + +async def mock_post_process(mot: ModelOutputThunk) -> None: + """Mock post-process function (does nothing).""" + + +def create_manual_mock_thunk() -> ModelOutputThunk: + """Helper to create a mock ModelOutputThunk where we manually populate the queue.""" + mot = ModelOutputThunk(value=None) + mot._action = CBlock("mock_action") + mot._generate_type = GenerateType.ASYNC + mot._process = mock_process + mot._post_process = mock_post_process + mot._chunk_size = 0 # Read exactly what is available + return mot + + +@pytest.mark.asyncio +async def test_astream_returns_incremental_chunks(): + """Test that astream() returns only new content, not accumulated content.""" + mot = create_manual_mock_thunk() + + # Drop the first chunk and pull it + mot._async_queue.put_nowait("chunk1 ") + chunk1 = await mot.astream() + assert chunk1 == "chunk1 " + + # Drop the second chunk and pull it + mot._async_queue.put_nowait("chunk2 ") + chunk2 = await mot.astream() + assert chunk2 == "chunk2 " + + # Drop the third chunk and pull it + mot._async_queue.put_nowait("chunk3 ") + chunk3 = await mot.astream() + assert chunk3 == "chunk3 " + + # Send completion sentinel + mot._async_queue.put_nowait(None) + + # Wait until fully consumed + while not mot.is_computed(): + await mot.astream() + + final_val = await mot.avalue() + assert final_val == "chunk1 chunk2 chunk3 " + + +@pytest.mark.asyncio +async def test_astream_multiple_calls_accumulate_correctly(): + """Test that multiple astream() calls accumulate to the final value.""" + # Simulating a scenario where queue chunks outpace the reading loop + mot = create_manual_mock_thunk() + + # Drop multiple items at once to simulate fast network + mot._async_queue.put_nowait("c") + mot._async_queue.put_nowait("h") + mot._async_queue.put_nowait("u") + + # Calling astream should drain all currently queued items ("chu") + chunk1 = await mot.astream() + assert chunk1 == "chu" + + mot._async_queue.put_nowait("n") + mot._async_queue.put_nowait("k") + mot._async_queue.put_nowait(None) + + chunk2 = await mot.astream() + assert chunk2 == "chunk" + + final_val = await mot.avalue() + + assert mot.is_computed() + assert final_val == "chunk" + + +@pytest.mark.asyncio +async def test_astream_beginning_length_tracking(): + """Test that beginning_length is correctly tracked across astream calls.""" + mot = create_manual_mock_thunk() + + mot._async_queue.put_nowait("AAA") + chunk1 = await mot.astream() + assert chunk1 == "AAA" + + mot._async_queue.put_nowait("BBB") + chunk2 = await mot.astream() + # verify incremental length tracking works + assert not chunk2.startswith(chunk1) + assert chunk2 == "BBB" + + +@pytest.mark.asyncio +async def test_astream_empty_beginning(): + """Test astream when _underlying_value starts as None.""" + mot = create_manual_mock_thunk() + + mot._async_queue.put_nowait("First") + # At the start, _underlying_value is None, beginning_length is 0 + chunk = await mot.astream() + + # Because beginning length was 0, astream returns the full chunk + assert chunk == "First" + assert mot._underlying_value == "First" + + +@pytest.mark.asyncio +async def test_astream_computed_returns_full_value(): + """Test that astream returns full value when already computed.""" + # Precomputed thunk skips queue checking completely + mot = ModelOutputThunk(value="Hello, world!") + + # For a precomputed thunk, astream directly returns value + result = await mot.astream() + assert result == "Hello, world!" + + +@pytest.mark.asyncio +async def test_astream_final_call_returns_full_value(): + """Test that the final astream call returns the full value when computed.""" + mot = create_manual_mock_thunk() + + mot._async_queue.put_nowait("part1") + chunk1 = await mot.astream() + assert chunk1 == "part1" + + mot._async_queue.put_nowait("part2") + chunk2 = await mot.astream() + assert chunk2 == "part2" + + mot._async_queue.put_nowait("part3") + mot._async_queue.put_nowait(None) + + # Calling astream here processes "part3" and `None`, flagging it as done + chunk3 = await mot.astream() + + final_val = await mot.avalue() + + # The final chunk logically completes the thunk, returning the full value instead of a slice. + assert chunk3 == "part1part2part3" + assert chunk3 == final_val From 07e66f3718cd522bbfa200db443bb9ca1ca59e06 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 13 Mar 2026 13:53:47 +0000 Subject: [PATCH 3/4] test: adapt astream mock tests to upstream incremental semantics Update tests to match the astream() behavior change from PR #618: - astream() now always returns incremental content (including final call) - astream() on a computed MOT raises RuntimeError --- test/core/test_astream_mock.py | 37 ++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/test/core/test_astream_mock.py b/test/core/test_astream_mock.py index e3c58c697..6700754cb 100644 --- a/test/core/test_astream_mock.py +++ b/test/core/test_astream_mock.py @@ -1,8 +1,9 @@ """Deterministic Mock Tests for ModelOutputThunk.astream() incremental return behavior. -Tests that astream() returns only new content added since the beginning of -each astream() call, not the entire accumulated value. Uses manual queue -injection to bypass LLM calls and network operations, guaranteeing determinism. +Tests that astream() returns only the incremental content added during each call. +All astream() chunks concatenated should equal the full final value. Calling +astream() on a computed MOT raises RuntimeError. Uses manual queue injection to +bypass LLM calls and network operations, guaranteeing determinism. """ import asyncio @@ -87,12 +88,13 @@ async def test_astream_multiple_calls_accumulate_correctly(): mot._async_queue.put_nowait(None) chunk2 = await mot.astream() - assert chunk2 == "chunk" - - final_val = await mot.avalue() + # astream() returns only the incremental content added during this call + assert chunk2 == "nk" assert mot.is_computed() - assert final_val == "chunk" + # All astream() chunks concatenated should equal the full value + assert chunk1 + chunk2 == "chunk" + assert mot.value == "chunk" @pytest.mark.asyncio @@ -126,14 +128,14 @@ async def test_astream_empty_beginning(): @pytest.mark.asyncio -async def test_astream_computed_returns_full_value(): - """Test that astream returns full value when already computed.""" - # Precomputed thunk skips queue checking completely +async def test_astream_computed_raises_error(): + """Test that astream raises RuntimeError when already computed.""" + # Precomputed thunk is already computed mot = ModelOutputThunk(value="Hello, world!") - # For a precomputed thunk, astream directly returns value - result = await mot.astream() - assert result == "Hello, world!" + # astream() on a computed MOT now raises RuntimeError + with pytest.raises(RuntimeError, match="Streaming has finished"): + await mot.astream() @pytest.mark.asyncio @@ -155,8 +157,9 @@ async def test_astream_final_call_returns_full_value(): # Calling astream here processes "part3" and `None`, flagging it as done chunk3 = await mot.astream() - final_val = await mot.avalue() + # The final astream() call returns only the incremental content, not the full value + assert chunk3 == "part3" - # The final chunk logically completes the thunk, returning the full value instead of a slice. - assert chunk3 == "part1part2part3" - assert chunk3 == final_val + # All chunks concatenated equal the full value + assert chunk1 + chunk2 + chunk3 == "part1part2part3" + assert mot.value == "part1part2part3" From c5a632d158e14bbfc09cf4a309a094f1879d3a11 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 13 Mar 2026 14:24:40 +0000 Subject: [PATCH 4/4] chore: trigger CI rebuild