diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 8f5d64d..cc0d18c 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -27,6 +27,10 @@ from jsonschema import validate from jsonschema.exceptions import ValidationError +# Resolve project root (parent directory of this file's directory) +_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +_CONF_DIR = os.path.join(_PROJECT_ROOT, 'conf') + sys.path.append(os.path.join(os.path.dirname(__file__))) import writer_eventbridge @@ -41,20 +45,20 @@ logger.addHandler(logging.StreamHandler()) logger.debug("Initialized LOGGER") -with open("conf/api.yaml", "r") as file: +with open(os.path.join(_CONF_DIR, 'api.yaml'), 'r') as file: API = file.read() logger.debug("Loaded API definition") TOPICS = {} -with open("conf/topic_runs.json", "r") as file: +with open(os.path.join(_CONF_DIR, 'topic_runs.json'), 'r') as file: TOPICS["public.cps.za.runs"] = json.load(file) -with open("conf/topic_dlchange.json", "r") as file: +with open(os.path.join(_CONF_DIR, 'topic_dlchange.json'), 'r') as file: TOPICS["public.cps.za.dlchange"] = json.load(file) -with open("conf/topic_test.json", "r") as file: +with open(os.path.join(_CONF_DIR, 'topic_test.json'), 'r') as file: TOPICS["public.cps.za.test"] = json.load(file) logger.debug("Loaded TOPICS") -with open("conf/config.json", "r") as file: +with open(os.path.join(_CONF_DIR, 'config.json'), 'r') as file: CONFIG = json.load(file) logger.debug("Loaded main CONFIG") @@ -80,6 +84,17 @@ writer_kafka.init(logger, CONFIG) writer_postgres.init(logger) +def _error_response(status, err_type, message): + return { + "statusCode": status, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({ + "success": False, + "statusCode": status, + "errors": [{"type": err_type, "message": message}] + }) + } + def get_api(): return { "statusCode": 200, @@ -104,7 +119,7 @@ def get_topics(): def get_topic_schema(topicName): logger.debug(f"Handling GET TopicSchema({topicName})") if topicName not in TOPICS: - return { "statusCode": 404 } + return _error_response(404, "topic", f"Topic '{topicName}' not found") return { "statusCode": 200, @@ -116,35 +131,53 @@ def post_topic_message(topicName, topicMessage, tokenEncoded): logger.debug(f"Handling POST {topicName}") try: token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"]) - except Exception as e: - return { - "statusCode": 401, - "headers": {"Content-Type": "text/plain"}, - "body": str(e) - } + except Exception: + return _error_response(401, "auth", "Invalid or missing token") if topicName not in TOPICS: - return { "statusCode": 404 } + return _error_response(404, "topic", f"Topic '{topicName}' not found") user = token["sub"] if topicName not in ACCESS or user not in ACCESS[topicName]: - return { "statusCode": 403 } + return _error_response(403, "auth", "User not authorized for topic") try: validate(instance=topicMessage, schema=TOPICS[topicName]) except ValidationError as e: - return { - "statusCode": 400, - "headers": {"Content-Type": "text/plain"}, - "body": e.message - } + return _error_response(400, "validation", e.message) - success = ( - writer_kafka.write(topicName, topicMessage) and - writer_eventbridge.write(topicName, topicMessage) and - writer_postgres.write(topicName, topicMessage) - ) - return {"statusCode": 202} if success else {"statusCode": 500} + # Run all writers independently (avoid short-circuit so failures in one don't skip others) + kafka_ok, kafka_err = writer_kafka.write(topicName, topicMessage) + eventbridge_ok, eventbridge_err = writer_eventbridge.write(topicName, topicMessage) + postgres_ok, postgres_err = writer_postgres.write(topicName, topicMessage) + + errors = [] + if not kafka_ok: + errors.append({"type": "kafka", "message": kafka_err}) + if not eventbridge_ok: + errors.append({"type": "eventbridge", "message": eventbridge_err}) + if not postgres_ok: + errors.append({"type": "postgres", "message": postgres_err}) + + if errors: + return { + "statusCode": 500, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({ + "success": False, + "statusCode": 500, + "errors": errors + }) + } + + return { + "statusCode": 202, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({ + "success": True, + "statusCode": 202 + }) + } def extract_token(eventHeaders): # Initial implementation used bearer header directly @@ -171,7 +204,7 @@ def lambda_handler(event, context): return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), extract_token(event["headers"])) if event["resource"].lower() == "/terminate": sys.exit("TERMINATING") - return {"statusCode": 404} + return _error_response(404, "route", "Resource not found") except Exception as e: logger.error(f"Unexpected exception: {e}") - return {"statusCode": 500} + return _error_response(500, "internal", "Unexpected server error") diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py index d20f5fd..266e8d7 100644 --- a/src/writer_eventbridge.py +++ b/src/writer_eventbridge.py @@ -16,7 +16,7 @@ def init(logger, CONFIG): def write(topicName, message): if not EVENT_BUS_ARN: _logger.debug("No EventBus Arn - skipping") - return True + return True, None try: _logger.debug(f"Sending to eventBridge {topicName}") @@ -31,10 +31,12 @@ def write(topicName, message): ] ) if response["FailedEntryCount"] > 0: - _logger.error(str(response)) - return False + msg = str(response) + _logger.error(msg) + return False, msg except Exception as e: - _logger.error(f'The EventBridge writer failed with unknown error: {str(e)}') - return False + err_msg = f'The EventBridge writer failed with unknown error: {str(e)}' + _logger.error(err_msg) + return False, err_msg - return True + return True, None diff --git a/src/writer_kafka.py b/src/writer_kafka.py index b307cbd..2c2f253 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -29,17 +29,21 @@ def init(logger, CONFIG): def write(topicName, message): try: _logger.debug(f"Sending to kafka {topicName}") - error = [] - kafka_producer.produce(topicName, - key="", - value=json.dumps(message).encode("utf-8"), - callback = lambda err, msg: error.append(err) if err is not None else None) + errors = [] + kafka_producer.produce( + topicName, + key="", + value=json.dumps(message).encode("utf-8"), + callback=lambda err, msg: errors.append(str(err)) if err is not None else None + ) kafka_producer.flush() - if error: - _logger.error(str(error)) - return False + if errors: + msg = "; ".join(errors) + _logger.error(msg) + return False, msg except Exception as e: - _logger.error(f'The Kafka writer failed with unknown error: {str(e)}') - return False + err_msg = f'The Kafka writer failed with unknown error: {str(e)}' + _logger.error(err_msg) + return False, err_msg - return True + return True, None diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 87855d4..c533a5b 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -176,10 +176,10 @@ def write(topicName, message): try: if not POSTGRES["database"]: _logger.debug("No Postgres - skipping") - return True + return True, None if psycopg2 is None: _logger.debug("psycopg2 not available - skipping actual Postgres write") - return True + return True, None with psycopg2.connect( database=POSTGRES["database"], @@ -196,12 +196,14 @@ def write(topicName, message): elif topicName == "public.cps.za.test": postgres_test_write(cursor, "public_cps_za_test", message) else: - _logger.error(f"unknown topic for postgres {topicName}") - return False + msg = f"unknown topic for postgres {topicName}" + _logger.error(msg) + return False, msg connection.commit() except Exception as e: - _logger.error(f'The Postgres writer with failed unknown error: {str(e)}') - return False + err_msg = f'The Postgres writer with failed unknown error: {str(e)}' + _logger.error(err_msg) + return False, err_msg - return True + return True, None diff --git a/tests/test_event_gate_lambda.py b/tests/test_event_gate_lambda.py new file mode 100644 index 0000000..9a60918 --- /dev/null +++ b/tests/test_event_gate_lambda.py @@ -0,0 +1,220 @@ +import base64 +import json +import unittest +import importlib +import sys +import types +from unittest.mock import patch, MagicMock + +# Inject dummy confluent_kafka if not installed so patching works +if 'confluent_kafka' not in sys.modules: + dummy_ck = types.ModuleType('confluent_kafka') + class DummyProducer: # minimal interface + def __init__(self, *a, **kw): + pass + def produce(self, *a, **kw): + cb = kw.get('callback') + if cb: + cb(None, None) + def flush(self): + return None + dummy_ck.Producer = DummyProducer + sys.modules['confluent_kafka'] = dummy_ck + +# Inject dummy psycopg2 (not needed but prevents optional import noise) +if 'psycopg2' not in sys.modules: + dummy_pg = types.ModuleType('psycopg2') + sys.modules['psycopg2'] = dummy_pg + +def load_event_gate_module(): + # Start patches before import to intercept side effects + mock_requests_get = patch('requests.get').start() + mock_requests_get.return_value.json.return_value = {"key": base64.b64encode(b'dummy_der').decode('utf-8')} + + mock_load_key = patch('cryptography.hazmat.primitives.serialization.load_der_public_key').start() + mock_load_key.return_value = object() + + # Mock S3 access_config retrieval + class MockS3ObjectBody: + def read(self): + return json.dumps({ + "public.cps.za.runs": ["FooBarUser"], + "public.cps.za.dlchange": ["FooUser", "BarUser"], + "public.cps.za.test": ["TestUser"] + }).encode('utf-8') + class MockS3Object: + def get(self): + return {"Body": MockS3ObjectBody()} + class MockS3Bucket: + def Object(self, key): + return MockS3Object() + class MockS3Resource: + def Bucket(self, name): + return MockS3Bucket() + mock_session = patch('boto3.Session').start() + mock_session.return_value.resource.return_value = MockS3Resource() + + # Mock EventBridge client + mock_boto_client = patch('boto3.client').start() + mock_events_client = MagicMock() + mock_events_client.put_events.return_value = {"FailedEntryCount": 0} + mock_boto_client.return_value = mock_events_client + + # Allow kafka producer patching (already stubbed) but still patch to inspect if needed + patch('confluent_kafka.Producer').start() + + module = importlib.import_module('src.event_gate_lambda') + return module + +class TestEventGateLambda(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.event_gate_lambda = load_event_gate_module() + + @classmethod + def tearDownClass(cls): + patch.stopall() + + # Helper to construct API Gateway like event + def make_event(self, resource, method='GET', body=None, topic=None, headers=None): + return { + 'resource': resource, + 'httpMethod': method, + 'headers': headers or {}, + 'pathParameters': {'topic_name': topic} if topic else {}, + 'body': json.dumps(body) if isinstance(body, dict) else body + } + + def valid_payload(self): + return {"event_id":"e1","tenant_id":"t1","source_app":"app","environment":"dev","timestamp":123} + + # --- GET flows --- + def test_get_topics(self): + event = self.make_event('/topics') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 200) + body = json.loads(resp['body']) + self.assertIn('public.cps.za.test', body) + + def test_get_topic_schema_found(self): + event = self.make_event('/topics/{topic_name}', method='GET', topic='public.cps.za.test') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 200) + schema = json.loads(resp['body']) + self.assertEqual(schema['type'], 'object') + + def test_get_topic_schema_not_found(self): + event = self.make_event('/topics/{topic_name}', method='GET', topic='no.such.topic') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 404) + + # --- POST auth / validation failures --- + def test_post_missing_token(self): + payload = self.valid_payload() + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 401) + body = json.loads(resp['body']) + self.assertFalse(body['success']) + self.assertEqual(body['errors'][0]['type'], 'auth') + + def test_post_unauthorized_user(self): + payload = self.valid_payload() + with patch.object(self.event_gate_lambda.jwt, 'decode', return_value={'sub': 'NotAllowed'}, create=True): + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={'Authorization':'Bearer token'}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 403) + body = json.loads(resp['body']) + self.assertEqual(body['errors'][0]['type'], 'auth') + + def test_post_schema_validation_error(self): + payload = {"event_id":"e1","tenant_id":"t1","source_app":"app","environment":"dev"} # missing timestamp + with patch.object(self.event_gate_lambda.jwt, 'decode', return_value={'sub': 'TestUser'}, create=True): + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={'Authorization':'Bearer token'}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 400) + body = json.loads(resp['body']) + self.assertEqual(body['errors'][0]['type'], 'validation') + + # --- POST success & failure aggregation --- + def test_post_success_all_writers(self): + payload = self.valid_payload() + with patch.object(self.event_gate_lambda.jwt, 'decode', return_value={'sub': 'TestUser'}, create=True), \ + patch('src.event_gate_lambda.writer_kafka.write', return_value=(True, None)), \ + patch('src.event_gate_lambda.writer_eventbridge.write', return_value=(True, None)), \ + patch('src.event_gate_lambda.writer_postgres.write', return_value=(True, None)): + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={'Authorization':'Bearer token'}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 202) + body = json.loads(resp['body']) + self.assertTrue(body['success']) + self.assertEqual(body['statusCode'], 202) + + def test_post_single_writer_failure(self): + payload = self.valid_payload() + with patch.object(self.event_gate_lambda.jwt, 'decode', return_value={'sub': 'TestUser'}, create=True), \ + patch('src.event_gate_lambda.writer_kafka.write', return_value=(False, 'Kafka boom')), \ + patch('src.event_gate_lambda.writer_eventbridge.write', return_value=(True, None)), \ + patch('src.event_gate_lambda.writer_postgres.write', return_value=(True, None)): + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={'Authorization':'Bearer token'}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 500) + body = json.loads(resp['body']) + self.assertFalse(body['success']) + self.assertEqual(len(body['errors']), 1) + self.assertEqual(body['errors'][0]['type'], 'kafka') + + def test_post_multiple_writer_failures(self): + payload = self.valid_payload() + with patch.object(self.event_gate_lambda.jwt, 'decode', return_value={'sub': 'TestUser'}, create=True), \ + patch('src.event_gate_lambda.writer_kafka.write', return_value=(False, 'Kafka A')), \ + patch('src.event_gate_lambda.writer_eventbridge.write', return_value=(False, 'EB B')), \ + patch('src.event_gate_lambda.writer_postgres.write', return_value=(True, None)): + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={'Authorization':'Bearer token'}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 500) + body = json.loads(resp['body']) + self.assertEqual(len(body['errors']), 2) + types = sorted(e['type'] for e in body['errors']) + self.assertEqual(types, ['eventbridge', 'kafka']) + + def test_token_extraction_lowercase_bearer_header(self): + payload = self.valid_payload() + with patch.object(self.event_gate_lambda.jwt, 'decode', return_value={'sub': 'TestUser'}, create=True), \ + patch('src.event_gate_lambda.writer_kafka.write', return_value=(True, None)), \ + patch('src.event_gate_lambda.writer_eventbridge.write', return_value=(True, None)), \ + patch('src.event_gate_lambda.writer_postgres.write', return_value=(True, None)): + event = self.make_event('/topics/{topic_name}', method='POST', topic='public.cps.za.test', body=payload, headers={'bearer':'token'}) + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 202) + + def test_unknown_resource(self): + event = self.make_event('/unknown') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 404) + body = json.loads(resp['body']) + self.assertEqual(body['errors'][0]['type'], 'route') + + def test_get_api_endpoint(self): + event = self.make_event('/api') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 200) + self.assertIn('openapi', resp['body'].lower()) # crude check + + def test_get_token_endpoint(self): + event = self.make_event('/token') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 303) + self.assertIn('Location', resp['headers']) + + def test_internal_error_path(self): + # Force exception by patching get_topics to raise + with patch('src.event_gate_lambda.get_topics', side_effect=RuntimeError('boom')): + event = self.make_event('/topics') + resp = self.event_gate_lambda.lambda_handler(event, None) + self.assertEqual(resp['statusCode'], 500) + body = json.loads(resp['body']) + self.assertEqual(body['errors'][0]['type'], 'internal') + +if __name__ == '__main__': + unittest.main()