diff --git a/packages/ai-server/scripts/cost_probe.py b/packages/ai-server/scripts/cost_probe.py new file mode 100644 index 00000000..577541d7 --- /dev/null +++ b/packages/ai-server/scripts/cost_probe.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +# pyright: reportMissingImports=false +"""Single raw_post 의 전체 Gemini 파이프라인 비용을 실측한다. + +목적: cost-tracking 시스템 (DB pricing + per-call recorder) 을 만들기 전에 +"한 raw_post 처리에 실제로 token / image / grounding query 가 얼마나 +드는지" 를 ad-hoc 으로 측정. 결과로 plan 의 비용 추정치를 보강. + +원리: + - prod assets DB 에서 image_url 있는 raw_post 1건 random pick. + - prod 파이프라인이 만드는 8가지 호출 타입을 1회씩 (item 단위는 N=3 cap): + 1. hero_reframe gemini-2.5-flash-image image out + 2. subject gemini-2.5-flash image+text + 3. items gemini-2.5-pro image+text → N items + 4. spots gemini-2.5-flash hero+items + 5. thumbnail × N gemini-2.5-flash-image image out + 6. url_grounded × N gemini-2.5-flash + googleSearch + 7. url_filter × N gemini-2.5-flash + image + - 각 호출의 usage_metadata 캡처 (prompt_token_count / candidates_token_count / + cached_content_token_count, image 모델은 candidates_token_count 가 image + bytes 분 ≈ 1290). + - 끝에 합계 + Gemini 공시 단가 (스크립트 상단 상수) 로 USD 환산. + +단가 상수는 *프로브 한정* (production code 는 plan 대로 DB SOT). +출처는 `https://ai.google.dev/pricing` (2026-05 기준 manual 입력). + +Usage: + cd packages/ai-server + uv run python scripts/cost_probe.py + uv run python scripts/cost_probe.py --id +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import logging +import os +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional + +import asyncpg +import httpx +from google import genai +from google.genai import types as genai_types + + +_THIS = Path(__file__).resolve() +_AI_SERVER_ROOT = _THIS.parent.parent +sys.path.insert(0, str(_AI_SERVER_ROOT)) + +from src.services.raw_posts.processors.prompts import ( # noqa: E402 + HERO_REFRAME_PROMPT, + ITEM_THUMBNAIL_PROMPT, + ITEMS_PROMPT, + SPOTS_PROMPT, + SUBJECT_PROMPT, +) +from src.services.raw_posts.processors.items_parser import _ItemsResponse # noqa: E402 +from src.services.raw_posts.processors.spots_parser import _SpotsResponse # noqa: E402 +from src.services.raw_posts.processors.subject_parser import _SubjectDraft # noqa: E402 + + +logger = logging.getLogger("cost-probe") + + +# === Gemini 단가 (USD / unit) — 2026-05 ai.google.dev/pricing 기준 manual === +# 프로브 스크립트 한정. 운영 시스템은 DB pricing 테이블 SOT. +PRICING = { + "gemini-2.5-pro": { + "input_token": 1.25 / 1_000_000, # ≤200k context + "output_token": 10.0 / 1_000_000, + "cached_input_token": 0.3125 / 1_000_000, + }, + "gemini-2.5-flash": { + "input_token": 0.30 / 1_000_000, + "output_token": 2.50 / 1_000_000, + "cached_input_token": 0.075 / 1_000_000, + }, + "gemini-2.5-flash-image": { + "image_output": 0.039, # per image (1024x1024) + "input_token": 0.30 / 1_000_000, # image input still tokenized + }, + "grounding_query": 35.0 / 1_000, # $35 / 1000 google search queries +} + +ITEM_CAP = 3 # 실제 prod 평균 ≈ 5, probe 는 3 으로 cap (비용 ↓) + + +@dataclass +class CallLog: + step: str + model: str + ok: bool + prompt_tokens: int = 0 + completion_tokens: int = 0 + cached_tokens: int = 0 + image_output: int = 0 + grounding_queries: int = 0 + latency_ms: int = 0 + err: Optional[str] = None + est_cost_usd: float = 0.0 + + +calls: list[CallLog] = [] + + +def _extract_usage(resp: Any) -> tuple[int, int, int]: + um = getattr(resp, "usage_metadata", None) + if not um: + return (0, 0, 0) + return ( + getattr(um, "prompt_token_count", 0) or 0, + getattr(um, "candidates_token_count", 0) or 0, + getattr(um, "cached_content_token_count", 0) or 0, + ) + + +def _price_text(model: str, prompt: int, completion: int, cached: int) -> float: + p = PRICING.get(model, {}) + cost = 0.0 + cost += (prompt - (cached or 0)) * p.get("input_token", 0) + cost += completion * p.get("output_token", 0) + cost += (cached or 0) * p.get("cached_input_token", p.get("input_token", 0)) + return cost + + +async def _download(url: str) -> tuple[bytes, str]: + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as c: + r = await c.get(url) + r.raise_for_status() + ct = (r.headers.get("content-type") or "image/jpeg").split(";")[0].strip() + return r.content, ct + + +async def _pick_sample(db_url: str, raw_id: Optional[str]) -> dict: + conn = await asyncpg.connect(db_url) + try: + if raw_id: + row = await conn.fetchrow( + "SELECT id::text AS id, image_url, caption, parse_result FROM public.raw_posts WHERE id=$1::uuid", + raw_id, + ) + else: + row = await conn.fetchrow( + """ + SELECT id::text AS id, image_url, caption, parse_result + FROM public.raw_posts + WHERE image_url IS NOT NULL AND image_url != '' + AND status = 'COMPLETED' + AND parse_result IS NOT NULL + AND jsonb_typeof(parse_result -> 'items') = 'array' + AND jsonb_array_length(parse_result -> 'items') >= 3 + ORDER BY random() LIMIT 1 + """ + ) + if not row: + raise SystemExit("no eligible raw_post found") + return dict(row) + finally: + await conn.close() + + +async def _call_text( + client: genai.Client, + *, + step: str, + model: str, + contents: list, + response_schema: Optional[Any] = None, +) -> Any: + cfg = genai_types.GenerateContentConfig(temperature=0.1) + if response_schema is not None: + cfg = genai_types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=response_schema, + temperature=0.1, + ) + t0 = time.monotonic() + log = CallLog(step=step, model=model, ok=False) + try: + resp = await client.aio.models.generate_content(model=model, contents=contents, config=cfg) + log.ok = True + p, c, ca = _extract_usage(resp) + log.prompt_tokens, log.completion_tokens, log.cached_tokens = p, c, ca + log.est_cost_usd = _price_text(model, p, c, ca) + return resp + except Exception as exc: # noqa: BLE001 + log.err = f"{type(exc).__name__}: {exc}" + raise + finally: + log.latency_ms = int((time.monotonic() - t0) * 1000) + calls.append(log) + + +async def _call_image( + client: genai.Client, + *, + step: str, + image_bytes: bytes, + content_type: str, + prompt: str, + aspect_ratio: str, + image_size: str, +) -> Any: + cfg = genai_types.GenerateContentConfig( + response_modalities=["IMAGE"], + image_config=genai_types.ImageConfig(aspect_ratio=aspect_ratio, image_size=image_size), + ) + t0 = time.monotonic() + log = CallLog(step=step, model="gemini-2.5-flash-image", ok=False) + try: + resp = await client.aio.models.generate_content( + model="gemini-2.5-flash-image", + contents=[ + genai_types.Part.from_bytes(data=image_bytes, mime_type=content_type), + prompt, + ], + config=cfg, + ) + log.ok = True + p, c, ca = _extract_usage(resp) + log.prompt_tokens = p + log.image_output = 1 + log.est_cost_usd = ( + PRICING["gemini-2.5-flash-image"]["image_output"] + + p * PRICING["gemini-2.5-flash-image"]["input_token"] + ) + return resp + except Exception as exc: # noqa: BLE001 + log.err = f"{type(exc).__name__}: {exc}" + raise + finally: + log.latency_ms = int((time.monotonic() - t0) * 1000) + calls.append(log) + + +async def _call_grounded( + api_key: str, *, step: str, brand: str, title: str, model: str +) -> dict: + prompt = f"Find official PDP URLs for fashion item: brand={brand!r}, product={title!r}. Return top 5 URLs." + payload = { + "contents": [{"role": "user", "parts": [{"text": prompt}]}], + "tools": [{"googleSearch": {}}], + "generationConfig": {"temperature": 0.2}, + } + url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent" + t0 = time.monotonic() + log = CallLog(step=step, model=model, ok=False, grounding_queries=1) + try: + async with httpx.AsyncClient(timeout=60.0) as c: + r = await c.post(url, params={"key": api_key}, json=payload) + r.raise_for_status() + data = r.json() + log.ok = True + um = (data or {}).get("usageMetadata") or {} + p = um.get("promptTokenCount", 0) or 0 + cc = um.get("candidatesTokenCount", 0) or 0 + log.prompt_tokens, log.completion_tokens = p, cc + log.est_cost_usd = ( + _price_text(model, p, cc, 0) + PRICING["grounding_query"] * 1 + ) + return data + except Exception as exc: # noqa: BLE001 + log.err = f"{type(exc).__name__}: {exc}" + raise + finally: + log.latency_ms = int((time.monotonic() - t0) * 1000) + calls.append(log) + + +async def main() -> int: + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") + parser = argparse.ArgumentParser() + parser.add_argument("--id", type=str, default=None) + args = parser.parse_args() + + db_url = os.environ.get("ASSETS_DATABASE_URL") + api_key = os.environ.get("GEMINI_API_KEY") + if not db_url or not api_key: + raise SystemExit("ASSETS_DATABASE_URL + GEMINI_API_KEY required (source .env.backend.prod)") + + row = await _pick_sample(db_url, args.id) + logger.info("sample raw_post: id=%s image_url=%s", row["id"], row["image_url"]) + image_bytes, content_type = await _download(row["image_url"]) + logger.info("image: %d bytes, ct=%s", len(image_bytes), content_type) + + client = genai.Client(api_key=api_key) + img_part = genai_types.Part.from_bytes(data=image_bytes, mime_type=content_type) + + # === 1. hero_reframe (flash-image) === + logger.info("[1/7] hero_reframe …") + await _call_image( + client, step="hero_reframe", image_bytes=image_bytes, content_type=content_type, + prompt=HERO_REFRAME_PROMPT, aspect_ratio="4:5", image_size="2K", + ) + + # === 2. subject (flash) === + logger.info("[2/7] subject …") + caption_text = (row.get("caption") or "").strip() + subj_contents = [img_part, SUBJECT_PROMPT + (f"\n\nCaption: {caption_text[:300]}" if caption_text else "")] + await _call_text(client, step="subject", model="gemini-2.5-flash", + contents=subj_contents, response_schema=_SubjectDraft) + + # === 3. items (pro) === + logger.info("[3/7] items …") + items_resp = await _call_text(client, step="items", model="gemini-2.5-pro", + contents=[img_part, ITEMS_PROMPT], + response_schema=_ItemsResponse) + # raw image_url 은 합성 전이라 items parser 가 0 으로 돌아오는 게 정상. + # 실제 N 은 prod parse_result.items 에서 가져와 per-item 호출 시뮬레이션. + cached: list[dict] = [] + pr = row.get("parse_result") + if pr: + try: + pr_obj = pr if isinstance(pr, dict) else json.loads(pr) + cached = (pr_obj or {}).get("items", []) or [] + except Exception: # noqa: BLE001 + pass + n_items = min(len(cached), ITEM_CAP) + logger.info("items in cached parse_result: %d (probing %d)", len(cached), n_items) + items = cached + + # === 4. spots (flash) — image + items text === + logger.info("[4/7] spots …") + items_text = json.dumps([{"brand": (it or {}).get("brand"), "product": (it or {}).get("product")} for it in items[:n_items]]) + await _call_text(client, step="spots", model="gemini-2.5-flash", + contents=[img_part, SPOTS_PROMPT + "\n\nitems=" + items_text], + response_schema=_SpotsResponse) + + # === 5. thumbnail × N (flash-image) === + for i in range(n_items): + logger.info("[5/7] thumbnail %d/%d …", i + 1, n_items) + await _call_image( + client, step=f"thumbnail#{i+1}", image_bytes=image_bytes, content_type=content_type, + prompt=ITEM_THUMBNAIL_PROMPT, aspect_ratio="1:1", image_size="1K", + ) + + # === 6. url_search.grounded × N (flash + googleSearch) === + for i in range(n_items): + it = items[i] or {} + brand = (it.get("brand") or "unknown brand")[:80] + product = (it.get("product") or "unknown product")[:120] + logger.info("[6/7] url_grounded %d/%d (%s · %s) …", i + 1, n_items, brand, product) + try: + await _call_grounded(api_key, step=f"url_grounded#{i+1}", brand=brand, title=product, + model="gemini-2.5-flash") + except Exception as exc: # noqa: BLE001 + logger.warning("url_grounded failed: %s", exc) + + # === 7. url_search.filter × N (flash + thumbnail image) === + for i in range(n_items): + logger.info("[7/7] url_filter %d/%d …", i + 1, n_items) + await _call_text( + client, step=f"url_filter#{i+1}", model="gemini-2.5-flash", + contents=[img_part, "Evaluate top product URL candidates. Return JSON with best_url, confidence, domain_class."], + ) + + # === 결과 출력 === + print() + print(f"{'step':<22}{'model':<26}{'ok':<4}{'in tok':>8}{'out tok':>9}{'img':>5}{'grnd':>6}{'lat ms':>9}{'$':>10}") + print("-" * 99) + total = {"in": 0, "out": 0, "img": 0, "grnd": 0, "ms": 0, "usd": 0.0} + for c in calls: + print(f"{c.step:<22}{c.model:<26}{('✓' if c.ok else '✗'):<4}" + f"{c.prompt_tokens:>8}{c.completion_tokens:>9}{c.image_output:>5}" + f"{c.grounding_queries:>6}{c.latency_ms:>9}{c.est_cost_usd:>10.5f}") + total["in"] += c.prompt_tokens; total["out"] += c.completion_tokens + total["img"] += c.image_output; total["grnd"] += c.grounding_queries + total["ms"] += c.latency_ms; total["usd"] += c.est_cost_usd + print("-" * 99) + print(f"{'TOTAL':<22}{'':<26}{'':<4}{total['in']:>8}{total['out']:>9}{total['img']:>5}" + f"{total['grnd']:>6}{total['ms']:>9}{total['usd']:>10.5f}") + print() + print(f"raw_post id = {row['id']}") + print(f"items detected (prod 평균 ≈ 5) = {len(items)}, probe 사용 = {n_items}") + print(f"실측 ${total['usd']:.4f} / 1 raw_post (ITEM_CAP={ITEM_CAP})") + if n_items > 0: + per_item = total['usd'] - sum(c.est_cost_usd for c in calls if not c.step.startswith(('thumbnail', 'url_'))) + prod_est = (total['usd'] - per_item) + per_item / n_items * 5 + print(f"prod 추정 (5 items): ${prod_est:.4f}") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/packages/ai-server/scripts/cost_tracking_e2e_test.py b/packages/ai-server/scripts/cost_tracking_e2e_test.py new file mode 100644 index 00000000..a744b8ec --- /dev/null +++ b/packages/ai-server/scripts/cost_tracking_e2e_test.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +"""cost_tracking 모듈 end-to-end 검증. + +local DB (port 54322) 에 마이그레이션 적용 후 실행: + ASSETS_DATABASE_URL=postgresql://postgres:postgres@127.0.0.1:54322/postgres \\ + uv run python scripts/cost_tracking_e2e_test.py + +검증: + 1. PricingCache 가 DB seed 단가 로드 + 2. track_call 이 mock Gemini 호출 (usage_metadata 포함) 을 wrap + 3. estimate 가 단가 × token 으로 cost 계산 + 4. recorder 가 fire-and-forget INSERT + 5. 실패 path 도 row 적립 (ok=false, error_class) +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sys +import uuid +from dataclasses import dataclass +from pathlib import Path + +import asyncpg + + +_THIS = Path(__file__).resolve() +_AI_SERVER_ROOT = _THIS.parent.parent +sys.path.insert(0, str(_AI_SERVER_ROOT)) + +from src.services import cost_tracking # noqa: E402 + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger("cost-tracking-test") + + +# --- mock Gemini response objects --- + +@dataclass +class _MockUsage: + prompt_token_count: int = 0 + candidates_token_count: int = 0 + cached_content_token_count: int = 0 + + +@dataclass +class _MockResp: + usage_metadata: _MockUsage + + +async def _fake_call(usage: _MockUsage): + """Mock coroutine — returns response with usage_metadata.""" + await asyncio.sleep(0.01) + return _MockResp(usage_metadata=usage) + + +async def _fake_failure(): + """Mock coroutine that raises.""" + await asyncio.sleep(0.005) + raise RuntimeError("safety_block: simulated") + + +async def main() -> int: + dsn = os.environ.get("ASSETS_DATABASE_URL") + if not dsn: + print("ASSETS_DATABASE_URL required", file=sys.stderr) + return 2 + + raw_post_id = str(uuid.uuid4()) + cost_tracking.set_context(raw_post_id=raw_post_id, pipeline="raw_post") + logger.info("context: raw_post_id=%s pipeline=raw_post", raw_post_id) + + # === 1. happy path — Pro text call === + logger.info("[1] items_parser (Pro) — mock 1000 in / 500 out") + await cost_tracking.track_call( + "items_parser", + "gemini-2.5-pro", + cost_tracking.extract_text_usage, + _fake_call(_MockUsage(prompt_token_count=1000, candidates_token_count=500)), + ) + + # === 2. happy path — Flash text call === + logger.info("[2] subject_parser (Flash) — mock 800 in / 100 out") + await cost_tracking.track_call( + "subject_parser", + "gemini-2.5-flash", + cost_tracking.extract_text_usage, + _fake_call(_MockUsage(prompt_token_count=800, candidates_token_count=100)), + ) + + # === 3. happy path — Flash text with cache === + logger.info("[3] subject_parser cached — mock 1000 in (400 cached) / 100 out") + await cost_tracking.track_call( + "subject_parser", + "gemini-2.5-flash", + cost_tracking.extract_text_usage, + _fake_call( + _MockUsage( + prompt_token_count=1000, + candidates_token_count=100, + cached_content_token_count=400, + ) + ), + ) + + # === 4. image output (flash-image) === + logger.info("[4] hero_reframe (flash-image)") + await cost_tracking.track_call( + "hero_reframe", + "gemini-2.5-flash-image", + cost_tracking.extract_image_usage, + _fake_call(_MockUsage(prompt_token_count=540, candidates_token_count=0)), + ) + + # === 5. failure path === + logger.info("[5] items_parser failure (safety_block)") + try: + await cost_tracking.track_call( + "items_parser", + "gemini-2.5-pro", + cost_tracking.extract_text_usage, + _fake_failure(), + ) + except RuntimeError as exc: + logger.info(" propagated as expected: %s", exc) + + # === 6. no-pricing model === + logger.info("[6] unknown model (no pricing)") + await cost_tracking.track_call( + "experimental_step", + "gemini-999-future", + cost_tracking.extract_text_usage, + _fake_call(_MockUsage(prompt_token_count=100, candidates_token_count=50)), + ) + + # === 모든 fire-and-forget task 끝날 때까지 대기 === + logger.info("waiting for fire-and-forget INSERTs to drain …") + await asyncio.sleep(2.0) + # 추가로 명시 shutdown — 풀의 모든 pending INSERT 가 끝남 + pending = [t for t in asyncio.all_tasks() if not t.done() and t is not asyncio.current_task()] + if pending: + await asyncio.gather(*pending, return_exceptions=True) + + # === DB 확인 === + conn = await asyncpg.connect(dsn) + try: + rows = await conn.fetch( + """ + SELECT step, model, ok, prompt_tokens, completion_tokens, cached_tokens, + image_output_count, est_cost_usd::float8 AS cost, + error_class, pricing_snapshot::text AS snap, latency_ms + FROM public.gemini_usage_events + WHERE raw_post_id = $1::uuid + ORDER BY id + """, + raw_post_id, + ) + finally: + await conn.close() + + print() + print(f"{'step':<24}{'model':<26}{'ok':<4}{'in':>6}{'out':>5}{'cache':>6}" + f"{'img':>4}{'cost $':>12}{'err':<16}") + print("-" * 110) + for r in rows: + print(f"{r['step']:<24}{r['model']:<26}{('✓' if r['ok'] else '✗'):<4}" + f"{(r['prompt_tokens'] or 0):>6}{(r['completion_tokens'] or 0):>5}" + f"{(r['cached_tokens'] or 0):>6}{(r['image_output_count'] or 0):>4}" + f"{r['cost']:>12.6f} {r['error_class'] or '':<14}") + print() + print(f"{len(rows)} rows inserted (expected 6)") + await cost_tracking.shutdown() + return 0 if len(rows) == 6 else 1 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/packages/ai-server/scripts/cost_tracking_parser_verify.py b/packages/ai-server/scripts/cost_tracking_parser_verify.py new file mode 100644 index 00000000..19d9e807 --- /dev/null +++ b/packages/ai-server/scripts/cost_tracking_parser_verify.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""실제 parser wrap 검증 — `SubjectParser` 가 cost_tracking 경유해 DB 적립하는지. + +이전 `cost_tracking_e2e_test.py` 는 mock coroutine + `track_call` 직접 호출. +이 스크립트는 *진짜 parser 클래스* 를 import + 호출 → 그 안에 박힌 wrap +코드 라인이 작동해서 `gemini_usage_events` 에 row 가 적립되는지 검증. + + ASSETS_DATABASE_URL=postgresql://postgres:postgres@127.0.0.1:54322/postgres \\ + GEMINI_API_KEY=$(grep '^GEMINI_API_KEY=' ../../.env.backend.prod | cut -d= -f2-) \\ + uv run python scripts/cost_tracking_parser_verify.py + +비용: 약 \$0.0005 (Subject parser 1 회). +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sys +import uuid +from pathlib import Path + +import asyncpg +import httpx + + +_THIS = Path(__file__).resolve() +_AI_SERVER_ROOT = _THIS.parent.parent +sys.path.insert(0, str(_AI_SERVER_ROOT)) + +from src.services import cost_tracking # noqa: E402 +from src.services.raw_posts.processors.subject_parser import SubjectParser # noqa: E402 + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger("cost-tracking-parser-verify") + + +# 테스트 이미지 — 운영 환경의 한 raw_post (Pinterest, 셀럽 사진). +TEST_IMAGE_URL = "https://pub-64ff29549cdf47ee94d338bca8d04819.r2.dev/pinterest/75/75646468738215900.png" + + +async def main() -> int: + dsn = os.environ.get("ASSETS_DATABASE_URL") + api_key = os.environ.get("GEMINI_API_KEY") + if not dsn or not api_key: + print("ASSETS_DATABASE_URL + GEMINI_API_KEY required", file=sys.stderr) + return 2 + + # 가짜 raw_post_id 로 context 설정 — DB 적립 시 이 ID 로 row 검증. + fake_raw_post_id = str(uuid.uuid4()) + cost_tracking.set_context(raw_post_id=fake_raw_post_id, pipeline="raw_post") + logger.info("context: raw_post_id=%s pipeline=raw_post", fake_raw_post_id) + + # 이미지 fetch + logger.info("fetching test image …") + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as c: + r = await c.get(TEST_IMAGE_URL) + r.raise_for_status() + img_bytes = r.content + ct = (r.headers.get("content-type") or "image/jpeg").split(";")[0].strip() + logger.info("image: %d bytes, ct=%s", len(img_bytes), ct) + + # 실제 SubjectParser 호출 — 내부 `await cost_tracking.track_call(...)` 발동. + logger.info("calling SubjectParser.parse() — wrap 경유 …") + parser = SubjectParser(api_key=api_key, model="gemini-2.5-flash") + subject = await parser.parse( + composite_bytes=img_bytes, + content_type=ct, + caption=None, + ) + logger.info("subject result: %s", subject) + + # fire-and-forget INSERT drain + logger.info("waiting for fire-and-forget INSERT …") + await asyncio.sleep(2.0) + pending = [t for t in asyncio.all_tasks() if not t.done() and t is not asyncio.current_task()] + if pending: + await asyncio.gather(*pending, return_exceptions=True) + + # DB 검증 + conn = await asyncpg.connect(dsn) + try: + rows = await conn.fetch( + """ + SELECT step, model, ok, prompt_tokens, completion_tokens, + est_cost_usd::float8 AS cost, latency_ms, pipeline, + pricing_snapshot::text AS snap, error_class + FROM public.gemini_usage_events + WHERE raw_post_id = $1::uuid + ORDER BY id + """, + fake_raw_post_id, + ) + finally: + await conn.close() + + print() + if not rows: + print(f"❌ FAIL — no row found for raw_post_id={fake_raw_post_id}") + await cost_tracking.shutdown() + return 1 + + print(f"✅ {len(rows)} row(s) inserted for raw_post_id={fake_raw_post_id}") + print() + for r in rows: + print(f" step {r['step']}") + print(f" model {r['model']}") + print(f" pipeline {r['pipeline']}") + print(f" ok {r['ok']}") + print(f" prompt_tokens {r['prompt_tokens']}") + print(f" completion_tok {r['completion_tokens']}") + print(f" cost_usd ${r['cost']:.6f}") + print(f" latency_ms {r['latency_ms']}") + print(f" pricing_snapshot {r['snap']}") + print(f" error_class {r['error_class']}") + print() + + # 핵심 invariant 검증 + fail = False + row = rows[0] + if row["step"] != "subject_parser": + print(f"❌ step mismatch: {row['step']!r} != 'subject_parser'") + fail = True + if row["model"] != "gemini-2.5-flash": + print(f"❌ model mismatch: {row['model']!r} != 'gemini-2.5-flash'") + fail = True + if row["pipeline"] != "raw_post": + print(f"❌ pipeline mismatch: {row['pipeline']!r} != 'raw_post'") + fail = True + if not row["ok"]: + print(f"❌ ok=false (error_class={row['error_class']})") + fail = True + if (row["prompt_tokens"] or 0) <= 0: + print(f"❌ prompt_tokens missing: {row['prompt_tokens']}") + fail = True + if row["cost"] <= 0: + print(f"❌ cost_usd <= 0: {row['cost']}") + fail = True + + await cost_tracking.shutdown() + if fail: + print("❌ invariant check failed") + return 1 + print("✅ all invariants passed — wrap 코드 경로가 실제로 작동함") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/packages/ai-server/src/managers/llm/adapters/nano_banana.py b/packages/ai-server/src/managers/llm/adapters/nano_banana.py index 2be2e48e..891bc8ae 100644 --- a/packages/ai-server/src/managers/llm/adapters/nano_banana.py +++ b/packages/ai-server/src/managers/llm/adapters/nano_banana.py @@ -17,6 +17,8 @@ from google import genai from google.genai import types as genai_types +from src.services import cost_tracking + logger = logging.getLogger(__name__) @@ -50,24 +52,33 @@ async def reframe( prompt: str, aspect_ratio: str = "4:5", image_size: str = "2K", + step: str = "nano_banana", ) -> bytes: """Call Nano Banana with an input image + prompt, return output bytes. Raises NanoBananaError when the API errors or returns no inline_data. + + `step` 은 cost_tracking 의 step 라벨 — 호출자 (hero_reframe / + items_thumbnail) 가 own 라벨 전달 시 cost 대시보드에서 분리 가능. """ try: - resp = await self._client.aio.models.generate_content( - model=self._model, - contents=[ - genai_types.Part.from_bytes( - data=image_bytes, mime_type=image_mime_type - ), - prompt, - ], - config=genai_types.GenerateContentConfig( - response_modalities=["IMAGE"], - image_config=genai_types.ImageConfig( - aspect_ratio=aspect_ratio, image_size=image_size + resp = await cost_tracking.track_call( + step, + self._model, + cost_tracking.extract_image_usage, + self._client.aio.models.generate_content( + model=self._model, + contents=[ + genai_types.Part.from_bytes( + data=image_bytes, mime_type=image_mime_type + ), + prompt, + ], + config=genai_types.GenerateContentConfig( + response_modalities=["IMAGE"], + image_config=genai_types.ImageConfig( + aspect_ratio=aspect_ratio, image_size=image_size + ), ), ), ) diff --git a/packages/ai-server/src/post_editorial/nodes/celeb_search.py b/packages/ai-server/src/post_editorial/nodes/celeb_search.py index 25beb73f..0e13ebb7 100644 --- a/packages/ai-server/src/post_editorial/nodes/celeb_search.py +++ b/packages/ai-server/src/post_editorial/nodes/celeb_search.py @@ -12,6 +12,7 @@ from pydantic import BaseModel from src.managers.database import DatabaseManager +from src.services import cost_tracking from ..state import PostEditorialState from ..config import get_settings @@ -189,13 +190,18 @@ def _build_celeb_ranking_prompt( async def _rank_celebs( client: genai.Client, prompt: str, model: str ) -> CelebRankingOutput: - response = await client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=CelebRankingOutput, - temperature=0.3, + response = await cost_tracking.track_call( + "celeb_search", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=CelebRankingOutput, + temperature=0.3, + ), ), ) raw_text = response.text or "{}" diff --git a/packages/ai-server/src/post_editorial/nodes/design_spec.py b/packages/ai-server/src/post_editorial/nodes/design_spec.py index b3a6cd0e..22454e39 100644 --- a/packages/ai-server/src/post_editorial/nodes/design_spec.py +++ b/packages/ai-server/src/post_editorial/nodes/design_spec.py @@ -7,6 +7,8 @@ from google import genai from google.genai import types +from src.services import cost_tracking + from ..models import DesignSpec, default_design_spec from ..state import PostEditorialState from ..config import get_settings @@ -54,13 +56,18 @@ def _get_genai_client() -> genai.Client: async def _generate_spec(client: genai.Client, prompt: str, model: str) -> DesignSpec: - response = await client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=DesignSpec, - temperature=0.7, + response = await cost_tracking.track_call( + "design_spec", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=DesignSpec, + temperature=0.7, + ), ), ) raw_text = response.text or "{}" diff --git a/packages/ai-server/src/post_editorial/nodes/editorial.py b/packages/ai-server/src/post_editorial/nodes/editorial.py index 2db9ff10..c039dcf7 100644 --- a/packages/ai-server/src/post_editorial/nodes/editorial.py +++ b/packages/ai-server/src/post_editorial/nodes/editorial.py @@ -8,6 +8,8 @@ from google.genai import types from pydantic import BaseModel +from src.services import cost_tracking + from ..state import PostEditorialState from ..config import get_settings from ..gemini_retry import call_gemini_with_fallback @@ -163,13 +165,18 @@ def _get_genai_client() -> genai.Client: async def _generate_editorial(client: genai.Client, prompt: str, model: str) -> EditorialOutput: - response = await client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=EditorialOutput, - temperature=0.7, + response = await cost_tracking.track_call( + "editorial", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=EditorialOutput, + temperature=0.7, + ), ), ) raw_text = response.text or "{}" diff --git a/packages/ai-server/src/post_editorial/nodes/image_analysis.py b/packages/ai-server/src/post_editorial/nodes/image_analysis.py index fcbf0ea4..24e52a97 100644 --- a/packages/ai-server/src/post_editorial/nodes/image_analysis.py +++ b/packages/ai-server/src/post_editorial/nodes/image_analysis.py @@ -9,6 +9,8 @@ from google.genai import types from pydantic import BaseModel +from src.services import cost_tracking + from ..state import PostEditorialState from ..config import get_settings from ..gemini_retry import call_gemini_with_fallback @@ -49,16 +51,21 @@ async def _download_image(url: str) -> tuple[bytes, str]: async def _analyze_image( client: genai.Client, model: str, prompt: str, image_bytes: bytes, mime_type: str ) -> ImageAnalysisOutput: - response = await client.aio.models.generate_content( - model=model, - contents=[ - prompt, - types.Part.from_bytes(data=image_bytes, mime_type=mime_type), - ], - config=types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=ImageAnalysisOutput, - temperature=0.3, + response = await cost_tracking.track_call( + "image_analysis", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=[ + prompt, + types.Part.from_bytes(data=image_bytes, mime_type=mime_type), + ], + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=ImageAnalysisOutput, + temperature=0.3, + ), ), ) raw_text = response.text or "{}" diff --git a/packages/ai-server/src/post_editorial/nodes/item_search.py b/packages/ai-server/src/post_editorial/nodes/item_search.py index 3c45124f..32f22347 100644 --- a/packages/ai-server/src/post_editorial/nodes/item_search.py +++ b/packages/ai-server/src/post_editorial/nodes/item_search.py @@ -13,6 +13,8 @@ from google.genai import types from pydantic import BaseModel +from src.services import cost_tracking + from src.managers.database import DatabaseManager from ..state import PostEditorialState @@ -253,13 +255,18 @@ def _build_ranking_prompt( async def _rank_items( client: genai.Client, prompt: str, model: str ) -> ItemRankingOutput: - response = await client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=ItemRankingOutput, - temperature=0.3, + response = await cost_tracking.track_call( + "item_search", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=ItemRankingOutput, + temperature=0.3, + ), ), ) raw_text = response.text or "{}" diff --git a/packages/ai-server/src/post_editorial/nodes/news_research.py b/packages/ai-server/src/post_editorial/nodes/news_research.py index 3fb40f6e..f83434c9 100644 --- a/packages/ai-server/src/post_editorial/nodes/news_research.py +++ b/packages/ai-server/src/post_editorial/nodes/news_research.py @@ -15,6 +15,8 @@ from google.genai import types from pydantic import BaseModel +from src.services import cost_tracking + from ..state import PostEditorialState from ..config import get_settings from ..gemini_retry import call_gemini_with_fallback @@ -243,13 +245,18 @@ async def _filter_with_gemini( client = genai.Client(api_key=settings.gemini_api_key) async def _generate(model: str) -> NewsFilterOutput: - response = await client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=NewsFilterOutput, - temperature=0.1, + response = await cost_tracking.track_call( + "news_research", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=NewsFilterOutput, + temperature=0.1, + ), ), ) return NewsFilterOutput.model_validate_json(response.text or '{"articles":[]}') diff --git a/packages/ai-server/src/post_editorial/nodes/review.py b/packages/ai-server/src/post_editorial/nodes/review.py index e0d86c66..d566bb0e 100644 --- a/packages/ai-server/src/post_editorial/nodes/review.py +++ b/packages/ai-server/src/post_editorial/nodes/review.py @@ -11,6 +11,8 @@ from google.genai import types from pydantic import ValidationError +from src.services import cost_tracking + from ..models import PostMagazineLayout, ReviewResult, CriterionResult from ..state import PostEditorialState from ..config import get_settings @@ -147,11 +149,16 @@ async def review_node(state: PostEditorialState) -> dict: async def _call_review(model: str): return await asyncio.wait_for( - client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", temperature=0.0 + cost_tracking.track_call( + "review", + model, + cost_tracking.extract_text_usage, + client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + response_mime_type="application/json", temperature=0.0 + ), ), ), timeout=_REVIEW_TIMEOUT, diff --git a/packages/ai-server/src/services/cost_tracking/__init__.py b/packages/ai-server/src/services/cost_tracking/__init__.py new file mode 100644 index 00000000..953110a6 --- /dev/null +++ b/packages/ai-server/src/services/cost_tracking/__init__.py @@ -0,0 +1,43 @@ +"""Gemini API per-call 비용 추적 (#cost-tracking). + +ai-server 모든 Gemini 호출의 사용량 / 비용을 *호출 시점에* DB +(`gemini_usage_events`) 에 적립한다. 단가는 DB SOT (`gemini_pricing`, +SCD-2) 에서 동적 lookup — 코드 하드코딩 없음. + +설계 원칙: + - **fire-and-forget**: 추적이 본 파이프라인 latency / 실패에 영향 없음. + - **실패도 row 적립**: ok=false, error_class 로 분류 — 가시화. + - **lazy init**: ASSETS_DATABASE_URL 미설정이면 silent no-op. + - **단가 cache**: 5 분 TTL 메모리 캐시, 어드민 단가 변경 시 자동 반영. + +호출 사이트 (parser 별): + resp = await track_call( + "items_parser", model, extract_text_usage, + client.aio.models.generate_content(model=..., contents=..., config=...), + ) + +Pipeline runner 가 진입 시 한 번: + cost_tracking.set_context(raw_post_id=..., pipeline="raw_post") +""" + +from ._impl import ( + extract_grounded_response, + extract_grounded_usage, + extract_image_usage, + extract_rest_text_usage, + extract_text_usage, + set_context, + shutdown, + track_call, +) + +__all__ = [ + "extract_grounded_response", + "extract_grounded_usage", + "extract_image_usage", + "extract_rest_text_usage", + "extract_text_usage", + "set_context", + "shutdown", + "track_call", +] diff --git a/packages/ai-server/src/services/cost_tracking/_impl.py b/packages/ai-server/src/services/cost_tracking/_impl.py new file mode 100644 index 00000000..01490a1a --- /dev/null +++ b/packages/ai-server/src/services/cost_tracking/_impl.py @@ -0,0 +1,397 @@ +"""cost_tracking 내부 구현 — pool · pricing cache · recorder · estimator · context. + +본 모듈은 `__init__.py` 가 re-export 하는 5개 심볼만 외부에 노출: + track_call, set_context, extract_{text,image,grounded}_usage, shutdown +""" + +from __future__ import annotations + +import asyncio +import contextvars +import json +import logging +import os +import time +from typing import Any, Awaitable, Callable, Optional + +import asyncpg + + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# pool — 자체 asyncpg pool. DatabaseManager 와 분리 (DI 변경 없이 동작). +# --------------------------------------------------------------------------- + +_pool: Optional[asyncpg.Pool] = None +_pool_lock = asyncio.Lock() +_pool_failed = False # init 실패 후 재시도 안 함 (조용한 no-op) + + +async def _get_pool() -> Optional[asyncpg.Pool]: + """ASSETS_DATABASE_URL 로 lazy init. 미설정 / 실패 시 None — no-op.""" + global _pool, _pool_failed + if _pool is not None: + return _pool + if _pool_failed: + return None + async with _pool_lock: + if _pool is not None: + return _pool + if _pool_failed: + return None + dsn = ( + os.environ.get("ASSETS_DATABASE_URL") + or os.environ.get("OPERATION_DATABASE_URL") + ) + if not dsn: + logger.info("cost_tracking: no DSN — disabled (no-op)") + _pool_failed = True + return None + try: + _pool = await asyncpg.create_pool( + dsn=dsn, min_size=1, max_size=3, command_timeout=10 + ) + except Exception as exc: # noqa: BLE001 + logger.warning("cost_tracking: pool init failed (%s) — disabled", exc) + _pool_failed = True + return None + logger.info("cost_tracking: pool initialized") + return _pool + + +async def shutdown() -> None: + """선택적 종료 — 테스트 / app shutdown 용.""" + global _pool + if _pool is not None: + await _pool.close() + _pool = None + + +# --------------------------------------------------------------------------- +# pricing cache — DB SOT 의 5분 TTL 메모리 캐시 +# --------------------------------------------------------------------------- + +_PRICE_TTL_SEC = 300.0 +_price_cache: dict[tuple[str, str, str], float] = {} +_price_loaded_at = 0.0 +_price_lock = asyncio.Lock() + + +async def _refresh_pricing_if_stale() -> None: + global _price_cache, _price_loaded_at + now = time.monotonic() + if now - _price_loaded_at < _PRICE_TTL_SEC: + return + async with _price_lock: + if now - _price_loaded_at < _PRICE_TTL_SEC: + return + pool = await _get_pool() + if pool is None: + return + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT model, unit_kind, tier, usd_per_unit::float8 AS rate + FROM public.gemini_pricing + WHERE effective_to IS NULL + """ + ) + except Exception as exc: # noqa: BLE001 + logger.warning("cost_tracking: pricing reload failed: %s", exc) + return + _price_cache = {(r["model"], r["unit_kind"], r["tier"]): r["rate"] for r in rows} + _price_loaded_at = time.monotonic() + logger.debug("cost_tracking: pricing cache reloaded (%d rows)", len(_price_cache)) + + +async def _price_lookup(model: str, unit_kind: str, tier: str = "default") -> Optional[float]: + await _refresh_pricing_if_stale() + return _price_cache.get((model, unit_kind, tier)) + + +async def _model_snapshot(model: str, tier: str = "default") -> dict[str, float]: + """현재 model 의 모든 unit_kind 단가 dict — pricing_snapshot 빌드용.""" + await _refresh_pricing_if_stale() + return { + uk: rate + for (m, uk, t), rate in _price_cache.items() + if m == model and t == tier + } + + +# --------------------------------------------------------------------------- +# estimator — usage dict + model → (cost, snapshot, error_class) +# --------------------------------------------------------------------------- + + +async def _estimate(model: str, usage: dict[str, Any]) -> tuple[float, dict[str, float], Optional[str]]: + snap = await _model_snapshot(model) + cost = 0.0 + used_snap: dict[str, float] = {} + err: Optional[str] = None + + prompt = int(usage.get("prompt_tokens") or 0) + completion = int(usage.get("completion_tokens") or 0) + cached = int(usage.get("cached_tokens") or 0) + image_n = int(usage.get("image_output_count") or 0) + grounding_n = int(usage.get("grounding_queries") or 0) + + has_any_token = (prompt + completion + cached) > 0 + if has_any_token and "input_token" not in snap and "output_token" not in snap: + err = "no_pricing" + + if "input_token" in snap: + rate = snap["input_token"] + used_snap["input_token"] = rate + cost += max(0, prompt - cached) * rate + if "output_token" in snap and completion: + rate = snap["output_token"] + used_snap["output_token"] = rate + cost += completion * rate + if cached and "cached_input_token" in snap: + rate = snap["cached_input_token"] + used_snap["cached_input_token"] = rate + cost += cached * rate + if image_n and "image_output" in snap: + rate = snap["image_output"] + used_snap["image_output"] = rate + cost += image_n * rate + if grounding_n: + grate = await _price_lookup("grounding", "grounding_query") + if grate is not None: + used_snap["grounding_query"] = grate + cost += grounding_n * grate + else: + err = err or "no_pricing" + + return (cost, used_snap, err) + + +# --------------------------------------------------------------------------- +# context — contextvars (raw_post_id / post_id / pipeline 자동 전파) +# --------------------------------------------------------------------------- + +_raw_post_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar( + "ct_raw_post_id", default=None +) +_post_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar( + "ct_post_id", default=None +) +_pipeline_var: contextvars.ContextVar[str] = contextvars.ContextVar( + "ct_pipeline", default="ad_hoc" +) + + +def set_context( + *, + raw_post_id: Optional[str] = None, + post_id: Optional[str] = None, + pipeline: str = "ad_hoc", +) -> None: + """파이프라인 진입부에서 한 번 호출. 이후 같은 async task 안의 모든 + `track_call` 이 자동으로 read.""" + _raw_post_id_var.set(raw_post_id) + _post_id_var.set(post_id) + _pipeline_var.set(pipeline) + + +# --------------------------------------------------------------------------- +# recorder — fire-and-forget INSERT +# --------------------------------------------------------------------------- + + +def _classify(exc: BaseException) -> str: + msg = str(exc).lower() + if "safety" in msg or "blocked" in msg or "block_reason" in msg: + return "safety_block" + if "quota" in msg or "rate limit" in msg or "resource_exhausted" in msg: + return "quota" + if "timeout" in msg or "deadline" in msg: + return "timeout" + if "parse" in msg or "json" in msg: + return "parse_error" + if any(c in msg for c in ("500", "502", "503", "504")): + return "http_5xx" + return "error" + + +def _record( # fire-and-forget + *, + step: str, + model: str, + ok: bool, + usage: dict[str, Any], + cost: float, + snapshot: dict[str, float], + error_class: Optional[str], + latency_ms: int, +) -> None: + raw_post_id = _raw_post_id_var.get() + post_id = _post_id_var.get() + pipeline = _pipeline_var.get() + + async def _insert() -> None: + pool = await _get_pool() + if pool is None: + return + try: + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO public.gemini_usage_events( + raw_post_id, post_id, step, pipeline, model, ok, + prompt_tokens, completion_tokens, cached_tokens, + image_output_count, grounding_queries, + est_cost_usd, pricing_snapshot, error_class, latency_ms + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13::jsonb,$14,$15) + """, + raw_post_id, + post_id, + step, + pipeline, + model, + ok, + usage.get("prompt_tokens"), + usage.get("completion_tokens"), + usage.get("cached_tokens"), + usage.get("image_output_count"), + usage.get("grounding_queries"), + cost, + json.dumps(snapshot) if snapshot else None, + error_class, + latency_ms, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("cost_tracking: insert failed (step=%s): %s", step, exc) + + try: + asyncio.create_task(_insert()) + except RuntimeError: + # event loop 없는 컨텍스트 — silent skip (테스트 등) + pass + + +# --------------------------------------------------------------------------- +# usage extractors — SDK response → 공통 dict +# --------------------------------------------------------------------------- + + +def extract_text_usage(resp: Any) -> dict[str, Any]: + """google.genai 의 resp.usage_metadata → 공통 dict.""" + um = getattr(resp, "usage_metadata", None) + if not um: + return {} + return { + "prompt_tokens": getattr(um, "prompt_token_count", 0) or 0, + "completion_tokens": getattr(um, "candidates_token_count", 0) or 0, + "cached_tokens": getattr(um, "cached_content_token_count", 0) or 0, + } + + +def extract_image_usage(resp: Any) -> dict[str, Any]: + """flash-image: 1 image output + input image 토큰.""" + base = extract_text_usage(resp) + base["image_output_count"] = 1 + return base + + +def extract_grounded_usage(resp_dict: dict) -> dict[str, Any]: + """url_search.grounded_search — httpx response.json() (camelCase 키).""" + um = (resp_dict or {}).get("usageMetadata") or {} + return { + "prompt_tokens": um.get("promptTokenCount", 0) or 0, + "completion_tokens": um.get("candidatesTokenCount", 0) or 0, + "cached_tokens": um.get("cachedContentTokenCount", 0) or 0, + "grounding_queries": 1, + } + + +def extract_grounded_response(resp: Any) -> dict[str, Any]: + """httpx.Response → grounded usage. .json() 결과를 cache (httpx 가 알아서).""" + try: + data = resp.json() + except Exception: # noqa: BLE001 + return {"grounding_queries": 1} + return extract_grounded_usage(data) + + +def extract_rest_text_usage(resp: Any) -> dict[str, Any]: + """httpx.Response → text usage (camelCase 키, grounding 미사용).""" + try: + data = resp.json() + except Exception: # noqa: BLE001 + return {} + um = (data or {}).get("usageMetadata") or {} + return { + "prompt_tokens": um.get("promptTokenCount", 0) or 0, + "completion_tokens": um.get("candidatesTokenCount", 0) or 0, + "cached_tokens": um.get("cachedContentTokenCount", 0) or 0, + } + + +# --------------------------------------------------------------------------- +# track_call — public wrapper (어떤 Gemini 호출이든 통합) +# --------------------------------------------------------------------------- + + +async def track_call( + step: str, + model: str, + extract: Callable[[Any], dict[str, Any]], + coro: Awaitable[Any], +) -> Any: + """Gemini SDK 호출 coroutine 을 감싸 usage / cost 적립. + + `coro` 는 *이미 생성된* coroutine (e.g. `client.aio.models.generate_content(...)`). + 이 함수가 await + 결과 반환. 호출자 인터페이스는 변하지 않음. + + 실패 시 record_failure + re-raise. 본 파이프라인은 정상 에러 흐름 유지. + """ + t0 = time.monotonic() + try: + resp = await coro + except BaseException as exc: + latency = int((time.monotonic() - t0) * 1000) + _record( + step=step, + model=model, + ok=False, + usage={}, + cost=0.0, + snapshot={}, + error_class=_classify(exc), + latency_ms=latency, + ) + raise + latency = int((time.monotonic() - t0) * 1000) + try: + usage = extract(resp) or {} + cost, snap, err = await _estimate(model, usage) + except Exception as exc: # noqa: BLE001 + # extract / pricing 실패 — 본 결과는 반환, ok=true row 만 cost=0 + logger.warning("cost_tracking: usage/estimate failed (step=%s): %s", step, exc) + _record( + step=step, + model=model, + ok=True, + usage={}, + cost=0.0, + snapshot={}, + error_class="extract_error", + latency_ms=latency, + ) + return resp + _record( + step=step, + model=model, + ok=True, + usage=usage, + cost=cost, + snapshot=snap, + error_class=err, + latency_ms=latency, + ) + return resp diff --git a/packages/ai-server/src/services/post_editorial/post_editorial_service.py b/packages/ai-server/src/services/post_editorial/post_editorial_service.py index 25e3b543..eaadd170 100644 --- a/packages/ai-server/src/services/post_editorial/post_editorial_service.py +++ b/packages/ai-server/src/services/post_editorial/post_editorial_service.py @@ -67,6 +67,15 @@ async def post_editorial_job( metadata_extract_service = ctx.get("metadata_extract_service") from src.post_editorial.graph import create_post_editorial_graph + from src.services import cost_tracking + + # cost_tracking 컨텍스트 — 이후 모든 LangGraph 노드의 Gemini 호출이 + # post_id + pipeline 라벨로 자동 적립. + post_id = (post_data or {}).get("id") if isinstance(post_data, dict) else None + cost_tracking.set_context( + post_id=str(post_id) if post_id else None, + pipeline="post_editorial", + ) graph = create_post_editorial_graph() diff --git a/packages/ai-server/src/services/raw_posts/processors/hero_reframe.py b/packages/ai-server/src/services/raw_posts/processors/hero_reframe.py index 8c5d1078..d08fda27 100644 --- a/packages/ai-server/src/services/raw_posts/processors/hero_reframe.py +++ b/packages/ai-server/src/services/raw_posts/processors/hero_reframe.py @@ -57,6 +57,7 @@ async def reframe( image_mime_type=content_type or "image/jpeg", prompt=prompt, aspect_ratio="4:5", + step="hero_reframe", ) except NanoBananaError as exc: raise HeroReframeError(str(exc)) from exc diff --git a/packages/ai-server/src/services/raw_posts/processors/items_parser.py b/packages/ai-server/src/services/raw_posts/processors/items_parser.py index aed555f0..01dfc91a 100644 --- a/packages/ai-server/src/services/raw_posts/processors/items_parser.py +++ b/packages/ai-server/src/services/raw_posts/processors/items_parser.py @@ -15,6 +15,8 @@ from google.genai import types as genai_types from pydantic import BaseModel +from src.services import cost_tracking + from .prompts import ITEMS_PROMPT from .schemas import ParsedItem, SpotBox @@ -85,19 +87,24 @@ async def parse( if caption: text += f"\n\nThe pin caption is: {caption[:200]}" - resp = await self._client.aio.models.generate_content( - model=self._model, - contents=[ - genai_types.Part.from_bytes( - data=composite_bytes, - mime_type=content_type or "image/jpeg", + resp = await cost_tracking.track_call( + "items_parser", + self._model, + cost_tracking.extract_text_usage, + self._client.aio.models.generate_content( + model=self._model, + contents=[ + genai_types.Part.from_bytes( + data=composite_bytes, + mime_type=content_type or "image/jpeg", + ), + text, + ], + config=genai_types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=_ItemsResponse, + temperature=0.1, ), - text, - ], - config=genai_types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=_ItemsResponse, - temperature=0.1, ), ) diff --git a/packages/ai-server/src/services/raw_posts/processors/items_thumbnail.py b/packages/ai-server/src/services/raw_posts/processors/items_thumbnail.py index b50eb036..0751d7e6 100644 --- a/packages/ai-server/src/services/raw_posts/processors/items_thumbnail.py +++ b/packages/ai-server/src/services/raw_posts/processors/items_thumbnail.py @@ -162,6 +162,7 @@ async def _maybe_generate_one( prompt=prompt, aspect_ratio="1:1", image_size="1K", + step="items_thumbnail", ) except NanoBananaError as exc: logger.warning( @@ -272,6 +273,7 @@ async def _maybe_download_one( prompt=prompt, aspect_ratio="1:1", image_size="1K", + step="items_thumbnail_refine", ) except NanoBananaError as exc: logger.warning( diff --git a/packages/ai-server/src/services/raw_posts/processors/spots_parser.py b/packages/ai-server/src/services/raw_posts/processors/spots_parser.py index 02094fbc..07755266 100644 --- a/packages/ai-server/src/services/raw_posts/processors/spots_parser.py +++ b/packages/ai-server/src/services/raw_posts/processors/spots_parser.py @@ -20,6 +20,8 @@ from google.genai import types as genai_types from pydantic import BaseModel, Field +from src.services import cost_tracking + from .prompts import SPOTS_PROMPT from .schemas import ParsedItem, SpotBox @@ -85,19 +87,24 @@ async def detect( ) text = f"{self._prompt}\n\nItems:\n{listing}" - resp = await self._client.aio.models.generate_content( - model=self._model, - contents=[ - genai_types.Part.from_bytes( - data=hero_bytes, - mime_type=hero_content_type or "image/png", + resp = await cost_tracking.track_call( + "spots_parser", + self._model, + cost_tracking.extract_text_usage, + self._client.aio.models.generate_content( + model=self._model, + contents=[ + genai_types.Part.from_bytes( + data=hero_bytes, + mime_type=hero_content_type or "image/png", + ), + text, + ], + config=genai_types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=_SpotsResponse, + temperature=0.1, ), - text, - ], - config=genai_types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=_SpotsResponse, - temperature=0.1, ), ) diff --git a/packages/ai-server/src/services/raw_posts/processors/subject_parser.py b/packages/ai-server/src/services/raw_posts/processors/subject_parser.py index 456b75a8..5ff013cf 100644 --- a/packages/ai-server/src/services/raw_posts/processors/subject_parser.py +++ b/packages/ai-server/src/services/raw_posts/processors/subject_parser.py @@ -29,6 +29,8 @@ from google.genai import types as genai_types from pydantic import BaseModel +from src.services import cost_tracking + from .prompts import SUBJECT_PROMPT from .schemas import CONTEXT_ENUM, Subject @@ -80,19 +82,24 @@ async def parse( if caption: text += f"\n\nThe pin caption is: {caption[:300]}" - resp = await self._client.aio.models.generate_content( - model=self._model, - contents=[ - genai_types.Part.from_bytes( - data=composite_bytes, - mime_type=content_type or "image/jpeg", + resp = await cost_tracking.track_call( + "subject_parser", + self._model, + cost_tracking.extract_text_usage, + self._client.aio.models.generate_content( + model=self._model, + contents=[ + genai_types.Part.from_bytes( + data=composite_bytes, + mime_type=content_type or "image/jpeg", + ), + text, + ], + config=genai_types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=_SubjectDraft, + temperature=0.1, ), - text, - ], - config=genai_types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=_SubjectDraft, - temperature=0.1, ), ) diff --git a/packages/ai-server/src/services/raw_posts/processors/url_search.py b/packages/ai-server/src/services/raw_posts/processors/url_search.py index 30fc1cd7..4ba60755 100644 --- a/packages/ai-server/src/services/raw_posts/processors/url_search.py +++ b/packages/ai-server/src/services/raw_posts/processors/url_search.py @@ -33,6 +33,8 @@ import httpx +from src.services import cost_tracking + from .schemas import ParsedItem @@ -211,10 +213,19 @@ async def _grounded_search( } url = f"{_GEMINI_BASE}/{self._gemini_model}:generateContent" try: - resp = await client.post( - url, params={"key": self._gemini_key}, json=payload, timeout=60 + async def _do_post(): + r = await client.post( + url, params={"key": self._gemini_key}, json=payload, timeout=60 + ) + r.raise_for_status() + return r + + resp = await cost_tracking.track_call( + "url_grounded_search", + self._gemini_model, + cost_tracking.extract_grounded_response, + _do_post(), ) - resp.raise_for_status() data = resp.json() except Exception as exc: logger.warning("grounded_search HTTP error: %s", exc) @@ -287,10 +298,19 @@ async def _filter( } url = f"{_GEMINI_BASE}/{self._gemini_model}:generateContent" try: - resp = await client.post( - url, params={"key": self._gemini_key}, json=payload, timeout=60 + async def _do_post(): + r = await client.post( + url, params={"key": self._gemini_key}, json=payload, timeout=60 + ) + r.raise_for_status() + return r + + resp = await cost_tracking.track_call( + "url_filter", + self._gemini_model, + cost_tracking.extract_rest_text_usage, + _do_post(), ) - resp.raise_for_status() data = resp.json() except Exception as exc: logger.warning("filter HTTP error: %s", exc) diff --git a/packages/ai-server/src/services/raw_posts/scheduler.py b/packages/ai-server/src/services/raw_posts/scheduler.py index ab335ebe..1e05520f 100644 --- a/packages/ai-server/src/services/raw_posts/scheduler.py +++ b/packages/ai-server/src/services/raw_posts/scheduler.py @@ -31,6 +31,7 @@ from apscheduler.triggers.interval import IntervalTrigger from src.managers.storage.r2_client import R2Client +from src.services import cost_tracking from .discovery.composite_filter import CompositeFilter, CompositeFilterError from .models import FetchRequest @@ -968,6 +969,13 @@ async def step_cb(step: str, ok: bool, note: Optional[str]) -> None: # 가 items / url_search / subject 단계 우회. prelabeled = from_platform_metadata(target.platform_metadata) + # cost_tracking 컨텍스트 — 이후 처리 안 Gemini 호출이 raw_post_id 와 + # pipeline 라벨을 자동 적립. + cost_tracking.set_context( + raw_post_id=str(target.raw_post_id), + pipeline="raw_post", + ) + try: processed = await self._processor.process( composite_bytes=composite_bytes, diff --git a/packages/api-server/src/domains/admin/gemini_cost.rs b/packages/api-server/src/domains/admin/gemini_cost.rs new file mode 100644 index 00000000..5e8e022e --- /dev/null +++ b/packages/api-server/src/domains/admin/gemini_cost.rs @@ -0,0 +1,526 @@ +//! Admin — Gemini API cost tracking dashboard endpoints. +//! +//! 데이터 SOT: `public.gemini_usage_events` + `public.gemini_spend_daily` (view). +//! 단가 SOT: `public.gemini_pricing` (SCD-2). ai-server 의 cost_tracking 이 +//! 호출 시점에 적립한다. + +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + routing::{delete, get}, + Json, Router, +}; +use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::{ + config::{AppConfig, AppState}, + error::{AppError, AppResult}, + middleware::auth::User, +}; + +// === Query / Response types === + +#[derive(Debug, Deserialize)] +pub struct DaysQuery { + /// 조회 기간 (일). 1..=90. 기본 7. + #[serde(default)] + pub days: Option, +} + +#[derive(Debug, Deserialize)] +pub struct TopRawPostsQuery { + #[serde(default)] + pub days: Option, + #[serde(default)] + pub limit: Option, +} + +#[derive(Debug, Serialize)] +pub struct DailySpendRow { + pub day: String, // YYYY-MM-DD + pub spend_usd: f64, + pub ok_calls: i64, + pub failed_calls: i64, + pub prompt_tokens: i64, + pub completion_tokens: i64, + pub images: i64, + pub groundings: i64, +} + +#[derive(Debug, Serialize)] +pub struct DailySpendResponse { + pub days: Vec, + pub total_usd: f64, +} + +#[derive(Debug, Serialize)] +pub struct GroupSpendRow { + pub key: String, + pub spend_usd: f64, + pub ok_calls: i64, + pub failed_calls: i64, +} + +#[derive(Debug, Serialize)] +pub struct GroupSpendResponse { + pub rows: Vec, + pub total_usd: f64, +} + +#[derive(Debug, Serialize)] +pub struct TodaySpendResponse { + pub today_usd: f64, + pub today_calls: i64, + pub today_failed: i64, + pub yesterday_usd: f64, + pub last_7d_usd: f64, + pub last_30d_usd: f64, +} + +#[derive(Debug, Serialize)] +pub struct TopRawPostRow { + pub raw_post_id: String, + pub spend_usd: f64, + pub call_count: i64, +} + +#[derive(Debug, Serialize)] +pub struct TopRawPostsResponse { + pub rows: Vec, +} + +#[derive(Debug, Serialize)] +pub struct PricingRow { + pub id: String, + pub model: String, + pub unit_kind: String, + pub tier: String, + pub usd_per_unit: f64, + pub effective_from: String, + pub effective_to: Option, + pub source: String, + pub notes: Option, +} + +#[derive(Debug, Serialize)] +pub struct PricingListResponse { + pub active: Vec, + pub history: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct UpsertPricingRequest { + pub model: String, + pub unit_kind: String, + pub usd_per_unit: f64, + #[serde(default)] + pub tier: Option, + #[serde(default)] + pub notes: Option, +} + +// === Handlers === + +fn clamp_days(days: Option, default: i64) -> i64 { + days.unwrap_or(default).clamp(1, 90) +} + +pub async fn get_daily_spend( + State(state): State, + _user: axum::Extension, + Query(q): Query, +) -> AppResult> { + let days = clamp_days(q.days, 7); + let db = state.assets_db.as_ref(); + let stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT date_trunc('day', occurred_at)::date::text AS day, + COALESCE(SUM(est_cost_usd), 0)::float8 AS spend_usd, + COUNT(*) FILTER (WHERE ok) AS ok_calls, + COUNT(*) FILTER (WHERE NOT ok) AS failed_calls, + COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens, + COALESCE(SUM(completion_tokens), 0) AS completion_tokens, + COALESCE(SUM(image_output_count), 0) AS images, + COALESCE(SUM(grounding_queries), 0) AS groundings + FROM public.gemini_usage_events + WHERE occurred_at >= now() - ($1::int * interval '1 day') + GROUP BY 1 + ORDER BY 1 DESC", + vec![(days as i32).into()], + ); + let rows = db.query_all(stmt).await.map_err(AppError::DatabaseError)?; + let mut days_out = Vec::with_capacity(rows.len()); + let mut total = 0.0f64; + for r in rows { + let row = DailySpendRow { + day: r.try_get("", "day").map_err(AppError::DatabaseError)?, + spend_usd: r + .try_get("", "spend_usd") + .map_err(AppError::DatabaseError)?, + ok_calls: r.try_get("", "ok_calls").map_err(AppError::DatabaseError)?, + failed_calls: r + .try_get("", "failed_calls") + .map_err(AppError::DatabaseError)?, + prompt_tokens: r + .try_get("", "prompt_tokens") + .map_err(AppError::DatabaseError)?, + completion_tokens: r + .try_get("", "completion_tokens") + .map_err(AppError::DatabaseError)?, + images: r.try_get("", "images").map_err(AppError::DatabaseError)?, + groundings: r + .try_get("", "groundings") + .map_err(AppError::DatabaseError)?, + }; + total += row.spend_usd; + days_out.push(row); + } + Ok(Json(DailySpendResponse { + days: days_out, + total_usd: total, + })) +} + +async fn group_spend( + db: &DatabaseConnection, + group_col: &str, + days: i64, +) -> AppResult { + // group_col 은 whitelist 만 — SQL injection 회피. + let sql = format!( + "SELECT {gc}::text AS key, + COALESCE(SUM(est_cost_usd), 0)::float8 AS spend_usd, + COUNT(*) FILTER (WHERE ok) AS ok_calls, + COUNT(*) FILTER (WHERE NOT ok) AS failed_calls + FROM public.gemini_usage_events + WHERE occurred_at >= now() - ($1::int * interval '1 day') + GROUP BY 1 + ORDER BY spend_usd DESC", + gc = group_col, + ); + let stmt = + Statement::from_sql_and_values(DatabaseBackend::Postgres, &sql, vec![(days as i32).into()]); + let rows = db.query_all(stmt).await.map_err(AppError::DatabaseError)?; + let mut out = Vec::with_capacity(rows.len()); + let mut total = 0.0f64; + for r in rows { + let row = GroupSpendRow { + key: r.try_get("", "key").map_err(AppError::DatabaseError)?, + spend_usd: r + .try_get("", "spend_usd") + .map_err(AppError::DatabaseError)?, + ok_calls: r.try_get("", "ok_calls").map_err(AppError::DatabaseError)?, + failed_calls: r + .try_get("", "failed_calls") + .map_err(AppError::DatabaseError)?, + }; + total += row.spend_usd; + out.push(row); + } + Ok(GroupSpendResponse { + rows: out, + total_usd: total, + }) +} + +pub async fn get_spend_by_step( + State(state): State, + _user: axum::Extension, + Query(q): Query, +) -> AppResult> { + let days = clamp_days(q.days, 7); + Ok(Json( + group_spend(state.assets_db.as_ref(), "step", days).await?, + )) +} + +pub async fn get_spend_by_model( + State(state): State, + _user: axum::Extension, + Query(q): Query, +) -> AppResult> { + let days = clamp_days(q.days, 7); + Ok(Json( + group_spend(state.assets_db.as_ref(), "model", days).await?, + )) +} + +pub async fn get_spend_by_pipeline( + State(state): State, + _user: axum::Extension, + Query(q): Query, +) -> AppResult> { + let days = clamp_days(q.days, 7); + Ok(Json( + group_spend(state.assets_db.as_ref(), "pipeline", days).await?, + )) +} + +pub async fn get_today_spend( + State(state): State, + _user: axum::Extension, +) -> AppResult> { + let db = state.assets_db.as_ref(); + let stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT + COALESCE(SUM(est_cost_usd) FILTER ( + WHERE occurred_at >= date_trunc('day', now())), 0)::float8 AS today_usd, + COUNT(*) FILTER ( + WHERE occurred_at >= date_trunc('day', now()) AND ok) AS today_calls, + COUNT(*) FILTER ( + WHERE occurred_at >= date_trunc('day', now()) AND NOT ok) AS today_failed, + COALESCE(SUM(est_cost_usd) FILTER ( + WHERE occurred_at >= date_trunc('day', now()) - interval '1 day' + AND occurred_at < date_trunc('day', now())), 0)::float8 AS yesterday_usd, + COALESCE(SUM(est_cost_usd) FILTER ( + WHERE occurred_at >= now() - interval '7 days'), 0)::float8 AS last_7d_usd, + COALESCE(SUM(est_cost_usd) FILTER ( + WHERE occurred_at >= now() - interval '30 days'), 0)::float8 AS last_30d_usd + FROM public.gemini_usage_events", + Vec::::new(), + ); + let row = db + .query_one(stmt) + .await + .map_err(AppError::DatabaseError)? + .ok_or_else(|| AppError::internal("today spend row missing"))?; + Ok(Json(TodaySpendResponse { + today_usd: row + .try_get("", "today_usd") + .map_err(AppError::DatabaseError)?, + today_calls: row + .try_get("", "today_calls") + .map_err(AppError::DatabaseError)?, + today_failed: row + .try_get("", "today_failed") + .map_err(AppError::DatabaseError)?, + yesterday_usd: row + .try_get("", "yesterday_usd") + .map_err(AppError::DatabaseError)?, + last_7d_usd: row + .try_get("", "last_7d_usd") + .map_err(AppError::DatabaseError)?, + last_30d_usd: row + .try_get("", "last_30d_usd") + .map_err(AppError::DatabaseError)?, + })) +} + +pub async fn get_top_raw_posts( + State(state): State, + _user: axum::Extension, + Query(q): Query, +) -> AppResult> { + let days = clamp_days(q.days, 7); + let limit = q.limit.unwrap_or(10).clamp(1, 100); + let db = state.assets_db.as_ref(); + let stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT raw_post_id::text AS raw_post_id, + COALESCE(SUM(est_cost_usd), 0)::float8 AS spend_usd, + COUNT(*) AS call_count + FROM public.gemini_usage_events + WHERE raw_post_id IS NOT NULL + AND occurred_at >= now() - ($1::int * interval '1 day') + GROUP BY raw_post_id + ORDER BY spend_usd DESC + LIMIT $2", + vec![(days as i32).into(), (limit as i64).into()], + ); + let rows = db.query_all(stmt).await.map_err(AppError::DatabaseError)?; + let mut out = Vec::with_capacity(rows.len()); + for r in rows { + out.push(TopRawPostRow { + raw_post_id: r + .try_get("", "raw_post_id") + .map_err(AppError::DatabaseError)?, + spend_usd: r + .try_get("", "spend_usd") + .map_err(AppError::DatabaseError)?, + call_count: r + .try_get("", "call_count") + .map_err(AppError::DatabaseError)?, + }); + } + Ok(Json(TopRawPostsResponse { rows: out })) +} + +// === Pricing CRUD === + +fn row_to_pricing(r: sea_orm::QueryResult) -> AppResult { + let id: Uuid = r.try_get("", "id").map_err(AppError::DatabaseError)?; + let effective_from: chrono::DateTime = r + .try_get("", "effective_from") + .map_err(AppError::DatabaseError)?; + let effective_to: Option> = r + .try_get("", "effective_to") + .map_err(AppError::DatabaseError)?; + Ok(PricingRow { + id: id.to_string(), + model: r.try_get("", "model").map_err(AppError::DatabaseError)?, + unit_kind: r + .try_get("", "unit_kind") + .map_err(AppError::DatabaseError)?, + tier: r.try_get("", "tier").map_err(AppError::DatabaseError)?, + usd_per_unit: r + .try_get("", "usd_per_unit") + .map_err(AppError::DatabaseError)?, + effective_from: effective_from.to_rfc3339(), + effective_to: effective_to.map(|t| t.to_rfc3339()), + source: r.try_get("", "source").map_err(AppError::DatabaseError)?, + notes: r.try_get("", "notes").map_err(AppError::DatabaseError)?, + }) +} + +pub async fn list_pricing( + State(state): State, + _user: axum::Extension, +) -> AppResult> { + let db = state.assets_db.as_ref(); + let stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT id, model, unit_kind, tier, usd_per_unit::float8 AS usd_per_unit, + effective_from, effective_to, source, notes + FROM public.gemini_pricing + ORDER BY model, unit_kind, tier, effective_from DESC", + Vec::::new(), + ); + let rows = db.query_all(stmt).await.map_err(AppError::DatabaseError)?; + let mut active = Vec::new(); + let mut history = Vec::new(); + for r in rows { + let p = row_to_pricing(r)?; + if p.effective_to.is_none() { + active.push(p); + } else { + history.push(p); + } + } + Ok(Json(PricingListResponse { active, history })) +} + +pub async fn upsert_pricing( + State(state): State, + _user: axum::Extension, + Json(body): Json, +) -> AppResult> { + if body.usd_per_unit < 0.0 { + return Err(AppError::bad_request("usd_per_unit must be >= 0")); + } + let allowed_units = [ + "input_token", + "output_token", + "cached_input_token", + "image_output", + "grounding_query", + ]; + if !allowed_units.contains(&body.unit_kind.as_str()) { + return Err(AppError::bad_request( + "unit_kind must be one of: input_token, output_token, cached_input_token, image_output, grounding_query", + )); + } + let tier = body.tier.unwrap_or_else(|| "default".to_string()); + let notes = body.notes; + + let db = state.assets_db.as_ref(); + // Two-step: close existing active row, then insert new active row. + // SeaORM `query_one` can run inside a Statement — but for transaction we + // use raw SQL fenced by BEGIN/COMMIT via `db.execute_unprepared`. + let txn_sql = " + BEGIN; + UPDATE public.gemini_pricing + SET effective_to = now(), updated_at = now() + WHERE model = $1 AND unit_kind = $2 AND tier = $3 AND effective_to IS NULL; + INSERT INTO public.gemini_pricing(model, unit_kind, tier, usd_per_unit, source, notes) + VALUES ($1, $2, $3, $4, 'manual', $5) + RETURNING id, model, unit_kind, tier, usd_per_unit::float8 AS usd_per_unit, + effective_from, effective_to, source, notes; + COMMIT; + "; + // sea_orm 의 Statement 는 single-statement 라 트랜잭션을 묶기 어렵다 — SQL 분할 실행. + let txn = db.begin().await.map_err(AppError::DatabaseError)?; + let close_stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "UPDATE public.gemini_pricing + SET effective_to = now(), updated_at = now() + WHERE model = $1 AND unit_kind = $2 AND tier = $3 AND effective_to IS NULL", + vec![ + body.model.clone().into(), + body.unit_kind.clone().into(), + tier.clone().into(), + ], + ); + txn.execute(close_stmt) + .await + .map_err(AppError::DatabaseError)?; + + let insert_stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "INSERT INTO public.gemini_pricing(model, unit_kind, tier, usd_per_unit, source, notes) + VALUES ($1, $2, $3, $4, 'manual', $5) + RETURNING id, model, unit_kind, tier, usd_per_unit::float8 AS usd_per_unit, + effective_from, effective_to, source, notes", + vec![ + body.model.into(), + body.unit_kind.into(), + tier.into(), + body.usd_per_unit.into(), + notes + .clone() + .map(|n| n.into()) + .unwrap_or(sea_orm::Value::String(None)), + ], + ); + let new_row = txn + .query_one(insert_stmt) + .await + .map_err(AppError::DatabaseError)? + .ok_or_else(|| AppError::internal("pricing insert returned no row"))?; + let pricing = row_to_pricing(new_row)?; + txn.commit().await.map_err(AppError::DatabaseError)?; + + let _ = txn_sql; // doc reference + Ok(Json(pricing)) +} + +pub async fn retire_pricing( + State(state): State, + _user: axum::Extension, + Path(id): Path, +) -> AppResult { + let db = state.assets_db.as_ref(); + let stmt = Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "UPDATE public.gemini_pricing + SET effective_to = now(), updated_at = now() + WHERE id = $1 AND effective_to IS NULL", + vec![id.into()], + ); + db.execute(stmt).await.map_err(AppError::DatabaseError)?; + Ok(StatusCode::NO_CONTENT) +} + +// === Router === + +pub fn router(state: AppState, app_config: AppConfig) -> Router { + Router::new() + .route("/spend/daily", get(get_daily_spend)) + .route("/spend/by-step", get(get_spend_by_step)) + .route("/spend/by-model", get(get_spend_by_model)) + .route("/spend/by-pipeline", get(get_spend_by_pipeline)) + .route("/spend/today", get(get_today_spend)) + .route("/spend/top-raw-posts", get(get_top_raw_posts)) + .route("/pricing", get(list_pricing).post(upsert_pricing)) + .route("/pricing/{id}", delete(retire_pricing)) + .layer(axum::middleware::from_fn_with_state( + state, + crate::middleware::admin_db_middleware, + )) + .layer(axum::middleware::from_fn_with_state( + app_config, + crate::middleware::auth_middleware, + )) +} diff --git a/packages/api-server/src/domains/admin/handlers.rs b/packages/api-server/src/domains/admin/handlers.rs index 50a4e9fb..e1eecb84 100644 --- a/packages/api-server/src/domains/admin/handlers.rs +++ b/packages/api-server/src/domains/admin/handlers.rs @@ -9,7 +9,8 @@ use crate::{app_state::AppState, config::AppConfig}; use super::{ badges, categories, curations, dashboard, editorial_article_chat, editorial_articles, editorial_candidates, editorial_discovery_settings, editorial_pipeline_settings, - editorial_recommendations, magazine_sessions, monitoring, posts, solutions, spots, synonyms, + editorial_recommendations, gemini_cost, magazine_sessions, monitoring, posts, solutions, spots, + synonyms, }; use crate::domains::reports; @@ -53,6 +54,10 @@ pub fn router(state: AppState, app_config: AppConfig) -> Router { "/dashboard", dashboard::router(state.clone(), app_config.clone()), ) + .nest( + "/gemini-cost", + gemini_cost::router(state.clone(), app_config.clone()), + ) .nest("/badges", badges::router(app_config.clone())) .nest("/reports", reports::admin_router(app_config.clone())) .nest( diff --git a/packages/api-server/src/domains/admin/mod.rs b/packages/api-server/src/domains/admin/mod.rs index f79112df..4b124107 100644 --- a/packages/api-server/src/domains/admin/mod.rs +++ b/packages/api-server/src/domains/admin/mod.rs @@ -13,6 +13,7 @@ pub mod editorial_candidates; pub mod editorial_discovery_settings; pub mod editorial_pipeline_settings; pub mod editorial_recommendations; +pub mod gemini_cost; pub mod magazine_sessions; pub mod monitoring; pub mod posts; diff --git a/packages/web/app/admin/gemini-cost/page.tsx b/packages/web/app/admin/gemini-cost/page.tsx new file mode 100644 index 00000000..c9283f1d --- /dev/null +++ b/packages/web/app/admin/gemini-cost/page.tsx @@ -0,0 +1,475 @@ +"use client"; + +/** + * /admin/gemini-cost — Gemini API 비용 대시보드 (#cost-tracking). + * + * 데이터 SOT: `public.gemini_usage_events` + `public.gemini_pricing` (assets DB). + * ai-server cost_tracking 모듈이 호출 시점에 fire-and-forget 으로 적립. + */ + +import { useState } from "react"; +import { + ResponsiveContainer, + AreaChart, + Area, + BarChart, + Bar, + XAxis, + YAxis, + CartesianGrid, + Tooltip, +} from "recharts"; + +import { + useGeminiDailySpend, + useGeminiSpendByStep, + useGeminiSpendByModel, + useGeminiSpendByPipeline, + useGeminiTodaySpend, + useGeminiTopRawPosts, + useGeminiPricing, + useUpsertGeminiPricing, + useRetireGeminiPricing, + type PricingRow, +} from "@/lib/hooks/admin/useGeminiCost"; + +const PERIOD_OPTIONS = [ + { label: "7D", value: 7 }, + { label: "14D", value: 14 }, + { label: "30D", value: 30 }, +]; + +const UNIT_KINDS = [ + "input_token", + "output_token", + "cached_input_token", + "image_output", + "grounding_query", +]; + +function fmtUsd(v: number | undefined): string { + if (v == null) return "—"; + if (v < 0.01) return `$${v.toFixed(5)}`; + if (v < 10) return `$${v.toFixed(3)}`; + return `$${v.toFixed(2)}`; +} + +function fmtInt(v: number | undefined): string { + if (v == null) return "—"; + return v.toLocaleString(); +} + +// ─── KPI Row ────────────────────────────────────────────────────────────────── + +function KpiRow() { + const { data } = useGeminiTodaySpend(); + const items = [ + { + label: "Today", + value: fmtUsd(data?.today_usd), + sub: `${fmtInt(data?.today_calls)} calls / ${fmtInt(data?.today_failed)} failed`, + }, + { label: "Yesterday", value: fmtUsd(data?.yesterday_usd), sub: "" }, + { label: "Last 7d", value: fmtUsd(data?.last_7d_usd), sub: "" }, + { label: "Last 30d", value: fmtUsd(data?.last_30d_usd), sub: "" }, + ]; + return ( +
+ {items.map((it) => ( +
+
+ {it.label} +
+
+ {it.value} +
+ {it.sub && ( +
{it.sub}
+ )} +
+ ))} +
+ ); +} + +// ─── Daily Spend Chart ──────────────────────────────────────────────────────── + +function DailyChart({ days }: { days: number }) { + const { data, isLoading } = useGeminiDailySpend(days); + const rows = (data?.days ?? []).slice().reverse(); // oldest → newest + + return ( +
+
+

