Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ cls
coc
codegen
coro
culsans
datamodel
deepwiki
drivername
Expand Down Expand Up @@ -127,7 +128,7 @@ taskupdate
testuuid
Tful
tiangolo
TResponse
typ
typeerror
vulnz
TResponse
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"google-api-core>=1.26.0",
"json-rpc>=1.15.0",
"googleapis-common-protos>=1.70.0",
"culsans>=0.11.0 ; python_full_version < '3.13'",
]

classifiers = [
Expand Down
12 changes: 2 additions & 10 deletions src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import asyncio
import logging
import sys

from collections.abc import AsyncGenerator

from pydantic import ValidationError

from a2a.server.events.event_queue import Event, EventQueue
from a2a.server.events.event_queue import Event, EventQueue, QueueShutDown
from a2a.types.a2a_pb2 import (
Message,
Task,
Expand All @@ -17,13 +16,6 @@
from a2a.utils.telemetry import SpanKind, trace_class


# This is an alias to the exception for closed queue
QueueClosed: type[Exception] = asyncio.QueueEmpty

# When using python 3.13 or higher, the closed queue signal is QueueShutdown
if sys.version_info >= (3, 13):
QueueClosed = asyncio.QueueShutDown

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -130,7 +122,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
# This class was made an alias of built-in TimeoutError after 3.11
continue
except (QueueClosed, asyncio.QueueEmpty):
except (QueueShutDown, asyncio.QueueEmpty):
# Confirm that the queue is closed, e.g. we aren't on
# python 3.12 and get a queue empty error on an open queue
if self.queue.is_closed():
Expand Down
112 changes: 49 additions & 63 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,31 @@
import sys

from types import TracebackType
from typing import Any

from typing_extensions import Self


if sys.version_info >= (3, 13):
from asyncio import Queue as AsyncQueue
from asyncio import QueueShutDown

def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
"""Create a backwards-compatible queue object."""
return AsyncQueue(maxsize=maxsize)
else:
import culsans

from culsans import AsyncQueue # type: ignore[no-redef]
from culsans import (
AsyncQueueShutDown as QueueShutDown, # type: ignore[no-redef]
)

def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
"""Create a backwards-compatible queue object."""
return culsans.Queue(maxsize=maxsize).async_q # type: ignore[no-any-return]


from a2a.types.a2a_pb2 import (
Message,
Task,
Expand Down Expand Up @@ -41,7 +63,9 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
if max_queue_size <= 0:
raise ValueError('max_queue_size must be greater than 0')

self.queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=max_queue_size)
self.queue: AsyncQueue[Event] = _create_async_queue(
maxsize=max_queue_size
)
self._children: list[EventQueue] = []
self._is_closed = False
self._lock = asyncio.Lock()
Expand Down Expand Up @@ -73,8 +97,12 @@ async def enqueue_event(self, event: Event) -> None:

logger.debug('Enqueuing event of type: %s', type(event))

# Make sure to use put instead of put_nowait to avoid blocking the event loop.
await self.queue.put(event)
try:
await self.queue.put(event)
except QueueShutDown:
logger.warning('Queue was closed during enqueuing. Event dropped.')
return

for child in self._children:
await child.enqueue_event(event)

Expand Down Expand Up @@ -107,14 +135,9 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
asyncio.QueueShutDown: If the queue has been closed and is empty.
"""
async with self._lock:
if (
sys.version_info < (3, 13)
and self._is_closed
and self.queue.empty()
):
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
if self._is_closed and self.queue.empty():
logger.warning('Queue is closed. Event will not be dequeued.')
raise asyncio.QueueEmpty('Queue is closed.')
raise QueueShutDown('Queue is closed.')

if no_wait:
logger.debug('Attempting to dequeue event (no_wait=True).')
Expand Down Expand Up @@ -152,56 +175,26 @@ def tap(self) -> 'EventQueue':
async def close(self, immediate: bool = False) -> None:
"""Closes the queue for future push events and also closes all child queues.

Once closed, no new events can be enqueued. Behavior is consistent across
Python versions:
- Python >= 3.13: Uses `asyncio.Queue.shutdown` to stop the queue. With
`immediate=True` the queue is shut down and pending events are cleared; with
`immediate=False` the queue is shut down and we wait for it to drain via
`queue.join()`.
- Python < 3.13: Emulates the same semantics by clearing on `immediate=True`
or awaiting `queue.join()` on `immediate=False`.

Consumers attempting to dequeue after close on an empty queue will observe
`asyncio.QueueShutDown` on Python >= 3.13 and `asyncio.QueueEmpty` on
Python < 3.13.

Args:
immediate (bool):
- True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
- False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.

immediate: If True, immediately flushes the queue, discarding all pending
events, and causes any currently blocked `dequeue_event` calls to raise
`QueueShutDown`. If False (default), the queue is marked as closed to new
events, but existing events can still be dequeued and processed until the
queue is fully drained.
"""
logger.debug('Closing EventQueue.')
async with self._lock:
# If already closed, just return.
if self._is_closed and not immediate:
return
if not self._is_closed:
self._is_closed = True
# If using python 3.13 or higher, use shutdown but match <3.13 semantics
if sys.version_info >= (3, 13):
if immediate:
# Immediate: stop queue and clear any pending events, then close children
self.queue.shutdown(True)
await self.clear_events(True)
for child in self._children:
await child.close(True)
return
# Graceful: prevent further gets/puts via shutdown, then wait for drain and children
self.queue.shutdown(False)
await asyncio.gather(
self.queue.join(), *(child.close() for child in self._children)
)
# Otherwise, join the queue
else:
if immediate:
await self.clear_events(True)
for child in self._children:
await child.close(immediate)
return
await asyncio.gather(
self.queue.join(), *(child.close() for child in self._children)
)
self._is_closed = True

self.queue.shutdown(immediate)

await asyncio.gather(
*(child.close(immediate) for child in self._children)
)
if not immediate:
await self.queue.join()

def is_closed(self) -> bool:
"""Checks if the queue is closed."""
Expand Down Expand Up @@ -234,15 +227,8 @@ async def clear_events(self, clear_child_queues: bool = True) -> None:
cleared_count += 1
except asyncio.QueueEmpty:
pass
except Exception as e:
# Handle Python 3.13+ QueueShutDown
if (
sys.version_info >= (3, 13)
and type(e).__name__ == 'QueueShutDown'
):
pass
else:
raise
except QueueShutDown:
pass

if cleared_count > 0:
logger.debug(
Expand Down
Empty file.
Loading
Loading