Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ uv run --extra dev python -m pytest tests/test_registry_schema_artifact_postgres
## PostgreSQL integration test

The `postgres`-marked test is opt-in and requires a reachable PostgreSQL
database. The fixture resolves the engine in this order:
database. The fixture resolves the integration source in this order:

1. `ENGINE_CDM`
2. `ENGINE`
1. the configured `cdm_db` resource from `oa-configurator`
2. `ENGINE_CDM`
3. `ENGINE`

During the test, `ENGINE` is populated automatically from `ENGINE_CDM` when
needed so the scratch database is visible to resolver-backed construct imports.
When a stack config is present, the fixture reuses the effective `cdm_db`
schema specification and writes a temporary stack config that points `cdm_db`
at the disposable scratch database. Resolver-backed construct imports therefore
exercise the same `oa-configurator` path as normal runtime code while remaining
isolated from the configured database.

The test does not modify the database named in `ENGINE_CDM` / `ENGINE`
directly. Instead it:
The test does not modify the configured source database directly. Instead it:

1. connects to that server
2. creates a disposable scratch database with a generated unique name
3. bootstraps the OMOP tables there
3. bootstraps the OMOP tables into the configured CDM and vocabulary schemas
4. runs the full registry compile check
5. creates all materialized views with `WITH NO DATA`
6. drops the scratch database on teardown
Expand Down
191 changes: 179 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
import os
from pathlib import Path
import sys
import tempfile
import time
from uuid import uuid4

import pytest
import sqlalchemy as sa
import oa_configurator.loader as oa_loader
from oa_configurator import (
DatabaseConfig,
Resolver,
ResourceConfig,
StackConfig,
load_stack_config,
save_stack_config,
)
from omop_alchemy.maintenance import create_missing_tables
from omop_alchemy.maintenance._cli_utils import ensure_schema
from omop_alchemy.maintenance.tables import (
collect_maintenance_tables,
schema_adjusted_metadata,
)
from omop_constructs.config import resolve_cdm_resource_name


ROOT = Path(__file__).resolve().parents[1]
Expand All @@ -20,6 +36,91 @@
sys.path.insert(0, str(SEMANTICS_SRC))


def _build_database_config(url: sa.engine.URL) -> DatabaseConfig:
return DatabaseConfig(
dialect=url.drivername,
host=url.host,
port=url.port,
user=url.username,
password=url.password,
database_name=url.database,
)


def _load_effective_cdm_resource() -> ResourceConfig | None:
try:
stack = load_stack_config()
except FileNotFoundError:
return None

resolved = Resolver(stack).resolve_resource(resolve_cdm_resource_name(stack))
return ResourceConfig(
database="cdm_db",
vocab_database="cdm_db",
cdm_schema=resolved.cdm_schema,
vocab_schema=resolved.vocab_schema,
results_schema=resolved.results_schema,
)


def _build_scratch_stack(
scratch_url: sa.engine.URL,
*,
resource_config: ResourceConfig,
) -> StackConfig:
return StackConfig.for_session(
databases={"cdm_db": _build_database_config(scratch_url)},
resources={"cdm_db": resource_config},
)


def _bootstrap_scratch_cdm(
engine: sa.Engine,
*,
resource_config: ResourceConfig,
) -> None:
cdm_schema = resource_config.cdm_schema
vocab_schema = resource_config.vocab_schema or cdm_schema
results_schema = resource_config.results_schema

if results_schema:
ensure_schema(engine, results_schema)

if vocab_schema == cdm_schema:
create_missing_tables(
engine,
db_schema=cdm_schema,
vocabulary_included=True,
)
return

ensure_schema(engine, cdm_schema)
ensure_schema(engine, vocab_schema)

cdm_tables = []
vocab_tables = []
for maintenance_table in collect_maintenance_tables():
if maintenance_table.is_vocabulary:
vocab_tables.append(maintenance_table)
else:
cdm_tables.append(maintenance_table)

with engine.begin() as connection:
for tables, db_schema in (
(cdm_tables, cdm_schema),
(vocab_tables, vocab_schema),
):
metadata, adjusted_tables = schema_adjusted_metadata(
tables,
db_schema=db_schema,
)
metadata.create_all(
bind=connection,
tables=[adjusted_tables[table.table_name] for table in tables],
checkfirst=True,
)


def _wait_for_engine(engine: sa.Engine, *, attempts: int = 20) -> None:
for attempt in range(attempts):
try:
Expand Down Expand Up @@ -103,17 +204,52 @@ def pg_engine():
Session-scoped engine connecting to a PostgreSQL database for opt-in tests.