Daily Spend

+
+ Total {days}d:{" "} + {fmtUsd(data?.total_usd)} +
+
+
+ {isLoading ? ( +
+ Loading… +
+ ) : rows.length === 0 ? ( +
+ No data yet +
+ ) : ( + + + + + + fmtUsd(typeof v === "number" ? v : Number(v))} + /> + + + + )} +
+
+ ); +} + +// ─── Breakdown bar charts ───────────────────────────────────────────────────── + +function BreakdownCard({ + title, + data, +}: { + title: string; + data: + | { + key: string; + spend_usd: number; + ok_calls: number; + failed_calls: number; + }[] + | undefined; +}) { + const rows = (data ?? []).slice(0, 10); + return ( +
+

{title}

+ {rows.length === 0 ? ( +
+ No data +
+ ) : ( +
+ + + + + + fmtUsd(typeof v === "number" ? v : Number(v))} + /> + + + +
+ )} +
+ ); +} + +// ─── Top raw_posts ──────────────────────────────────────────────────────────── + +function TopRawPostsCard({ days }: { days: number }) { + const { data } = useGeminiTopRawPosts(days, 10); + const rows = data?.rows ?? []; + return ( +
+

+ Top expensive raw_posts ({days}d) +

+ {rows.length === 0 ? ( +
No data
+ ) : ( + + + + + + + + + + {rows.map((r) => ( + + + + + + ))} + +
raw_post_idCallsSpend
+ + {r.raw_post_id.slice(0, 8)}… + + + {fmtInt(r.call_count)} + + {fmtUsd(r.spend_usd)} +
+ )} +
+ ); +} + +// ─── Pricing Editor ─────────────────────────────────────────────────────────── + +function PricingEditor() { + const { data } = useGeminiPricing(); + const upsert = useUpsertGeminiPricing(); + const retire = useRetireGeminiPricing(); + const [showHistory, setShowHistory] = useState(false); + + // 신규 row 추가 입력 상태 + const [model, setModel] = useState(""); + const [unitKind, setUnitKind] = useState("input_token"); + const [rate, setRate] = useState(""); + const [notes, setNotes] = useState(""); + + const handleAdd = async () => { + const value = parseFloat(rate); + if (!model.trim() || !Number.isFinite(value)) { + return; + } + await upsert.mutateAsync({ + model: model.trim(), + unit_kind: unitKind, + usd_per_unit: value, + notes: notes.trim() || null, + }); + setModel(""); + setRate(""); + setNotes(""); + }; + + const renderRow = (p: PricingRow, isActive: boolean) => ( + + {p.model} + {p.unit_kind} + + {p.usd_per_unit < 0.001 + ? `$${p.usd_per_unit.toExponential(3)}` + : `$${p.usd_per_unit.toFixed(6)}`} + + + {new Date(p.effective_from).toLocaleDateString()} →{" "} + {p.effective_to + ? new Date(p.effective_to).toLocaleDateString() + : "active"} + + {p.source} + + {isActive && ( + + )} + + + ); + + return ( +
+
+

