Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions kg/neo4j_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import logging
import os
import re
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Iterator
Expand All @@ -22,6 +23,11 @@

logger = logging.getLogger(__name__)

# 쿼리에 LIMIT 절이 이미 있는지 판별. substring " LIMIT " 는 줄바꿈/탭으로 시작하는
# LIMIT(예: "ORDER BY score DESC\nLIMIT $k")를 놓쳐 자동 부착이 중복되므로
# word-boundary 정규식으로 잡는다.
_LIMIT_RE = re.compile(r"\bLIMIT\b", re.IGNORECASE)


@dataclass(frozen=True)
class Neo4jConfig:
Expand Down Expand Up @@ -98,7 +104,6 @@ def write(self, query: str, **params) -> list[dict]:
@classmethod
def _ensure_limit(cls, query: str) -> str:
"""쿼리에 LIMIT 이 없으면 끝에 부착. 단순 휴리스틱이지만 Spike 충분."""
upper = query.upper()
if " LIMIT " in upper or upper.rstrip().endswith(";"):
if _LIMIT_RE.search(query) or query.rstrip().endswith(";"):
return query
return f"{query.rstrip()} LIMIT {cls.DEFAULT_READ_LIMIT}"
271 changes: 271 additions & 0 deletions retrieval/bm25_retriever.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
"""BM25 Retriever — 어휘 fulltext 검색 (Plan A, Hybrid 1단계).

자연어 질문 → Neo4j fulltext 인덱스(`chunk_fulltext`, Lucene/BM25 스코어) →
top-k 청크 → 자연어 답변.

기존 Layer A(text2cypher) / Layer B(local_retrieve) 와 달리 **그래프 구조를
전혀 타지 않는다**. entity 식별·MENTIONS traversal 없이 Chunk.text 를 어휘
매칭으로 직접 검색 → entity→MENTIONS 병목을 우회.

왜 만들었나 (근거):
- chunk-rerank 네거티브 결과(#70)가 가리킨 처방: 병목은 청크 *랭킹*이 아니라
후보 풀(graph-anchored, MENTIONS-only). 어휘 검색은 그 풀을 우회한다.
- BEIR (Thakur et al. 2021, arXiv 2104.08663): BM25 는 OOD 에서 dense 를 자주
능가하는 robust baseline. 정확 수치·고유명사(목표주가/종목코드/매출액)에서 강함.
- 본 프로젝트 80 QA 에서 doc-summary(BM25) 가 factual/numerical 을 압승
(AC 4.0 / Faithful 92% vs doc-graph 2.0 / 15~17%) — 같은 처방을 doc-graph 에 이식.

한국어 주의:
- fulltext 인덱스는 **cjk analyzer** 로 생성해야 함 (standard 는 공백 분리만 →
"두산밥캣의" ≠ "두산밥캣" 매칭 실패). 인덱스 DDL:
CREATE FULLTEXT INDEX chunk_fulltext IF NOT EXISTS
FOR (c:Chunk) ON EACH [c.text]
OPTIONS {indexConfig: {`fulltext.analyzer`: 'cjk'}}

안전장치:
1. read-only — queryNodes 는 조회 전용, 사용자 입력은 parameterized + Lucene escape.
2. 결과 크기 제한 — TOP_K_CHUNKS / CHUNK_TEXT_TRUNCATE 로 토큰 폭발 방지.
3. graceful fallback — 인덱스 부재 / 0건 / LLM 실패 모두 자연어 안내.

#24 Opik:
- 공개 진입점 `bm25_retrieve` 에 `@track`.

Reference:
- text2cypher.py / local_retriever.py — LLM 호출 / 프롬프트 로드 / @track 패턴 그대로.
- Cormack et al. 2009 (RRF) — 추후 graph 검색과 융합 시 사용 (Plan B).
"""

from __future__ import annotations

import logging
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from agent.llm_client import LLMClient
from kg.neo4j_client import Neo4jClient
from observability.tracing import track

logger = logging.getLogger(__name__)


# ── 상수 (text2cypher / local 과 일치) ───────────────────────
TEMPERATURE_ANSWER = 0.3
MAX_TOKENS_ANSWER = 700

FULLTEXT_INDEX = "chunk_fulltext" # cjk analyzer 로 생성된 Chunk.text fulltext 인덱스
TOP_K_CHUNKS = 8 # BM25 상위 N 청크 (어휘 검색은 graph 보다 넉넉히)
CHUNK_TEXT_TRUNCATE = 600 # 청크 1개당 char 상한 (LLM 컨텍스트용)

PROMPTS_DIR = Path(__file__).parent / "prompts"
ANSWER_PROMPT_PATH = PROMPTS_DIR / "bm25_answer_v1.md"

# Lucene 쿼리 특수문자 — escape 대상 (사용자 입력이 쿼리 연산자로 해석되는 것 방지)
_LUCENE_SPECIAL_RE = re.compile(r'([+\-!(){}\[\]^"~*?:\\/]|&&|\|\|)')


