Skip to content

[HSC-288] recommendation-server-v0.0.2#31

Merged
tkv00 merged 12 commits into
mainfrom
release/HSC-288
Mar 12, 2026
Merged

[HSC-288] recommendation-server-v0.0.2#31
tkv00 merged 12 commits into
mainfrom
release/HSC-288

Conversation

@tkv00

@tkv00 tkv00 commented Mar 12, 2026

Copy link
Copy Markdown
Contributor

🚀배포 목적

recommendation-server-v0.0.2를 배포합니다.

이전 recommendation 릴리즈 이후 반영된 추천 파이프라인 개선과 Kafka 연동 변경사항을 운영 환경에 배포하는 목적입니다.


🏷️ Release Level

  • release:major
  • release:minor
  • release:patch

📦 배포 대상 (1개 이상)

  • deploy:api-server
  • deploy:worker
  • deploy:counseling-analytics
  • deploy:analysis-server
  • deploy:recommendation-server
  • deploy:customer-web
  • deploy:admin-web

📝작업 내용

  • recommendation-server 신규 릴리즈 v0.0.2 배포
  • 추천 서비스 RAG 파이프라인 구조 개선 반영
  • Kafka 연동 및 공통 인증 처리 반영

👀변경 사항

구분 변경 내용 비고
추천 파이프라인 RAG 파이프라인 구조 개선 및 클래스화 추천 흐름 정리
Kafka 연동 recommendation-server Kafka 연결 경로 반영 운영 연동
Kafka 인증 MSK IAM 기반 인증 처리 반영 공통 Kafka 설정 확장
포함 PR #29, #28 이번 recommendation 릴리즈 포함 범위

🎫 Jira Ticket

  • Jira Ticket: HSC-288

#️⃣관련 이슈

@tkv00 tkv00 added 🏷️ release 릴리즈 준비/버전 태깅/릴리즈 노트/릴리즈 브랜치 작업 ☁️ area: INFRA 인프라/운영/배포 영역 🔥 priority: P0 즉시 처리 필요(서비스/데모 블로커) release:patch 버전 patch bump: X.Y.(Z+1) deploy:recommendation-server 배포 대상: recommendation-server labels Mar 12, 2026
@github-actions github-actions Bot changed the title recommendation-server-v0.0.2 [HSC-288] recommendation-server-v0.0.2 Mar 12, 2026
@gemini-code-assist

Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

이 Pull Request는 추천 시스템의 핵심 구성 요소인 추천 파이프라인과 Kafka 연동 방식을 대폭 개선하여 recommendation-server-v0.0.2를 배포하는 것을 목표로 합니다. 고객 컨텍스트를 활용한 개인화된 추천 로직을 강화하고, Kafka를 통한 비동기 메시지 처리 및 보안 인증을 도입하여 시스템의 안정성과 확장성을 높였습니다. 또한, 새로운 analysis-server 모드를 추가하여 서비스 운영의 유연성을 확보했습니다.

Highlights

  • 추천 파이프라인 개선: RAG(Retrieval Augmented Generation) 파이프라인 구조를 개선하고 클래스 기반으로 재구성하여, 고객 컨텍스트 기반의 동적 쿼리 생성, 타입 가중치 벡터 검색, 세그먼트/페르소나 기반 프롬프트 엔지니어링을 도입했습니다.
  • Kafka 연동 및 인증 강화: Kafka 연결 경로를 반영하고 MSK IAM 기반 인증 처리를 위한 공통 Kafka 설정 확장 및 aws-msk-iam-sasl-signer-python 라이브러리를 추가했습니다.
  • 비동기 추천 API 도입: 추천 API 엔드포인트(POST /recommendations)가 202 Accepted를 즉시 반환하고, 실제 추천 생성 및 Kafka 발행은 백그라운드 태스크로 처리하도록 변경하여 응답성을 향상시켰습니다.
  • 새로운 analysis-server 모드 추가: 독립적인 analysis-server FastAPI 애플리케이션을 추가하여 분석 서버의 시작, 중지, 상태 확인(health/readiness) 기능을 제공합니다.
  • Kafka 메시지 처리 로직 개선: Kafka 컨슈머 서비스에서 메시지 처리 후 응답 발행 로직을 강화하고, 실패 시 재시도 및 데드 레터 처리 기능을 추가했습니다.