+ Pricing (DB SOT) +

+ +
+ + + + + + + + + + + + + {(data?.active ?? []).map((p) => renderRow(p, true))} + {showHistory && (data?.history ?? []).map((p) => renderRow(p, false))} + +
ModelUnitUSD / unitEffectiveSource +
+ +
+ setModel(e.target.value)} + className="rounded border border-neutral-700 bg-neutral-900 px-2 py-1 text-xs text-neutral-100" + /> + + setRate(e.target.value)} + className="rounded border border-neutral-700 bg-neutral-900 px-2 py-1 text-xs font-mono text-neutral-100" + /> + setNotes(e.target.value)} + className="rounded border border-neutral-700 bg-neutral-900 px-2 py-1 text-xs text-neutral-100" + /> + +
+ {upsert.isError && ( +
+ {upsert.error?.message ?? "Failed"} +
+ )} +
+ ); +} + +// ─── Page ───────────────────────────────────────────────────────────────────── + +export default function GeminiCostPage() { + const [days, setDays] = useState(7); + const byStep = useGeminiSpendByStep(days); + const byModel = useGeminiSpendByModel(days); + const byPipeline = useGeminiSpendByPipeline(days); + + return ( +
+
+
+

+ Gemini Cost +

+

+ Per-call cost tracking. Pricing SOT is the DB table below — no + hardcoding in code. +

