Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions app/core/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from collections.abc import AsyncGenerator

from pgvector.asyncpg import register_vector
try:
from pgvector.asyncpg import register_vector
except Exception: # pragma: no cover
register_vector = None # type: ignore[assignment]
from sqlalchemy import event, text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
Expand Down Expand Up @@ -34,9 +37,10 @@ def create_engine() -> AsyncEngine:
if (settings.effective_database_url or "").strip():
engine = create_engine()
# pgvector 등둝: asyncpg μ—°κ²° μ‹œ vector νƒ€μž… 등둝
@event.listens_for(engine.sync_engine, "connect")
def register_pgvector(dbapi_connection, connection_record) -> None:
dbapi_connection.run_async(register_vector)
if register_vector is not None:
@event.listens_for(engine.sync_engine, "connect")
def register_pgvector(dbapi_connection, connection_record) -> None:
dbapi_connection.run_async(register_vector)

SessionLocal = async_sessionmaker(
bind=engine,
Expand Down
147 changes: 116 additions & 31 deletions app/services/recommendation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import re
from datetime import datetime, timezone

from aiokafka import AIOKafkaProducer
try:
from aiokafka import AIOKafkaProducer
except Exception: # pragma: no cover
# 평가 슀크립트/둜컬 ν™˜κ²½μ—μ„œ Kafka μ˜μ‘΄μ„±μ΄ 없을 수 μžˆλ‹€.
# Kafka λ―Έμ„€μ • μ‹œ publish_recommendation_to_kafkaμ—μ„œ μ•ˆμ „ν•˜κ²Œ μŠ€ν‚΅ν•œλ‹€.
AIOKafkaProducer = None # type: ignore[assignment]

logger = logging.getLogger(__name__)
from openai import AsyncOpenAI
Expand Down Expand Up @@ -71,6 +76,17 @@
LIMIT :k
""")

# product_type별 벑터 검색: νƒ€μž…μ„ λΆ„μ‚°μ‹œμΌœ 후보 ν’€ 편ν–₯을 μ™„ν™”
SEARCH_SIMILAR_BY_TYPE_SQL = text("""
SELECT product_id
FROM product
WHERE embedding_vector IS NOT NULL
AND product_type = :ptype
AND (NOT (product_id = ANY(:exclude_ids)))
ORDER BY embedding_vector <#> :query_vec
LIMIT :k
""")

# μΆ”μ²œ 후보 μƒν’ˆ 상세 쑰회. data_amountλŠ” OVER/UNDER μž¬μ •λ ¬μš© (mobile_planΒ·tab_watch_plan)
FETCH_PRODUCTS_FULL_SQL = text("""
SELECT
Expand All @@ -96,6 +112,16 @@
# retrieval 후보 수 (LLM에 넣은 λ’€ 3개 선택)
RETRIEVAL_CANDIDATES_K = 50

# νƒ€μž…λ³„λ‘œ μ΅œμ†Œ 후보λ₯Ό ν™•λ³΄ν•˜κΈ° μœ„ν•œ μ„€μ •
MAIN_PRODUCT_TYPES: tuple[str, ...] = (
"MOBILE_PLAN",
"INTERNET",
"IPTV",
"TAB_WATCH_PLAN",
"ADDON",
)
RETRIEVAL_PER_TYPE_K = 10

# member_llm_context 미ꡬ좕 μ‹œ μ‚¬μš©ν•  κΈ°λ³Έ 쿼리 ν…μŠ€νŠΈ
DEFAULT_RETRIEVAL_QUERY = "톡신 μš”κΈˆμ œ, 데이터 μš”κΈˆμ œ, λΆ€κ°€μ„œλΉ„μŠ€ μΆ”μ²œ"

Expand Down Expand Up @@ -654,30 +680,60 @@ async def _run_recommendation_with_context(
product_type_weights = compute_product_type_weights(ctx)
boost_type1, boost1, boost_type2, boost2 = _product_type_boost_from_weights(product_type_weights)
use_type_boost = (boost1 > 0 or boost2 > 0)
if use_type_boost:
result = await session.execute(
SEARCH_SIMILAR_WITH_TYPE_BOOST_SQL,
{
"query_vec": query_vec,
"exclude_ids": exclude_ids,
"k": RETRIEVAL_CANDIDATES_K,
"boost_type1": boost_type1,
"boost1": boost1,
"boost_type2": boost_type2,
"boost2": boost2,
},
)
else:
result = await session.execute(
SEARCH_SIMILAR_SQL,

# 2-1) νƒ€μž…λ³„ κ²€μƒ‰μœΌλ‘œ 후보 ν’€ λΆ„μ‚°(μ΅œμ†Œ 1개/νƒ€μž… μœ λ„)
seen: set[int] = set()
product_ids: list[int] = []
for ptype in MAIN_PRODUCT_TYPES:
r = await session.execute(
SEARCH_SIMILAR_BY_TYPE_SQL,
{
"query_vec": query_vec,
"exclude_ids": exclude_ids,
"k": RETRIEVAL_CANDIDATES_K,
"ptype": ptype,
"k": RETRIEVAL_PER_TYPE_K,
},
)
rows = result.fetchall()
product_ids = [r[0] for r in rows]
for row in r.fetchall():
pid = row[0]
if pid in seen:
continue
product_ids.append(pid)
seen.add(pid)

# 2-2) 뢀쑱뢄은 κΈ°μ‘΄ 전체 검색(κ°€μ€‘μΉ˜ boost 포함 κ°€λŠ₯)으둜 보좩
if len(product_ids) < RETRIEVAL_CANDIDATES_K:
if use_type_boost:
result = await session.execute(
SEARCH_SIMILAR_WITH_TYPE_BOOST_SQL,
{
"query_vec": query_vec,
"exclude_ids": exclude_ids,
"k": RETRIEVAL_CANDIDATES_K,
"boost_type1": boost_type1,
"boost1": boost1,
"boost_type2": boost_type2,
"boost2": boost2,
},
)
else:
result = await session.execute(
SEARCH_SIMILAR_SQL,
{
"query_vec": query_vec,
"exclude_ids": exclude_ids,
"k": RETRIEVAL_CANDIDATES_K,
},
)
for row in result.fetchall():
pid = row[0]
if pid in seen:
continue
product_ids.append(pid)
seen.add(pid)
if len(product_ids) >= RETRIEVAL_CANDIDATES_K:
break
Comment on lines +684 to +735

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

이 후보 μƒν’ˆ IDλ₯Ό κ°€μ Έμ˜€λŠ” λ‘œμ§μ€ _run_fallback_recommendation ν•¨μˆ˜ λ‚΄μ˜ 1078-1114행에 μžˆλŠ” 둜직과 맀우 μœ μ‚¬ν•©λ‹ˆλ‹€. μ½”λ“œ 쀑볡은 ν–₯ν›„ 버그 λ°œμƒ κ°€λŠ₯성을 높이고 μœ μ§€λ³΄μˆ˜λ₯Ό μ–΄λ ΅κ²Œ λ§Œλ“­λ‹ˆλ‹€.

이 μ€‘λ³΅λœ λ‘œμ§μ„ λ³„λ„μ˜ 헬퍼 ν•¨μˆ˜λ‘œ μΆ”μΆœν•˜μ—¬ 두 κ³³μ—μ„œ λͺ¨λ‘ μž¬μ‚¬μš©ν•˜λŠ” 것을 κ³ λ €ν•΄ λ³΄μ„Έμš”. 예λ₯Ό λ“€μ–΄, λ‹€μŒκ³Ό 같은 헬퍼 ν•¨μˆ˜λ₯Ό λ§Œλ“€ 수 μžˆμŠ΅λ‹ˆλ‹€.

async def _retrieve_candidate_products(
    session: AsyncSession,
    query_vec: list[float],
    exclude_ids: list[int],
    use_type_boost: bool = False,
    boost_type1: str = "",
    boost1: float = 0.0,
    boost_type2: str = "",
    boost2: float = 0.0,
) -> list[int]:
    # ... 쀑볡 둜직 κ΅¬ν˜„ ...
    return product_ids

μ΄λ ‡κ²Œ ν•˜λ©΄ _run_recommendation_with_context와 _run_fallback_recommendation ν•¨μˆ˜κ°€ 더 κ°„κ²°ν•΄μ§€κ³ , μΆ”μ²œ 후보λ₯Ό κ°€μ Έμ˜€λŠ” λ‘œμ§μ„ ν•œ κ³³μ—μ„œ 관리할 수 있게 λ©λ‹ˆλ‹€.


if not product_ids:
return RecommendationResponse(
segment=_segment_enum(ctx.get("segment")),
Expand Down Expand Up @@ -915,7 +971,6 @@ async def _run_recommendation_with_context(
)
)
used_ids.add(pid)
type_counts[tcode] = current + 1

return RecommendationResponse(
segment=_segment_enum(ctx.get("segment")),
Expand Down Expand Up @@ -1020,16 +1075,43 @@ async def _run_fallback_recommendation(
updated_at=_utc_now_iso(),
)

result = await fallback_session.execute(
SEARCH_SIMILAR_SQL,
{
"query_vec": query_vec,
"exclude_ids": [0],
"k": top_k,
},
)
rows = result.fetchall()
product_ids = [r[0] for r in rows]
# ν΄λ°±μ—μ„œλ„ νƒ€μž…λ³„ 후보λ₯Ό λ¨Όμ € 확보해 λͺ¨λ°”일 쏠림을 μ™„ν™”
seen: set[int] = set()
product_ids: list[int] = []
for ptype in MAIN_PRODUCT_TYPES:
r = await fallback_session.execute(
SEARCH_SIMILAR_BY_TYPE_SQL,
{
"query_vec": query_vec,
"exclude_ids": [0],
"ptype": ptype,
"k": RETRIEVAL_PER_TYPE_K,
},
)
for row in r.fetchall():
pid = row[0]
if pid in seen:
continue
product_ids.append(pid)
seen.add(pid)

if len(product_ids) < RETRIEVAL_CANDIDATES_K:
result = await fallback_session.execute(
SEARCH_SIMILAR_SQL,
{
"query_vec": query_vec,
"exclude_ids": [0],
"k": RETRIEVAL_CANDIDATES_K,
},
)
for row in result.fetchall():
pid = row[0]
if pid in seen:
continue
product_ids.append(pid)
seen.add(pid)
if len(product_ids) >= RETRIEVAL_CANDIDATES_K:
break
logger.info("recommendation: 폴백 벑터 검색 μ™„λ£Œ product_ids=%s", product_ids[:10] if len(product_ids) > 10 else product_ids)
if not product_ids:
return RecommendationResponse(
Expand Down Expand Up @@ -1133,6 +1215,9 @@ async def publish_recommendation_to_kafka(
settings = get_settings()
topic = getattr(settings, "kafka_recommendation_topic", "recommendation")
bootstrap = getattr(settings, "kafka_bootstrap_servers", "").strip()
if AIOKafkaProducer is None:
logger.warning("recommendation: aiokafka λ―Έμ„€μΉ˜, Kafka λ°œν–‰ μŠ€ν‚΅ member_id=%s", member_id)
return
if not bootstrap:
logger.warning("recommendation: Kafka λ―Έμ„€μ •, λ°œν–‰ μŠ€ν‚΅ member_id=%s", member_id)
return
Expand Down
Loading