Changelog
  • app/analysis_server/main.py
    • 새로운 FastAPI 애플리케이션을 추가하여 분석 서버의 진입점을 정의했습니다.
    • KafkaAnalysisConsumerService의 시작/중지 및 health/readiness 엔드포인트를 구현했습니다.
  • app/batch/main.py
    • Kafka 클라이언트 옵션 빌더(build_kafka_client_options)를 임포트하고 AIOKafkaConsumer 초기화에 적용했습니다.
  • app/core/config.py
    • Kafka 관련 설정 필드(예: kafka_recommendation_topic, kafka_analysis_response_topic, kafka_security_protocol, kafka_sasl_mechanism, kafka_aws_region 등)를 추가했습니다.
    • openai_embedding_model에 대한 주석을 추가하여 사용 목적을 명확히 했습니다.
  • app/infra/kafka/client_options.py
    • 새로운 파일을 추가하여 Kafka 클라이언트 연결 옵션을 동적으로 구성하는 build_kafka_client_options 함수를 정의했습니다.
    • MSK IAM 인증을 위한 MskIamTokenProvider 클래스를 구현했습니다.
  • app/infra/postgres/dispatch_outbox_repository.py
    • mark_acked_by_request_ids 메서드를 load_metadata_by_request_ids로 변경하여 아웃박스 메타데이터를 로드하도록 했습니다.
    • Kafka 응답 메시지 발송을 준비하는 prepare_response_dispatch 메서드를 추가했습니다.
    • Kafka 응답 메시지 발송 실패 시 재시도 로직을 처리하는 mark_response_retry 메서드를 추가했습니다.
  • app/realtime/api/v1/recommendation.py
    • post_recommendations 엔드포인트를 비동기 처리 방식으로 변경하여 202 Accepted를 즉시 반환하고, 추천 생성 및 Kafka 발행을 백그라운드 태스크로 실행하도록 했습니다.
    • 응답 모델을 RecommendationResponse에서 Response로 변경했습니다.
  • app/realtime/main.py
    • 로깅 및 정규식 모듈을 임포트하고, 데이터베이스 연결 대상을 마스킹하여 로깅하는 _mask_database_url 함수와 log_db_target 스타트업 이벤트를 추가했습니다.
  • app/schemas/analysis_request_message.py
    • AnalysisRequestMessage 스키마에 type 필드를 추가했습니다.
  • app/schemas/recommendation.py
    • RecommendationRequest 스키마에서 profile_text 필드를 제거하고 member_id만 남겼습니다.
    • RecommendedProductItem 스키마에 rank, product_name, product_type, product_price, sale_price, tags, llm_reason 필드를 추가하고 reason 필드를 제거했습니다.
    • RecommendationResponse 스키마에 sourceupdated_at 필드를 추가했습니다.
  • app/services/analysis_outcome_service.py
    • build_message_outcomes 메서드의 인수를 acked_request_ids에서 outbox_metadata_by_request_id로 변경했습니다.
    • 생성되는 아웃컴 메시지에 typechunkId 필드를 추가했습니다.
  • app/services/kafka_analysis_consumer_service.py
    • Kafka 컨슈머 서비스 초기화 시 KafkaResultPublisherServiceAnalysisOutcomeService를 사용하도록 변경했습니다.
    • Kafka 클라이언트 옵션 빌더(build_kafka_client_options)를 적용하여 Kafka 연결을 설정했습니다.
    • 서비스 시작/중지 로직을 개선하고, health/readiness 체크를 위한 페이로드 생성 메서드를 추가했습니다.
    • 메시지 처리 로직에서 DispatchOutboxRepository의 새로운 메서드를 사용하여 응답 발행 및 재시도 처리를 구현했습니다.
  • app/services/kafka_request_consumer_service.py
    • Kafka 클라이언트 옵션 빌더(build_kafka_client_options)를 임포트하고 AIOKafkaConsumer 초기화에 적용했습니다.
  • app/services/kafka_result_publisher_service.py
    • Kafka 프로듀서 초기화 시 build_kafka_client_options를 적용했습니다.
    • 단일 메시지를 발행하는 publish_response_message 메서드를 추가했습니다.
  • app/services/persona_recommendation_prompts.py
    • 새로운 파일을 추가하여 세그먼트 및 페르소나별 LLM 시스템 프롬프트와 유저 프롬프트 생성 로직을 정의했습니다.
    • 상품 목록을 LLM 입력 형식에 맞게 포맷팅하는 format_products 함수를 포함했습니다.
  • app/services/recommendation_service.py
    • 추천 로직을 RecommendationService 클래스로 캡슐화하고, 고객 컨텍스트(ctx) 기반의 RAG 파이프라인을 구현했습니다.
    • member_llm_context를 조회하여 동적으로 쿼리 텍스트를 생성하고 임베딩하는 기능을 추가했습니다.
    • pgvector를 이용한 유사도 검색 시 product_type_clicks 기반 가중치 및 CHURN_RISK 세그먼트의 가격 상한 필터링 로직을 도입했습니다.
    • 데이터 사용량 패턴에 따른 상품 재정렬 로직을 추가했습니다.
    • LLM을 활용하여 전반적인 마케팅 문구와 상품별 추천 이유를 생성하도록 변경했습니다.
    • Kafka를 통해 추천 결과를 발행하는 run_recommendation_and_publish_to_kafka 함수를 추가했습니다.
    • DB 연결 및 LLM 호출 실패 시 안전하게 폴백 추천을 제공하는 로직을 구현했습니다.
  • app/services/retrieval_query_builder.py
    • 새로운 파일을 추가하여 member_llm_context 데이터를 기반으로 retrieval 쿼리 텍스트를 생성하는 build_retrieval_query_text 함수를 정의했습니다.
  • docker-entrypoint.sh
    • 새로운 analysis-server APP_MODE를 추가하여 해당 모드에서 app.analysis_server.main:app을 실행하도록 했습니다.
  • requirements.txt
    • aws-msk-iam-sasl-signer-python==1.0.2 라이브러리를 추가했습니다.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

