@@ -242,7 +242,7 @@ def data_received(self, data: bytes) -> None:
242242 if not data :
243243 return
244244
245- # custom payload parser
245+ # custom payload parser - currently always WebSocketReader
246246 if self ._payload_parser is not None :
247247 eof , tail = self ._payload_parser .feed_data (data )
248248 if eof :
@@ -252,57 +252,56 @@ def data_received(self, data: bytes) -> None:
252252 if tail :
253253 self .data_received (tail )
254254 return
255- else :
256- if self ._upgraded or self ._parser is None :
257- # i.e. websocket connection, websocket parser is not set yet
258- self ._tail += data
255+
256+ if self ._upgraded or self ._parser is None :
257+ # i.e. websocket connection, websocket parser is not set yet
258+ self ._tail += data
259+ return
260+
261+ # parse http messages
262+ try :
263+ messages , upgraded , tail = self ._parser .feed_data (data )
264+ except BaseException as underlying_exc :
265+ if self .transport is not None :
266+ # connection.release() could be called BEFORE
267+ # data_received(), the transport is already
268+ # closed in this case
269+ self .transport .close ()
270+ # should_close is True after the call
271+ if isinstance (underlying_exc , HttpProcessingError ):
272+ exc = HttpProcessingError (
273+ code = underlying_exc .code ,
274+ message = underlying_exc .message ,
275+ headers = underlying_exc .headers ,
276+ )
259277 else :
260- # parse http messages
261- try :
262- messages , upgraded , tail = self ._parser .feed_data (data )
263- except BaseException as underlying_exc :
264- if self .transport is not None :
265- # connection.release() could be called BEFORE
266- # data_received(), the transport is already
267- # closed in this case
268- self .transport .close ()
269- # should_close is True after the call
270- if isinstance (underlying_exc , HttpProcessingError ):
271- exc = HttpProcessingError (
272- code = underlying_exc .code ,
273- message = underlying_exc .message ,
274- headers = underlying_exc .headers ,
275- )
276- else :
277- exc = HttpProcessingError ()
278- self .set_exception (exc , underlying_exc )
279- return
280-
281- self ._upgraded = upgraded
282-
283- payload : Optional [StreamReader ] = None
284- for message , payload in messages :
285- if message .should_close :
286- self ._should_close = True
287-
288- self ._payload = payload
289-
290- if self ._skip_payload or message .code in EMPTY_BODY_STATUS_CODES :
291- self .feed_data ((message , EMPTY_PAYLOAD ), 0 )
292- else :
293- self .feed_data ((message , payload ), 0 )
294- if payload is not None :
295- # new message(s) was processed
296- # register timeout handler unsubscribing
297- # either on end-of-stream or immediately for
298- # EMPTY_PAYLOAD
299- if payload is not EMPTY_PAYLOAD :
300- payload .on_eof (self ._drop_timeout )
301- else :
302- self ._drop_timeout ()
278+ exc = HttpProcessingError ()
279+ self .set_exception (exc , underlying_exc )
280+ return
303281
304- if tail :
305- if upgraded :
306- self .data_received (tail )
307- else :
308- self ._tail = tail
282+ self ._upgraded = upgraded
283+
284+ payload : Optional [StreamReader ] = None
285+ for message , payload in messages :
286+ if message .should_close :
287+ self ._should_close = True
288+
289+ self ._payload = payload
290+
291+ if self ._skip_payload or message .code in EMPTY_BODY_STATUS_CODES :
292+ self .feed_data ((message , EMPTY_PAYLOAD ), 0 )
293+ else :
294+ self .feed_data ((message , payload ), 0 )
295+
296+ if payload is not None :
297+ # new message(s) was processed
298+ # register timeout handler unsubscribing
299+ # either on end-of-stream or immediately for
300+ # EMPTY_PAYLOAD
301+ if payload is not EMPTY_PAYLOAD :
302+ payload .on_eof (self ._drop_timeout )
303+ else :
304+ self ._drop_timeout ()
305+
306+ if upgraded and tail :
307+ self .data_received (tail )
0 commit comments