Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions app/analysis_server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""FastAPI entrypoint for the ephemeral analysis server."""

from __future__ import annotations

import os
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException

from app.core.config import get_settings
from app.core.logging import configure_logging
from app.services.kafka_analysis_consumer_service import KafkaAnalysisConsumerService

settings = get_settings()
configure_logging(settings.debug)


@asynccontextmanager
async def lifespan(application: FastAPI):
consumer_service = KafkaAnalysisConsumerService(settings)
await consumer_service.start()
application.state.analysis_consumer_service = consumer_service
try:
yield
finally:
await consumer_service.stop()


def create_app() -> FastAPI:
application = FastAPI(title=f"{settings.app_name}-analysis-server", lifespan=lifespan)

@application.get("/")
async def root() -> dict[str, str]:
return {"app": settings.app_name, "mode": "analysis-server", "health": "/health", "ready": "/ready"}

@application.get("/health")
async def health() -> dict[str, object]:
return application.state.analysis_consumer_service.health_payload()

@application.get("/ready")
async def ready() -> dict[str, object]:
payload = application.state.analysis_consumer_service.readiness_payload()
if payload["ready"]:
return payload
raise HTTPException(status_code=503, detail=payload)

return application


app = create_app()


def run() -> None:
import uvicorn

uvicorn.run(
"app.analysis_server.main:app",
host=os.getenv("APP_HOST", "0.0.0.0"),
port=int(os.getenv("APP_PORT", "8000")),
)


if __name__ == "__main__":
run()
3 changes: 2 additions & 1 deletion app/batch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from app.core.config import get_settings
from app.core.logging import configure_logging
from app.infra.kafka.client_options import build_kafka_client_options
from app.infra.postgres.analysis_repository import AnalysisRepository
from app.infra.postgres.client import create_postgres_pool
from app.infra.postgres.dispatch_outbox_repository import DispatchOutboxRepository
Expand All @@ -32,11 +33,11 @@ async def run_once() -> int:

consumer = AIOKafkaConsumer(
settings.kafka_analysis_request_topic,
bootstrap_servers=[s.strip() for s in settings.kafka_bootstrap_servers.split(",") if s.strip()],
group_id=settings.kafka_consumer_group_id,
auto_offset_reset=settings.kafka_auto_offset_reset,
enable_auto_commit=False,
value_deserializer=lambda value: json.loads(value.decode("utf-8")),
**build_kafka_client_options(settings),
)

processed_count = 0
Expand Down
13 changes: 13 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Settings(BaseSettings):
database_ssl: bool = Field(default=False, validation_alias=AliasChoices("DATABASE_SSL", "DB_SSL"))
openai_api_key: str = ""
openai_chat_model: str = "gpt-4o-mini"
# ์ฟผ๋ฆฌ ์ž„๋ฒ ๋”ฉ์šฉ. product.embedding_vector ์ €์žฅ ์‹œ ์‚ฌ์šฉํ•œ ๋ชจ๋ธ๊ณผ ๋™์ผํ•ด์•ผ ํ•จ (๊ธฐ๋ณธ 1536์ฐจ์›, DB VECTOR(1536)).
openai_embedding_model: str = "text-embedding-3-small"
recommend_top_k: int = 3
cache_ttl_days: int = 7
Expand All @@ -32,12 +33,24 @@ class Settings(BaseSettings):
kafka_consumer_enabled: bool = False
kafka_bootstrap_servers: str = "localhost:9092"
kafka_analysis_request_topic: str = "analysis.request.v1"
kafka_recommendation_topic: str = Field(
default="recommendation",
validation_alias=AliasChoices("KAFKA_RECOMMENDATION_TOPIC", "RECOMMENDATION_TOPIC"),
)
kafka_analysis_response_topic: str = "analysis.response.v1"
kafka_consumer_group_id: str = "counseling-analytics-consumer"
kafka_auto_offset_reset: str = "earliest"
kafka_batch_size: int = 1000
kafka_poll_timeout_ms: int = 1000
kafka_log_each_message: bool = False
kafka_log_result_limit: int = 20
kafka_response_max_attempts: int = 3
kafka_security_protocol: str = Field(default="PLAINTEXT", validation_alias=AliasChoices("KAFKA_SECURITY_PROTOCOL"))
kafka_sasl_mechanism: str = Field(default="", validation_alias=AliasChoices("KAFKA_SASL_MECHANISM"))
kafka_aws_region: str = Field(default="", validation_alias=AliasChoices("KAFKA_AWS_REGION", "AWS_REGION", "AWS_DEFAULT_REGION"))
kafka_max_poll_interval_ms: int = Field(default=1800000, validation_alias=AliasChoices("KAFKA_MAX_POLL_INTERVAL_MS"))
kafka_session_timeout_ms: int = Field(default=60000, validation_alias=AliasChoices("KAFKA_SESSION_TIMEOUT_MS"))
kafka_heartbeat_interval_ms: int = Field(default=15000, validation_alias=AliasChoices("KAFKA_HEARTBEAT_INTERVAL_MS"))

# PostgreSQL connection for bulk lookup
postgres_dsn: str = Field(default="", validation_alias=AliasChoices("POSTGRES_DSN", "DB_DSN"))
Expand Down
61 changes: 61 additions & 0 deletions app/infra/kafka/client_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

import asyncio
import time
from typing import Any

from aiokafka.abc import AbstractTokenProvider
from aiokafka.helpers import create_ssl_context
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

from app.core.config import Settings


class MskIamTokenProvider(AbstractTokenProvider):
def __init__(self, region: str) -> None:
self._region = region
self._token = ""
self._expiry_ms = 0
self._lock = asyncio.Lock()

async def token(self) -> str:
async with self._lock:
now_ms = int(time.time() * 1000)
if self._token and now_ms < self._expiry_ms - 60_000:
return self._token

token, expiry_ms = await asyncio.get_running_loop().run_in_executor(
None,
MSKAuthTokenProvider.generate_auth_token,
self._region,
)
self._token = token
self._expiry_ms = int(expiry_ms)
return token


def build_kafka_client_options(settings: Settings) -> dict[str, Any]:
options: dict[str, Any] = {
"bootstrap_servers": [server.strip() for server in settings.kafka_bootstrap_servers.split(",") if server.strip()],
}
security_protocol = settings.kafka_security_protocol.strip().upper()
if not security_protocol:
return options

options["security_protocol"] = security_protocol
if security_protocol in {"SSL", "SASL_SSL"}:
options["ssl_context"] = create_ssl_context()

if security_protocol.startswith("SASL"):
sasl_mechanism = settings.kafka_sasl_mechanism.strip().upper()
if not sasl_mechanism:
raise RuntimeError("KAFKA_SASL_MECHANISM must be set when using a SASL security protocol.")

options["sasl_mechanism"] = sasl_mechanism
if sasl_mechanism == "OAUTHBEARER":
region = settings.kafka_aws_region.strip()
if not region:
raise RuntimeError("KAFKA_AWS_REGION or AWS_REGION must be set for SASL/OAUTHBEARER.")
options["sasl_oauth_token_provider"] = MskIamTokenProvider(region)

return options
72 changes: 65 additions & 7 deletions app/infra/postgres/dispatch_outbox_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,79 @@ class DispatchOutboxRepository:
def __init__(self, pool: Pool) -> None:
self._pool = pool

async def mark_acked_by_request_ids(self, request_ids: list[str]) -> set[str]:
async def load_metadata_by_request_ids(self, request_ids: list[str]) -> dict[str, dict[str, str | None]]:
if not request_ids:
return set()
return {}

sql = """
SELECT
request_id,
chunk_id,
type::text AS type,
dispatch_status::text AS dispatch_status
FROM analysis_dispatch_outbox
WHERE request_id = ANY($1::text[])
"""

async with self._pool.acquire() as conn:
rows = await conn.fetch(sql, request_ids)

return {
str(row["request_id"]): {
"chunkId": row["chunk_id"],
"type": row["type"],
"dispatchStatus": row["dispatch_status"],
}
for row in rows
}

async def prepare_response_dispatch(self, request_id: str, analysis_status: str) -> bool:
sql = """
UPDATE analysis_dispatch_outbox
SET
dispatch_status = 'ACKED'::dispatch_status,
analysis_status = 'READY'::analysis_status,
type = 'RESPONSE'::dispatch_outbox_type,
dispatch_status = 'SENT'::dispatch_status,
analysis_status = $2::analysis_status,
last_error = NULL,
updated_at = NOW()
WHERE request_id = ANY($1::text[])
WHERE request_id = $1
AND dispatch_status <> 'ACKED'::dispatch_status
Comment on lines +43 to +44

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

