UN-2860 [FIX] Fixed active file cache not preventing duplicate file processing#1570
Conversation
…rocessing
Fixed critical bug where ActiveFileFilter cache checks were failing to detect
files already being processed, causing duplicate file processing in concurrent
workflow executions.
Key fixes:
- Fixed cache data access: Extract execution_id from nested cache structure
(cached_data["data"]["execution_id"] instead of cached_data["execution_id"])
- Changed cache status from "EXECUTING" to "PENDING" for queued files
- Increased MAX_ACTIVE_FILE_CACHE_TTL from 1hr to 2hrs for resource-constrained environments
- Added cache cleanup in finally blocks to prevent stale entries
- Fixed cache key format consistency (hash-based) between backend and workers
- Optimized DB queries to skip files already found in cache
- Removed ~370 lines of dead code (file_management_utils.py and unused methods)
Root cause: RedisCacheBackend wraps data in {data: {...}, cached_at, ttl} but
filter_pipeline was accessing execution_id directly instead of from nested data key.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Summary by CodeRabbit
WalkthroughRemoved per-file cache lookups in backend active-file checks and replaced them with a single bulk database query (path-aware and UUID-only branches); workers now perform cache-first filtering, submit only remaining composite identifiers to the backend, and always attempt cache cleanup after pre-create attempts; several utility modules and exports were removed and cache TTL/status semantics adjusted. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant W as Worker FilterPipeline
participant AFM as ActiveFileManager (cache)
participant API as Backend internal_views
participant DB as Database
rect rgb(245,248,255)
note over W,AFM: Cache-first filtering
W->>AFM: Create cache keys, read cache entries
AFM-->>W: Cached-active entries (data.execution_id)
W->>W: Compute remaining_identifiers (exclude cached-active)
end
alt no remaining_identifiers
W-->>W: Skip backend call
else remaining_identifiers present
rect rgb(248,245,255)
note over W,API: Path-aware bulk query
W->>API: Request with composite ids (uuid:path)
API->>DB: Bulk check (path-aware or UUID-only)
DB-->>API: Active results
API-->>W: active_identifiers (preferred) or legacy active_uuids
end
W->>W: Map response to original identifiers
end
note over W: Proceed with non-active files only
sequenceDiagram
autonumber
participant T as Worker Task (_pre_create_file_executions)
participant C as Cache
participant L as Logger
T->>L: Log using file history (includes execution_id)
rect rgb(245,255,247)
note over T: Attempt pre-create per file (try)
T->>T: Create WorkflowFileExecution
end
rect rgb(255,249,245)
note over T,C: Guaranteed cache cleanup (finally)
T->>C: _cleanup_file_cache_entry(file_hash, workflow_id, file_name)
C-->>T: Success or error
T->>L: Warn on cleanup failure (non-fatal)
end
sequenceDiagram
autonumber
participant API as Backend internal_views
participant DB as Database
note over API: Backend no longer reads cache for active-file checks
API->>DB: Bulk query active files (path-aware or UUID-only)
DB-->>API: Results
API->>API: Log db_queries only
API-->>Caller: { active_identifiers | active_uuids, db_queries }
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to Reviews > Disable Cache setting Knowledge base: Disabled due to 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/workflow_manager/internal_views.py (1)
756-767: db_queries count is always set to 2 (inaccurate metrics)Set db_queries based on whether path-aware and/or legacy queries will actually run.
- self._bulk_database_check( + # Pre-compute which query paths will run + has_path = any(f["path"] is not None for f in files_needing_db_check) + has_legacy = any(f["path"] is None for f in files_needing_db_check) + self._bulk_database_check( files_needing_db_check=files_needing_db_check, workflow_id=workflow_id, current_execution_id=current_execution_id, active_files=active_files, active_identifiers=active_identifiers, ) - db_queries = 2 # At most 2 bulk queries (path-aware + legacy) + db_queries = int(has_path) + int(has_legacy) # 0–2 depending on pathsAlso applies to: 771-772
🧹 Nitpick comments (6)
workers/sample.env (1)
260-262: Add units/bounds note for clarityACTIVE_FILE_CACHE_TTL is seconds and capped in code. Consider adding a brief comment (e.g., “seconds, 60–7200”) to avoid misconfiguration.
workers/shared/clients/file_client.py (1)
130-132: Logging additions look goodIncluding execution_id improves traceability. Consider structured logs (extra=…) for easier filtering downstream.
Also applies to: 223-223, 227-229
backend/workflow_manager/internal_views.py (1)
739-754: Avoid duplicate entries in files_needing_db_checkIf input contains repeated uuid/path, we’ll add duplicates and grow Q unnecessarily. Deduplicate by composite_id for performance.
- for file_data in files: + seen = set() + for file_data in files: provider_uuid = file_data["uuid"] file_path = file_data.get("path") composite_id = ( f"{provider_uuid}:{file_path}" if file_path else provider_uuid ) - - # All files need database check - files_needing_db_check.append( + if composite_id in seen: + continue + seen.add(composite_id) + files_needing_db_check.append( { "uuid": provider_uuid, "path": file_path, "composite_id": composite_id, } )workers/file_processing/tasks.py (1)
840-875: Tolerate cleanup errors, but silence BLE001Catching broad Exception is acceptable in a finally cleanup; annotate to satisfy Ruff BLE001.
- except Exception as cleanup_error: + except Exception as cleanup_error: # noqa: BLE001 logger.warning(f"Cache cleanup failed for {file_name}: {cleanup_error}") # Don't raise - cache will expire anywayPlease confirm this cleanup timing is intentional: removing the “active” cache right after pre-creating WorkflowFileExecution relies on DB-based active checks to prevent duplicates during processing.
workers/shared/processing/filter_pipeline.py (2)
639-646: Correct: read execution_id from cached_data['data'] and reuse manager key logicThis fixes the root cause and ensures key parity. LGTM.
Prefer a public helper (e.g., ActiveFileManager.create_cache_key) rather than calling a private method (_create_cache_key) across modules to reduce brittleness.
654-656: Consider lowering to debugThis cache-hit count log may be noisy at INFO in high volume pipelines.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (8)
backend/workflow_manager/internal_views.py(4 hunks)workers/file_processing/tasks.py(3 hunks)workers/sample.env(1 hunks)workers/shared/clients/file_client.py(2 hunks)workers/shared/processing/filter_pipeline.py(4 hunks)workers/shared/workflow/execution/__init__.py(0 hunks)workers/shared/workflow/execution/active_file_manager.py(3 hunks)workers/shared/workflow/execution/file_management_utils.py(0 hunks)
💤 Files with no reviewable changes (2)
- workers/shared/workflow/execution/init.py
- workers/shared/workflow/execution/file_management_utils.py
🧰 Additional context used
🪛 Ruff (0.13.3)
workers/file_processing/tasks.py
872-872: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (3)
workers/shared/workflow/execution/active_file_manager.py (1)
421-421: Status PENDING alignmentChanging cache entry status to PENDING is consistent with upstream use. LGTM.
Also applies to: 538-538
workers/file_processing/tasks.py (1)
976-979: Finally cleanup is good; ensure file_hash is always setfile_hash is created before the try, so this is safe. LGTM.
If any early-return paths are added later, ensure file_hash exists to avoid UnboundLocalError.
workers/shared/processing/filter_pipeline.py (1)
660-675: No staleexecution_idlookups detected
Repository-wide search found no directcached_data.get("execution_id")calls. LGTM.
…-duplicate-processing
…processing' of github.com:Zipstack/unstract into fix/UN-2860-FIX_active-file-cache-preventing-duplicate-processing
…-duplicate-processing
|
|
|



What
Why
RedisCacheBackendwraps cached data in{data: {...}, cached_at: "...", ttl: 300}butfilter_pipeline.py:643was accessingexecution_iddirectly instead of from the nesteddatakeyNoneforcached_exec_idHow
Critical fix in filter_pipeline.py (line 645):
cached_data.get("execution_id")→cached_data.get("data", {}).get("execution_id")Cache lifecycle improvements in active_file_manager.py:
Guaranteed cache cleanup in file_processing/tasks.py:
_cleanup_file_cache_entry()helper methodCache key consistency:
Database query optimization in filter_pipeline.py:
Code cleanup:
file_management_utils.py(~150 lines unused)filter_and_cache_files()method (~197 lines unused)_check_database_active_files()method (~23 lines unused)Can this PR break any existing features. If yes, please list possible items. If no, please explain why.
No, this PR will not break existing features:
get("data", {}).get("execution_id")pattern safely handles both old and new cache formats (returns None if structure differs)Risk mitigation:
Database Migrations
Env Config
ACTIVE_FILE_CACHE_TTLto customize cache TTL (default: 300s, max: 7200s)Relevant Docs
workers/shared/cache/cache_backends.pyworkers/shared/processing/filter_pipeline.pyworkers/shared/workflow/execution/active_file_manager.pyRelated Issues or PRs
Dependencies Versions
Notes on Testing
How to test:
file_active:*keys exist with correct execution IDsRedis verification:
Log verification:
Screenshots
N/A - Backend/worker cache logic fix
Checklist
✅ I have read and understood the Contribution Guidelines.
🤖 Generated with Claude Code