-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevent_gate_lambda.py
More file actions
139 lines (122 loc) · 5.47 KB
/
event_gate_lambda.py
File metadata and controls
139 lines (122 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Event Gate Lambda function implementation."""
import json
import logging
import os
import sys
from typing import Any, Dict
import boto3
from botocore.exceptions import BotoCoreError, NoCredentialsError
from src.handlers.handler_token import HandlerToken
from src.handlers.handler_topic import HandlerTopic
from src.handlers.handler_health import HandlerHealth
from src.utils.constants import SSL_CA_BUNDLE_KEY
from src.utils.utils import build_error_response
from src.writers.writer_eventbridge import WriterEventBridge
from src.writers.writer_kafka import WriterKafka
from src.writers.writer_postgres import WriterPostgres
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# Initialize logger
logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)
if not logger.handlers:
logger.addHandler(logging.StreamHandler())
logger.debug("Initialized logger with level %s", log_level)
# Load main configuration
logger.debug("Using CONF_DIR=%s", CONF_DIR)
if INVALID_CONF_ENV:
logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", INVALID_CONF_ENV, CONF_DIR)
with open(os.path.join(CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
config = json.load(file)
logger.debug("Loaded main configuration")
# Load API definition
with open(os.path.join(PROJECT_ROOT, "api.yaml"), "r", encoding="utf-8") as file:
API = file.read()
logger.debug("Loaded API definition")
# Initialize S3 client with SSL verification
try:
ssl_verify = config.get(SSL_CA_BUNDLE_KEY, True)
aws_s3 = boto3.Session().resource("s3", verify=ssl_verify)
logger.debug("Initialized AWS S3 Client")
except (BotoCoreError, NoCredentialsError) as exc:
logger.exception("Failed to initialize AWS S3 client")
raise RuntimeError("AWS S3 client initialization failed") from exc
# Load access configuration
ACCESS: Dict[str, list[str]] = {}
if config["access_config"].startswith("s3://"):
name_parts = config["access_config"].split("/")
BUCKET_NAME = name_parts[2]
BUCKET_OBJECT_KEY = "/".join(name_parts[3:])
ACCESS = json.loads(aws_s3.Bucket(BUCKET_NAME).Object(BUCKET_OBJECT_KEY).get()["Body"].read().decode("utf-8"))
else:
with open(config["access_config"], "r", encoding="utf-8") as file:
ACCESS = json.load(file)
logger.debug("Loaded access configuration")
# Initialize token handler and load token public keys
handler_token = HandlerToken(config).load_public_keys()
# Initialize EventGate writers
writers = {
"kafka": WriterKafka(config),
"eventbridge": WriterEventBridge(config),
"postgres": WriterPostgres(config),
}
# Initialize topic handler and load topic schemas
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token, writers).load_topic_schemas()
# Initialize health handler
handler_health = HandlerHealth(writers)
def get_api() -> Dict[str, Any]:
"""Return the OpenAPI specification text."""
return {"statusCode": 200, "body": API}
def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any]:
"""
AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
Args:
event: The event data from API Gateway.
_context: The mandatory context argument for AWS Lambda invocation (unused).
Returns:
A dictionary compatible with API Gateway Lambda Proxy integration.
Raises:
Request exception: For unexpected errors.
"""
try:
resource = event.get("resource", "").lower()
if resource == "/api":
return get_api()
if resource == "/token":
return handler_token.get_token_provider_info()
if resource == "/health":
return handler_health.get_health()
if resource == "/topics":
return handler_topic.get_topics_list()
if resource == "/topics/{topic_name}":
method = event.get("httpMethod")
if method == "GET":
return handler_topic.get_topic_schema(event["pathParameters"]["topic_name"].lower())
if method == "POST":
return handler_topic.post_topic_message(
event["pathParameters"]["topic_name"].lower(),
json.loads(event["body"]),
handler_token.extract_token(event.get("headers", {})),
)
if resource == "/terminate":
sys.exit("TERMINATING")
return build_error_response(404, "route", "Resource not found")
except (KeyError, json.JSONDecodeError, ValueError, AttributeError, TypeError, RuntimeError) as request_exc:
logger.exception("Request processing error: %s", request_exc)
return build_error_response(500, "internal", "Unexpected server error")