prepare_response_dispatch ๋ฉ”์†Œ๋“œ์˜ WHERE ์กฐ๊ฑด์ด ๋ถˆ์ถฉ๋ถ„ํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ์ค‘๋ณต ๋ฐœํ–‰์˜ ์œ„ํ—˜์ด ์žˆ์Šต๋‹ˆ๋‹ค. ํ˜„์žฌ dispatch_status <> 'ACKED'๋งŒ ํ™•์ธํ•˜๊ณ  ์žˆ์–ด, SENT ์ƒํƒœ์ธ ๋ฉ”์‹œ์ง€๋„ ๋‹ค์‹œ ์ฒ˜๋ฆฌ ๋Œ€์ƒ์œผ๋กœ ๊ฐ„์ฃผ๋ฉ๋‹ˆ๋‹ค. kafka_analysis_consumer_service์—์„œ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ์ค‘ ์ผ๋ถ€ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰ ์‹คํŒจ๋กœ ์žฌ์‹œ๋„๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด, ์ด๋ฏธ ์„ฑ๊ณต์ ์œผ๋กœ ๋ฐœํ–‰๋œ ๋ฉ”์‹œ์ง€๋“ค์ด ๋‹ค์‹œ ๋ฐœํ–‰๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. WHERE ์กฐ๊ฑด์— SENT ์ƒํƒœ๋„ ์ œ์™ธํ•˜์—ฌ ์ค‘๋ณต ์ฒ˜๋ฆฌ๋ฅผ ๋ฐฉ์ง€ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

