Skip to content

Commit a3fd65e

Browse files
committed
Annotate common_service.py and CommonService
1 parent 5835843 commit a3fd65e

1 file changed

Lines changed: 118 additions & 63 deletions

File tree

src/workflows/services/common_service.py

Lines changed: 118 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
import enum
55
import itertools
66
import logging
7+
import multiprocessing.connection
78
import queue
89
import threading
910
import time
11+
from collections.abc import Callable, Generator, Mapping
1012
from typing import Any
1113

1214
from opentelemetry import trace
@@ -17,7 +19,7 @@
1719

1820
import workflows
1921
import workflows.logging
20-
from workflows.transport.common_transport import CommonTransport
22+
from workflows.transport.common_transport import CommonTransport, MessageCallback
2123
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
2224

2325

@@ -57,7 +59,7 @@ class Status(enum.Enum):
5759
NONE = (-1, "no service loaded") # Node has no service instance loaded
5860
TEARDOWN = (-2, "shutdown") # Node is shutting down
5961

60-
def __init__(self, intval, description):
62+
def __init__(self, intval: int, description: str):
6163
"""
6264
Each status is defined as a tuple of a unique integer value and a
6365
descriptive string. These are available via enum properties
@@ -98,18 +100,19 @@ class CommonService:
98100

99101
# Logger name ---------------------------------------------------------------
100102

101-
_logger_name = "workflows.service" # The logger can be accessed via self.log
103+
#: The logger can be accessed via self.log
104+
_logger_name = "workflows.service"
102105

103106
# Overrideable functions ----------------------------------------------------
104107

105-
def initializing(self):
108+
def initializing(self) -> None:
106109
"""Service initialization. This function is run before any commands are
107110
received from the frontend. This is the place to request channel
108111
subscriptions with the messaging layer, and register callbacks.
109112
This function can be overridden by specific service implementations."""
110113
pass
111114

112-
def in_shutdown(self):
115+
def in_shutdown(self) -> None:
113116
"""Service shutdown. This function is run before the service is terminated.
114117
No more commands are received, but communications can still be sent.
115118
This function can be overridden by specific service implementations."""
@@ -136,39 +139,47 @@ def in_shutdown(self):
136139

137140
# Any keyword arguments set on service invocation
138141

139-
start_kwargs: dict[Any, Any] = {}
142+
start_kwargs: dict[str, Any]
143+
_transport_interceptor_counter: itertools.count[int]
140144

141145
# Not so overrideable functions ---------------------------------------------
142146

143-
def __init__(self, *args, **kwargs):
144-
"""Service constructor. Parameters include optional references to two
145-
pipes: frontend= for messages from the service to the frontend,
146-
and commands= for messages from the frontend to the service.
147-
A dictionary can optionally be passed with environment=, which is then
148-
available to the service during runtime."""
149-
self.__pipe_frontend = None
150-
self.__pipe_commands = None
151-
self._environment = kwargs.get("environment", {})
152-
self._transport = None
153-
self.__callback_register = {}
154-
self.__log_extensions = []
155-
self.__service_status = None
156-
self.__shutdown = False
157-
self.__update_service_status(self.SERVICE_STATUS_NEW)
158-
self.__queue = queue.PriorityQueue()
159-
self._idle_callback = None
160-
self._idle_time = None
147+
def __init__(self, *, environment: dict[str, Any] | None = None):
148+
"""
149+
Service constructor.
150+
151+
Args:
152+
environment:
153+
Optional dictionary made available to the service at runtime
154+
via ``self._environment``. Typically used by the frontend to
155+
pass configuration (e.g. ``config``, ``metrics``, ``liveness``)
156+
into the spawned service process.
157+
"""
158+
self.__pipe_frontend: multiprocessing.connection.Connection | None = None
159+
self.__pipe_commands: multiprocessing.connection.Connection | None = None
160+
self._environment = environment if environment is not None else {}
161+
self._transport: CommonTransport | None = None
162+
self.__callback_register: dict[str, Callable[[Any], None]] = {}
163+
self.__log_extensions: list[tuple[str, Any]] = []
164+
self.__service_status: int = self.SERVICE_STATUS_NEW
165+
self.__shutdown: bool = False
166+
self.__queue: queue.PriorityQueue[tuple[Priority, int, Any]] = (
167+
queue.PriorityQueue()
168+
)
169+
self._idle_callback: Callable[[], None] | None = None
170+
self._idle_time: float | None = None
171+
self.start_kwargs = {}
161172

162173
# Logger will be overwritten in start() function
163174
self.log = logging.getLogger(self._logger_name)
164175

165-
def __send_to_frontend(self, data_structure):
176+
def __send_to_frontend(self, data_structure: Any) -> None:
166177
"""Put a message in the pipe for the frontend."""
167178
if self.__pipe_frontend:
168179
self.__pipe_frontend.send(data_structure)
169180

170181
@property
171-
def config(self):
182+
def config(self) -> Any:
172183
return self._environment.get("config")
173184

174185
@property
@@ -184,8 +195,8 @@ def transport(self, value: CommonTransport) -> None:
184195
raise AttributeError("Transport already defined")
185196
self._transport = value
186197

187-
def start_transport(self):
188-
"""If a transport object has been defined then connect it now."""
198+
def start_transport(self) -> None:
199+
"""If a transport object has been defined, then connect it."""
189200
if self._transport:
190201
if self.transport.connect():
191202
self.log.debug("Service successfully connected to transport layer")
@@ -244,17 +255,17 @@ def start_transport(self):
244255
else:
245256
self.log.debug("No transport layer defined for service. Skipping.")
246257

247-
def stop_transport(self):
258+
def stop_transport(self) -> None:
248259
"""If a transport object has been defined then tear it down."""
249260
if self._transport:
250261
self.log.debug("Stopping transport object")
251262
self.transport.disconnect()
252263

253-
def _transport_interceptor(self, callback):
264+
def _transport_interceptor(self, callback: MessageCallback) -> MessageCallback:
254265
"""Takes a callback function and returns a function that takes headers and
255266
messages and places them on the main service queue."""
256267

257-
def add_item_to_queue(header, message):
268+
def add_item_to_queue(header: Mapping[str, Any], message: Any) -> None:
258269
queue_item = (
259270
Priority.TRANSPORT,
260271
next(
@@ -268,22 +279,52 @@ def add_item_to_queue(header, message):
268279

269280
return add_item_to_queue
270281

271-
def connect(self, frontend=None, commands=None):
272-
"""Inject pipes connecting the service to the frontend. Two arguments are
273-
supported: frontend= for messages from the service to the frontend,
274-
and commands= for messages from the frontend to the service.
275-
The injection should happen before the service is started, otherwise the
276-
underlying file descriptor references may not be handled correctly."""
277-
if frontend:
282+
def connect(
283+
self,
284+
frontend: multiprocessing.connection.Connection | None = None,
285+
commands: multiprocessing.connection.Connection | None = None,
286+
) -> None:
287+
"""Inject the pipes connecting this service to the frontend.
288+
289+
Injection should happen before :meth:`start` is called, otherwise
290+
the underlying file descriptor references may not be handled
291+
correctly across the process boundary.
292+
293+
Args:
294+
frontend: Write end of the pipe used to send messages from the
295+
service to the frontend (status updates, log records, etc.).
296+
Setting this also triggers an immediate status broadcast.
297+
commands: Read end of the pipe used to receive command messages
298+
from the frontend. If left as ``None`` the service has no
299+
way to receive commands and will shut itself down shortly
300+
after :meth:`start`.
301+
"""
302+
if frontend is not None:
278303
self.__pipe_frontend = frontend
279304
self.__send_service_status_to_frontend()
280-
if commands:
305+
if commands is not None:
281306
self.__pipe_commands = commands
282307

283308
@contextlib.contextmanager
284-
def extend_log(self, field, value):
285-
"""A context wherein a specified extra field in log messages is populated
286-
with a fixed value. This affects all log messages within the context."""
309+
def extend_log(self, field: str, value: Any) -> Generator[None, None, None]:
310+
"""Annotate log records emitted within the context with an extra field.
311+
312+
The ``(field, value)`` pair is attached to every log record produced
313+
while the context is active, and removed on exit. If an exception
314+
propagates out of the block, the field is also stashed on the
315+
exception as ``workflows_log_<field>`` so downstream handlers
316+
(notably :meth:`process_uncaught_exception`) can surface it.
317+
318+
Args:
319+
field: Name of the extra field to attach to log records. Must be
320+
a valid Python identifier suffix, as it is also used to
321+
build the attribute name on any escaping exception.
322+
value: Value to associate with ``field``. Anything that the
323+
log handler can serialize is acceptable.
324+
325+
Yields:
326+
Control to the wrapped block. No value is yielded.
327+
"""
287328
self.__log_extensions.append((field, value))
288329
try:
289330
yield
@@ -293,7 +334,7 @@ def extend_log(self, field, value):
293334
finally:
294335
self.__log_extensions.remove((field, value))
295336

296-
def __command_queue_listener(self):
337+
def __command_queue_listener(self) -> None:
297338
"""Function to continuously retrieve data from the frontend. Commands are
298339
sent to the central priority queue. If the pipe from the frontend is
299340
closed the service shutdown is initiated. Check every second if service
@@ -332,14 +373,14 @@ def __command_queue_listener(self):
332373
time.sleep(0.05)
333374
self.log.debug("Queue listener thread terminating")
334375

