Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ def my_task_function(self, task_context: TaskContext) -> None:
from multiprocessing import Queue
from threading import RLock, Thread
from types import TracebackType
from typing import Generic, Literal, TypeVar
from typing import Any, Generic, Literal, TypeVar

from humps import pascalize
from typing_extensions import Self, assert_never

from cognite.client import CogniteClient
from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import (
Expand Down Expand Up @@ -85,6 +86,55 @@ def my_task_function(self, task_context: TaskContext) -> None:
_T = TypeVar("_T", bound=ExtractorConfig)


class _NoOpCogniteClient:
"""A mock CogniteClient that performs no actions, for use in dry-run mode."""

class _MockResponse:
def __init__(self, url: str, current_config_revision: int | str, external_id: str) -> None:
self._url = url
self.current_config_revision = current_config_revision
self.external_id = external_id

def json(self) -> dict:
if "integrations/checkin" in self._url or "/integrations/extractorinfo" in self._url:
if self.current_config_revision == "local":
return {"externalId": self.external_id}
elif isinstance(self.current_config_revision, int):
return {"externalId": self.external_id, "lastConfigRevision": self.current_config_revision}
return {}

def __init__(
self,
config: ConnectionConfig | None,
current_config_revision: int | str,
client_name: str,
logger: logging.Logger,
) -> None:
class MockSDKConfig:
def __init__(self, project: str) -> None:
self.project = project

project_name = config.project if config else "dry-run-no-config"
self.external_id = config.integration.external_id if config and config.integration else "dry-run-no-integration"
self.config = MockSDKConfig(project_name)
self.current_config_revision = current_config_revision
# self._logger = logging.getLogger(__name__)
self._logger = logger
self._logger.info(f"CogniteClient is in no-op mode (dry-run). Client name: {client_name}")

def post(self, url: str, json: dict, **kwargs: dict[str, Any]) -> _MockResponse:
response = self._MockResponse(url, self.current_config_revision, self.external_id)
self._logger.info(f"[DRY-RUN] SKIPPED POST to {url} with payload: {json}.")
self._logger.info(f"[DRY-RUN] Response: {response.json()}")
return response

def get(self, url: str, **kwargs: dict[str, Any]) -> _MockResponse:
response = self._MockResponse(url, self.current_config_revision, self.external_id)
self._logger.info(f"[DRY-RUN] SKIPPED GET from {url}.")
self._logger.info(f"[DRY-RUN] Response: {response.json()}")
return response


class FullConfig(Generic[_T]):
"""
A class that holds the full configuration for an extractor.
Expand All @@ -95,15 +145,17 @@ class FullConfig(Generic[_T]):

def __init__(
self,
connection_config: ConnectionConfig,
application_config: _T,
current_config_revision: ConfigRevision,
connection_config: ConnectionConfig | None,
log_level_override: str | None = None,
is_dry_run: bool = False,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.connection_config = connection_config
self.log_level_override = log_level_override
self.is_dry_run = is_dry_run


class Extractor(Generic[ConfigType], CogniteLogger):
Expand All @@ -125,8 +177,14 @@ class Extractor(Generic[ConfigType], CogniteLogger):
CONFIG_TYPE: type[ConfigType]

RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES
SUPPORTS_DRY_RUN: bool = False
cognite_client: _NoOpCogniteClient | CogniteClient

def __init__(self, config: FullConfig[ConfigType]) -> None:
self.is_dry_run = config.is_dry_run
if self.is_dry_run and not self.SUPPORTS_DRY_RUN:
raise NotImplementedError(f"Extractor '{self.NAME}' does not support dry-run mode.")

self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")

self.cancellation_token = CancellationToken()
Expand All @@ -137,7 +195,14 @@ def __init__(self, config: FullConfig[ConfigType]) -> None:
self.current_config_revision = config.current_config_revision
self.log_level_override = config.log_level_override

self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}")
if self.is_dry_run:
self.cognite_client = _NoOpCogniteClient(
self.connection_config, self.current_config_revision, f"{self.EXTERNAL_ID}-{self.VERSION}", self._logger
)
elif self.connection_config:
self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}")
else:
raise ValueError("Connection config is missing and not in dry-run mode.")

self._checkin_lock = RLock()
self._runtime_messages: Queue[RuntimeMessage] | None = None
Expand Down Expand Up @@ -221,6 +286,9 @@ def _set_runtime_message_queue(self, queue: Queue) -> None:
self._runtime_messages = queue

def _checkin(self) -> None:
if not self.connection_config:
return

with self._checkin_lock:
task_updates = [t.model_dump() for t in self._task_updates]
self._task_updates.clear()
Expand Down Expand Up @@ -360,6 +428,9 @@ def run_task(task_context: TaskContext) -> None:
)

def _report_extractor_info(self) -> None:
if not self.connection_config:
return

self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/integrations/extractorinfo",
json={
Expand Down
68 changes: 53 additions & 15 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _create_argparser(self) -> ArgumentParser:
"--connection-config",
nargs=1,
type=Path,
required=True,
required=False,
help="Connection parameters",
)
argparser.add_argument(
Expand Down Expand Up @@ -134,6 +134,11 @@ def _create_argparser(self) -> ArgumentParser:
required=False,
help="Set the current working directory for the extractor.",
)
argparser.add_argument(
"--dry-run",
action="store_true",
help="Run without writing to CDF. The extractor must support this feature for this to work.",
)

return argparser

Expand Down Expand Up @@ -167,6 +172,9 @@ def _inner_run(
with extractor:
extractor.run()

except NotImplementedError as e:
logging.getLogger(__name__).critical(f"Configuration error: {e}")

except Exception:
self.logger.exception("Extractor crashed, will attempt restart")
message_queue.put(RuntimeMessage.RESTART)
Expand All @@ -188,7 +196,7 @@ def _spawn_extractor(
def _try_get_application_config(
self,
args: Namespace,
connection_config: ConnectionConfig,
connection_config: ConnectionConfig | None,
) -> tuple[ExtractorConfig, ConfigRevision]:
current_config_revision: ConfigRevision

Expand All @@ -205,7 +213,7 @@ def _try_get_application_config(
self.logger.critical(str(e))
raise InvalidConfigError(str(e)) from e

else:
elif connection_config:
self.logger.info("Loading application config from CDF")

application_config, current_config_revision = load_from_cdf(
Expand All @@ -214,6 +222,10 @@ def _try_get_application_config(
self._extractor_class.CONFIG_TYPE,
)

else:
self.logger.critical("No connection config provided and no local config file specified.")
raise InvalidConfigError("No connection config provided and no local config file specified.")

return application_config, current_config_revision
Copy link
Contributor

@toondaey toondaey Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably try to use pyright as your lsp because this is very unsafe. You could result in a situation where you get UnboundLocalError since resolving application_config and current_config_revision is behind a condition. What then happens where connection_config is None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to fix UnboundLocalError


def _try_set_cwd(self, args: Namespace) -> None:
Expand All @@ -230,8 +242,15 @@ def _try_set_cwd(self, args: Namespace) -> None:
def _safe_get_application_config(
self,
args: Namespace,
connection_config: ConnectionConfig,
connection_config: ConnectionConfig | None,
) -> tuple[ExtractorConfig, ConfigRevision] | None:
if args.dry_run and not args.force_local_config:
self.logger.warning(
"Running in dry-run mode without a local application config file (-f). "
"The extractor will not perform any actions."
)
return None

prev_error: str | None = None

while not self._cancellation_token.is_cancelled:
Expand All @@ -257,23 +276,28 @@ def _safe_get_application_config(
task=None,
)

self._cognite_client.post(
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
json={
"externalId": connection_config.integration.external_id,
"errors": [error.model_dump()],
},
headers={"cdf-version": "alpha"},
)
if connection_config:
self._cognite_client.post(
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
json={
"externalId": connection_config.integration.external_id,
"errors": [error.model_dump()],
},
headers={"cdf-version": "alpha"},
)

self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL))

return None

def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool:
def _verify_connection_config(self, connection_config: ConnectionConfig | None) -> bool:
if connection_config is None:
return False

self._cognite_client = connection_config.get_cognite_client(
f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}"
)

try:
self._cognite_client.post(
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
Expand Down Expand Up @@ -333,13 +357,26 @@ def run(self) -> None:

try:
self._try_set_cwd(args)
connection_config = load_file(args.connection_config[0], ConnectionConfig)

if args.dry_run:
self.logger.info("Running in dry-run mode. No data will be written to CDF.")

connection_config = (
load_file(args.connection_config[0], ConnectionConfig) if args.connection_config else None
)
else:
if not args.connection_config:
self.logger.critical("Connection config file is required when not in dry-run mode.")
sys.exit(1)

connection_config = load_file(args.connection_config[0], ConnectionConfig)

except InvalidConfigError as e:
self.logger.error(str(e))
self.logger.critical("Could not load connection config")
sys.exit(1)

if not args.skip_init_checks and not self._verify_connection_config(connection_config):
if not args.dry_run and not args.skip_init_checks and not self._verify_connection_config(connection_config):
sys.exit(1)

# This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't
Expand All @@ -363,6 +400,7 @@ def run(self) -> None:
application_config=application_config,
current_config_revision=current_config_revision,
log_level_override=args.log_level,
is_dry_run=args.dry_run,
)
)
process.join()
Expand Down
45 changes: 43 additions & 2 deletions tests/test_unstable/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pytest

from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
from cognite.extractorutils.unstable.core.base import ConfigRevision
from cognite.extractorutils.unstable.core.base import ConfigRevision, FullConfig
from cognite.extractorutils.unstable.core.runtime import Runtime
from test_unstable.conftest import TestConfig, TestExtractor

Expand Down Expand Up @@ -97,7 +97,7 @@ def cancel_after_delay() -> None:

start_time = time.time()
result: tuple[TestConfig, ConfigRevision] | None = runtime._safe_get_application_config(
args=Namespace(force_local_config=None),
args=Namespace(force_local_config=None, dry_run=False),
connection_config=connection_config,
)
duration = time.time() - start_time
Expand Down Expand Up @@ -134,3 +134,44 @@ def test_changing_cwd() -> None:

assert os.getcwd() == str(Path(__file__).parent)
assert os.getcwd() != original_cwd


def test_unsupported_dry_run_crashes(connection_config: ConnectionConfig) -> None:
"""
Tests that an extractor with SUPPORTS_DRY_RUN = False raises a
NotImplementedError if started in dry-run mode.
"""
extractor_class = TestExtractor

full_config = FullConfig(
connection_config=connection_config,
application_config=TestConfig(parameter_one=1, parameter_two="a"),
current_config_revision=1,
is_dry_run=True,
)

with pytest.raises(NotImplementedError, match="does not support dry-run mode"):
extractor_class(full_config)


def test_supported_dry_run_uses_noop_client(connection_config: ConnectionConfig) -> None:
"""
Tests that an extractor with SUPPORTS_DRY_RUN = True uses the
_NoOpCogniteClient when in dry-run mode.
"""

class DryRunSupportedExtractor(TestExtractor):
SUPPORTS_DRY_RUN = True

full_config = FullConfig(
connection_config=connection_config,
application_config=TestConfig(parameter_one=1, parameter_two="a"),
current_config_revision=1,
is_dry_run=True,
)

extractor = DryRunSupportedExtractor(full_config)

from cognite.extractorutils.unstable.core.base import _NoOpCogniteClient

assert isinstance(extractor.cognite_client, _NoOpCogniteClient)
Loading