이 PR은 추천 파이프라인을 개선하고 Kafka 연동을 강화하는 중요한 변경사항을 담고 있습니다. 전반적으로 RAG 파이프라인 도입, 비동기 처리, Kafka 인증 및 outbox 패턴 강화 등 아키텍처가 크게 개선되었습니다. 코드 구조가 잘 잡혀 있고, 특히 프롬프트 관리나 설정 중앙화 등 좋은 사례들이 많이 보입니다. 다만, 메시지 중복 발행 가능성과 Kafka 프로듀서 관리 효율성에 대한 두 가지 주요 개선점을 발견하여 코멘트를 남겼습니다. 이 부분들을 수정하면 시스템의 안정성과 성능이 더욱 향상될 것입니다.

Comment on lines +43 to +44
WHERE request_id = $1
AND dispatch_status <> 'ACKED'::dispatch_status

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

prepare_response_dispatch 메소드의 WHERE 조건이 불충분하여 메시지 중복 발행의 위험이 있습니다. 현재 dispatch_status <> 'ACKED'만 확인하고 있어, SENT 상태인 메시지도 다시 처리 대상으로 간주됩니다. kafka_analysis_consumer_service에서 배치 처리 중 일부 메시지 발행 실패로 재시도가 발생하면, 이미 성공적으로 발행된 메시지들이 다시 발행될 수 있습니다. WHERE 조건에 SENT 상태도 제외하여 중복 처리를 방지해야 합니다.

Suggested change
WHERE request_id = $1
AND dispatch_status <> 'ACKED'::dispatch_status
WHERE request_id = $1
AND dispatch_status NOT IN ('SENT', 'ACKED')

Comment on lines +651 to +674
async def publish_recommendation_to_kafka(
member_id: int,
response: RecommendationResponse,
) -> None:
"""추천 결과를 Kafka recommendation-topic으로 발행. Spring Consumer가 수신 후 persona_recommendation 적재."""
settings = get_settings()
topic = getattr(settings, "kafka_recommendation_topic", "recommendation")
bootstrap = getattr(settings, "kafka_bootstrap_servers", "").strip()
if not bootstrap:
logging.warning("recommendation: Kafka 미설정, 발행 스킵 member_id=%s", member_id)
return
payload = {"memberId": member_id, **response.model_dump(by_alias=True)}
producer = AIOKafkaProducer(
bootstrap_servers=[s.strip() for s in bootstrap.split(",") if s.strip()],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
)
try:
await producer.start()
await producer.send_and_wait(topic, value=payload, key=str(member_id).encode("utf-8"))
logging.info("recommendation: Kafka 발행 완료 member_id=%s topic=%s", member_id, topic)
except Exception as e:
logging.error("recommendation: Kafka 발행 실패 member_id=%s: %s", member_id, e, exc_info=True)
finally:
await producer.stop()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

publish_recommendation_to_kafka 함수가 호출될 때마다 AIOKafkaProducer 인스턴스를 새로 생성하고 시작/중지하고 있습니다. 이는 매 추천 발행 시마다 Kafka에 대한 새로운 연결을 설정하는 오버헤드를 유발하여 비효율적입니다. 프로듀서는 애플리케이션 시작 시 생성하고 종료 시 정리하는 긴 수명의 객체로 관리하는 것이 좋습니다. 예를 들어, recommendation-serverlifespan 컨텍스트 관리자나 유사한 메커니즘을 사용하여 프로듀서 인스턴스를 재사용하도록 리팩토링하는 것을 고려해 보세요.

@tkv00 tkv00 merged commit 118cca7 into main Mar 12, 2026
4 checks passed
@tkv00 tkv00 deleted the release/HSC-288 branch March 20, 2026 01:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☁️ area: INFRA 인프라/운영/배포 영역 deploy:recommendation-server 배포 대상: recommendation-server Infra Team 🔥 priority: P0 즉시 처리 필요(서비스/데모 블로커) release:patch 버전 patch bump: X.Y.(Z+1) 🏷️ release 릴리즈 준비/버전 태깅/릴리즈 노트/릴리즈 브랜치 작업

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants