[HCR-226] analysis-server-v0.0.5#25
Conversation
[HCR-226] 회원별 키워드 집계기(Aggregator) 구현 및 추출 파이프라인(Stage 2, 3) 정확도 개선
…ytics into feat/HSC-228 # Conflicts: # app/pipeline/aggregator.py # tests/test_e2e.py
[HSC-228] Kafka 소비 기반 SQL 분석 워커 전환
[HCR-235] 추천 api + 상품 임베딩
[HCR-244] Realtime API 분리 및 batch/docker 실행 구조 정리
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 이 Pull Request는 분석 서버의 배포 체계를 namespaced 릴리즈로 전환하고, 핵심 분석 로직을 Kafka 기반으로 재구축하여 처리 효율성과 확장성을 대폭 개선하는 것을 목표로 합니다. 또한, 회원별 키워드 집계 기능과 키워드 추출 정확도 개선을 통해 분석 결과의 유용성을 높였으며, Docker 환경에서의 배치 및 실시간 애플리케이션 실행 모드를 명확히 분리하여 운영 안정성을 강화했습니다. 더불어, 향후 추천 시스템 구축을 위한 기반 API와 상품 임베딩 파이프라인을 추가하여 서비스 확장의 발판을 마련했습니다. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This PR is an impressive refactoring that transitions the analysis server's architecture from EFS file-based to Kafka and PostgreSQL-based, incorporating improvements like multi-stage Docker builds, robust entrypoint scripts, and a real-time recommendation API. However, significant security concerns have been identified: a prompt injection vulnerability in the recommendation service and a lack of authentication on the new recommendation API endpoint. Additionally, some legacy code from the previous architecture remains, which could lead to runtime errors, and there are opportunities to enhance the performance and code design of the new Kafka consumer. These security issues should be addressed before deployment to production.
| from typing import Dict, Any, List | ||
|
|
||
| from app.core.config import Settings | ||
| from app.infra.efs.paths import build_res_dir |
There was a problem hiding this comment.
이 파일에서 app.infra.efs.paths 모듈의 build_res_dir 함수를 가져오고 있습니다. 하지만 이번 PR에서 app.infra.efs/paths.py 파일이 삭제되어, 애플리케이션 실행 시 ImportError가 발생할 것입니다. 이 집계기(Aggregator)는 이전 EFS 기반 아키텍처에 의존하는 것으로 보이며, 현재의 Kafka/DB 기반 흐름과 맞지 않습니다. 이 파일과 관련 테스트(tests/test_aggregator.py)가 여전히 필요한지 확인하고, 필요하다면 EFS 의존성을 제거하거나, 필요 없다면 삭제하는 것을 고려해 주세요.
| from pathlib import Path | ||
|
|
||
| from app.core.config import Settings | ||
| from app.services.analyze_service import AnalyzeService |
| prompt = f"""사용자 프로필: {profile_text} | ||
|
|
||
| 아래 상품들을 이 프로필에 맞춰 추천했습니다. 각 상품을 왜 추천했는지 한 문장으로만 설명해주세요. | ||
| 상품 목록: | ||
| {product_list} | ||
|
|
||
| 응답은 반드시 JSON만 주세요. 다른 말 없이 예시 형식만 따르세요. | ||
| 예시: {{"reasons": ["이유1", "이유2", "이유3"]}} | ||
| """ |
There was a problem hiding this comment.
The profile_text provided by the user is directly concatenated into the LLM prompt in the _generate_recommendation_reasons function. This allows an attacker to perform prompt injection attacks to manipulate the LLM's behavior, potentially leading to the generation of malicious content or bypassing intended constraints.
Remediation: Sanitize the profile_text input and use a more robust prompt structure. Consider using system messages to define the LLM's role and constraints, and use delimiters to clearly separate user input from instructions.
| async def _process_batch(self, batch: list[AnalysisRequestMessage]) -> None: | ||
| assert self._analysis_repository is not None | ||
| assert self._outbox_repository is not None | ||
| assert self._analysis_service is not None | ||
|
|
||
| unique_request_ids = list(dict.fromkeys(msg.dispatch_request_id for msg in batch)) | ||
| acked_request_ids = await self._outbox_repository.mark_acked_by_request_ids(unique_request_ids) | ||
| acked_count = len(acked_request_ids) | ||
|
|
||
| unique_pairs = list(dict.fromkeys((msg.case_id, msg.analyzer_version) for msg in batch)) | ||
| case_ids = [pair[0] for pair in unique_pairs] | ||
| analyzer_versions = [pair[1] for pair in unique_pairs] | ||
| target_rows = await self._analysis_repository.find_targets_by_case_and_version(case_ids, analyzer_versions) | ||
| target_by_pair = { | ||
| (int(row["case_id"]), int(row["analyzer_version"])): row | ||
| for row in target_rows | ||
| } | ||
| missing_pairs = [pair for pair in unique_pairs if pair not in target_by_pair] | ||
|
|
||
| keyword_rows = await self._analysis_repository.load_active_keyword_rows() | ||
| keyword_dict_rows = [dict(row) for row in keyword_rows] | ||
| self._analysis_service.load_dictionary(keyword_dict_rows) | ||
| keyword_name_by_id: dict[int, str] = {} | ||
| for row in keyword_dict_rows: | ||
| keyword_id = int(row["business_keyword_id"]) | ||
| if keyword_id not in keyword_name_by_id: | ||
| keyword_name_by_id[keyword_id] = str(row["keyword_name"]) | ||
|
|
||
| targets = [dict(row) for row in target_rows] | ||
| mapping_rows, completed_ids, failed_items = self._analysis_service.analyze_targets(targets) | ||
| # DB write 권한은 Spring에만 있으므로 Python에서는 결과를 DB에 반영하지 않는다. | ||
| # (business_keyword_mapping_result INSERT, consultation_analysis status UPDATE 미수행) | ||
|
|
||
| if self._settings.kafka_log_each_message: | ||
| self._log_message_outcomes( | ||
| batch=batch, | ||
| acked_request_ids=acked_request_ids, | ||
| target_by_pair=target_by_pair, | ||
| mapping_rows=mapping_rows, | ||
| completed_ids=completed_ids, | ||
| failed_items=failed_items, | ||
| keyword_name_by_id=keyword_name_by_id, | ||
| ) | ||
|
|
||
| logger.info( | ||
| "Kafka batch consumed. messages=%d unique_requests=%d acked=%d unique_pairs=%d loaded_targets=%d missing_pairs=%d completed=%d failed=%d mappings=%d (only-outbox-write=enabled)", | ||
| len(batch), | ||
| len(unique_request_ids), | ||
| acked_count, | ||
| len(unique_pairs), | ||
| len(target_rows), | ||
| len(missing_pairs), | ||
| len(completed_ids), | ||
| len(failed_items), | ||
| len(mapping_rows), | ||
| ) |
There was a problem hiding this comment.
| @router.post("/recommendations", response_model=RecommendationResponse) | ||
| async def post_recommendations( | ||
| body: RecommendationRequest, | ||
| session: AsyncSession = Depends(get_db_session), | ||
| ) -> RecommendationResponse: | ||
| return await get_recommendation( | ||
| session=session, | ||
| member_id=body.member_id, | ||
| profile_text=body.profile_text, | ||
| ) |
There was a problem hiding this comment.
The /recommendations endpoint is exposed without any authentication or authorization mechanisms. This allows any unauthenticated user to call the service, which could lead to unauthorized use of the recommendation logic and potential abuse of the underlying LLM API, leading to increased costs or denial of service.
Remediation: Implement an authentication layer (e.g., JWT, API Keys) and ensure that only authorized users or internal services can access this endpoint.
| service._db_pool = await create_postgres_pool(settings) # noqa: SLF001 | ||
| service._analysis_repository = AnalysisRepository(service._db_pool) # noqa: SLF001 | ||
| service._outbox_repository = DispatchOutboxRepository(service._db_pool) # noqa: SLF001 | ||
| service._analysis_service = SqlKeywordAnalysisService() # noqa: SLF001 |
There was a problem hiding this comment.
🚀배포 목적
analysis-server-v0.0.5는 키워드 분석 배치 앱의 첫 공식 namespaced 릴리즈입니다.기준 SHA
2eaf5f9를 대상으로 legacy linev0.0.4를 이어받아 발행하며, 분석 서버를 독립 배포 가능한 태그 체계로 전환한 뒤 처음 수행하는 배포입니다.🏷️ Release Level
📦 배포 대상 (1개 이상)
📝주요 변경 사항
[HCR-226] 회원별 키워드 집계기(Aggregator) 구현 및 추출 파이프라인(Stage 2, 3) 정확도 개선[HSC-228] Kafka 소비 기반 SQL 분석 워커 전환[HCR-244] Realtime API 분리 및 batch/docker 실행 구조 정리analysis-server-vX.Y.Z형식으로 발행됨v0.0.4를 이어받아 첫 namespaced tag는analysis-server-v0.0.5임✅배포 체크리스트
🔁롤백 계획
🎫 Jira Ticket
closed #23