diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 9e994805..4e8054de 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -166,6 +166,7 @@ def __init__( self._next_chunk_index: int = 0 self._destination_identities = destination_identities self._sender_identity = sender_identity or self._local_participant.identity + self._closed = False async def _send_header(self): req = proto_ffi.FfiRequest( @@ -191,6 +192,8 @@ async def _send_header(self): raise ConnectionError(cb.send_stream_header.error) async def _send_chunk(self, chunk: proto_DataStream.Chunk): + if self._closed: + raise RuntimeError(f"Cannot send chunk after stream is closed: {chunk}") req = proto_ffi.FfiRequest( send_stream_chunk=proto_room.SendStreamChunkRequest( chunk=chunk, @@ -238,6 +241,9 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): async def aclose( self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None ): + if self._closed: + raise RuntimeError("Stream already closed") + self._closed = True await self._send_trailer( trailer=proto_DataStream.Trailer( stream_id=self._header.stream_id, reason=reason, attributes=attributes