+
+
+ {PERIOD_OPTIONS.map((opt) => ( + + ))} +
+
+ + + + + +
+ + + +
+ + + + +
+ ); +} diff --git a/packages/web/app/admin/page.tsx b/packages/web/app/admin/page.tsx index 359cdee9..8f340341 100644 --- a/packages/web/app/admin/page.tsx +++ b/packages/web/app/admin/page.tsx @@ -19,6 +19,7 @@ import { TodaySummarySkeleton, } from "@/lib/components/admin/dashboard/TodaySummary"; import { PipelineHealthCard } from "@/lib/components/admin/dashboard/PipelineHealthCard"; +import { GeminiCostMini } from "@/lib/components/admin/dashboard/GeminiCostMini"; /** * Admin Dashboard Page @@ -56,6 +57,9 @@ export default function AdminDashboardPage() { {/* Pipeline 헬스 — IN_PROGRESS > 0 시 라이브 폴링 (#361) */} + {/* Gemini API cost mini (#cost-tracking) */} + + {/* Traffic Chart */} {chartQuery.isLoading ? ( diff --git a/packages/web/app/api/admin/gemini-cost/[...path]/route.ts b/packages/web/app/api/admin/gemini-cost/[...path]/route.ts new file mode 100644 index 00000000..ccd85cf8 --- /dev/null +++ b/packages/web/app/api/admin/gemini-cost/[...path]/route.ts @@ -0,0 +1,79 @@ +import { NextRequest, NextResponse } from "next/server"; +import { createSupabaseServerClient } from "@/lib/supabase/server"; +import { checkIsAdmin } from "@/lib/supabase/admin"; +import { API_BASE_URL } from "@/lib/server-env"; + +/** + * Catch-all proxy: /api/v1/admin/gemini-cost/* → api-server. + * + * Supports GET / POST / DELETE. Auth check + bearer forward. + */ +async function proxy( + request: NextRequest, + context: { params: Promise<{ path: string[] }> } +): Promise { + const supabase = await createSupabaseServerClient(); + const { + data: { user }, + } = await supabase.auth.getUser(); + if (!user) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + const isAdmin = await checkIsAdmin(supabase, user.id); + if (!isAdmin) { + return NextResponse.json({ error: "Forbidden" }, { status: 403 }); + } + const { + data: { session }, + } = await supabase.auth.getSession(); + if (!session?.access_token) { + return NextResponse.json({ error: "No session" }, { status: 401 }); + } + + const { path } = await context.params; + const subpath = (path || []).join("/"); + const qs = request.nextUrl.searchParams.toString(); + const url = `${API_BASE_URL}/api/v1/admin/gemini-cost/${subpath}${ + qs ? `?${qs}` : "" + }`; + + const init: RequestInit = { + method: request.method, + headers: { + Authorization: `Bearer ${session.access_token}`, + }, + }; + if (request.method === "POST" || request.method === "PATCH") { + const body = await request.text(); + init.body = body; + (init.headers as Record)["Content-Type"] = + request.headers.get("content-type") || "application/json"; + } + + try { + const response = await fetch(url, init); + const text = await response.text(); + if (!text) { + return new NextResponse(null, { status: response.status }); + } + let data: unknown; + try { + data = JSON.parse(text); + } catch { + data = { message: text }; + } + return NextResponse.json(data, { status: response.status }); + } catch (error) { + if (process.env.NODE_ENV === "development") { + console.error("gemini-cost proxy error:", error); + } + return NextResponse.json( + { message: error instanceof Error ? error.message : "Proxy error" }, + { status: 502 } + ); + } +} + +export const GET = proxy; +export const POST = proxy; +export const DELETE = proxy; diff --git a/packages/web/lib/components/admin/AdminSidebar.tsx b/packages/web/lib/components/admin/AdminSidebar.tsx index 66876b76..de7b8c4b 100644 --- a/packages/web/lib/components/admin/AdminSidebar.tsx +++ b/packages/web/lib/components/admin/AdminSidebar.tsx @@ -15,6 +15,7 @@ import { Tag, UsersRound, Link2, + DollarSign, } from "lucide-react"; import { cn } from "@/lib/utils"; import { useAuthStore } from "@/lib/stores/authStore"; @@ -62,6 +63,12 @@ const SIDEBAR_ENTRIES: SidebarEntry[] = [ }, ], }, + { + label: "Observability", + items: [ + { href: "/admin/gemini-cost", label: "Gemini Cost", icon: DollarSign }, + ], + }, ]; interface AdminSidebarProps { diff --git a/packages/web/lib/components/admin/dashboard/GeminiCostMini.tsx b/packages/web/lib/components/admin/dashboard/GeminiCostMini.tsx new file mode 100644 index 00000000..25caeb4a --- /dev/null +++ b/packages/web/lib/components/admin/dashboard/GeminiCostMini.tsx @@ -0,0 +1,70 @@ +"use client"; + +/** + * Compact today-Gemini-cost card for the /admin home (#cost-tracking). + * Full breakdown lives at /admin/gemini-cost. + */ + +import Link from "next/link"; +import { useGeminiTodaySpend } from "@/lib/hooks/admin/useGeminiCost"; + +function fmtUsd(v: number | undefined): string { + if (v == null) return "—"; + if (v < 0.01) return `$${v.toFixed(5)}`; + if (v < 10) return `$${v.toFixed(3)}`; + return `$${v.toFixed(2)}`; +} + +export function GeminiCostMini() { + const { data, isLoading } = useGeminiTodaySpend(); + + return ( + +
+

Gemini Cost

+ view → +
+ {isLoading ? ( +
+ ) : ( +
+
+
+ Today +
+
+ {fmtUsd(data?.today_usd)} +
+
+
+
+ Yesterday +
+
+ {fmtUsd(data?.yesterday_usd)} +
+
+
+
+ 7d +
+
+ {fmtUsd(data?.last_7d_usd)} +
+
+
+
+ 30d +
+
+ {fmtUsd(data?.last_30d_usd)} +
+
+
+ )} + + ); +} diff --git a/packages/web/lib/hooks/admin/useGeminiCost.ts b/packages/web/lib/hooks/admin/useGeminiCost.ts new file mode 100644 index 00000000..2e9ce085 --- /dev/null +++ b/packages/web/lib/hooks/admin/useGeminiCost.ts @@ -0,0 +1,235 @@ +"use client"; + +/** + * Gemini cost tracking dashboard (#cost-tracking). + * + * GET /api/admin/gemini-cost/spend/daily?days= + * GET /api/admin/gemini-cost/spend/by-step?days= + * GET /api/admin/gemini-cost/spend/by-model?days= + * GET /api/admin/gemini-cost/spend/by-pipeline?days= + * GET /api/admin/gemini-cost/spend/today + * GET /api/admin/gemini-cost/spend/top-raw-posts?days=&limit= + * GET /api/admin/gemini-cost/pricing + * POST /api/admin/gemini-cost/pricing + * DEL /api/admin/gemini-cost/pricing/:id + */ + +import { + useMutation, + useQuery, + useQueryClient, + type UseMutationResult, + type UseQueryResult, +} from "@tanstack/react-query"; + +const BASE = "/api/admin/gemini-cost"; + +// ─── Types ────────────────────────────────────────────────────────────────── + +export interface DailySpendRow { + day: string; + spend_usd: number; + ok_calls: number; + failed_calls: number; + prompt_tokens: number; + completion_tokens: number; + images: number; + groundings: number; +} + +export interface DailySpendResponse { + days: DailySpendRow[]; + total_usd: number; +} + +export interface GroupSpendRow { + key: string; + spend_usd: number; + ok_calls: number; + failed_calls: number; +} + +export interface GroupSpendResponse { + rows: GroupSpendRow[]; + total_usd: number; +} + +export interface TodaySpendResponse { + today_usd: number; + today_calls: number; + today_failed: number; + yesterday_usd: number; + last_7d_usd: number; + last_30d_usd: number; +} + +export interface TopRawPostRow { + raw_post_id: string; + spend_usd: number; + call_count: number; +} + +export interface TopRawPostsResponse { + rows: TopRawPostRow[]; +} + +export interface PricingRow { + id: string; + model: string; + unit_kind: string; + tier: string; + usd_per_unit: number; + effective_from: string; + effective_to: string | null; + source: string; + notes: string | null; +} + +export interface PricingListResponse { + active: PricingRow[]; + history: PricingRow[]; +} + +export interface UpsertPricingRequest { + model: string; + unit_kind: string; + usd_per_unit: number; + tier?: string; + notes?: string | null; +} + +// ─── Fetch helper ──────────────────────────────────────────────────────────── + +async function getJSON(path: string, signal?: AbortSignal): Promise { + const res = await fetch(`${BASE}${path}`, { signal }); + if (!res.ok) { + throw new Error(`gemini-cost: ${path} → ${res.status}`); + } + return (await res.json()) as T; +} + +// ─── Queries ───────────────────────────────────────────────────────────────── + +export function useGeminiDailySpend( + days: number +): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "daily", days], + queryFn: ({ signal }) => + getJSON(`/spend/daily?days=${days}`, signal), + refetchInterval: 60_000, + staleTime: 30_000, + }); +} + +export function useGeminiSpendByStep( + days: number +): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "by-step", days], + queryFn: ({ signal }) => + getJSON(`/spend/by-step?days=${days}`, signal), + staleTime: 30_000, + }); +} + +export function useGeminiSpendByModel( + days: number +): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "by-model", days], + queryFn: ({ signal }) => + getJSON(`/spend/by-model?days=${days}`, signal), + staleTime: 30_000, + }); +} + +export function useGeminiSpendByPipeline( + days: number +): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "by-pipeline", days], + queryFn: ({ signal }) => + getJSON(`/spend/by-pipeline?days=${days}`, signal), + staleTime: 30_000, + }); +} + +export function useGeminiTodaySpend(): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "today"], + queryFn: ({ signal }) => + getJSON(`/spend/today`, signal), + refetchInterval: 60_000, + staleTime: 30_000, + }); +} + +export function useGeminiTopRawPosts( + days: number, + limit = 10 +): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "top-raw-posts", days, limit], + queryFn: ({ signal }) => + getJSON( + `/spend/top-raw-posts?days=${days}&limit=${limit}`, + signal + ), + staleTime: 60_000, + }); +} + +export function useGeminiPricing(): UseQueryResult { + return useQuery({ + queryKey: ["admin", "gemini-cost", "pricing"], + queryFn: ({ signal }) => getJSON(`/pricing`, signal), + staleTime: 60_000, + }); +} + +// ─── Mutations ─────────────────────────────────────────────────────────────── + +export function useUpsertGeminiPricing(): UseMutationResult< + PricingRow, + Error, + UpsertPricingRequest +> { + const qc = useQueryClient(); + return useMutation({ + mutationFn: async (body) => { + const res = await fetch(`${BASE}/pricing`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + }); + if (!res.ok) { + const errBody = await res.text().catch(() => ""); + throw new Error(`upsert pricing: ${res.status} ${errBody}`.trim()); + } + return (await res.json()) as PricingRow; + }, + onSuccess: () => { + qc.invalidateQueries({ queryKey: ["admin", "gemini-cost", "pricing"] }); + }, + }); +} + +export function useRetireGeminiPricing(): UseMutationResult< + void, + Error, + string +> { + const qc = useQueryClient(); + return useMutation({ + mutationFn: async (id) => { + const res = await fetch(`${BASE}/pricing/${id}`, { method: "DELETE" }); + if (!res.ok && res.status !== 204) { + throw new Error(`retire pricing: ${res.status}`); + } + }, + onSuccess: () => { + qc.invalidateQueries({ queryKey: ["admin", "gemini-cost", "pricing"] }); + }, + }); +} diff --git a/supabase-assets/migrations/20260514140000_gemini_cost_tracking.sql b/supabase-assets/migrations/20260514140000_gemini_cost_tracking.sql new file mode 100644 index 00000000..023c7c64 --- /dev/null +++ b/supabase-assets/migrations/20260514140000_gemini_cost_tracking.sql @@ -0,0 +1,172 @@ +-- Gemini API 비용 추적 시스템 (#cost-tracking). +-- +-- 두 테이블: +-- public.gemini_pricing — 단가 SOT. SCD type 2 (effective_from/to). +-- 어드민이 UI 에서 update. source='manual' (default) +-- 또는 'catalog_sync' (future Vertex 이전 시). +-- public.gemini_usage_events — per-call 적립. raw_post / editorial 호출 모두. +-- fire-and-forget INSERT from ai-server. +-- ok=false 도 row 적립 (실패 가시화). +-- +-- 단가 변경 워크플로우 (admin UI): +-- 1. 기존 active row → UPDATE SET effective_to = now() -- 히스토리 close +-- 2. 새 row INSERT (effective_to NULL = active) +-- → 과거 호출은 pricing_snapshot 으로 호출 시점 단가 보존, 신규는 새 단가. +-- +-- View: +-- public.gemini_spend_daily — 일/step/model/pipeline 별 집계. 대시보드 read-side. +-- +-- 단가 seed 는 2026-05 ai.google.dev/pricing 기준. 추후 어드민이 update 가능. + +------------------------------------------------------------------------------ +-- 1. pricing SOT +------------------------------------------------------------------------------ + +CREATE TABLE IF NOT EXISTS public.gemini_pricing ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + model text NOT NULL, + -- input_token | output_token | cached_input_token | image_output | grounding_query + unit_kind text NOT NULL, + usd_per_unit numeric(18,10) NOT NULL CHECK (usd_per_unit >= 0), + tier text NOT NULL DEFAULT 'default', + effective_from timestamptz NOT NULL DEFAULT now(), + effective_to timestamptz, + source text NOT NULL DEFAULT 'manual', + notes text, + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT gemini_pricing_unit_kind_chk CHECK ( + unit_kind IN ('input_token','output_token','cached_input_token', + 'image_output','grounding_query') + ), + CONSTRAINT gemini_pricing_source_chk CHECK ( + source IN ('manual','catalog_sync') + ), + CONSTRAINT gemini_pricing_active_period_chk CHECK ( + effective_to IS NULL OR effective_to > effective_from + ), + UNIQUE (model, unit_kind, tier, effective_from) +); + +CREATE INDEX IF NOT EXISTS gemini_pricing_active_idx + ON public.gemini_pricing (model, unit_kind, tier) + WHERE effective_to IS NULL; + +COMMENT ON TABLE public.gemini_pricing IS + 'Gemini API 단가 SOT. SCD type 2 — admin UI 에서 update 시 기존 row close + 새 row insert.'; +COMMENT ON COLUMN public.gemini_pricing.unit_kind IS + 'input_token / output_token / cached_input_token (text models), image_output (flash-image, per image), grounding_query (googleSearch tool).'; +COMMENT ON COLUMN public.gemini_pricing.effective_to IS + 'NULL = active. 단가 변경 시 기존 row 의 이 컬럼을 now() 로 set.'; + +------------------------------------------------------------------------------ +-- 2. usage events (per Gemini call) +------------------------------------------------------------------------------ + +CREATE TABLE IF NOT EXISTS public.gemini_usage_events ( + id bigserial PRIMARY KEY, + occurred_at timestamptz NOT NULL DEFAULT now(), + raw_post_id uuid, + post_id uuid, + step text NOT NULL, + -- raw_post | post_editorial | editorial_article | ad_hoc + pipeline text NOT NULL, + model text NOT NULL, + ok boolean NOT NULL, + prompt_tokens integer, + completion_tokens integer, + cached_tokens integer, + image_output_count integer, + grounding_queries integer, + est_cost_usd numeric(14,8) NOT NULL DEFAULT 0, + pricing_snapshot jsonb, + error_class text, + latency_ms integer, + request_id text, + CONSTRAINT gemini_usage_pipeline_chk CHECK ( + pipeline IN ('raw_post','post_editorial','editorial_article','ad_hoc') + ) +); + +-- BRIN 으로 시간순 append 패턴 효율화 (millions 의 row 예상). step / model / +-- pipeline 별 일별 집계 쿼리는 covering composite 로. +CREATE INDEX IF NOT EXISTS gemini_usage_occurred_at_brin_idx + ON public.gemini_usage_events USING brin (occurred_at); +CREATE INDEX IF NOT EXISTS gemini_usage_raw_post_idx + ON public.gemini_usage_events (raw_post_id) + WHERE raw_post_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS gemini_usage_post_idx + ON public.gemini_usage_events (post_id) + WHERE post_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS gemini_usage_step_occurred_idx + ON public.gemini_usage_events (step, occurred_at); +CREATE INDEX IF NOT EXISTS gemini_usage_model_occurred_idx + ON public.gemini_usage_events (model, occurred_at); +CREATE INDEX IF NOT EXISTS gemini_usage_pipeline_occurred_idx + ON public.gemini_usage_events (pipeline, occurred_at); + +COMMENT ON TABLE public.gemini_usage_events IS + 'Per-Gemini-call usage log. ai-server fire-and-forget INSERT. 실패도 row 적립 (ok=false, error_class).'; +COMMENT ON COLUMN public.gemini_usage_events.pricing_snapshot IS + '호출 시점 단가 fingerprint. 단가 row 변경 후에도 과거 cost 재현/audit 가능.'; +COMMENT ON COLUMN public.gemini_usage_events.error_class IS + 'safety_block | http_5xx | quota | parse_error | no_pricing | timeout | null'; + +------------------------------------------------------------------------------ +-- 3. daily aggregation view +------------------------------------------------------------------------------ + +CREATE OR REPLACE VIEW public.gemini_spend_daily AS +SELECT + date_trunc('day', occurred_at) AS day, + step, + model, + pipeline, + count(*) FILTER (WHERE ok) AS ok_calls, + count(*) FILTER (WHERE NOT ok) AS failed_calls, + coalesce(sum(prompt_tokens), 0) AS prompt_tokens, + coalesce(sum(completion_tokens), 0) AS completion_tokens, + coalesce(sum(cached_tokens), 0) AS cached_tokens, + coalesce(sum(image_output_count), 0) AS images, + coalesce(sum(grounding_queries), 0) AS groundings, + sum(est_cost_usd) AS spend_usd +FROM public.gemini_usage_events +GROUP BY 1, 2, 3, 4; + +COMMENT ON VIEW public.gemini_spend_daily IS + 'Daily aggregation by step / model / pipeline. 대시보드 read-side.'; + +------------------------------------------------------------------------------ +-- 4. seed — 2026-05 ai.google.dev/pricing 기준 +------------------------------------------------------------------------------ + +-- gemini-2.5-pro (≤200k context) +INSERT INTO public.gemini_pricing (model, unit_kind, usd_per_unit, notes) VALUES + ('gemini-2.5-pro', 'input_token', 0.00000125, 'Pro ≤200k context'), + ('gemini-2.5-pro', 'output_token', 0.00001000, 'Pro output'), + ('gemini-2.5-pro', 'cached_input_token', 0.00000031, 'Pro cached input (~25%% of fresh)') +ON CONFLICT (model, unit_kind, tier, effective_from) DO NOTHING; + +-- gemini-2.5-flash +INSERT INTO public.gemini_pricing (model, unit_kind, usd_per_unit, notes) VALUES + ('gemini-2.5-flash', 'input_token', 0.00000030, 'Flash 2.5 input'), + ('gemini-2.5-flash', 'output_token', 0.00000250, 'Flash 2.5 output'), + ('gemini-2.5-flash', 'cached_input_token', 0.000000075, 'Flash cached input') +ON CONFLICT (model, unit_kind, tier, effective_from) DO NOTHING; + +-- gemini-2.5-flash-lite +INSERT INTO public.gemini_pricing (model, unit_kind, usd_per_unit, notes) VALUES + ('gemini-2.5-flash-lite', 'input_token', 0.000000075, 'Flash-Lite input'), + ('gemini-2.5-flash-lite', 'output_token', 0.00000030, 'Flash-Lite output'), + ('gemini-2.5-flash-lite', 'cached_input_token', 0.0000000187,'Flash-Lite cached input') +ON CONFLICT (model, unit_kind, tier, effective_from) DO NOTHING; + +-- gemini-2.5-flash-image (image generation) +INSERT INTO public.gemini_pricing (model, unit_kind, usd_per_unit, notes) VALUES + ('gemini-2.5-flash-image', 'image_output', 0.0390000000, 'per generated image (1024x1024)'), + ('gemini-2.5-flash-image', 'input_token', 0.00000030, 'image input still tokenized at flash rate') +ON CONFLICT (model, unit_kind, tier, effective_from) DO NOTHING; + +-- grounding query — model-agnostic (googleSearch tool) +INSERT INTO public.gemini_pricing (model, unit_kind, usd_per_unit, notes) VALUES + ('grounding', 'grounding_query', 0.0350000000, 'googleSearch tool: $35 / 1000 queries') +ON CONFLICT (model, unit_kind, tier, effective_from) DO NOTHING;