UN-2946 [MISC] Flush embedding usage records on indexing path#1962
Conversation
The deferred-batch usage refactor stopped flushing embedding-callback records: ``UsageHandler.on_event_end`` now appends to ``_pending_usage`` instead of pushing directly, but ``_handle_index`` (and ``_run_pipeline_index``) never drained the list. Embedding rows fell off ``Usage`` for every workflow / API-deployment run, so the API response's ``metadata.usage.embedding_tokens`` reported 0 despite indexing actually happening. - Add ``EmbeddingCompat.flush_pending_usage()`` mirroring the LLM shim. - ``_handle_index`` flushes embedding into ``ExecutionResult.metadata["usage_records"]`` on all success exits and attaches partial rows via ``LegacyExecutorError.partial_usage_records`` on the error path. - ``_run_pipeline_index`` now returns ``(metrics, records)`` so ``_handle_structure_pipeline`` can absorb embedding rows into ``pipeline_records``. Existing IDE-index path already absorbs via ``metadata["usage_records"]`` and starts working automatically. - Fix the ``_run_pipeline_index`` mock in test_phase5d to return the new tuple shape.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughEmbeddingCompat gains flush_pending_usage() to drain per-handler usage records. LegacyExecutor now collects and propagates per-output indexing usage: ChangesUsage Record Propagation Through Embedding and Indexing Pipeline
sequenceDiagram
participant HPL as _handle_structure_pipeline
participant RPI as _run_pipeline_index
participant HI as _handle_index
participant EC as EmbeddingCompat
HPL->>RPI: request indexing for outputs
RPI->>HI: call per-output _handle_index
activate HI
HI->>EC: create/use embedding adapter
EC->>EC: perform embedding/index operations
HI->>EC: call flush_pending_usage()
EC-->>HI: return usage_records
HI-->>RPI: return (metrics, metadata:{usage_records})
deactivate HI
RPI->>RPI: aggregate metrics and flatten usage_records into index_records
RPI-->>HPL: return (index_metrics, index_records)
HPL->>HPL: extend pipeline_records with index_records
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workers/executor/executors/legacy_executor.py (1)
889-993:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winPartial usage records can still be lost if a later iteration's
_handle_indexraises.
_run_pipeline_indexaccumulatesindex_recordsacross loop iterations but has notry/except. If_handle_indexraisesLegacyExecutorErroron iteration N, the accumulated records from iterations 1..N-1 are local-only and never make it ontoe.partial_usage_records. The outer handler in_handle_structure_pipeline(Lines 692-694) prependspipeline_records, which at that point only holds the extract/summarize records — not what this function has already collected.Given that this PR exists specifically to stop dropping embedding-usage rows, this edge case (multiple distinct param sets where a later one fails) is worth closing.
🛡️ Proposed fix
index_metrics: dict = {} index_records: list[dict] = [] seen_params: set = set() - for output in outputs: - chunk_size = output.get("chunk-size", 0) - ... - return index_metrics, index_records + try: + for output in outputs: + # ... existing loop body ... + except LegacyExecutorError as e: + e.partial_usage_records = index_records + e.partial_usage_records + raise + + return index_metrics, index_records🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/executor/executors/legacy_executor.py` around lines 889 - 993, Wrap the call to self._handle_index(index_ctx) in _run_pipeline_index with a try/except that catches LegacyExecutorError, and on exception attach the currently accumulated index_records to the exception (e.g. set e.partial_usage_records = index_records) before re-raising so previously collected usage rows aren't lost; reference the call site in _run_pipeline_index and the LegacyExecutorError class when making the change.
🧹 Nitpick comments (1)
unstract/sdk1/src/unstract/sdk1/embedding.py (1)
285-293: 💤 Low valueConsider per-handler exception handling for consistency.
_flush_per_prompt_metricsinlegacy_executor.pywraps each handler'sflush_pending_usage()call in atry/exceptso a single misbehaving handler doesn't drop records from the others. This new method does not, so any handler that raises will skip the rest and bubble up — which is then re-attempted by the broader error path in_handle_indexthat callsflush_pending_usage()again. Aligning the two implementations avoids the inconsistency.♻️ Proposed refactor
def flush_pending_usage(self) -> list[dict]: """Drain pending usage rows from registered callback handlers.""" if not self.callback_manager: return [] records: list[dict] = [] for handler in self.callback_manager.handlers: - if hasattr(handler, "flush_pending_usage"): - records.extend(handler.flush_pending_usage()) + if not hasattr(handler, "flush_pending_usage"): + continue + try: + records.extend(handler.flush_pending_usage()) + except Exception: + logger.warning( + "Failed to flush usage from embedding handler %s", + type(handler).__name__, + exc_info=True, + ) return records🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@unstract/sdk1/src/unstract/sdk1/embedding.py` around lines 285 - 293, The new flush_pending_usage method currently calls each handler.flush_pending_usage() without per-handler error handling, so a single handler exception can abort processing; change flush_pending_usage in the class to wrap each handler.flush_pending_usage() call in a try/except (mirroring legacy_executor._flush_per_prompt_metrics) so exceptions from one handler are caught/logged and processing continues for remaining handlers, and ensure the caught exception does not suppress returned records from other handlers.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@workers/executor/executors/legacy_executor.py`:
- Around line 889-993: Wrap the call to self._handle_index(index_ctx) in
_run_pipeline_index with a try/except that catches LegacyExecutorError, and on
exception attach the currently accumulated index_records to the exception (e.g.
set e.partial_usage_records = index_records) before re-raising so previously
collected usage rows aren't lost; reference the call site in _run_pipeline_index
and the LegacyExecutorError class when making the change.
---
Nitpick comments:
In `@unstract/sdk1/src/unstract/sdk1/embedding.py`:
- Around line 285-293: The new flush_pending_usage method currently calls each
handler.flush_pending_usage() without per-handler error handling, so a single
handler exception can abort processing; change flush_pending_usage in the class
to wrap each handler.flush_pending_usage() call in a try/except (mirroring
legacy_executor._flush_per_prompt_metrics) so exceptions from one handler are
caught/logged and processing continues for remaining handlers, and ensure the
caught exception does not suppress returned records from other handlers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0d9d3d64-485c-47d4-bd25-b54eeac0d4a0
📒 Files selected for processing (3)
unstract/sdk1/src/unstract/sdk1/embedding.pyworkers/executor/executors/legacy_executor.pyworkers/tests/test_phase5d.py
|
| Filename | Overview |
|---|---|
| unstract/sdk1/src/unstract/sdk1/embedding.py | Adds flush_pending_usage() to EmbeddingCompat, iterating callback handlers with per-handler exception isolation — mirrors the existing LLM shim pattern exactly. |
| workers/executor/executors/legacy_executor.py | Refactors _run_pipeline_index to extract per-output logic into _index_pipeline_output; wires flush_pending_usage() into every _handle_index exit path (success, skip-reindex, and exception), and propagates accumulated records through the tuple return and LegacyExecutorError.partial_usage_records. |
| workers/tests/test_phase5d.py | Updates the existing mock for _run_pipeline_index to match the new tuple return type, and adds two new test cases that exercise the real _run_pipeline_index → _index_pipeline_output chain. |
Sequence Diagram
sequenceDiagram
participant HSP as _handle_structure_pipeline
participant RPI as _run_pipeline_index
participant IPO as _index_pipeline_output
participant HI as _handle_index
participant EC as EmbeddingCompat
HSP->>RPI: call(context, index_template, ...)
loop for each output
RPI->>IPO: call(output, index_records, ...)
IPO->>HI: call(index_ctx)
HI->>EC: instantiate embedding
alt indexing success
EC-->>HI: usage accumulated in callbacks
HI->>EC: flush_pending_usage()
EC-->>HI: usage_records
HI-->>IPO: "ExecutionResult(metadata={usage_records})"
IPO->>IPO: index_records.extend(child_records)
else LegacyExecutorError raised
HI->>EC: flush_pending_usage() [in except]
HI-->>IPO: "raises LegacyExecutorError(partial_usage_records=partial)"
IPO->>IPO: "e.partial_usage_records = index_records + e.partial_usage_records"
IPO-->>RPI: re-raises
RPI-->>HSP: re-raises
HSP->>HSP: "e.partial_usage_records = pipeline_records + e.partial_usage_records"
end
end
RPI-->>HSP: (index_metrics, index_records)
HSP->>HSP: pipeline_records.extend(index_records)
HSP-->>HSP: "metadata[usage_records] = pipeline_records"
Reviews (6): Last reviewed commit: "Merge branch 'main' into fix/embedding-u..." | Re-trigger Greptile
If ``_handle_index`` raises in iteration N of the per-output loop, records accumulated from iterations 1…N-1 were dropped because the exception escaped ``_run_pipeline_index`` unmodified and ``_handle_structure_pipeline``'s ``except`` branch only inherits ``e.partial_usage_records`` (the N-th iteration's partial rows) and ``pipeline_records`` (which never received the in-flight tuple). Mirror the ``_handle_ide_index`` pattern: wrap the ``_handle_index`` call in a try/except and prepend ``index_records`` to ``e.partial_usage_records`` before re-raising so the outer handler sees every row the worker had collected so far.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@workers/executor/executors/legacy_executor.py`:
- Around line 1200-1205: The error-path currently swallows exceptions from
embedding.flush_pending_usage() by calling logger.debug without exc_info and
loses partial usage rows; change the handler in the block that builds partial
(where partial = [] and embedding.flush_pending_usage() is called) to mirror
_flush_per_prompt_metrics by using logger.warning (or logger.exception) and pass
exc_info=True so failures are surfaced; keep the try/except structure around
embedding.flush_pending_usage() but replace the logger.debug call with a warning
that includes exc_info=True and a short descriptive message indicating
flush_pending_usage failed on the error path.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2288377d-a83a-4aa6-9420-5263a1a162f0
📒 Files selected for processing (1)
workers/executor/executors/legacy_executor.py
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workers/executor/executors/legacy_executor.py (1)
1167-1192:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winGuard success-path
flush_pending_usage()calls to avoid mis-reporting a successful index as failed.Both success returns invoke
embedding.flush_pending_usage()unguarded inside the metadata dict, still within the outertry(line 1125). If the flush raises, control falls through toexcept Exceptionat line 1193 and the handler re-raisesLegacyExecutorError(message=f"Error while indexing: {e}")— even though indexing itself succeeded (or the doc was already indexed). The error path (lines 1202-1208) already guards flush with atry/except, so the success paths should match for consistency and to prevent false-failure reporting.🛡️ Proposed fix
if doc_id_found and not reindex: shim.stream_log( "Document already indexed in vector store; skipping re-index." ) logger.info( "Skipping re-index: doc_id=%s already in vector DB and reindex=False", doc_id, ) + early_records: list[dict] = [] + try: + early_records = list(embedding.flush_pending_usage()) + except Exception: + logger.warning( + "Failed to flush embedding usage on already-indexed early return", + exc_info=True, + ) return ExecutionResult( success=True, data={IKeys.DOC_ID: doc_id}, - metadata={"usage_records": embedding.flush_pending_usage()}, + metadata={"usage_records": early_records}, ) shim.stream_log( "Re-indexing document" if doc_id_found else "Indexing document" ) index.perform_indexing( vector_db=vector_db, doc_id=doc_id, extracted_text=extracted_text, doc_id_found=doc_id_found, ) logger.info( "Indexing completed: doc_id=%s file=%s", doc_id, Path(file_path).name, ) shim.stream_log("Document indexing completed") + success_records: list[dict] = [] + try: + success_records = list(embedding.flush_pending_usage()) + except Exception: + logger.warning( + "Failed to flush embedding usage on successful indexing", + exc_info=True, + ) return ExecutionResult( success=True, data={IKeys.DOC_ID: doc_id}, - metadata={"usage_records": embedding.flush_pending_usage()}, + metadata={"usage_records": success_records}, )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/executor/executors/legacy_executor.py` around lines 1167 - 1192, The success-path calls that build ExecutionResult currently invoke embedding.flush_pending_usage() directly inside the metadata dict so any exception from flush_pending_usage() will be caught by the outer try and convert a successful index into a failure; change both success-return sites (the two ExecutionResult returns after indexing and after early return when doc existed) to call flush_pending_usage() inside a local try/except (like the error-path at lines 1202-1208) and on exception log the flush error and set metadata["usage_records"] to an empty or safe fallback value before constructing and returning the ExecutionResult; ensure you reference the same embedding.flush_pending_usage(), ExecutionResult(...) return sites, and mirror the existing error-path handling behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@workers/executor/executors/legacy_executor.py`:
- Around line 1167-1192: The success-path calls that build ExecutionResult
currently invoke embedding.flush_pending_usage() directly inside the metadata
dict so any exception from flush_pending_usage() will be caught by the outer try
and convert a successful index into a failure; change both success-return sites
(the two ExecutionResult returns after indexing and after early return when doc
existed) to call flush_pending_usage() inside a local try/except (like the
error-path at lines 1202-1208) and on exception log the flush error and set
metadata["usage_records"] to an empty or safe fallback value before constructing
and returning the ExecutionResult; ensure you reference the same
embedding.flush_pending_usage(), ExecutionResult(...) return sites, and mirror
the existing error-path handling behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5c8c6ae4-c849-403f-9239-d28e3dd7f17f
📒 Files selected for processing (1)
workers/executor/executors/legacy_executor.py
|
@chandrasekharan-zipstack check the sonar issues. |
Extract structure-pipeline indexing loop body into _index_pipeline_output and missing-param validation into _missing_index_params to drop both functions below SonarCloud's cognitive complexity threshold. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@workers/executor/executors/legacy_executor.py`:
- Around line 1007-1015: The code currently only logs when _handle_index()
returns a failure (index_result) and continues, which hides indexing errors;
change the control flow in the caller (where index_result is checked) so that
when not index_result.success you propagate that failure upward instead of
merely warning — e.g., return or rethrow the failure (return index_result or
return ExecutionResult.failure(...) with index_result.error and metadata) so the
pipeline aborts on indexing errors; update any use of index_records/index_result
accordingly to avoid proceeding when index_result indicates failure.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f1cf4b3d-7675-4c40-9e66-679ab3ef83ef
📒 Files selected for processing (1)
workers/executor/executors/legacy_executor.py
- EmbeddingCompat.flush_pending_usage now wraps each handler call in try/except so one bad handler doesn't drop usage rows from the rest and doesn't escape into the indexing success path. - _index_pipeline_output now raises LegacyExecutorError when _handle_index returns a failure result, so the structure pipeline aborts instead of running downstream steps against an incomplete vector store. - Added regression tests asserting embedding usage rows propagate through the structure pipeline and that an indexing returned-failure short-circuits before answer_prompt. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|



