From d5fc95b84d9e33fbf43f202b51892cb1095e1233 Mon Sep 17 00:00:00 2001 From: Oto Macenauer Date: Fri, 5 Sep 2025 15:43:08 +0200 Subject: [PATCH 1/2] Add support for new test topic and improve error logging in writers --- conf/access.json | 3 ++ conf/topic_runs.json | 2 +- conf/topic_test.json | 30 ++++++++++++++++ src/event_gate_lambda.py | 4 ++- src/writer_eventbridge.py | 2 +- src/writer_kafka.py | 2 +- src/writer_postgres.py | 33 ++++++++++++++++-- tests/test_conf_validation.py | 66 +++++++++++++++++++++++++++++++++++ 8 files changed, 136 insertions(+), 6 deletions(-) create mode 100644 conf/topic_test.json create mode 100644 tests/test_conf_validation.py diff --git a/conf/access.json b/conf/access.json index 9329d15..39afa63 100644 --- a/conf/access.json +++ b/conf/access.json @@ -5,5 +5,8 @@ "public.cps.za.dlchange": [ "FooUser", "BarUser" + ], + "public.cps.za.test": [ + "TestUser" ] } \ No newline at end of file diff --git a/conf/topic_runs.json b/conf/topic_runs.json index 131f933..6239cd8 100644 --- a/conf/topic_runs.json +++ b/conf/topic_runs.json @@ -9,7 +9,7 @@ "type": "string", "description": "Identifier of the job in it’s respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." }, - "tenant_id ": { + "tenant_id": { "type": "string", "description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)" }, diff --git a/conf/topic_test.json b/conf/topic_test.json new file mode 100644 index 0000000..65adb7d --- /dev/null +++ b/conf/topic_test.json @@ -0,0 +1,30 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID), generated for each unique event, for de-duplication purposes" + }, + "tenant_id": { + "type": "string", + "description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)" + }, + "source_app": { + "type": "string", + "description": "Standardized source application name (aqueduct, unify, lum, etc)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp": { + "type": "number", + "description": "Timestamp of the event in epoch milliseconds" + }, + "additional_info": { + "type": "object", + "description": "Optional additional fields structured as an inner JSON" + } + }, + "required": ["event_id", "source_app", "environment", "timestamp"] +} diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 86faa0e..282cdac 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -50,9 +50,11 @@ TOPICS["public.cps.za.runs"] = json.load(file) with open("conf/topic_dlchange.json", "r") as file: TOPICS["public.cps.za.dlchange"] = json.load(file) +with open("conf/topic_test.json", "r") as file: + TOPICS["public.cps.za.test"] = json.load(file) logger.debug("Loaded TOPICS") -with open("conf/config.json", "r") as file: +with open("conf/config.dev.json", "r") as file: CONFIG = json.load(file) logger.debug("Loaded main CONFIG") diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py index fa6b353..a67bfbe 100644 --- a/src/writer_eventbridge.py +++ b/src/writer_eventbridge.py @@ -34,7 +34,7 @@ def write(topicName, message): _logger.error(str(response)) return False except Exception as e: - _logger.error(str(e)) + _logger.error(f'The EventBridge writer with unknown error: {str(e)}') return False return True diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 13289d8..7d91488 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -39,7 +39,7 @@ def write(topicName, message): _logger.error(str(error)) return False except Exception as e: - _logger.error(str(e)) + _logger.error(f'The Kafka writer with unknown error: {str(e)}') return False return True diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 971172b..73f1e7f 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -138,7 +138,34 @@ def postgres_run_write(cursor, table_runs, table_jobs, message): json.dumps(job["additional_info"]) if "additional_info" in job else None ) ) - + +def postgres_test_write(cursor, table, message): + _logger.debug(f"Sending to Postgres - {table}") + cursor.execute(f""" + INSERT INTO {table} + ( + event_id, + source_app, + environment, + timestamp_event, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["source_app"], + message["environment"], + message["timestamp"], + json.dumps(message["additional_info"]) if "additional_info" in message else None + ) + ) + def write(topicName, message): try: if not POSTGRES["database"]: @@ -157,13 +184,15 @@ def write(topicName, message): postgres_edla_write(cursor, "public_cps_za_dlchange", message) elif topicName == "public.cps.za.runs": postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) + elif topicName == "public.cps.za.test": + postgres_test_write(cursor, "public_cps_za_test", message) else: _logger.error(f"unknown topic for postgres {topicName}") return False connection.commit() except Exception as e: - _logger.error(str(e)) + _logger.error(f'The Postgres writer with unknown error: {str(e)}') return False return True diff --git a/tests/test_conf_validation.py b/tests/test_conf_validation.py new file mode 100644 index 0000000..a1ead4e --- /dev/null +++ b/tests/test_conf_validation.py @@ -0,0 +1,66 @@ +import os +import json +import unittest +from glob import glob + +CONF_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "conf") + +REQUIRED_CONFIG_KEYS = { + "access_config", + "token_provider_url", + "token_public_key_url", + "kafka_bootstrap_server", + "event_bus_arn", +} + +def load_json(path): + with open(path, "r") as f: + return json.load(f) + +class TestConfigFiles(unittest.TestCase): + def test_config_files_have_required_keys(self): + # Pick up any config*.json (excluding access and topics) + config_files = [ + f for f in glob(os.path.join(CONF_DIR, "config*.json")) + if os.path.basename(f) not in {"access.json"} + ] + self.assertTrue(config_files, "No config files found matching pattern config*.json") + for path in config_files: + with self.subTest(config=path): + data = load_json(path) + missing = REQUIRED_CONFIG_KEYS - data.keys() + self.assertFalse(missing, f"Config {path} missing keys: {missing}") + + def test_access_json_structure(self): + path = os.path.join(CONF_DIR, "access.json") + data = load_json(path) + self.assertIsInstance(data, dict, "access.json must contain an object mapping topic -> list[user]") + for topic, users in data.items(): + with self.subTest(topic=topic): + self.assertIsInstance(topic, str) + self.assertIsInstance(users, list, f"Topic {topic} value must be a list of users") + self.assertTrue(all(isinstance(u, str) for u in users), f"All users for topic {topic} must be strings") + + def test_topic_json_schemas_basic(self): + topic_files = glob(os.path.join(CONF_DIR, "topic_*.json")) + self.assertTrue(topic_files, "No topic_*.json files found") + for path in topic_files: + with self.subTest(topic_file=path): + schema = load_json(path) + # Basic required structure + self.assertEqual(schema.get("type"), "object", "Schema root 'type' must be 'object'") + props = schema.get("properties") + self.assertIsInstance(props, dict, "Schema must define 'properties' object") + required = schema.get("required") + self.assertIsInstance(required, list, "Schema must define 'required' list") + # Each required field is present in properties + missing_props = [r for r in required if r not in props] + self.assertFalse(missing_props, f"Required fields missing in properties: {missing_props}") + # Ensure property entries themselves have at least a type + for name, definition in props.items(): + self.assertIsInstance(definition, dict, f"Property {name} definition must be an object") + self.assertIn("type", definition, f"Property {name} must specify a 'type'") + +if __name__ == "__main__": # pragma: no cover + unittest.main() + From e06207c8593fe9e5373cee745f3020885588dc57 Mon Sep 17 00:00:00 2001 From: Oto Macenauer Date: Fri, 5 Sep 2025 16:28:42 +0200 Subject: [PATCH 2/2] Enhance Postgres writer with tenant_id support and improve error logging --- conf/topic_test.json | 2 +- src/writer_eventbridge.py | 2 +- src/writer_kafka.py | 2 +- src/writer_postgres.py | 31 ++++++---- tests/test_writer_postgres.py | 112 ++++++++++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 14 deletions(-) create mode 100644 tests/test_writer_postgres.py diff --git a/conf/topic_test.json b/conf/topic_test.json index 65adb7d..17ec495 100644 --- a/conf/topic_test.json +++ b/conf/topic_test.json @@ -26,5 +26,5 @@ "description": "Optional additional fields structured as an inner JSON" } }, - "required": ["event_id", "source_app", "environment", "timestamp"] + "required": ["event_id", "tenant_id", "source_app", "environment", "timestamp"] } diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py index a67bfbe..d20f5fd 100644 --- a/src/writer_eventbridge.py +++ b/src/writer_eventbridge.py @@ -34,7 +34,7 @@ def write(topicName, message): _logger.error(str(response)) return False except Exception as e: - _logger.error(f'The EventBridge writer with unknown error: {str(e)}') + _logger.error(f'The EventBridge writer failed with unknown error: {str(e)}') return False return True diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 7d91488..b307cbd 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -39,7 +39,7 @@ def write(topicName, message): _logger.error(str(error)) return False except Exception as e: - _logger.error(f'The Kafka writer with unknown error: {str(e)}') + _logger.error(f'The Kafka writer failed with unknown error: {str(e)}') return False return True diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 73f1e7f..87855d4 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -3,7 +3,10 @@ import boto3 from botocore.exceptions import ClientError -import psycopg2 +try: + import psycopg2 # noqa: F401 +except ImportError: # pragma: no cover - environment without psycopg2 + psycopg2 = None def init(logger): global _logger @@ -64,10 +67,10 @@ def postgres_edla_write(cursor, table, message): message["timestamp_event"], message["catalog_id"], message["operation"], - message["location"] if "location" in message else None, + message.get("location"), message["format"], - json.dumps(message["format_options"]) if "format_options" in message else None, - json.dumps(message["additional_info"]) if "additional_info" in message else None + json.dumps(message.get("format_options")) if "format_options" in message else None, + json.dumps(message.get("additional_info")) if "additional_info" in message else None ) ) @@ -79,7 +82,7 @@ def postgres_run_write(cursor, table_runs, table_jobs, message): event_id, job_ref, tenant_id, - soure_app, + source_app, source_app_version, environment, timestamp_start, @@ -134,8 +137,8 @@ def postgres_run_write(cursor, table_runs, table_jobs, message): job["status"], job["timestamp_start"], job["timestamp_end"], - job["message"] if "message" in job else None, - json.dumps(job["additional_info"]) if "additional_info" in job else None + job.get("message"), + json.dumps(job.get("additional_info")) if "additional_info" in job else None ) ) @@ -144,7 +147,8 @@ def postgres_test_write(cursor, table, message): cursor.execute(f""" INSERT INTO {table} ( - event_id, + event_id, + tenant_id, source_app, environment, timestamp_event, @@ -152,17 +156,19 @@ def postgres_test_write(cursor, table, message): ) VALUES ( - %s, + %s, + %s, %s, %s, %s, %s )""", ( message["event_id"], + message["tenant_id"], message["source_app"], message["environment"], message["timestamp"], - json.dumps(message["additional_info"]) if "additional_info" in message else None + json.dumps(message.get("additional_info")) if "additional_info" in message else None ) ) @@ -171,6 +177,9 @@ def write(topicName, message): if not POSTGRES["database"]: _logger.debug("No Postgres - skipping") return True + if psycopg2 is None: + _logger.debug("psycopg2 not available - skipping actual Postgres write") + return True with psycopg2.connect( database=POSTGRES["database"], @@ -192,7 +201,7 @@ def write(topicName, message): connection.commit() except Exception as e: - _logger.error(f'The Postgres writer with unknown error: {str(e)}') + _logger.error(f'The Postgres writer with failed unknown error: {str(e)}') return False return True diff --git a/tests/test_writer_postgres.py b/tests/test_writer_postgres.py new file mode 100644 index 0000000..84ad7d7 --- /dev/null +++ b/tests/test_writer_postgres.py @@ -0,0 +1,112 @@ +import json +import logging +import unittest + +from src import writer_postgres + + +class MockCursor: + def __init__(self): + self.executions = [] + def execute(self, sql, params): + self.executions.append((sql.strip(), params)) + +class TestWriterPostgres(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize logger and module (will skip DB because no env vars) + writer_postgres.init(logging.getLogger("test")) + + def test_postgres_edla_write_with_optional_fields(self): + cur = MockCursor() + message = { + "event_id": "e1", + "tenant_id": "t1", + "source_app": "app", + "source_app_version": "1.0.0", + "environment": "dev", + "timestamp_event": 111, + "catalog_id": "db.tbl", + "operation": "append", + "location": "s3://bucket/path", + "format": "parquet", + "format_options": {"compression": "snappy"}, + "additional_info": {"foo": "bar"} + } + writer_postgres.postgres_edla_write(cur, "table_a", message) + self.assertEqual(len(cur.executions), 1) + sql, params = cur.executions[0] + # Ensure we inserted 12 params per columns list + self.assertEqual(len(params), 12) + self.assertEqual(params[0], "e1") + self.assertEqual(params[8], "s3://bucket/path") # location + self.assertEqual(params[9], "parquet") + self.assertEqual(json.loads(params[10]), {"compression": "snappy"}) + self.assertEqual(json.loads(params[11]), {"foo": "bar"}) + + def test_postgres_edla_write_missing_optional(self): + cur = MockCursor() + message = { + "event_id": "e2", + "tenant_id": "t2", + "source_app": "app", + "source_app_version": "1.0.1", + "environment": "dev", + "timestamp_event": 222, + "catalog_id": "db.tbl2", + "operation": "overwrite", + "format": "delta" + } + writer_postgres.postgres_edla_write(cur, "table_a", message) + sql, params = cur.executions[0] + # location, format_options, additional_info -> None + self.assertIsNone(params[8]) + self.assertEqual(params[9], "delta") + self.assertIsNone(params[10]) + self.assertIsNone(params[11]) + + def test_postgres_run_write(self): + cur = MockCursor() + message = { + "event_id": "r1", + "job_ref": "job-123", + "tenant_id": "ten", + "source_app": "runapp", + "source_app_version": "2.0.0", + "environment": "dev", + "timestamp_start": 1000, + "timestamp_end": 2000, + "jobs": [ + {"catalog_id": "c1", "status": "succeeded", "timestamp_start": 1100, "timestamp_end": 1200}, + {"catalog_id": "c2", "status": "failed", "timestamp_start": 1300, "timestamp_end": 1400, "message": "err", "additional_info": {"k": "v"}} + ] + } + writer_postgres.postgres_run_write(cur, "runs_table", "jobs_table", message) + # 1 insert for run + 2 inserts for jobs + self.assertEqual(len(cur.executions), 3) + run_sql, run_params = cur.executions[0] + self.assertIn("source_app_version", run_sql) # ensure fixed column name present implicitly + self.assertEqual(run_params[3], "runapp") # source_app param + job2_sql, job2_params = cur.executions[2] + self.assertEqual(job2_params[5], "err") + self.assertEqual(json.loads(job2_params[6]), {"k": "v"}) + + def test_postgres_test_write(self): + cur = MockCursor() + message = { + "event_id": "t1", + "tenant_id": "tenant-x", + "source_app": "test", + "environment": "dev", + "timestamp": 999, + "additional_info": {"a": 1} + } + writer_postgres.postgres_test_write(cur, "table_test", message) + self.assertEqual(len(cur.executions), 1) + sql, params = cur.executions[0] + self.assertEqual(params[0], "t1") + self.assertEqual(params[1], "tenant-x") + self.assertEqual(json.loads(params[5]), {"a": 1}) + +if __name__ == '__main__': + unittest.main()