diff --git a/Dockerfile b/Dockerfile index 494f094..726afef 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ # test via (provide payload): # curl http://localhost:9000/2015-03-31/functions/function/invocations -d "{payload}" # -# Deploy to AWS Lambda via ACR +# Deploy to AWS Lambda via ECR FROM --platform=linux/arm64 public.ecr.aws/lambda/python:3.13-arm64 @@ -53,7 +53,7 @@ RUN \ echo "###################" && \ echo "### pip installs ###" && \ echo "###################" && \ - pip install requests==2.31.0 urllib3==1.26.18 setuptools cryptography jsonschema PyJWT && \ + pip install requests==2.31.0 urllib3==1.26.18 setuptools cryptography jsonschema PyJWT psycopg2-binary && \ echo "######################" && \ echo "### confluent-kafka ###" && \ echo "######################" && \ @@ -71,7 +71,7 @@ RUN \ # Lambda and SASL_SSL_Artifacts COPY $SASL_SSL_ARTIFACTS /opt/sasl_ssl_artifacts/ -COPY src/event_gate_lambda.py $LAMBDA_TASK_ROOT +COPY src/ $LAMBDA_TASK_ROOT/ COPY conf $LAMBDA_TASK_ROOT/conf # Mark librdkafka to LD_LIBRARY_PATH diff --git a/conf/access.json b/conf/access.json index a401f38..9329d15 100644 --- a/conf/access.json +++ b/conf/access.json @@ -1,8 +1,8 @@ { - "run.topic": [ + "public.cps.za.runs": [ "FooBarUser" ], - "edla.change.topic": [ + "public.cps.za.dlchange": [ "FooUser", "BarUser" ] diff --git a/conf/config.json b/conf/config.json index 023fe64..84ab9f8 100644 --- a/conf/config.json +++ b/conf/config.json @@ -1,6 +1,5 @@ { "access_config": "s3:///access.json", - "topics_config": "s3:///topics.json", "token_provider_url": "https://", "token_public_key_url": "https://", "kafka_bootstrap_server": "localhost:9092", diff --git a/conf/topic_dlchange.json b/conf/topic_dlchange.json new file mode 100644 index 0000000..7cd7c22 --- /dev/null +++ b/conf/topic_dlchange.json @@ -0,0 +1,55 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID)" + }, + "tenant_id": { + "type": "string", + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": "string", + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "number", + "description": "Timestamp of the event in epoch milliseconds" + }, + "catalog_id": { + "type": "string", + "description": "Identifier for the data definition (Glue/Hive) database and table name for example " + }, + "operation": { + "type": "string", + "enum": ["overwrite", "append", "archive", "delete"], + "description": "Operation performed" + }, + "location": { + "type": "string", + "description": "Location of the data" + }, + "format": { + "type": "string", + "description": "Format of the data (parquet, delta, crunch, etc)." + }, + "format_options": { + "type": "object", + "description": "When possible, add additional options related to the format" + }, + "additional_info": { + "type": "object", + "description": "Optional additional fields structured as an inner JSON" + } + }, + "required": ["event_id", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_event", "catalog_id", "operation", "format"] +} diff --git a/conf/topic_runs.json b/conf/topic_runs.json new file mode 100644 index 0000000..a13b6f5 --- /dev/null +++ b/conf/topic_runs.json @@ -0,0 +1,73 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID), generated for each unique event, for de-duplication purposes" + }, + "job_ref": { + "type": "string", + "description": "Identifier of the job in it’s respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, + "tenant_id ": { + "type": "string", + "description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)" + }, + "source_app": { + "type": "string", + "description": "Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_start": { + "type": "number", + "description": "Start timestamp of the run in epoch milliseconds" + }, + "timestamp_end": { + "type": "number", + "description": "End timestamp of the run in epoch milliseconds" + }, + "jobs": { + "type": "array", + "description": "List of individual jobs withing the run", + "items": { + "type": "object", + "properties": { + "catalog_id": { + "type": "string", + "description": "Identifier for the data definition (Glue/Hive) database and table name for example" + }, + "status": { + "type": "string", + "enum": ["succeeded", "failed", "killed", "skipped"], + "description": "Status of the job." + }, + "timestamp_start": { + "type": "number", + "description": "Start timestamp of a job that is a part of a run in epoch milliseconds" + }, + "timestamp_end": { + "type": "number", + "description": "End timestamp of a job that is a part of a run in epoch milliseconds" + }, + "message": { + "type": "string", + "description": "Job status/error message." + }, + "additional_info": { + "type": "object", + "description": "Optional additional fields structured as an inner JSON" + } + }, + "required": ["catalog_id", "status", "timestamp_start", "timestamp_end"] + } + } + }, + "required": ["event_id", "job_ref", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_start", "timestamp_end", "jobs"] +} diff --git a/conf/topics.json b/conf/topics.json deleted file mode 100644 index 2f83383..0000000 --- a/conf/topics.json +++ /dev/null @@ -1,119 +0,0 @@ -{ - "run.topic": { - "type": "object", - "properties": { - "event_id": { - "type": "string", - "description": "Unique identifier for the event (GUID), generated for each unique event, for de-duplication purposes" - }, - "job_ref": { - "type": "string", - "description": "Identifier of the job in it’s respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." - }, - "tenant_id ": { - "type": "string", - "description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)" - }, - "source_app": { - "type": "string", - "description": "Standardized source application name (aqueduct, unify, lum, etc)" - }, - "source_app_version": { - "type": "string", - "description": "Source application version (SemVer preferred)" - }, - "environment": { - "type": "string", - "description": "Environment (dev, uat, pre-prod, prod, test or others)" - }, - "timestamp_start": { - "type": "number", - "description": "Start timestamp of the run in epoch milliseconds" - }, - "timestamp_end": { - "type": "number", - "description": "End timestamp of the run in epoch milliseconds" - }, - "jobs": { - "type": "array", - "description": "List of individual jobs withing the run", - "element_type": "object", - "object_schema": { - "catalog_id": { - "type": "string", - "description": "Identifier for the data definition (Glue/Hive) database and table name for example" - }, - "status": { - "type": "string", - "enum": ["succeeded", "failed", "killed", "skipped"], - "description": "Status of the job." - }, - "timestamp_start": { - "type": "number", - "description": "Start timestamp of a job that is a part of a run in epoch milliseconds" - }, - "timestamp_end": { - "type": "number", - "description": "End timestamp of a job that is a part of a run in epoch milliseconds" - }, - "message": { - "type": "string", - "description": "Job status/error message." - } - } - } - }, - "required": ["event_id", "job_ref", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_start", "timestamp_end", "jobs"] - }, - "edla.change.topic": { - "type": "object", - "properties": { - "event_id": { - "type": "string", - "description": "Unique identifier for the event (GUID)" - }, - "tenant_id": { - "type": "string", - "description": "Application ID or ServiceNow identifier" - }, - "source_app": { - "type": "string", - "description": " Standardized source application name (aqueduct, unify, lum, etc)" - }, - "source_app_version": { - "type": "string", - "description": "Source application version (SemVer preferred)" - }, - "environment": { - "type": "string", - "description": "Environment (dev, uat, pre-prod, prod, test or others)" - }, - "timestamp_event": { - "type": "number", - "description": "Timestamp of the event in epoch milliseconds" - }, - "catalog_id": { - "type": "string", - "description": "Identifier for the data definition (Glue/Hive) database and table name for example " - }, - "operation": { - "type": "string", - "enum": ["overwrite", "append", "archive", "delete"], - "description": "Operation performed" - }, - "location": { - "type": "string", - "description": "Location of the data" - }, - "format": { - "type": "string", - "description": "Format of the data (parquet, delta, crunch, etc)." - }, - "format_options": { - "type": "object", - "description": "When possible, add additional options related to the format" - } - }, - "required": ["event_id", "tenant_id", "source_app", "source_app_version", "environment", "timestamp_event", "catalog_id", "operation", "format"] - } -} diff --git a/scripts/notebook.ipynb b/scripts/notebook.ipynb index ccdb3b8..945a0d9 100644 --- a/scripts/notebook.ipynb +++ b/scripts/notebook.ipynb @@ -3,16 +3,29 @@ { "cell_type": "code", "execution_count": null, - "id": "e8e37945-f8c5-4769-bdde-226edeb8465f", + "id": "b0ddfccc-0a08-4c37-89c0-fa144ef516e3", "metadata": {}, "outputs": [], "source": [ - "%load_ext autoreload\n", - "%autoreload 2\n", + "# Set postgres secret\n", + "import os\n", "\n", + "os.environ[\"POSTGRES_SECRET_NAME\"] = \"\"\n", + "os.environ[\"POSTGRES_SECRET_REGION\"] = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e8e37945-f8c5-4769-bdde-226edeb8465f", + "metadata": {}, + "outputs": [], + "source": [ + "# Load lambda core\n", "# Jump out of the \"Scripts\" box for the lambda source\n", "import sys\n", "import os\n", + "os.environ[\"LOG_LEVEL\"] = \"DEBUG\"\n", "parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))\n", "sys.path.insert(0, parent_dir)\n", "if not os.path.exists('src'):\n", @@ -27,7 +40,8 @@ "metadata": {}, "outputs": [], "source": [ - "jwtToken = \"eyJhb...\"" + "# Set token for querying lambda\n", + "jwtToken = \"eyJhb\"" ] }, { @@ -37,6 +51,7 @@ "metadata": {}, "outputs": [], "source": [ + "# GET API\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/api\"\n", @@ -50,6 +65,7 @@ "metadata": {}, "outputs": [], "source": [ + "# GET TOKEN => path to token source\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/token\"\n", @@ -63,6 +79,7 @@ "metadata": {}, "outputs": [], "source": [ + "# GET TOPICS\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/topics\"\n", @@ -76,10 +93,11 @@ "metadata": {}, "outputs": [], "source": [ + "# GET TOPIC SCHEMA\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"GET\",\n", " \"resource\": \"/topics/{topic_name}\",\n", - " \"pathParameters\": {\"topic_name\": \"run.topic\"}\n", + " \"pathParameters\": {\"topic_name\": \"public.cps.za.dlchange\"}\n", "}, {})" ] }, @@ -90,24 +108,25 @@ "metadata": {}, "outputs": [], "source": [ + "# POST MESSAGE\n", "import json\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"POST\",\n", " \"resource\": \"/topics/{topic_name}\",\n", - " \"pathParameters\": {\"topic_name\": \"edla.change.topic\"},\n", + " \"pathParameters\": {\"topic_name\": \"public.cps.za.dlchange\"},\n", " \"headers\": {\"bearer\": jwtToken},\n", " \"body\": json.dumps({\n", " \"event_id\": \"JupyterEventId\",\n", " \"tenant_id\": \"JupyterTenantId\",\n", " \"source_app\": \"JupyterSrc\",\n", - " \"source_app_version\": \"v2024-10-17\",\n", + " \"source_app_version\": \"v2025-05-20\",\n", " \"environment\": \"JupyterEnv\",\n", " \"timestamp_event\": 1729602770000,\n", " \"catalog_id\": \"TestCatalog\",\n", " \"operation\": \"delete\",\n", " \"location\": \"UnitTest\",\n", " \"format\": \"TestFormat\",\n", - " \"formatOptions\": {\"Foo\" : \"Bar\"}\n", + " \"format_options\": {\"Foo\" : \"Bar\"}\n", " })\n", "}, {})" ] @@ -119,6 +138,7 @@ "metadata": {}, "outputs": [], "source": [ + "# CYCLE LAMBDA ENVIRONMENT\n", "src.event_gate_lambda.lambda_handler({\n", " \"httpMethod\": \"POST\",\n", " \"resource\": \"/terminate\"\n", diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 5345566..49eaa06 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -13,46 +13,51 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os import base64 import json import logging +import os import sys import urllib3 -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +import boto3 +import jwt +import requests from cryptography.hazmat.primitives import serialization from jsonschema import validate from jsonschema.exceptions import ValidationError -import jwt -import requests -import boto3 -from confluent_kafka import Producer +sys.path.append(os.path.join(os.path.dirname(__file__))) + +import writer_eventbridge +import writer_kafka +import writer_postgres + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger(__name__) -log_level = os.environ.get('LOG_LEVEL', 'INFO') +log_level = os.environ.get("LOG_LEVEL", "INFO") logger.setLevel(log_level) logger.addHandler(logging.StreamHandler()) +logger.debug("Initialized LOGGER") with open("conf/api.yaml", "r") as file: API = file.read() +logger.debug("Loaded API definition") + +TOPICS = {} +with open("conf/topic_runs.json", "r") as file: + TOPICS["public.cps.za.runs"] = json.load(file) +with open("conf/topic_dlchange.json", "r") as file: + TOPICS["public.cps.za.dlchange"] = json.load(file) +logger.debug("Loaded TOPICS") with open("conf/config.json", "r") as file: CONFIG = json.load(file) +logger.debug("Loaded main CONFIG") -aws_session = boto3.Session() -aws_s3 = aws_session.resource('s3', verify=False) -aws_eventbridge = boto3.client('events') - -if CONFIG["topics_config"].startswith("s3://"): - name_parts = CONFIG["topics_config"].split('/') - bucket_name = name_parts[2] - bucket_object = "/".join(name_parts[3:]) - TOPICS = json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object).get()["Body"].read().decode("utf-8")) -else: - with open(CONFIG["topics_config"], "r") as file: - TOPICS = json.load(file) +aws_s3 = boto3.Session().resource('s3', verify=False) +logger.debug("Initialized AWS S3 Client") if CONFIG["access_config"].startswith("s3://"): name_parts = CONFIG["access_config"].split('/') @@ -62,67 +67,16 @@ else: with open(CONFIG["access_config"], "r") as file: ACCESS = json.load(file) - -TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] - -if "event_bus_arn" in CONFIG: - EVENT_BUS_ARN = CONFIG["event_bus_arn"] -else: - EVENT_BUS_ARN = "" - -logger.debug("Loaded configs") +logger.debug("Loaded ACCESS definitions") +TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"] TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded)) -logger.debug("Loaded token public key") - -producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]} -if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG: - producer_config.update({ - "security.protocol": "SASL_SSL", - "sasl.mechanism": "GSSAPI", - "sasl.kerberos.service.name": "kafka", - "sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"], - "sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"], - "ssl.ca.location": CONFIG["kafka_ssl_ca_path"], - "ssl.certificate.location": CONFIG["kafka_ssl_cert_path"], - "ssl.key.location": CONFIG["kafka_ssl_key_path"], - "ssl.key.password": CONFIG["kafka_ssl_key_password"] - }) - logger.debug("producer will use SASL_SSL") - -kafka_producer = Producer(producer_config) -logger.debug("Initialized kafka producer") - -def kafka_write(topicName, message): - logger.debug(f"Sending to kafka {topicName}") - error = [] - kafka_producer.produce(topicName, - key="", - value=json.dumps(message).encode("utf-8"), - callback = lambda err, msg: error.append(err) if err is not None else None) - kafka_producer.flush() - if error: - raise Exception(error) - -def event_bridge_write(topicName, message): - if not EVENT_BUS_ARN: - logger.debug("No EventBus Arn - skipping") - return - - logger.debug(f"Sending to eventBridge {topicName}") - response = aws_eventbridge.put_events( - Entries=[ - { - "Source": topicName, - 'DetailType': 'JSON', - 'Detail': json.dumps(message), - 'EventBusName': EVENT_BUS_ARN, - } - ] - ) - if response["FailedEntryCount"] > 0: - raise Exception(response) +logger.debug("Loaded TOKEN_PUBLIC_KEY") + +writer_eventbridge.init(logger, CONFIG) +writer_kafka.init(logger, CONFIG) +writer_postgres.init(logger) def get_api(): return { @@ -183,21 +137,12 @@ def post_topic_message(topicName, topicMessage, tokenEncoded): "body": e.message } - wasError = False - try: - kafka_write(topicName, topicMessage) - except Exception as e: - logger.error(str(e)) - wasError = True - try: - event_bridge_write(topicName, topicMessage) - except Exception as e: - logger.error(str(e)) - wasError = True - if wasError: - return {"statusCode": 500} - else: - return {"statusCode": 202} + success = ( + writer_kafka.write(topicName, topicMessage) and + writer_eventbridge.write(topicName, topicMessage) and + writer_postgres.write(topicName, topicMessage) + ) + return {"statusCode": 202} if success else {"statusCode": 500} def lambda_handler(event, context): try: diff --git a/src/requirements.txt b/src/requirements.txt index 6411363..89ddd0e 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -4,4 +4,5 @@ jsonschema PyJWT requests boto3 -confluent_kafka \ No newline at end of file +confluent_kafka +psycopg2 \ No newline at end of file diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py new file mode 100644 index 0000000..1a9446d --- /dev/null +++ b/src/writer_eventbridge.py @@ -0,0 +1,40 @@ +import json + +import boto3 + +def init(logger, CONFIG): + global _logger + global EVENT_BUS_ARN + global aws_eventbridge + + _logger = logger + + aws_eventbridge = boto3.client('events') + EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else "" + _logger.debug("Initialized EVENTBRIDGE writer") + +def write(topicName, message): + if not aws_eventbridge: + _logger.debug("No EventBus Arn - skipping") + return True + + try: + _logger.debug(f"Sending to eventBridge {topicName}") + response = aws_eventbridge.put_events( + Entries=[ + { + "Source": topicName, + 'DetailType': 'JSON', + 'Detail': json.dumps(message), + 'EventBusName': EVENT_BUS_ARN, + } + ] + ) + if response["FailedEntryCount"] > 0: + _logger.error(str(response)) + return False + except Exception as e: + _logger.error(str(e)) + return False + + return True diff --git a/src/writer_kafka.py b/src/writer_kafka.py new file mode 100644 index 0000000..13289d8 --- /dev/null +++ b/src/writer_kafka.py @@ -0,0 +1,45 @@ +import json + +import boto3 +from confluent_kafka import Producer + +def init(logger, CONFIG): + global _logger + global kafka_producer + + _logger = logger + + producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]} + if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG: + producer_config.update({ + "security.protocol": "SASL_SSL", + "sasl.mechanism": "GSSAPI", + "sasl.kerberos.service.name": "kafka", + "sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"], + "sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"], + "ssl.ca.location": CONFIG["kafka_ssl_ca_path"], + "ssl.certificate.location": CONFIG["kafka_ssl_cert_path"], + "ssl.key.location": CONFIG["kafka_ssl_key_path"], + "ssl.key.password": CONFIG["kafka_ssl_key_password"] + }) + _logger.debug("producer will use SASL_SSL") + kafka_producer = Producer(producer_config) + _logger.debug("Initialized KAFKA writer") + +def write(topicName, message): + try: + _logger.debug(f"Sending to kafka {topicName}") + error = [] + kafka_producer.produce(topicName, + key="", + value=json.dumps(message).encode("utf-8"), + callback = lambda err, msg: error.append(err) if err is not None else None) + kafka_producer.flush() + if error: + _logger.error(str(error)) + return False + except Exception as e: + _logger.error(str(e)) + return False + + return True diff --git a/src/writer_postgres.py b/src/writer_postgres.py new file mode 100644 index 0000000..cd56488 --- /dev/null +++ b/src/writer_postgres.py @@ -0,0 +1,167 @@ +import json +import os + +import boto3 +from botocore.exceptions import ClientError +import psycopg2 + +def init(logger): + global _logger + global POSTGRES + + _logger = logger + + secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") + secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") + + if secret_name and secret_region: + aws_secrets = boto3.Session().client(service_name='secretsmanager', region_name=secret_region) + postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] + POSTGRES = json.loads(postgres_secret) + + _logger.debug("Initialized POSTGRES writer") + +def postgres_edla_write(cursor, table, message): + _logger.debug(f"Sending to Postgres - {table}") + cursor.execute(f""" + INSERT INTO {table} + ( + event_id, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_event, + catalog_id, + operation, + "location", + "format", + format_options, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_event"], + message["catalog_id"], + message["operation"], + message["location"] if "location" in message else None, + message["format"], + json.dumps(message["format_options"]) if "format_options" in message else None, + json.dumps(message["additional_info"]) if "additional_info" in message else None + ) + ) + +def postgres_run_write(cursor, table_runs, table_jobs, message): + _logger.debug(f"Sending to Postgres - {table_runs} and {table_jobs}") + cursor.execute(f""" + INSERT INTO {table_runs} + ( + event_id, + job_ref, + tenant_id, + soure_app, + source_app_version, + environment, + timestamp_start, + timestamp_end + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + message["job_ref"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_start"], + message["timestamp_end"] + ) + ) + + for job in message["jobs"]: + cursor.execute(f""" + INSERT INTO {table_jobs} + ( + event_id, + catalog_id, + status, + timestamp_start, + timestamp_end, + message, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( + message["event_id"], + job["catalog_id"], + job["status"], + job["timestamp_start"], + job["timestamp_end"], + job["message"] if "message" in job else None, + json.dumps(job["additional_info"]) if "additional_info" in job else None + ) + ) + +def write(topicName, message): + try: + if not POSTGRES["database"]: + _logger.debug("No Postgress - skipping") + return + + with psycopg2.connect( + database=POSTGRES["database"], + host=POSTGRES["host"], + user=POSTGRES["user"], + password=POSTGRES["password"], + port=POSTGRES["port"] + ) as connection: + with connection.cursor() as cursor: + if topicName == "public.cps.za.dlchange": + postgres_edla_write(cursor, "public_cps_za_dlchange", message) + elif topicName == "public.cps.za.runs": + postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) + else: + _logger.error(f"unknown topic for postgres {topicName}") + return False + + connection.commit() + except Exception as e: + _logger.error(str(e)) + return False + + return True