What
EmbeddingCompatgets aflush_pending_usage()shim mirroring the LLM one._handle_indexreturns its embedding rows viaExecutionResult.metadata["usage_records"];_run_pipeline_indexpropagates them up to_handle_structure_pipelineto be absorbed intopipeline_records.LegacyExecutorError.partial_usage_recordscarries embedding rows across mid-pipeline failures.Why
The deferred-batch usage refactor (UN-2946) replaced
Audit.push_usage_data()direct push withUsageHandler._pending_usage+ aflush_pending_usage()drain pattern. The drain was wired for LLM (_handle_answer_prompt,_handle_summarize, IDE-index) but never for the workflow indexing path. As a result, embedding rows were appended to an in-memory list that died with the localembeddingvariable when_handle_indexreturned — never reaching theUsagetable.Visible symptom in API-deployment responses post-merge of
feat/lookups-v2to main on 2026-05-11:Staging
unstract_core.usageconfirms the cliff: lastusage_type='embedding'row at 2026-05-11T06:34Z; zero embedding rows since. LLM rows continue normally.How
EmbeddingCompat.flush_pending_usage()— walkscallback_manager.handlersand drains anything with aflush_pending_usagemethod (matches the LLM shim shape and the iterator pattern already used in_handle_answer_prompt)._handle_index— at each success exit (chunk_size==0early return is unchanged since no embedding is instantiated; document-already-indexed skip; post-perform_indexingreturn) it now passesmetadata={"usage_records": embedding.flush_pending_usage()}on theExecutionResult. The exception arm flushes any partial records intoLegacyExecutorError.partial_usage_records._run_pipeline_index— return type changes fromdicttotuple[dict, list[dict]]; the per-output loop accumulatesindex_result.metadata["usage_records"]intoindex_records._handle_structure_pipeline— unpacks the tuple and extendspipeline_records. IDE-index path (_handle_ide_index) already absorbsmetadata["usage_records"]from its child_handle_indexcall, so it starts working without changes.workers/tests/test_phase5d.py— mock updated to return the new tuple shape.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
_run_pipeline_indexreturn type changed (dict→tuple[dict, list[dict]]). Only one caller (_handle_structure_pipeline) and one test mock; both updated._handle_indexnow always returnsmetadata={"usage_records": [...]}. Existing callers that readmetadata["usage_records"](_handle_ide_index,_absorb) already handle the key being present or absent. The earlychunk_size==0return is left without metadata because no embedding is instantiated — matches the pre-refactor behaviour.EmbeddingCompat.flush_pending_usage()is additive.Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
8aa675884(UN-2946 [FEAT] Deferred batch usage tracking with operation metrics, 2026-04-08) onfeat/lookups-v2.Dependencies Versions
Notes on Testing
test_phase5d,test_context_retrieval_metrics,test_legacy_executor_index,test_sanity_phase4/5/6g— 93/93.metadata.usage.embedding_tokens > 0and thatUsage.objects.filter(run_id=<file_execution_id>, usage_type='embedding')returns rows.Screenshots
Checklist
I have read and understood the Contribution Guidelines.