335-
def __start_command_queue_listener(self):
376+
def __start_command_queue_listener(self) -> None:
336377
"""Start the function __command_queue_listener in a separate thread. This
337378
function continuously listens to the pipe connected to the frontend.
338379
"""
339380
thread_function = self.__command_queue_listener
340381

341382
class QueueListenerThread(threading.Thread):
342-
def run(self):
383+
def run(self) -> None:
343384
thread_function()
344385

345386
assert not hasattr(self, "__queue_listener_thread")
@@ -349,52 +390,52 @@ def run(self):
349390
self.__queue_listener_thread.name = "Command Queue Listener"
350391
self.__queue_listener_thread.start()
351392

352-
def _log_send(self, logrecord):
393+
def _log_send(self, logrecord: logging.LogRecord) -> None:
353394
"""Forward log records to the frontend."""
354395
for field, value in self.__log_extensions:
355396
setattr(logrecord, field, value)
356397
self.__send_to_frontend({"band": "log", "payload": logrecord})
357398

358-
def _register(self, message_band, callback):
399+
def _register(self, message_band: str, callback: Callable[[Any], None]) -> None:
359400
"""Register a callback function for a specific message band."""
360401
self.__callback_register[message_band] = callback
361402

362-
def _register_idle(self, idle_time, callback):
403+
def _register_idle(self, idle_time: float, callback: Callable[[], None]) -> None:
363404
"""Register a callback function that is run when idling for a given
364405
time span (in seconds)."""
365406
self._idle_callback = callback
366407
self._idle_time = idle_time
367408