# ── 결과 객체 ────────────────────────────────────────────────
@dataclass
class BM25RetrieverResult:
"""BM25 retriever 한 회차의 전체 trace.

Opik / 디버깅용 모든 중간 산물 보존.
"""

question: str
retrieved_chunks: list[dict[str, Any]] = field(default_factory=list)
n_chunks: int = 0
answer: str = ""
elapsed_seconds: float = 0.0
error: str | None = None


# ── 프롬프트 로드 ────────────────────────────────────────────
def _load_prompt(path: Path) -> str:
return path.read_text(encoding="utf-8")


# ── 쿼리 sanitize ────────────────────────────────────────────
def _sanitize_query(question: str) -> str:
"""사용자 질문을 Lucene fulltext 쿼리로 안전하게 변환.

특수문자를 escape 해 Lucene 연산자로 오해석되는 것을 막는다. cjk analyzer 가
토큰화(bigram)하므로 별도 토큰 분해는 하지 않고, 양끝 공백만 정리.
"""
if not question:
return ""
escaped = _LUCENE_SPECIAL_RE.sub(r"\\\1", question)
return re.sub(r"\s+", " ", escaped).strip()


# ── BM25 fulltext 검색 ───────────────────────────────────────
# db.index.fulltext.queryNodes — Lucene 스코어(BM25) 내림차순. 인덱스 부재 시
# Cypher 예외 → 호출부에서 graceful 처리.
#
# 주의: Lucene 검색 문자열 파라미터를 $query 로 두면 Neo4jClient.read(self, query,
# **params) 의 첫 위치인자 `query` 와 이름이 겹쳐 TypeError 가 난다. 그래서
# $search_text 로 명명한다.
_BM25_CYPHER = """
CALL db.index.fulltext.queryNodes($index_name, $search_text) YIELD node, score
RETURN node.id AS chunk_id,
node.text AS text,
properties(node).page AS page,
score
ORDER BY score DESC
LIMIT $top_k
"""


def _bm25_chunks(
neo4j: Neo4jClient, query: str, top_k: int = TOP_K_CHUNKS
) -> tuple[list[dict[str, Any]], str | None]:
"""fulltext 인덱스에서 top-k 청크 검색.

Returns:
(chunks, error). 인덱스 부재 / 빈 쿼리 / 0건 모두 ([], reason) 로 graceful.
chunks 각 원소: {chunk_id, text(truncated), page, score}
"""
q = _sanitize_query(query)
if not q:
return [], "빈 질문입니다."
try:
rows = neo4j.read(
_BM25_CYPHER, index_name=FULLTEXT_INDEX, search_text=q, top_k=top_k
)
except Exception as exc: # noqa: BLE001
logger.warning("BM25 fulltext 검색 실패 (인덱스 미생성 가능): %s", exc)
return [], (
f"fulltext 인덱스 '{FULLTEXT_INDEX}' 검색 실패 — "
f"인덱스가 생성되지 않았을 수 있습니다: {exc}"
)

chunks: list[dict[str, Any]] = []
for r in rows:
cid = r.get("chunk_id")
text = r.get("text")
if not cid or not text:
continue
chunks.append(
{
"chunk_id": cid,
"text": text[:CHUNK_TEXT_TRUNCATE],
"page": r.get("page"),
"score": round(float(r.get("score") or 0.0), 3),
}
)

if not chunks:
return [], "fulltext 검색 결과 0건."
return chunks, None


# ── 답변 생성 ────────────────────────────────────────────────
@retry(
retry=retry_if_exception_type(Exception),
wait=wait_exponential(multiplier=1, min=2, max=10),
stop=stop_after_attempt(3),
reraise=True,
)
def _call_llm_text(llm: LLMClient, system_prompt: str, user_prompt: str) -> str:
return llm.chat(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE_ANSWER,
max_tokens=MAX_TOKENS_ANSWER,
)


def _generate_answer(
llm: LLMClient,
question: str,
chunks: list[dict[str, Any]],
answer_system_prompt: str,
note: str = "",
) -> str:
"""질문 + BM25 청크 → 자연어 답변. 0건 / 실패 모두 LLM 에 그대로 넘겨 안내."""
import json

user_payload = {
"question": question,
"retrieved_chunks": chunks,
"note": note,
}
user_prompt = json.dumps(user_payload, ensure_ascii=False, indent=2)

try:
answer = _call_llm_text(llm, answer_system_prompt, user_prompt)
except Exception as exc: # noqa: BLE001
logger.warning("BM25 답변 생성 LLM 호출 실패: %s", exc)
if not chunks:
return note or "검색된 자료에서 답을 찾지 못했습니다."
return f"{len(chunks)}개 청크를 검색했으나 자연어 변환에 실패했습니다."

return answer.strip()


