From 710446e142ce911321cefc7bf8c172b2af6b9d05 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Fri, 9 May 2025 10:14:45 +0200 Subject: [PATCH 1/3] #34 Send copy to EventBridge --- conf/config.json | 3 ++- src/event_gate_lambda.py | 40 ++++++++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/conf/config.json b/conf/config.json index 6dbc4ef..023fe64 100644 --- a/conf/config.json +++ b/conf/config.json @@ -3,5 +3,6 @@ "topics_config": "s3:///topics.json", "token_provider_url": "https://", "token_public_key_url": "https://", - "kafka_bootstrap_server": "localhost:9092" + "kafka_bootstrap_server": "localhost:9092", + "event_bus_arn": "arn:aws:events:" } \ No newline at end of file diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index fc43dee..e35c226 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -41,6 +41,7 @@ aws_session = boto3.Session() aws_s3 = aws_session.resource('s3', verify=False) +aws_eventbridge = boto3.client('events') if CONFIG["topics_config"].startswith("s3://"): name_parts = CONFIG["topics_config"].split('/') @@ -61,6 +62,12 @@ ACCESS = json.load(file) TOKEN_PROVIDER_URL = CONFIG["token_provider_url"] + +if "event_bus_arn" in CONFIG: + EVENT_BUS_ARN = CONFIG["event_bus_arn"] +else: + EVENT_BUS_ARN = "" + logger.info("Loaded configs") token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"] @@ -94,11 +101,26 @@ def kafkaWrite(topicName, message): callback = lambda err, msg: error.append(err) if err is not None else None) kafka_producer.flush() if error: - logger.error(error) - return 500 - else: - logger.info("OK") - return 202 + raise Exception(error) + +def eventBridgeWrite(topicName, message): + if not EVENT_BUS_ARN: + logger.info("No EventBus Arn - skipping") + return + + logger.info(f"Sending to eventBridge {topicName}") + response = aws_eventbridge.put_events( + Entries=[ + { + "Source": topicName, + 'DetailType': 'JSON', + 'Detail': json.dumps(message), + 'EventBusName': EVENT_BUS_ARN, + } + ] + ) + if response["FailedEntryCount"] > 0: + raise Exception(response) def getApi(): return { @@ -159,7 +181,13 @@ def postTopicMessage(topicName, topicMessage, tokenEncoded): "body": e.message } - return {"statusCode": kafkaWrite(topicName, topicMessage)} + try: + kafkaWrite(topicName, topicMessage) + eventBridgeWrite(topicName, topicMessage) + return {"statusCode": 202} + except Exception as e: + logger.error(str(e)) + return {"statusCode": 500} def lambda_handler(event, context): try: From 0dc0026db14aa3f6cb2b38d9c9b53f8b320aa2a5 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 13 May 2025 13:56:31 +0200 Subject: [PATCH 2/3] review takeaway --- src/event_gate_lambda.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index e35c226..dd7cff5 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -92,7 +92,7 @@ kafka_producer = Producer(producer_config) logger.info("Initialized kafka producer") -def kafkaWrite(topicName, message): +def kafka_write(topicName, message): logger.info(f"Sending to kafka {topicName}") error = [] kafka_producer.produce(topicName, @@ -103,7 +103,7 @@ def kafkaWrite(topicName, message): if error: raise Exception(error) -def eventBridgeWrite(topicName, message): +def event_bridge_write(topicName, message): if not EVENT_BUS_ARN: logger.info("No EventBus Arn - skipping") return @@ -122,20 +122,20 @@ def eventBridgeWrite(topicName, message): if response["FailedEntryCount"] > 0: raise Exception(response) -def getApi(): +def get_api(): return { "statusCode": 200, "body": API } -def getToken(): +def get_token(): logger.info("Handling GET Token") return { "statusCode": 303, "headers": {"Location": TOKEN_PROVIDER_URL} } -def getTopics(): +def get_topics(): logger.info("Handling GET Topics") return { "statusCode": 200, @@ -143,7 +143,7 @@ def getTopics(): "body": json.dumps([topicName for topicName in TOPICS]) } -def getTopicSchema(topicName): +def get_topic_schema(topicName): logger.info(f"Handling GET TopicSchema({topicName})") if topicName not in TOPICS: return { "statusCode": 404 } @@ -154,7 +154,7 @@ def getTopicSchema(topicName): "body": json.dumps(TOPICS[topicName]) } -def postTopicMessage(topicName, topicMessage, tokenEncoded): +def post_topic_message(topicName, topicMessage, tokenEncoded): logger.info(f"Handling POST {topicName}") try: token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"]) @@ -182,8 +182,8 @@ def postTopicMessage(topicName, topicMessage, tokenEncoded): } try: - kafkaWrite(topicName, topicMessage) - eventBridgeWrite(topicName, topicMessage) + kafka_write(topicName, topicMessage) + event_bridge_write(topicName, topicMessage) return {"statusCode": 202} except Exception as e: logger.error(str(e)) @@ -192,16 +192,16 @@ def postTopicMessage(topicName, topicMessage, tokenEncoded): def lambda_handler(event, context): try: if event["resource"].lower() == "/api": - return getApi() + return get_api() if event["resource"].lower() == "/token": - return getToken() + return get_token() if event["resource"].lower() == "/topics": - return getTopics() + return get_topics() if event["resource"].lower() == "/topics/{topic_name}": if event["httpMethod"] == "GET": - return getTopicSchema(event["pathParameters"]["topic_name"].lower()) + return get_topic_schema(event["pathParameters"]["topic_name"].lower()) if event["httpMethod"] == "POST": - return postTopicMessage(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), event["headers"]["bearer"]) + return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), event["headers"]["bearer"]) if event["resource"].lower() == "/terminate": sys.exit("TERMINATING") return {"statusCode": 404} From 61d0bb2ee08955957b4224c18a657e565902610d Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Tue, 13 May 2025 14:06:55 +0200 Subject: [PATCH 3/3] review takeaway --- src/event_gate_lambda.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index dd7cff5..ffdf1f8 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -181,13 +181,21 @@ def post_topic_message(topicName, topicMessage, tokenEncoded): "body": e.message } + wasError = False try: kafka_write(topicName, topicMessage) + except Exception as e: + logger.error(str(e)) + wasError = True + try: event_bridge_write(topicName, topicMessage) - return {"statusCode": 202} except Exception as e: logger.error(str(e)) + wasError = True + if wasError: return {"statusCode": 500} + else: + return {"statusCode": 202} def lambda_handler(event, context): try: