Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 60 additions & 27 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
from jsonschema import validate
from jsonschema.exceptions import ValidationError

# Resolve project root (parent directory of this file's directory)
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
_CONF_DIR = os.path.join(_PROJECT_ROOT, 'conf')

sys.path.append(os.path.join(os.path.dirname(__file__)))

import writer_eventbridge
Expand All @@ -41,20 +45,20 @@
logger.addHandler(logging.StreamHandler())
logger.debug("Initialized LOGGER")

with open("conf/api.yaml", "r") as file:
with open(os.path.join(_CONF_DIR, 'api.yaml'), 'r') as file:
API = file.read()
logger.debug("Loaded API definition")

TOPICS = {}
with open("conf/topic_runs.json", "r") as file:
with open(os.path.join(_CONF_DIR, 'topic_runs.json'), 'r') as file:
TOPICS["public.cps.za.runs"] = json.load(file)
with open("conf/topic_dlchange.json", "r") as file:
with open(os.path.join(_CONF_DIR, 'topic_dlchange.json'), 'r') as file:
TOPICS["public.cps.za.dlchange"] = json.load(file)
with open("conf/topic_test.json", "r") as file:
with open(os.path.join(_CONF_DIR, 'topic_test.json'), 'r') as file:
TOPICS["public.cps.za.test"] = json.load(file)
logger.debug("Loaded TOPICS")

with open("conf/config.json", "r") as file:
with open(os.path.join(_CONF_DIR, 'config.json'), 'r') as file:
CONFIG = json.load(file)
logger.debug("Loaded main CONFIG")

Expand All @@ -80,6 +84,17 @@
writer_kafka.init(logger, CONFIG)
writer_postgres.init(logger)

def _error_response(status, err_type, message):
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": False,
"statusCode": status,
"errors": [{"type": err_type, "message": message}]
})
}

def get_api():
return {
"statusCode": 200,
Expand All @@ -104,7 +119,7 @@ def get_topics():
def get_topic_schema(topicName):
logger.debug(f"Handling GET TopicSchema({topicName})")
if topicName not in TOPICS:
return { "statusCode": 404 }
return _error_response(404, "topic", f"Topic '{topicName}' not found")

return {
"statusCode": 200,
Expand All @@ -116,35 +131,53 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
logger.debug(f"Handling POST {topicName}")
try:
token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
except Exception as e:
return {
"statusCode": 401,
"headers": {"Content-Type": "text/plain"},
"body": str(e)
}
except Exception:
return _error_response(401, "auth", "Invalid or missing token")

if topicName not in TOPICS:
return { "statusCode": 404 }
return _error_response(404, "topic", f"Topic '{topicName}' not found")

user = token["sub"]
if topicName not in ACCESS or user not in ACCESS[topicName]:
return { "statusCode": 403 }
return _error_response(403, "auth", "User not authorized for topic")

try:
validate(instance=topicMessage, schema=TOPICS[topicName])
except ValidationError as e:
return {
"statusCode": 400,
"headers": {"Content-Type": "text/plain"},
"body": e.message
}
return _error_response(400, "validation", e.message)

success = (
writer_kafka.write(topicName, topicMessage) and
writer_eventbridge.write(topicName, topicMessage) and
writer_postgres.write(topicName, topicMessage)
)
return {"statusCode": 202} if success else {"statusCode": 500}
# Run all writers independently (avoid short-circuit so failures in one don't skip others)
kafka_ok, kafka_err = writer_kafka.write(topicName, topicMessage)
eventbridge_ok, eventbridge_err = writer_eventbridge.write(topicName, topicMessage)
postgres_ok, postgres_err = writer_postgres.write(topicName, topicMessage)

errors = []
if not kafka_ok:
errors.append({"type": "kafka", "message": kafka_err})
if not eventbridge_ok:
errors.append({"type": "eventbridge", "message": eventbridge_err})
if not postgres_ok:
errors.append({"type": "postgres", "message": postgres_err})

if errors:
return {
"statusCode": 500,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": False,
"statusCode": 500,
"errors": errors
})
}

return {
"statusCode": 202,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": True,
"statusCode": 202
})
}

def extract_token(eventHeaders):
# Initial implementation used bearer header directly
Expand All @@ -171,7 +204,7 @@ def lambda_handler(event, context):
return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), extract_token(event["headers"]))
if event["resource"].lower() == "/terminate":
sys.exit("TERMINATING")
return {"statusCode": 404}
return _error_response(404, "route", "Resource not found")
except Exception as e:
logger.error(f"Unexpected exception: {e}")
return {"statusCode": 500}
return _error_response(500, "internal", "Unexpected server error")
14 changes: 8 additions & 6 deletions src/writer_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def init(logger, CONFIG):
def write(topicName, message):
if not EVENT_BUS_ARN:
_logger.debug("No EventBus Arn - skipping")
return True
return True, None

try:
_logger.debug(f"Sending to eventBridge {topicName}")
Expand All @@ -31,10 +31,12 @@ def write(topicName, message):
]
)
if response["FailedEntryCount"] > 0:
_logger.error(str(response))
return False
msg = str(response)
_logger.error(msg)
return False, msg
except Exception as e:
_logger.error(f'The EventBridge writer failed with unknown error: {str(e)}')
return False
err_msg = f'The EventBridge writer failed with unknown error: {str(e)}'
_logger.error(err_msg)
return False, err_msg

return True
return True, None
26 changes: 15 additions & 11 deletions src/writer_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@ def init(logger, CONFIG):
def write(topicName, message):
try:
_logger.debug(f"Sending to kafka {topicName}")
error = []
kafka_producer.produce(topicName,
key="",
value=json.dumps(message).encode("utf-8"),
callback = lambda err, msg: error.append(err) if err is not None else None)
errors = []
kafka_producer.produce(
topicName,
key="",
value=json.dumps(message).encode("utf-8"),
callback=lambda err, msg: errors.append(str(err)) if err is not None else None
)
kafka_producer.flush()
if error:
_logger.error(str(error))
return False
if errors:
msg = "; ".join(errors)
_logger.error(msg)
return False, msg
except Exception as e:
_logger.error(f'The Kafka writer failed with unknown error: {str(e)}')
return False
err_msg = f'The Kafka writer failed with unknown error: {str(e)}'
_logger.error(err_msg)
return False, err_msg

return True
return True, None
16 changes: 9 additions & 7 deletions src/writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ def write(topicName, message):
try:
if not POSTGRES["database"]:
_logger.debug("No Postgres - skipping")
return True
return True, None
if psycopg2 is None:
_logger.debug("psycopg2 not available - skipping actual Postgres write")
return True
return True, None

with psycopg2.connect(
database=POSTGRES["database"],
Expand All @@ -196,12 +196,14 @@ def write(topicName, message):
elif topicName == "public.cps.za.test":
postgres_test_write(cursor, "public_cps_za_test", message)
else:
_logger.error(f"unknown topic for postgres {topicName}")
return False
msg = f"unknown topic for postgres {topicName}"
_logger.error(msg)
return False, msg

connection.commit()
except Exception as e:
_logger.error(f'The Postgres writer with failed unknown error: {str(e)}')
return False
err_msg = f'The Postgres writer with failed unknown error: {str(e)}'
_logger.error(err_msg)
return False, err_msg

return True
return True, None
Loading