1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ from collections import deque
1516import logging
1617import threading
1718import warnings
2324_LOGGER = logging .getLogger (__name__ )
2425
2526
27+ class _QuantityReservation (object ):
28+ """A (partial) reservation of a quantifiable resource."""
29+
30+ def __init__ (self , reserved , needed ):
31+ self .reserved = reserved
32+ self .needed = needed
33+
34+
2635class FlowController (object ):
2736 """A class used to control the flow of messages passing through it.
2837
@@ -34,10 +43,22 @@ class FlowController(object):
3443 def __init__ (self , settings ):
3544 self ._settings = settings
3645
46+ # Load statistics. They represent the number of messages added, but not
47+ # yet released (and their total size).
3748 self ._message_count = 0
3849 self ._total_bytes = 0
3950
40- # The lock is used to protect the internal state (message and byte count).
51+ # A FIFO queue of threads blocked on adding a message, from first to last.
52+ # Only relevant if the configured limit exceeded behavior is BLOCK.
53+ self ._waiting = deque ()
54+
55+ # Reservations of available flow control bytes by the waiting threads.
56+ # Each value is a _QuantityReservation instance.
57+ self ._byte_reservations = dict ()
58+ self ._reserved_bytes = 0
59+
60+ # The lock is used to protect all internal state (message and byte count,
61+ # waiting threads to add, etc.).
4162 self ._operational_lock = threading .Lock ()
4263
4364 # The condition for blocking the flow if capacity is exceeded.
@@ -62,46 +83,80 @@ def add(self, message):
6283 return
6384
6485 with self ._operational_lock :
65- self ._message_count += 1
66- self ._total_bytes += message .ByteSize ()
67-
68- if not self ._is_overflow ():
86+ if not self ._would_overflow (message ):
87+ self ._message_count += 1
88+ self ._total_bytes += message .ByteSize ()
6989 return
7090
71- # We have an overflow, react.
91+ # Adding a message would overflow, react.
7292 if (
7393 self ._settings .limit_exceeded_behavior
7494 == types .LimitExceededBehavior .ERROR
7595 ):
96+ # Raising an error means rejecting a message, thus we do not
97+ # add anything to the existing load, but we do report the would-be
98+ # load if we accepted the message.
7699 msg = (
77- "Flow control limits exceeded "
100+ "Flow control limits would be exceeded "
78101 "(messages: {} / {}, bytes: {} / {})."
79102 ).format (
80- self ._message_count ,
103+ self ._message_count + 1 ,
81104 self ._settings .message_limit ,
82- self ._total_bytes ,
105+ self ._total_bytes + message . ByteSize () ,
83106 self ._settings .byte_limit ,
84107 )
85108 error = exceptions .FlowControlLimitError (msg )
86109
87- # Raising an error means rejecting a message, thus we need to deduct
88- # the latter's contribution to the total load.
89- self ._message_count -= 1
90- self ._total_bytes -= message .ByteSize ()
91110 raise error
92111
93112 assert (
94113 self ._settings .limit_exceeded_behavior
95114 == types .LimitExceededBehavior .BLOCK
96115 )
97116
98- while self ._is_overflow ():
117+ # Sanity check - if a message exceeds total flow control limits all
118+ # by itself, it would block forever, thus raise error.
119+ if (
120+ message .ByteSize () > self ._settings .byte_limit
121+ or self ._settings .message_limit < 1
122+ ):
123+ error_msg = (
124+ "Flow control limits too low for the message. "
125+ "(messages: {} / {}, bytes: {} / {})."
126+ ).format (
127+ 1 ,
128+ self ._settings .message_limit ,
129+ message .ByteSize (),
130+ self ._settings .byte_limit ,
131+ )
132+ raise exceptions .PermanentlyBlockedError (error_msg )
133+
134+ current_thread = threading .current_thread ()
135+
136+ while self ._would_overflow (message ):
137+ if current_thread not in self ._byte_reservations :
138+ self ._waiting .append (current_thread )
139+ self ._byte_reservations [current_thread ] = _QuantityReservation (
140+ reserved = 0 , needed = message .ByteSize ()
141+ )
142+
99143 _LOGGER .debug (
100144 "Blocking until there is enough free capacity in the flow."
101145 )
102146 self ._has_capacity .wait ()
103147 _LOGGER .debug ("Woke up from waiting on free capacity in the flow." )
104148
149+ # Message accepted, increase the load and remove thread stats if
150+ # they exist in the waiting queue.
151+ self ._message_count += 1
152+ self ._total_bytes += message .ByteSize ()
153+
154+ reservation = self ._byte_reservations .get (current_thread )
155+ if reservation :
156+ self ._reserved_bytes -= reservation .reserved
157+ del self ._byte_reservations [current_thread ]
158+ self ._waiting .remove (current_thread )
159+
105160 def release (self , message ):
106161 """Release a mesage from flow control.
107162
@@ -113,8 +168,7 @@ def release(self, message):
113168 return
114169
115170 with self ._operational_lock :
116- was_overflow = self ._is_overflow ()
117-
171+ # Releasing a message decreases the load.
118172 self ._message_count -= 1
119173 self ._total_bytes -= message .ByteSize ()
120174
@@ -127,19 +181,72 @@ def release(self, message):
127181 self ._message_count = max (0 , self ._message_count )
128182 self ._total_bytes = max (0 , self ._total_bytes )
129183
130- if was_overflow and not self ._is_overflow ():
184+ self ._distribute_available_bytes ()
185+
186+ # If at least one thread waiting to add() can be unblocked, wake them up.
187+ if self ._ready_to_unblock ():
131188 _LOGGER .debug ("Notifying threads waiting to add messages to flow." )
132189 self ._has_capacity .notify_all ()
133190
134- def _is_overflow (self ):
135- """Determine if the current message load exceeds flow control limits.
191+ def _distribute_available_bytes (self ):
192+ """Distribute availalbe free capacity among the waiting threads in FIFO order.
193+
194+ The method assumes that the caller has obtained ``_operational_lock``.
195+ """
196+ available = self ._settings .byte_limit - self ._total_bytes - self ._reserved_bytes
197+
198+ for thread in self ._waiting :
199+ if available <= 0 :
200+ break
201+
202+ reservation = self ._byte_reservations [thread ]
203+ still_needed = reservation .needed - reservation .reserved
204+ can_give = min (still_needed , available )
205+
206+ reservation .reserved += can_give
207+ self ._reserved_bytes += can_give
208+ available -= can_give
209+
210+ def _ready_to_unblock (self ):
211+ """Determine if any of the threads waiting to add a message can proceed.
136212
137213 The method assumes that the caller has obtained ``_operational_lock``.
138214
139215 Returns:
140216 bool
141217 """
142- return (
143- self ._message_count > self ._settings .message_limit
144- or self ._total_bytes > self ._settings .byte_limit
145- )
218+ if self ._waiting :
219+ # It's enough to only check the head of the queue, because FIFO
220+ # distribution of any free capacity.
221+ reservation = self ._byte_reservations [self ._waiting [0 ]]
222+ return (
223+ reservation .reserved >= reservation .needed
224+ and self ._message_count < self ._settings .message_limit
225+ )
226+
227+ return False
228+
229+ def _would_overflow (self , message ):
230+ """Determine if accepting a message would exceed flow control limits.
231+
232+ The method assumes that the caller has obtained ``_operational_lock``.
233+
234+ Args:
235+ message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
236+ The message entering the flow control.
237+
238+ Returns:
239+ bool
240+ """
241+ reservation = self ._byte_reservations .get (threading .current_thread ())
242+
243+ if reservation :
244+ enough_reserved = reservation .reserved >= reservation .needed
245+ else :
246+ enough_reserved = False
247+
248+ bytes_taken = self ._total_bytes + self ._reserved_bytes + message .ByteSize ()
249+ size_overflow = bytes_taken > self ._settings .byte_limit and not enough_reserved
250+ msg_count_overflow = self ._message_count + 1 > self ._settings .message_limit
251+
252+ return size_overflow or msg_count_overflow
0 commit comments