Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import threading
import typing
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple
import uuid

import grpc # type: ignore
Expand Down Expand Up @@ -74,6 +74,15 @@
a subscription. We do this to reduce premature ack expiration.
"""

_DEFAULT_STREAM_ACK_DEADLINE: float = 60
"""The default stream ack deadline in seconds."""

_MAX_STREAM_ACK_DEADLINE: float = 600
"""The maximum stream ack deadline in seconds."""

_MIN_STREAM_ACK_DEADLINE: float = 10
"""The minimum stream ack deadline in seconds."""

_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
code_pb2.DEADLINE_EXCEEDED,
code_pb2.RESOURCE_EXHAUSTED,
Expand Down Expand Up @@ -270,7 +279,36 @@ def __init__(
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline: Union[int, float] = histogram.MIN_ACK_DEADLINE

# If max_duration_per_lease_extension is the default
# we set the stream_ack_deadline to the default of 60
if self._flow_control.max_duration_per_lease_extension == 0:
self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
# We will not be able to extend more than the default minimum
elif (
self._flow_control.max_duration_per_lease_extension
< _MIN_STREAM_ACK_DEADLINE
):
self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
# Will not be able to extend past the max
elif (
self._flow_control.max_duration_per_lease_extension
> _MAX_STREAM_ACK_DEADLINE
):
self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
else:
self._stream_ack_deadline = (
self._flow_control.max_duration_per_lease_extension
)

self._ack_deadline = max(
min(
self._flow_control.min_duration_per_lease_extension,
histogram.MAX_ACK_DEADLINE,
),
histogram.MIN_ACK_DEADLINE,
)

self._rpc: Optional[bidi.ResumableBidiRpc] = None
self._callback: Optional[functools.partial] = None
self._closing = threading.Lock()
Expand Down Expand Up @@ -741,10 +779,10 @@ def heartbeat(self) -> bool:

if send_new_ack_deadline:
request = gapic_types.StreamingPullRequest(
stream_ack_deadline_seconds=self.ack_deadline
stream_ack_deadline_seconds=self._stream_ack_deadline
)
_LOGGER.debug(
"Sending new ack_deadline of %d seconds.", self.ack_deadline
"Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
)
else:
request = gapic_types.StreamingPullRequest()
Expand Down Expand Up @@ -796,7 +834,7 @@ def open(

_LOGGER.debug(
"Creating a stream, default ACK deadline set to {} seconds.".format(
stream_ack_deadline_seconds
self._stream_ack_deadline
)
)

Expand Down Expand Up @@ -928,6 +966,8 @@ def _get_initial_request(
suitable for any other purpose).
"""
# Put the request together.
# We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
# anyway. Set to some big-ish value in case we modack late.
request = gapic_types.StreamingPullRequest(
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
modify_deadline_ack_ids=[],
Expand Down
70 changes: 61 additions & 9 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,61 @@ def test_constructor_and_default_state():
assert manager._client_id is not None


def test_constructor_with_options():
def test_constructor_with_default_options():
flow_control_ = types.FlowControl()
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=mock.sentinel.flow_control,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)

assert manager.flow_control == mock.sentinel.flow_control
assert manager.flow_control == flow_control_
assert manager._scheduler == mock.sentinel.scheduler
assert manager._ack_deadline == 10
assert manager._stream_ack_deadline == 60


def test_constructor_with_min_and_max_duration_per_lease_extension_():
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=15, max_duration_per_lease_extension=20
)
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)
assert manager._ack_deadline == 15
assert manager._stream_ack_deadline == 20


def test_constructor_with_min_duration_per_lease_extension_too_low():
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=9, max_duration_per_lease_extension=9
)
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)
assert manager._ack_deadline == 10
assert manager._stream_ack_deadline == 10


def test_constructor_with_max_duration_per_lease_extension_too_high():
flow_control_ = types.FlowControl(
max_duration_per_lease_extension=601, min_duration_per_lease_extension=601
)
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)
assert manager._ack_deadline == 600
assert manager._stream_ack_deadline == 600


def make_manager(**kwargs):
Expand Down Expand Up @@ -164,9 +209,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting():
manager._flow_control = types.FlowControl(
min_duration_per_lease_extension=0, max_duration_per_lease_extension=0
)
assert manager._stream_ack_deadline == 60
assert manager._ack_deadline == 10
assert manager._obtain_ack_deadline(maybe_update=False) == 10

deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MIN_ACK_DEADLINE
assert manager._stream_ack_deadline == 60

# When we get some historical data, the deadline is adjusted.
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2)
Expand All @@ -186,11 +235,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension():
manager._flow_control = types.FlowControl(
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1
)
assert manager._ack_deadline == 10

manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large

# The deadline configured in flow control should prevail.
deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MIN_ACK_DEADLINE + 1
assert manager._stream_ack_deadline == 60


def test__obtain_ack_deadline_with_min_duration_per_lease_extension():
Expand Down Expand Up @@ -292,12 +344,12 @@ def test__obtain_ack_deadline_no_value_update():

def test_client_id():
manager1 = make_manager()
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10)
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=60)
client_id_1 = request1.client_id
assert client_id_1

manager2 = make_manager()
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10)
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=60)
client_id_2 = request2.client_id
assert client_id_2

Expand All @@ -308,7 +360,7 @@ def test_streaming_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
request = manager._get_initial_request(stream_ack_deadline_seconds=60)
assert request.max_outstanding_messages == 10
assert request.max_outstanding_bytes == 1000

Expand All @@ -318,7 +370,7 @@ def test_streaming_flow_control_use_legacy_flow_control():
flow_control=types.FlowControl(max_messages=10, max_bytes=1000),
use_legacy_flow_control=True,
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
request = manager._get_initial_request(stream_ack_deadline_seconds=60)
assert request.max_outstanding_messages == 0
assert request.max_outstanding_bytes == 0

Expand Down Expand Up @@ -1046,12 +1098,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog):
result = manager.heartbeat()

manager._rpc.send.assert_called_once_with(
gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10)
gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60)
)
assert result
# Set to false after a send is initiated.
assert not manager._send_new_ack_deadline
assert "Sending new ack_deadline of 10 seconds." in caplog.text
assert "Sending new ack_deadline of 60 seconds." in caplog.text


@mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True)
Expand Down