368-
def __update_service_status(self, statuscode):
409+
def __update_service_status(self, statuscode: int) -> None:
369410
"""Set the internal status of the service object, and notify frontend."""
370411
if self.__service_status != statuscode:
371412
self.__service_status = statuscode
372413
self.__send_service_status_to_frontend()
373414

374-
def __send_service_status_to_frontend(self):
415+
def __send_service_status_to_frontend(self) -> None:
375416
"""Actually send the internal status of the service object to the frontend."""
376417
self.__send_to_frontend(
377418
{"band": "status_update", "statuscode": self.__service_status}
378419
)
379420

380-
def get_name(self):
421+
def get_name(self) -> str:
381422
"""Get the name for this service."""
382423
return self._service_name
383424

384-
def _set_name(self, name):
425+
def _set_name(self, name: str) -> None:
385426
"""Set a new name for this service, and notify the frontend accordingly."""
386427
self._service_name = name
387428
self.__send_to_frontend({"band": "set_name", "name": self._service_name})
388429

389-
def _request_termination(self):
430+
def _request_termination(self) -> None:
390431
"""Terminate the service from the frontend side"""
391432
self.__send_to_frontend({"band": "request_termination"})
392433

393-
def _shutdown(self):
434+
def _shutdown(self) -> None:
394435
"""Terminate the service from the service side."""
395436
self.__shutdown = True
396437

397-
def initialize_logging(self):
438+
def initialize_logging(self) -> None:
398439
"""Reset the logging for the service process. All logged messages are
399440
forwarded to the frontend. If any filtering is desired, then this must
400441
take place on the service side."""
@@ -426,14 +467,28 @@ def initialize_logging(self):
426467
console.setLevel(logging.CRITICAL)
427468
root_logger.addHandler(console)
428469

429-
def start(self, **kwargs):
430-
"""Start listening to command queue, process commands in main loop,
431-
set status, etc...
432-
This function is most likely called by the frontend in a separate
433-
process."""
434-
470+
def start(self, *, verbose_log: bool = False, **kwargs: Any) -> None:
471+
"""Run the service main loop until shutdown.
472+
473+
This is the entry point invoked by the frontend in the spawned service
474+
process. It sets up logging and transport, calls :meth:`initializing`,
475+
then enters the main loop, dispatching command-band and transport-band
476+
messages off the internal priority queue and emitting status updates as
477+
the service state changes. On shutdown - ``clean``, or via an unhandled
478+
exception - :meth:`in_shutdown`, is invoked and the transport is torn
479+
down.
480+
481+
Args:
482+
verbose_log:
483+
If set, initialises the service logger level to ``DEBUG``.
484+
**kwargs:
485+
Other arbitrary keyword arguments, forwarded by the frontend.
486+
Stored on :attr:`start_kwargs` for use by subclasses.
487+
"""
435488
# Keep a copy of keyword arguments for use in subclasses
436489
self.start_kwargs.update(kwargs)
490+
if verbose_log:
491+
self.start_kwargs["verbose_log"] = verbose_log
437492
try:
438493
self.initialize_logging()
439494

@@ -512,7 +567,7 @@ def start(self, **kwargs):
512567
self.process_uncaught_exception(e)
513568
self.__update_service_status(self.SERVICE_STATUS_ERROR)
514569

515-
def process_uncaught_exception(self, e):
570+
def process_uncaught_exception(self, e: BaseException) -> None:
516571
"""This is called to handle otherwise uncaught exceptions from the service.
517572
The service will terminate either way, but here we can do things such as
518573
gathering useful environment information and logging for posterity."""
@@ -539,7 +594,7 @@ def process_uncaught_exception(self, e):
539594
"Unhandled service exception: %s", e, exc_info=True, extra=added_information
540595
)
541596

542-
def __process_command(self, command):
597+
def __process_command(self, command: str) -> None:
543598
"""Process an incoming command message from the frontend."""
544599
if command == Commands.SHUTDOWN:
545600
self.__shutdown = True

0 commit comments

Comments
 (0)