Skip to content

PR2: add concurrent_files for bounded concurrent reads#2

Draft
sumedhsakdeo wants to merge 4 commits intofix/arrow-scan-streaming-3036from
fix/arrow-scan-concurrent-3036
Draft

PR2: add concurrent_files for bounded concurrent reads#2
sumedhsakdeo wants to merge 4 commits intofix/arrow-scan-streaming-3036from
fix/arrow-scan-concurrent-3036

Conversation

@sumedhsakdeo
Copy link
Owner

@sumedhsakdeo sumedhsakdeo commented Feb 14, 2026

Part of apache#3036

Summary

  • Add concurrent_files parameter for bounded concurrent reads across multiple files in arrival order
  • Uses per-scan ThreadPoolExecutor(max_workers=concurrent_files) with bounded queue.Queue(maxsize=16) for backpressure
  • Lightweight threading.Event for cancellation on early termination
  • Refactors to_record_batches into helpers: _prepare_tasks_and_deletes, _iter_batches_arrival, _iter_batches_materialized, _apply_limit
  • Opt-in only — default concurrent_files=1 preserves sequential behavior

Ordering semantics

Config File ordering Within-file ordering
ScanOrder.TASK (default) Grouped by file, submission order Row order
ScanOrder.ARRIVAL, concurrent_files=1 Grouped by file, sequential Row order
ScanOrder.ARRIVAL, concurrent_files>1 Interleaved (no grouping guarantee) Row order

PR Stack

This is PR 2 of 3 for apache#3036:

  1. PR 0: batch_size forwarding
  2. PR 1: ScanOrder enum — stop materializing entire files
  3. PR 2 (this): concurrent_files — bounded concurrent reads in arrival order
  4. PR 3: benchmark

Are these changes tested?

Yes — 9 tests in test_bounded_concurrent_batches.py: correctness, backpressure, error propagation, early termination, concurrency limits, ArrowScan integration with limit. Plus 3 concurrent-specific tests in test_pyarrow.py.

Are there any user-facing changes?

Yes — new concurrent_files parameter on to_arrow_batch_reader() (used with order=ScanOrder.ARRIVAL)

@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch from 988d3f2 to 7cfb2b1 Compare February 14, 2026 22:40
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from b72b7ba to cbd6029 Compare February 15, 2026 00:18
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch from 7cfb2b1 to a66d7e1 Compare February 15, 2026 00:18
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from cbd6029 to 55d68b8 Compare February 15, 2026 00:33
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch from a66d7e1 to 8be61c8 Compare February 15, 2026 00:33
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from 55d68b8 to 444549f Compare February 15, 2026 00:35
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch 2 times, most recently from c6547c3 to a4cb212 Compare February 15, 2026 00:54
Comment on lines +1723 to +1725
if cancel_event.is_set():
return
acquired = True
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if cancel_event.is_set():
return
acquired = True
acquired = True
if cancel_event.is_set():
return

@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch 3 times, most recently from c383049 to d11eb43 Compare February 15, 2026 01:12
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch 2 times, most recently from 13feb8d to 86b5a4a Compare February 15, 2026 02:10
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from 444549f to 1f20655 Compare February 15, 2026 02:10
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch from 86b5a4a to c643dd2 Compare February 15, 2026 02:27
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from 1f20655 to 07287b6 Compare February 15, 2026 02:27
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch from c643dd2 to dcf9b13 Compare February 15, 2026 02:29
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from 07287b6 to a0a29c8 Compare February 15, 2026 02:29
sumedhsakdeo and others added 4 commits February 16, 2026 21:01
Add _bounded_concurrent_batches() with proper lock discipline:
- Queue backpressure caps memory (scan.max-buffered-batches, default 16)
- Semaphore limits concurrent file reads (concurrent_files param)
- Cancel event with timeouts on all blocking ops (no lock over IO)
- Error propagation and early termination support

When streaming=True and concurrent_files > 1, batches are yielded
as they arrive from parallel file reads. File ordering is not
guaranteed (documented).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace shared ExecutorFactory + Semaphore with per-scan
ThreadPoolExecutor(max_workers=concurrent_files) for deterministic
shutdown and simpler concurrency control.

Refactor to_record_batches into helpers:
- _prepare_tasks_and_deletes: resolve delete files
- _iter_batches_streaming: bounded concurrent streaming path
- _iter_batches_materialized: executor.map materialization path
- _apply_limit: unified row limit logic (was duplicated)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests and docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-concurrent-3036 branch from 91c6fa0 to 7c415d4 Compare February 17, 2026 05:07
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-streaming-3036 branch from a0a29c8 to 2474b12 Compare February 17, 2026 05:07
@sumedhsakdeo sumedhsakdeo changed the title PR2: feat arrow scan concurrent PR2: add concurrent_files for bounded concurrent reads Feb 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant