[REVIEW] Add multimodal reader/writer pipeline and scoped benchmarks#1517
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
c8b60e8 to
3b3433c
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a comprehensive multimodal data processing pipeline for NVIDIA NeMo Curator, enabling WebDataset (MINT-1T style) to Parquet conversion with filtering and materialization capabilities. The implementation follows the established task-centric API design and provides both reader and writer stages specifically designed for multimodal data containing text, images, and metadata.
Changes:
- Adds
MultiBatchTaskfor row-wise multimodal records with a standardized schema supporting sample_id, position, modality, content_type, text_content, binary_content, metadata_source, metadata_json, and materialize_error fields - Implements
WebdatasetReader(composite stage) andMultimodalParquetWriterwith support for optional binary materialization, configurable parquet backends (pandas/pyarrow), and error tracking for failed materializations - Adds
MultimodalJpegAspectRatioFilterStagefor filtering multimodal rows based on JPEG aspect ratios, with support for loading images from source when binary content is not already present - Introduces performance instrumentation via
_time_metriccontext manager on ProcessingStage andsplit_table_by_group_max_bytesutility for group-preserving table splitting - Includes comprehensive test coverage (3 test files, 6 tests passing), tutorial pipeline, and benchmark infrastructure with nightly configuration entries (currently disabled)
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| nemo_curator/tasks/multimodal.py | Core MultiBatchTask implementation with multimodal schema, conversion methods, and metadata parsing utilities |
| nemo_curator/tasks/init.py | Export MultiBatchTask, update copyright year to 2026 |
| nemo_curator/stages/base.py | Add _time_metric helper, move imports to top level, update copyright to 2026 |
| nemo_curator/core/utils.py | Add split_table_by_group_max_bytes utility for group-preserving table splitting |
| nemo_curator/stages/multimodal/stages.py | Implement base annotator/filter stages and JPEG aspect ratio filter |
| nemo_curator/stages/multimodal/io/readers/webdataset.py | WebDataset reader stage parsing tar shards into multimodal rows |
| nemo_curator/stages/multimodal/io/readers/base.py | Base multimodal reader contract |
| nemo_curator/stages/multimodal/io/readers/init.py | Export WebdatasetReaderStage |
| nemo_curator/stages/multimodal/io/reader.py | Composite WebdatasetReader with file partitioning |
| nemo_curator/stages/multimodal/io/writers/multimodal.py | Parquet writer with optional materialization and error tracking |
| nemo_curator/stages/multimodal/io/writers/base.py | Base multimodal writer with output mode handling |
| nemo_curator/stages/multimodal/io/writers/init.py | Export writer classes |
| nemo_curator/stages/multimodal/io/writer.py | User-facing MultimodalParquetWriter alias |
| nemo_curator/stages/multimodal/io/init.py | Export reader and writer for public API |
| nemo_curator/stages/multimodal/init.py | Export multimodal stages |
| tests/stages/multimodal/conftest.py | Shared test fixtures for multimodal tests |
| tests/stages/multimodal/test_multimodal_reader.py | Tests for WebDataset reader functionality |
| tests/stages/multimodal/test_multimodal_writer.py | Tests for parquet writer with materialization |
| tests/stages/multimodal/test_multimodal_core.py | Tests for task methods, filter stages, and utilities |
| tests/stages/multimodal/init.py | Test package marker |
| tutorials/multimodal/mint1t_mvp_pipeline.py | Example pipeline for MINT1T processing |
| benchmarking/scripts/utils.py | Add collect_parquet_output_metrics helper |
| benchmarking/scripts/multimodal_mint1t_benchmark.py | Benchmark script for multimodal pipeline performance |
| benchmarking/nightly-benchmark.yaml | Add disabled benchmark entries for multimodal processing |
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Additional Comments (2)
consider validating all output files or at least a random sample
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
Previously all passthrough fields were broadcast to every row. This adds per_image_fields and per_text_fields which distribute list values 1:1 to their respective non-None content rows, with sample-level passthrough now only on the metadata row. Includes length-mismatch warnings, ValueError for non-list per-modality values, and centralized helper methods. Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (4)
A function decorated with You'll also need to add from typing import TYPE_CHECKING, Any, Generic, Iterator, TypeVar, final
Only Consider catching a broader base class (at minimum def _read_direct_file(path: str, storage_options: dict[str, object]) -> bytes | None:
try:
with fsspec.open(path, mode="rb", **storage_options) as fobj:
return fobj.read()
except Exception:
return None
A subtle correctness issue: Consider adding
For the common MINT-1T pattern (one multi-frame TIFF per sample, all tokens resolving to the same member) this is correct. However, if a sample references multiple distinct TIFF members, the frame indices for the second and subsequent files will be wrong. For example, with tokens resolving to Consider tracking frame counts per resolved member name instead of a single global counter: frame_counter_per_member: dict[str, int] = {}
...
if content_key is not None and is_multiframe_candidate:
frame_index = frame_counter_per_member.get(content_key, 0)
frame_counter_per_member[content_key] = frame_index + 1 |
…ory to interleaved Rename the task class, schema, module, stage directory, and all stage classes to use "Interleaved" naming, reflecting the interleaved nature of the data format: - MultiBatchTask -> InterleavedBatch - MULTIMODAL_SCHEMA -> INTERLEAVED_SCHEMA - tasks/multimodal.py -> tasks/interleaved.py - stages/multimodal/ -> stages/interleaved/ - BaseMultimodalAnnotatorStage -> BaseInterleavedAnnotatorStage - BaseMultimodalFilterStage -> BaseInterleavedFilterStage - MultimodalAspectRatioFilterStage -> InterleavedAspectRatioFilterStage - MultimodalParquetWriterStage -> InterleavedParquetWriterStage - BaseMultimodalReader -> BaseInterleavedReader - BaseMultimodalWriter -> BaseInterleavedWriter Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (6)
Later, blobs = fs.cat_ranges(dedup_paths, starts, ends)…where The Fix: skip populating # Only populate byte-range hints for uncompressed tars; compressed tars
# require sequential decompression, so offset_data is the decompressed-
# stream position rather than the raw file byte offset.
_COMPRESSED_TAR_EXTS = (".tar.gz", ".tgz")
is_compressed = ctx.tar_path.endswith(_COMPRESSED_TAR_EXTS)
if ctx.member_info and content_key in ctx.member_info and not is_compressed:
info = ctx.member_info[content_key]
byte_offset = info.offset_data
byte_size = info.sizeThis ensures compressed-tar rows fall into the
In the current usage inside Consider either:
def iter_materialized_bytes(self, task, df, row_mask):
# Build a task wrapping only the passed-in df to avoid full re-materialization
filtered_task = InterleavedBatch(
task_id=task.task_id, dataset_name=task.dataset_name,
data=df.reset_index(drop=True), _metadata=task._metadata,
_stage_perf=task._stage_perf,
)
materialized_df = materialize_task_binary_content(filtered_task).to_pandas().reset_index(drop=True)
...
The outer Consider separating the archive-open failure from per-member failures: try:
fobj_ctx = fsspec.open(path, mode="rb", **storage_options)
tf_ctx = tarfile.open(fileobj=fobj_ctx.__enter__(), mode="r:*")
except (OSError, tarfile.TarError):
for idx, *_ in keyed_rows:
error_values[idx] = "failed to open archive"
continue
with fobj_ctx, tf_ctx as tf:
for idx, member, frame_idx in keyed_rows:
# per-member try/except here
...This prevents a single corrupt member from marking previously-successful rows as failed.
Consider adding: parser.add_argument("--min-aspect-ratio", type=float, default=1.0)
parser.add_argument("--max-aspect-ratio", type=float, default=2.0)and forwarding them to
Two inconsistencies with the write-time materialization path:
Both the frame-extraction call and the failure-recording behaviour should be added here to keep the two materialization modes symmetric. |
Interactive notebook showing the end-to-end interleaved pipeline: read WebDataset tar → inspect schema → display interleaved document (text + inline images) → filter by aspect ratio → write to Parquet. Includes executed outputs with rendered MINT-1T sample data. Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (2)
The import chain that triggers this:
try:
from PIL import Image
except ImportError:
Image = None
try:
from PIL import Image as _Image
except ImportError:
_Image = None # type: ignore[assignment]Then in if _Image is None:
return None # PIL not available; cannot extract TIFF frame
This means the Consider either:
|
- Use IPython.display.Image (image/png output) instead of display(HTML) so images render on GitHub, JupyterLab, and VS Code - Add HuggingFace download cell (mlfoundations/MINT-1T-PDF-CC-2024-18) with MINT1T_TAR_PATH env var override for local data - Fix all ruff lint errors (type annotations, import order, magic numbers) Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (4)
After Additionally, Consider adding an explicit return filtered.sort_values(["sample_id", "position"]).reset_index(drop=True)
Consider computing the estimate from the image rows only, or documenting the approximation prominently so callers set # Current approximation: uniform average across all rows.
# For tables with sparse binary_content this can over-estimate splits.
avg_bytes_per_row = table.nbytes / n
During materialization, The counter should be tracked per unique |
The quickstart notebook embeds base64-encoded PNG thumbnails which trigger false positives in detect-secrets. Updated baseline to whitelist these along with existing pre-existing entries. Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Regenerated .secrets.baseline to include entries from the updated interleaved quickstart notebook (base64-encoded image outputs and hex strings from Parquet metadata) so the detect-secrets CI check passes. Signed-off-by: Varun Jawa <vjawa@nvidia.com> Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (3)
Because The fix is to guard the import and raise a clear try:
from PIL import Image as _Image
except ImportError:
_Image = None # type: ignore[assignment]
# ... then inside _extract_tiff_frame:
def _extract_tiff_frame(tiff_bytes: bytes, frame_index: int) -> bytes | None:
if _Image is None:
raise RuntimeError(
"Pillow is required for TIFF frame extraction. "
"Install the 'image_cpu' dependency group."
)
...This pattern is already used correctly in
For a benchmark output that is meant to be inspected or compared later, prefer a serialization format that is both version-safe and human-readable. If the full task objects are genuinely needed, at minimum document that the Alternatively, if preserving the full objects via pickle is a hard requirement, add a comment explaining that constraint and the version-pinning expectation.
When This is harmless today but fragile: if raw_bytes = self._extract_tar_member(tf, content_key, read_ctx.byte_cache)
if raw_bytes is None:
row["materialize_error"] = f"missing member '{content_key}'"
continue
frame_index = parsed_ref.get("frame_index")
if frame_index is not None:
extracted = _extract_tiff_frame(raw_bytes, frame_index)
if extracted is None:
row["materialize_error"] = f"failed to extract frame {frame_index} from '{content_key}'"
continue
raw_bytes = extracted
row["binary_content"] = raw_bytes |
The --fields default hardcoded field names (language_id_whole_page_fasttext, previous_word_count) that ~5.3% of MINT-1T samples lack, causing a ValueError. Default to None (auto-discovery) to match benchmark behavior. Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (5)
Pillow is listed in the optional Consider guarding it the same way try:
from PIL import Image as _Image
except ImportError:
_Image = NoneThen inside def _extract_tiff_frame(tiff_bytes: bytes, frame_index: int) -> bytes | None:
if _Image is None:
return None # or raise RuntimeError with install hint
...
When a sample has two distinct TIFF files (e.g.
At materialization time, Consider tracking frame counts per resolved frame_counters: dict[str, int] = {}
...
if content_key is not None and is_multiframe_candidate:
frame_index = frame_counters.get(content_key, 0)
frame_counters[content_key] = frame_index + 1
The knock-on effect is that
When Consider adding a if frame_index is not None:
extracted = _extract_tiff_frame(raw_bytes, frame_index)
if extracted is None:
row["materialize_error"] = f"failed to extract frame {frame_index} from '{content_key}'"
row["binary_content"] = None
continue
raw_bytes = extracted
row["binary_content"] = raw_bytesNote: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
This means the "no materialize" benchmark configuration still pays the I/O cost of binary materialization for every image. If this is intentional (e.g. to validate filtering works end-to-end), it should be documented. If not, consider adding a |
- validate_and_project_source_fields now logs a warning and fills None when requested passthrough fields are absent from a source sample, instead of raising ValueError (heterogeneous data resilience). - _extract_per_modality_fields warns when a per-modality field is missing from the sample (previously silently ignored). - Changed type-mismatch raise from ValueError to TypeError (TRY004 fix). - Removed metadata_json from INTERLEAVED_SCHEMA -- passthrough fields already carry all source data; the redundant JSON blob is no longer needed. - Updated all tests, notebook, and README to match. Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (5)
Example: A sample with
During write-time materialization, The counter should be tracked per frame_counters: dict[str, int] = {}
# ...
if content_key is not None and is_multiframe_candidate:
frame_index = frame_counters.get(content_key, 0)
frame_counters[content_key] = frame_index + 1
When If a caller later encounters if frame_index is not None:
extracted = _extract_tiff_frame(raw_bytes, frame_index)
if extracted is None:
row["materialize_error"] = f"failed to extract frame {frame_index} from '{content_key}'"
else:
raw_bytes = extracted # only replace on success
row["binary_content"] = raw_bytes
Downstream consumers (e.g. the parquet writer,
Consider computing the average per-modality or using
A more robust key would be the actual content (e.g. |
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (3)
The If this exception escapes, it will propagate through try:
blobs = fs.cat_ranges(dedup_paths, starts, ends)
_scatter_range_blobs(blobs, range_keys, unique_ranges, binary_values, error_values)
except (OSError, RuntimeError, ValueError) as exc:
logger.warning(f"cat_ranges failed for {path} ({len(entries)} ranges): {exc}")
for idx, *_ in entries:
error_values[idx] = "cat_ranges failed"
continue
Consider sampling a few files (e.g., first, middle, last) or validating all files if the dataset is small enough: files_to_check = parquet_files[:1] + ([parquet_files[len(parquet_files) // 2]] if len(parquet_files) > 2 else []) + (parquet_files[-1:] if len(parquet_files) > 1 else [])
for check_file in files_to_check:
result = validate_parquet_ordering(check_file)
if not result["valid"]:
ordering_valid = False
logger.error("Ordering validation failed on {}: {}", check_file.name, result["errors"])
else:
logger.info("Ordering validation passed on {}", check_file.name)
When a sample contains image tokens that resolve to different TIFF files, each gets an incorrect frame index derived from the shared counter rather than a per-member counter. For example, a sample with two distinct single-frame TIFF members would assign Consider using a per-member counter (a |
…, orphaned metadata, fragile cache - Track frame_index per content_key so distinct TIFF members each start at 0 - Preserve original TIFF bytes in binary_content when frame extraction fails - Drop metadata rows whose sample has no remaining content after filtering - Remove _resolve_frame id()-based cache; inline _extract_tiff_frame directly Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (4)
When The write-time path in if payload is None:
error_values[idx] = f"failed to extract frame {frame_idx} from '{member}'"
continue # ← skips binary_values[idx] assignment entirely
binary_values[idx] = payloadThe fix is to skip assigning if raw_bytes is None:
row["materialize_error"] = f"missing member '{content_key}'"
else:
frame_index = parsed_ref.get("frame_index")
if frame_index is not None:
extracted = _extract_tiff_frame(raw_bytes, frame_index)
if extracted is None:
row["materialize_error"] = f"failed to extract frame {frame_index} from '{content_key}'"
continue # or skip setting binary_content
raw_bytes = extracted
row["binary_content"] = raw_bytesWithout this fix, consumers of
When Example scenario:
Consider tracking the last successfully-processed index or catching the error per-row to preserve successful results, e.g.: try:
key_cache[member] = extracted.read() if extracted is not None else None
except (OSError, tarfile.TarError) as e:
key_cache[member] = None
error_values[idx] = f"error reading member '{member}': {e}"
continue
Consider catching except Exception as exc:
logger.warning("Failed to read {}: {}", path, exc)
return None
|
Additional Comments (3)
PIL is only used in try:
from PIL import Image as _Image
except ImportError:
_Image = None # type: ignore[assignment]Then guard
This only passes today in the benchmark because Consider validating that positions are strictly increasing within a sample rather than requiring dense if positions != sorted(positions):
errors.append(f"sample={sample_id}: content positions {positions} are not monotone")
When a tar opens successfully and some members are extracted without error, but a later Track which rows have already been resolved before the exception occurs, so the outer handler only marks truly unresolved rows: resolved: set[int] = set()
try:
with fsspec.open(path, ...) as fobj, tarfile.open(...) as tf:
for idx, member, frame_idx in keyed_rows:
...
binary_values[idx] = payload
error_values[idx] = None
resolved.add(idx)
except (OSError, tarfile.TarError):
for idx, *_ in keyed_rows:
if idx not in resolved:
error_values[idx] = "failed to read path" |
abhinavg4
left a comment
There was a problem hiding this comment.
Looks good. Thanks a ton for your efforts Vibhu
|
|
||
| @dataclass | ||
| class MultiBatchTask(Task[pa.Table | pd.DataFrame]): | ||
| """Task carrying row-wise multimodal records. |
There was a problem hiding this comment.
Just having position means it's interleaved right? How about interleaved_multimodal?
| msg = f"Unsupported data type: {type(self.data)}" | ||
| raise TypeError(msg) | ||
|
|
||
| def validate(self) -> bool: |
There was a problem hiding this comment.
But base is just True right? For us, I'm not sure if the end user would agree this is what it means for a task to be a valid task.
New parametrized and data-driven tests across 6 test files covering all previously untested branches in materialization, validation utils, InterleavedBatch task, filter/annotator stages, readers, and writers. Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Made-with: Cursor
Additional Comments (2)
When This contradicts the write-time materialization path (via
Also add |

Summary
Adds row-wise multimodal ingestion and write path for WebDataset tar shards (MINT-1T style), with lazy materialization support for local, remote, and tar-archived binary content.
What this enables
MultiBatchTask)frame_indexinsource_refArchitecture
By the numbers
Key components added
MultiBatchTasknemo_curator/tasks/multimodal.pyMULTIMODAL_SCHEMA) plus user passthrough columnsWebdatasetReaderstages/multimodal/io/reader.pyCompositeStagethat decomposes intoFilePartitioningStage+WebdatasetReaderStageWebdatasetReaderStagestages/multimodal/io/readers/webdataset.pysource_reflocators and optional materialize-on-readMultimodalParquetWriterStagestages/multimodal/io/writers/tabular.pyBaseMultimodalFilterStagestages/multimodal/stages.pyMultimodalJpegAspectRatioFilterStagestages/multimodal/stages.pystages/multimodal/utils/materialization.pyfs.cat_ranges), tar extract, direct readsplit_table_by_group_max_bytesnemo_curator/core/utils.pyMinimal usage
Benchmarking
Added
benchmarking/scripts/multimodal_mint1t_benchmark.pywith two nightly benchmark entries innightly-benchmark.yaml:multimodal_mint1t_xennamultimodal_mint1t_xenna_materializeBoth benchmarks are initially
enabled: falsefor opt-in activation. Tracked metrics:throughput_rows_per_sec,num_rows,num_output_files,materialize_error_count.The benchmark script supports:
--executor xenna|ray_data)Other changes
nemo_curator/stages/base.py: Added_time_metric()context manager for timing code blocks within stagesnemo_curator/core/utils.py: Addedsplit_table_by_group_max_bytes()for splitting Arrow tables by byte budgetpyproject.toml: AddedPillowto theimage_cpudependency groupbenchmarking/scripts/utils.py: Addedcollect_parquet_output_metrics()for post-run output analysis.gitignore: AddedAGENTS.mdTests
39 unit tests across 3 test files (895 lines):
test_multimodal_core.py(25 tests) -- source_ref parsing, classify_rows dispatch, all three materialization strategies (direct read, tar extract, range read), range deduplication, mixed-strategy batches, edge cases (empty tasks, missing paths), JPEG aspect-ratio filter,split_table_by_group_max_bytes, row-validity mask, composite stage decompositiontest_multimodal_reader.py(6 tests) -- custom field mapping, default passthrough, content type resolution, image tokens with frame index, empty output schema, field validation errorstest_multimodal_writer.py(3 tests) -- materialization error handling, direct content path without tar key, DataFrame index preservationDocumentation
Full design reference in
nemo_curator/stages/multimodal/README.mdcovering schema, source_ref format, materialization strategies, usage example, and file layout.