# ── 공개 인터페이스 ──────────────────────────────────────────
@track
def bm25_retrieve(
question: str,
llm: LLMClient | None = None,
neo4j: Neo4jClient | None = None,
top_k: int = TOP_K_CHUNKS,
) -> BM25RetrieverResult:
"""자연어 질문 → BM25 fulltext 검색 → top-k 청크 → 자연어 답변.

Args:
question: 사용자 자연어 질문.
llm: 테스트용 mock 주입 가능. 기본 LLMClient().
neo4j: 테스트용 mock 주입 가능. 기본 Neo4jClient().
top_k: 검색할 청크 수.

Returns:
BM25RetrieverResult — 전체 trace 포함. answer 는 항상 채워짐.
"""
started = time.perf_counter()
if not question or not question.strip():
return BM25RetrieverResult(
question=question,
answer="질문이 비어 있습니다. 무엇이 궁금한지 입력해 주세요.",
elapsed_seconds=time.perf_counter() - started,
)

llm = llm or LLMClient()
own_neo4j = neo4j is None
neo4j = neo4j or Neo4jClient()

answer_system_prompt = _load_prompt(ANSWER_PROMPT_PATH)
logger.info("BM25Retriever 시작 — question=%r", question)

try:
chunks, error = _bm25_chunks(neo4j, question, top_k=top_k)
answer = _generate_answer(
llm, question, chunks, answer_system_prompt, note=error or "",
)
elapsed = time.perf_counter() - started
logger.info(
"BM25Retriever 완료 — chunks=%d elapsed=%.2fs error=%r",
len(chunks), elapsed, error,
)
return BM25RetrieverResult(
question=question,
retrieved_chunks=chunks,
n_chunks=len(chunks),
answer=answer,
elapsed_seconds=elapsed,
error=error,
)
finally:
if own_neo4j:
neo4j.close()
59 changes: 59 additions & 0 deletions retrieval/prompts/bm25_answer_v1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
당신은 BM25 fulltext 검색으로 가져온 문서 청크에서 사용자 질문에 정확히 답하는 친절한 분석가입니다.

## 입력 (JSON)

```
{
"question": "원래 사용자 질문",
"retrieved_chunks": [
{"chunk_id": "...", "text": "청크 원문 (max 600자)", "page": 3, "score": 12.4}
],
"note": "비어 있으면 정상, 채워져 있으면 검색 실패 사유"
}
```

`retrieved_chunks` 는 BM25(Lucene) 점수 내림차순 — 위쪽이 더 관련 높음.

## 출력 가이드

1. **질문에 직접 답변**: 청크 원문에 있는 내용으로만 답한다. 청크를 나열하지 말고 질문 의도에 맞춰 핵심만.

2. **수치·고유명사는 원문 그대로**: 목표주가, 매출액, OPM, 종목코드 등은 청크 원문의 표기를 **변형 없이** 옮긴다. 반올림·추정 금지.

3. **자료에 없으면 정직하게**: 검색된 청크에 답이 없으면 "검색된 자료에는 해당 정보가 없습니다" 라고 명확히 안내. 청크에 없는 내용을 지어내지 않는다 (faithfulness 보호).

4. **note 채워짐 (검색 0건)**: `note` 그대로 친절히 안내하고 대안 제안 (다른 키워드로 재시도 등).

5. **간결성**: 2~4 문장. 군더더기·반복 금지.

6. **출처 표기**: 인용한 청크에 page 가 있으면 "(p.N 인근)" 식으로 1번만.

## 출력 형식

plain text 만. 마크다운 헤더, 코드펜스, 불릿 절대 금지.

## 예시

### 예시 1 — factual / numerical

**입력 (요약)**: 질문 "두산밥캣의 목표주가는?", retrieved_chunks[0].text = "...두산밥캣의 1분기 실적은 매출액 2조 1,676억원... 목표주가 80,000원...", page=2

**답변**:

두산밥캣의 목표주가는 80,000원입니다 (p.2 인근). 같은 리포트에서 1분기 매출액은 2조 1,676억원으로 제시되어 있습니다.

### 예시 2 — 검색됐으나 답 없음

**입력 (요약)**: 질문 "두산밥캣의 배당 정책은?", retrieved_chunks 는 실적·목표주가 청크만 포함, 배당 언급 없음.

**답변**:

검색된 자료에는 두산밥캣의 배당 정책에 대한 내용이 없습니다. 실적·목표주가 관련 청크만 확인되며, 배당 관련 정보는 적재된 문서에서 찾을 수 없습니다.

### 예시 3 — 검색 0건 (note)

**입력 (요약)**: 질문 "현대차 실적은?", retrieved_chunks = [], note = "fulltext 검색 결과 0건."

**답변**:

'현대차' 로 검색된 청크가 없습니다. 현재 적재된 문서에는 현대차 관련 내용이 없는 것으로 보입니다. 적재된 회사(두산밥캣, 미래에셋증권 등)로 다시 질문해 주세요.
Loading