Resolution order:
1. ``ENGINE_CDM``
2. ``ENGINE``
1. configured ``cdm_db`` resource from ``oa-configurator``
2. ``ENGINE_CDM``
3. ``ENGINE``
"""
engine_url = os.getenv("ENGINE_CDM") or os.getenv("ENGINE")
resource_config = _load_effective_cdm_resource() or ResourceConfig(
database="cdm_db",
vocab_database="cdm_db",
cdm_schema="public",
vocab_schema="public",
)

engine_url = None
configured_resource = None
try:
stack = load_stack_config()
except FileNotFoundError:
stack = None

if stack is not None:
configured_resource = Resolver(stack).resolve_resource(
resolve_cdm_resource_name(stack)
)
if configured_resource.database.url.startswith("postgresql"):
engine_url = configured_resource.database.url

if engine_url is None:
engine_url = os.getenv("ENGINE_CDM") or os.getenv("ENGINE")

if not engine_url:
pytest.skip("No PostgreSQL engine configured. Set ENGINE_CDM or ENGINE.")
pytest.skip(
"No PostgreSQL source configured. Configure a PostgreSQL cdm_db "
"resource or set ENGINE_CDM / ENGINE."
)

engine = sa.create_engine(engine_url, future=True)
if engine.url.drivername != "postgresql+psycopg" and not engine.url.drivername.startswith(
"postgresql"
):
engine.dispose()
pytest.skip(
"The configured PostgreSQL integration source must use a PostgreSQL "
f"SQLAlchemy URL, got {engine.url.drivername!r}."
)
_wait_for_engine(engine)
try:
yield engine
yield engine, resource_config
finally:
engine.dispose()

Expand All @@ -123,21 +259,28 @@ def pg_bootstrapped_engine(pg_engine):
"""
Function-scoped PostgreSQL engine bound to a disposable scratch database.

The configured database in ``ENGINE_CDM`` / ``ENGINE`` is treated only as a
connection source. The fixture creates a separate temporary database,
bootstraps OMOP tables there, points ``ENGINE`` at that scratch database for
import-time resolver setup, and drops the scratch database on teardown.
The configured PostgreSQL source is treated only as a connection source.
The fixture creates a separate temporary database, bootstraps OMOP tables
into the configured CDM and vocabulary schemas, points resolver-backed
imports at that scratch database via a temporary ``oa-configurator`` stack
config, and drops the scratch database on teardown.
"""
source_engine, resource_config = pg_engine
source_url = sa.engine.make_url(
pg_engine.url.render_as_string(hide_password=False)
source_engine.url.render_as_string(hide_password=False)
)

admin_engine = sa.create_engine(
source_url,
future=True,
isolation_level="AUTOCOMMIT",
)
temp_config_dir = tempfile.TemporaryDirectory(prefix="omop-constructs-pg-")
temp_config_path = Path(temp_config_dir.name) / "config.toml"
original_engine = os.environ.get("ENGINE")
original_engine_cdm = os.environ.get("ENGINE_CDM")
original_oa_config_path = os.environ.get("OA_CONFIG_PATH")
original_loader_config_path = oa_loader.CONFIG_PATH
scratch_engine = None
scratch_name = None

Expand All @@ -154,8 +297,22 @@ def pg_bootstrapped_engine(pg_engine):
)
_wait_for_engine(scratch_engine)

create_missing_tables(scratch_engine, vocabulary_included=True)
os.environ["ENGINE"] = scratch_url.render_as_string(hide_password=False)
_bootstrap_scratch_cdm(
scratch_engine,
resource_config=resource_config,
)

scratch_stack = _build_scratch_stack(
scratch_url,
resource_config=resource_config,
)
save_stack_config(scratch_stack, temp_config_path)

scratch_url_str = scratch_url.render_as_string(hide_password=False)
oa_loader.CONFIG_PATH = temp_config_path
os.environ["OA_CONFIG_PATH"] = str(temp_config_path)
os.environ["ENGINE_CDM"] = scratch_url_str
os.environ["ENGINE"] = scratch_url_str
yield scratch_engine
finally:
try:
Expand All @@ -165,7 +322,17 @@ def pg_bootstrapped_engine(pg_engine):
_drop_database_if_exists(admin_engine, scratch_name)
finally:
admin_engine.dispose()
temp_config_dir.cleanup()
oa_loader.CONFIG_PATH = original_loader_config_path
if original_engine is None:
os.environ.pop("ENGINE", None)
else:
os.environ["ENGINE"] = original_engine
if original_engine_cdm is None:
os.environ.pop("ENGINE_CDM", None)
else:
os.environ["ENGINE_CDM"] = original_engine_cdm
if original_oa_config_path is None:
os.environ.pop("OA_CONFIG_PATH", None)
else:
os.environ["OA_CONFIG_PATH"] = original_oa_config_path
Loading
Loading