diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 62c3ee77..7ba1d54f 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -53,11 +53,12 @@ def my_task_function(self, task_context: TaskContext) -> None: from multiprocessing import Queue from threading import RLock, Thread from types import TracebackType -from typing import Generic, Literal, TypeVar +from typing import Any, Generic, Literal, TypeVar from humps import pascalize from typing_extensions import Self, assert_never +from cognite.client import CogniteClient from cognite.extractorutils._inner_util import _resolve_log_level from cognite.extractorutils.threading import CancellationToken from cognite.extractorutils.unstable.configuration.models import ( @@ -85,6 +86,55 @@ def my_task_function(self, task_context: TaskContext) -> None: _T = TypeVar("_T", bound=ExtractorConfig) +class _NoOpCogniteClient: + """A mock CogniteClient that performs no actions, for use in dry-run mode.""" + + class _MockResponse: + def __init__(self, url: str, current_config_revision: int | str, external_id: str) -> None: + self._url = url + self.current_config_revision = current_config_revision + self.external_id = external_id + + def json(self) -> dict: + if "integrations/checkin" in self._url or "/integrations/extractorinfo" in self._url: + if self.current_config_revision == "local": + return {"externalId": self.external_id} + elif isinstance(self.current_config_revision, int): + return {"externalId": self.external_id, "lastConfigRevision": self.current_config_revision} + return {} + + def __init__( + self, + config: ConnectionConfig | None, + current_config_revision: int | str, + client_name: str, + logger: logging.Logger, + ) -> None: + class MockSDKConfig: + def __init__(self, project: str) -> None: + self.project = project + + project_name = config.project if config else "dry-run-no-config" + self.external_id = config.integration.external_id if config and config.integration else "dry-run-no-integration" + self.config = MockSDKConfig(project_name) + self.current_config_revision = current_config_revision + # self._logger = logging.getLogger(__name__) + self._logger = logger + self._logger.info(f"CogniteClient is in no-op mode (dry-run). Client name: {client_name}") + + def post(self, url: str, json: dict, **kwargs: dict[str, Any]) -> _MockResponse: + response = self._MockResponse(url, self.current_config_revision, self.external_id) + self._logger.info(f"[DRY-RUN] SKIPPED POST to {url} with payload: {json}.") + self._logger.info(f"[DRY-RUN] Response: {response.json()}") + return response + + def get(self, url: str, **kwargs: dict[str, Any]) -> _MockResponse: + response = self._MockResponse(url, self.current_config_revision, self.external_id) + self._logger.info(f"[DRY-RUN] SKIPPED GET from {url}.") + self._logger.info(f"[DRY-RUN] Response: {response.json()}") + return response + + class FullConfig(Generic[_T]): """ A class that holds the full configuration for an extractor. @@ -95,15 +145,17 @@ class FullConfig(Generic[_T]): def __init__( self, - connection_config: ConnectionConfig, application_config: _T, current_config_revision: ConfigRevision, + connection_config: ConnectionConfig | None, log_level_override: str | None = None, + is_dry_run: bool = False, ) -> None: - self.connection_config = connection_config self.application_config = application_config self.current_config_revision = current_config_revision + self.connection_config = connection_config self.log_level_override = log_level_override + self.is_dry_run = is_dry_run class Extractor(Generic[ConfigType], CogniteLogger): @@ -125,8 +177,14 @@ class Extractor(Generic[ConfigType], CogniteLogger): CONFIG_TYPE: type[ConfigType] RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES + SUPPORTS_DRY_RUN: bool = False + cognite_client: _NoOpCogniteClient | CogniteClient def __init__(self, config: FullConfig[ConfigType]) -> None: + self.is_dry_run = config.is_dry_run + if self.is_dry_run and not self.SUPPORTS_DRY_RUN: + raise NotImplementedError(f"Extractor '{self.NAME}' does not support dry-run mode.") + self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") self.cancellation_token = CancellationToken() @@ -137,7 +195,14 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: self.current_config_revision = config.current_config_revision self.log_level_override = config.log_level_override - self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") + if self.is_dry_run: + self.cognite_client = _NoOpCogniteClient( + self.connection_config, self.current_config_revision, f"{self.EXTERNAL_ID}-{self.VERSION}", self._logger + ) + elif self.connection_config: + self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") + else: + raise ValueError("Connection config is missing and not in dry-run mode.") self._checkin_lock = RLock() self._runtime_messages: Queue[RuntimeMessage] | None = None @@ -221,6 +286,9 @@ def _set_runtime_message_queue(self, queue: Queue) -> None: self._runtime_messages = queue def _checkin(self) -> None: + if not self.connection_config: + return + with self._checkin_lock: task_updates = [t.model_dump() for t in self._task_updates] self._task_updates.clear() @@ -360,6 +428,9 @@ def run_task(task_context: TaskContext) -> None: ) def _report_extractor_info(self) -> None: + if not self.connection_config: + return + self.cognite_client.post( f"/api/v1/projects/{self.cognite_client.config.project}/integrations/extractorinfo", json={ diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index d843b0dc..038cd047 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -101,7 +101,7 @@ def _create_argparser(self) -> ArgumentParser: "--connection-config", nargs=1, type=Path, - required=True, + required=False, help="Connection parameters", ) argparser.add_argument( @@ -134,6 +134,11 @@ def _create_argparser(self) -> ArgumentParser: required=False, help="Set the current working directory for the extractor.", ) + argparser.add_argument( + "--dry-run", + action="store_true", + help="Run without writing to CDF. The extractor must support this feature for this to work.", + ) return argparser @@ -167,6 +172,9 @@ def _inner_run( with extractor: extractor.run() + except NotImplementedError as e: + logging.getLogger(__name__).critical(f"Configuration error: {e}") + except Exception: self.logger.exception("Extractor crashed, will attempt restart") message_queue.put(RuntimeMessage.RESTART) @@ -188,7 +196,7 @@ def _spawn_extractor( def _try_get_application_config( self, args: Namespace, - connection_config: ConnectionConfig, + connection_config: ConnectionConfig | None, ) -> tuple[ExtractorConfig, ConfigRevision]: current_config_revision: ConfigRevision @@ -205,7 +213,7 @@ def _try_get_application_config( self.logger.critical(str(e)) raise InvalidConfigError(str(e)) from e - else: + elif connection_config: self.logger.info("Loading application config from CDF") application_config, current_config_revision = load_from_cdf( @@ -214,6 +222,10 @@ def _try_get_application_config( self._extractor_class.CONFIG_TYPE, ) + else: + self.logger.critical("No connection config provided and no local config file specified.") + raise InvalidConfigError("No connection config provided and no local config file specified.") + return application_config, current_config_revision def _try_set_cwd(self, args: Namespace) -> None: @@ -230,8 +242,15 @@ def _try_set_cwd(self, args: Namespace) -> None: def _safe_get_application_config( self, args: Namespace, - connection_config: ConnectionConfig, + connection_config: ConnectionConfig | None, ) -> tuple[ExtractorConfig, ConfigRevision] | None: + if args.dry_run and not args.force_local_config: + self.logger.warning( + "Running in dry-run mode without a local application config file (-f). " + "The extractor will not perform any actions." + ) + return None + prev_error: str | None = None while not self._cancellation_token.is_cancelled: @@ -257,23 +276,28 @@ def _safe_get_application_config( task=None, ) - self._cognite_client.post( - f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", - json={ - "externalId": connection_config.integration.external_id, - "errors": [error.model_dump()], - }, - headers={"cdf-version": "alpha"}, - ) + if connection_config: + self._cognite_client.post( + f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", + json={ + "externalId": connection_config.integration.external_id, + "errors": [error.model_dump()], + }, + headers={"cdf-version": "alpha"}, + ) self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL)) return None - def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool: + def _verify_connection_config(self, connection_config: ConnectionConfig | None) -> bool: + if connection_config is None: + return False + self._cognite_client = connection_config.get_cognite_client( f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}" ) + try: self._cognite_client.post( f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", @@ -333,13 +357,26 @@ def run(self) -> None: try: self._try_set_cwd(args) - connection_config = load_file(args.connection_config[0], ConnectionConfig) + + if args.dry_run: + self.logger.info("Running in dry-run mode. No data will be written to CDF.") + + connection_config = ( + load_file(args.connection_config[0], ConnectionConfig) if args.connection_config else None + ) + else: + if not args.connection_config: + self.logger.critical("Connection config file is required when not in dry-run mode.") + sys.exit(1) + + connection_config = load_file(args.connection_config[0], ConnectionConfig) + except InvalidConfigError as e: self.logger.error(str(e)) self.logger.critical("Could not load connection config") sys.exit(1) - if not args.skip_init_checks and not self._verify_connection_config(connection_config): + if not args.dry_run and not args.skip_init_checks and not self._verify_connection_config(connection_config): sys.exit(1) # This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't @@ -363,6 +400,7 @@ def run(self) -> None: application_config=application_config, current_config_revision=current_config_revision, log_level_override=args.log_level, + is_dry_run=args.dry_run, ) ) process.join() diff --git a/tests/test_unstable/test_runtime.py b/tests/test_unstable/test_runtime.py index 7c045a23..a784e703 100644 --- a/tests/test_unstable/test_runtime.py +++ b/tests/test_unstable/test_runtime.py @@ -9,7 +9,7 @@ import pytest from cognite.extractorutils.unstable.configuration.models import ConnectionConfig -from cognite.extractorutils.unstable.core.base import ConfigRevision +from cognite.extractorutils.unstable.core.base import ConfigRevision, FullConfig from cognite.extractorutils.unstable.core.runtime import Runtime from test_unstable.conftest import TestConfig, TestExtractor @@ -97,7 +97,7 @@ def cancel_after_delay() -> None: start_time = time.time() result: tuple[TestConfig, ConfigRevision] | None = runtime._safe_get_application_config( - args=Namespace(force_local_config=None), + args=Namespace(force_local_config=None, dry_run=False), connection_config=connection_config, ) duration = time.time() - start_time @@ -134,3 +134,44 @@ def test_changing_cwd() -> None: assert os.getcwd() == str(Path(__file__).parent) assert os.getcwd() != original_cwd + + +def test_unsupported_dry_run_crashes(connection_config: ConnectionConfig) -> None: + """ + Tests that an extractor with SUPPORTS_DRY_RUN = False raises a + NotImplementedError if started in dry-run mode. + """ + extractor_class = TestExtractor + + full_config = FullConfig( + connection_config=connection_config, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + is_dry_run=True, + ) + + with pytest.raises(NotImplementedError, match="does not support dry-run mode"): + extractor_class(full_config) + + +def test_supported_dry_run_uses_noop_client(connection_config: ConnectionConfig) -> None: + """ + Tests that an extractor with SUPPORTS_DRY_RUN = True uses the + _NoOpCogniteClient when in dry-run mode. + """ + + class DryRunSupportedExtractor(TestExtractor): + SUPPORTS_DRY_RUN = True + + full_config = FullConfig( + connection_config=connection_config, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + is_dry_run=True, + ) + + extractor = DryRunSupportedExtractor(full_config) + + from cognite.extractorutils.unstable.core.base import _NoOpCogniteClient + + assert isinstance(extractor.cognite_client, _NoOpCogniteClient)