Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces configurable backpressure for the Topic Writer by adding a byte-size limit for the internal in-flight buffer and making writes block (up to a timeout) until space is freed by server acks.
Changes:
- Add
max_buffer_size_bytesandbuffer_wait_timeout_secto writer settings and enforce them inWriterAsyncIOReconnector.write_with_ack_future(). - Add a new public exception
TopicWriterBufferFullErrorsurfaced viaydb.topic. - Add sync/async tests and an example showing how to use the buffer limit.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
ydb/topic.py |
Exports the new TopicWriterBufferFullError as part of the public Topic API. |
ydb/_topic_writer/topic_writer.py |
Adds settings fields, new exception type, and a helper for message size accounting. |
ydb/_topic_writer/topic_writer_asyncio.py |
Implements buffer accounting + backpressure wait loop and updates ack handling/shutdown behavior. |
ydb/_topic_writer/topic_writer_test.py |
Adds sync writer tests around error propagation/blocking behavior. |
ydb/_topic_writer/topic_writer_asyncio_test.py |
Adds async tests for timeout, ack-unblocking, and concurrent waiting writers. |
examples/topic/writer_example.py |
Documents how to configure and handle backpressure in user code. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| data_len = len(msg.data) | ||
| meta_len = sum(len(k) + len(v) for k, v in msg.metadata_items.items()) if msg.metadata_items else 0 | ||
| return data_len + meta_len + 64 # 64 bytes overhead per message (seq_no, timestamps, etc.) | ||
|
|
||
|
|
There was a problem hiding this comment.
internal_message_size_bytes() currently uses len(msg.data) and len(k) for metadata keys. This can drift from the actual bytes held in the internal queues because msg.data is later mutated during compression (_encode_data_inplace), and len(k) counts characters not UTF-8 bytes. Consider accounting against a stable byte length (e.g., msg.uncompressed_size plus len(k.encode('utf-8')) + len(v)), or store the accounted size on the message at enqueue time and subtract the same value on ack.
| data_len = len(msg.data) | |
| meta_len = sum(len(k) + len(v) for k, v in msg.metadata_items.items()) if msg.metadata_items else 0 | |
| return data_len + meta_len + 64 # 64 bytes overhead per message (seq_no, timestamps, etc.) | |
| # Prefer a stable uncompressed size for accounting, fall back to current data length. | |
| uncompressed_size = getattr(msg, "uncompressed_size", None) | |
| data_len = uncompressed_size if isinstance(uncompressed_size, int) and uncompressed_size >= 0 else len(msg.data) | |
| if msg.metadata_items: | |
| meta_len = 0 | |
| for k, v in msg.metadata_items.items(): | |
| key_bytes_len = len(k.encode("utf-8")) | |
| if isinstance(v, str): | |
| value_bytes_len = len(v.encode("utf-8")) | |
| else: | |
| value_bytes_len = len(v) | |
| meta_len += key_bytes_len + value_bytes_len | |
| else: | |
| meta_len = 0 | |
| # 64 bytes overhead per message (seq_no, timestamps, etc.) | |
| return data_len + meta_len + 64 |
| max_buf = self._settings.max_buffer_size_bytes | ||
| if max_buf is not None: | ||
| new_bytes = sum(internal_message_size_bytes(m) for m in internal_messages) | ||
| timeout_sec = self._settings.buffer_wait_timeout_sec | ||
| deadline = self._loop.time() + timeout_sec | ||
| while True: | ||
| self._buffer_updated.clear() | ||
| if self._buffer_bytes + new_bytes <= max_buf: | ||
| break | ||
| if self._loop.time() >= deadline: | ||
| raise TopicWriterBufferFullError( | ||
| "Topic writer buffer full: no free space within %.1f s (buffer=%d, need=%d, max=%d)" | ||
| % (timeout_sec, self._buffer_bytes, new_bytes, max_buf) | ||
| ) | ||
| try: | ||
| await asyncio.wait_for( | ||
| self._buffer_updated.wait(), | ||
| timeout=min(0.5, max(0.01, deadline - self._loop.time())), | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| pass | ||
| self._buffer_bytes += new_bytes | ||
|
|
There was a problem hiding this comment.
Buffer reservation is computed before encoding (new_bytes = sum(internal_message_size_bytes(m) ...)), but buffer release on ack recomputes size from the (possibly mutated) InternalMessage. When codec selection/compression changes message.data, _buffer_bytes can drift (over- or under-count), breaking backpressure. A robust approach is to compute a single accounted size per message at enqueue time (or use a stable field like uncompressed_size) and subtract that same value on ack.
| deadline = self._loop.time() + timeout_sec | ||
| while True: | ||
| self._buffer_updated.clear() | ||
| if self._buffer_bytes + new_bytes <= max_buf: | ||
| break | ||
| if self._loop.time() >= deadline: | ||
| raise TopicWriterBufferFullError( | ||
| "Topic writer buffer full: no free space within %.1f s (buffer=%d, need=%d, max=%d)" | ||
| % (timeout_sec, self._buffer_bytes, new_bytes, max_buf) | ||
| ) | ||
| try: | ||
| await asyncio.wait_for( | ||
| self._buffer_updated.wait(), | ||
| timeout=min(0.5, max(0.01, deadline - self._loop.time())), | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| pass |
There was a problem hiding this comment.
The backpressure wait loop doesn’t react to writer stop/close while waiting for _buffer_updated (it only calls _check_stop() once at the start). If the writer is stopped while a caller is blocked here, it will typically wait until buffer_wait_timeout_sec and then raise TopicWriterBufferFullError, masking the real stop reason. Consider checking _stop_reason inside the loop (or waiting on both _buffer_updated and _stop_reason) and/or setting _buffer_updated in _stop() to wake waiters immediately.
| f.set_exception(reason) | ||
| f.exception() # mark as retrieved so asyncio does not log "Future exception was never retrieved" |
There was a problem hiding this comment.
_stop() unconditionally calls f.set_exception(reason) and then f.exception() for every future. If a caller cancels a returned ack future (or it’s already done), set_exception() can raise InvalidStateError, and f.exception() can raise CancelledError, potentially breaking shutdown/error propagation. Guard with if not f.done(): ... (and handle cancelled futures) before setting/reading exceptions.
| f.set_exception(reason) | |
| f.exception() # mark as retrieved so asyncio does not log "Future exception was never retrieved" | |
| if not f.done(): | |
| try: | |
| f.set_exception(reason) | |
| except asyncio.InvalidStateError: | |
| # Future might have been completed or cancelled concurrently; ignore. | |
| pass | |
| try: | |
| # Mark exception as retrieved so asyncio does not log | |
| # "Future exception was never retrieved". | |
| f.exception() | |
| except asyncio.CancelledError: | |
| # It is valid for callers to cancel ack futures; ignore. | |
| pass |
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information