Suggested change
WHERE request_id = $1
AND dispatch_status <> 'ACKED'::dispatch_status
WHERE request_id = $1
AND dispatch_status NOT IN ('SENT', 'ACKED')

RETURNING request_id
"""

async with self._pool.acquire() as conn:
rows = await conn.fetch(sql, request_ids)
row = await conn.fetchrow(sql, request_id, analysis_status)

return row is not None

async def mark_response_retry(
self,
request_id: str,
last_error: str,
max_attempts: int,
analysis_status: str,
) -> str:
sql = """
UPDATE analysis_dispatch_outbox
SET
type = 'RESPONSE'::dispatch_outbox_type,
dispatch_status = CASE
WHEN attempt_count + 1 >= $3 THEN 'DEAD'::dispatch_status
ELSE 'RETRY'::dispatch_status
END,
analysis_status = $4::analysis_status,
attempt_count = attempt_count + 1,
next_retry_at = CASE
WHEN attempt_count + 1 >= $3 THEN NULL
ELSE NOW() + INTERVAL '5 minutes'
END,
last_error = LEFT($2, 1000),
updated_at = NOW()
WHERE request_id = $1
RETURNING dispatch_status::text
"""

async with self._pool.acquire() as conn:
row = await conn.fetchrow(sql, request_id, last_error, max_attempts, analysis_status)

return {str(row["request_id"]) for row in rows}
return str(row["dispatch_status"]) if row is not None else "RETRY"
24 changes: 14 additions & 10 deletions app/realtime/api/v1/recommendation.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
from fastapi import APIRouter, Depends
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi.responses import Response
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_db_session
from app.schemas.recommendation import RecommendationRequest, RecommendationResponse
from app.services.recommendation_service import get_recommendation
from app.schemas.recommendation import RecommendationRequest
from app.services.recommendation_service import run_recommendation_and_publish_to_kafka

router = APIRouter()


@router.post("/recommendations", response_model=RecommendationResponse)
@router.post("/recommendations", status_code=202)
async def post_recommendations(
body: RecommendationRequest,
background_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_db_session),
) -> RecommendationResponse:
return await get_recommendation(
session=session,
member_id=body.member_id,
profile_text=body.profile_text,
)
) -> Response:
"""
202 Accepted ์ฆ‰์‹œ ๋ฐ˜ํ™˜. ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ ์ถ”์ฒœ ์ƒ์„ฑ ํ›„ Kafka recommendation-topic ๋ฐœํ–‰.
Spring์ด Kafka consume โ†’ persona_recommendation ์ ์žฌ โ†’ CompletableFuture.complete(๊ฒฐ๊ณผ).
"""
_ = session
background_tasks.add_task(run_recommendation_and_publish_to_kafka, body.member_id)
return Response(status_code=202, content=None)
14 changes: 14 additions & 0 deletions app/realtime/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from __future__ import annotations

import logging
import os
import re

from fastapi import FastAPI

Expand All @@ -14,10 +16,22 @@
configure_logging(settings.debug)


def _mask_database_url(url: str) -> str:
"""๋น„๋ฐ€๋ฒˆํ˜ธ๋งŒ ๋งˆ์Šคํ‚นํ•œ URL (์—ฐ๊ฒฐ ๋Œ€์ƒ ํ™•์ธ์šฉ)."""
if not url:
return "(empty)"
return re.sub(r"(:[^:@]+)(@)", r":****\2", url, count=1)


def create_app() -> FastAPI:
application = FastAPI(title=settings.app_name)
application.include_router(api_router, prefix=settings.api_v1_prefix)

@application.on_event("startup")
async def log_db_target() -> None:
url = get_settings().effective_database_url
logging.info("DB ์—ฐ๊ฒฐ ๋Œ€์ƒ: %s", _mask_database_url(url))

@application.get("/")
async def root() -> dict[str, str]:
return {"app": settings.app_name, "docs": "/docs", "health": "/health"}
Expand Down
1 change: 1 addition & 0 deletions app/schemas/analysis_request_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
class AnalysisRequestMessage(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="ignore")

type: str | None = Field(default=None, alias="type")
dispatch_request_id: str = Field(..., alias="dispatchRequestId", min_length=1)
case_id: int = Field(..., alias="caseId", ge=1)
analyzer_version: int = Field(..., alias="analyzerVersion", ge=1)
17 changes: 14 additions & 3 deletions app/schemas/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@ class Segment(str, Enum):


class RecommendationRequest(BaseModel):
member_id: int
profile_text: str | None = None # ํ…Œ์ŠคํŠธ์šฉ: ์žˆ์œผ๋ฉด ์ด ํ…์ŠคํŠธ์™€ ์œ ์‚ฌํ•œ ์ƒํ’ˆ top-k ์ถ”์ฒœ
"""POST /recommendations ์š”์ฒญ. Body: {"memberId": number}"""
member_id: int = Field(..., alias="memberId", description="ํšŒ์› PK")

model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True)


class RecommendedProductItem(BaseModel):
"""๋ช…์„ธ 2.3: Spring/ํ”„๋ก ํŠธ ์—ฐ๋™์šฉ ์ถ”์ฒœ ์ƒํ’ˆ ํ•œ ๊ฑด. embedding_text๋Š” ๋‚ด๋ถ€์šฉ์ด๋ผ ์‘๋‹ต์—์„œ ์ œ์™ธ."""
rank: int = Field(..., serialization_alias="rank")
product_id: int = Field(..., serialization_alias="productId")
reason: str
product_name: str = Field(..., serialization_alias="productName")
product_type: str = Field(..., serialization_alias="productType")
product_price: int = Field(..., serialization_alias="productPrice")
sale_price: int = Field(..., serialization_alias="salePrice")
tags: list[str] = Field(..., serialization_alias="tags")
llm_reason: str = Field(..., serialization_alias="llmReason")

model_config = ConfigDict(serialize_by_alias=True)

Expand All @@ -27,5 +36,7 @@ class RecommendationResponse(BaseModel):
recommended_products: list[RecommendedProductItem] = Field(
..., serialization_alias="recommendedProducts"
)
source: str = Field(..., serialization_alias="source") # CACHE | LIVE
updated_at: str = Field(..., serialization_alias="updatedAt") # ISO 8601

model_config = ConfigDict(serialize_by_alias=True)
Loading
Loading