Skip to content
Open
1 change: 1 addition & 0 deletions mkdocs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ markdown_extensions:
- pymdownx.superfences
- toc:
permalink: true

227 changes: 214 additions & 13 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

import pyarrow as pa

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
Expand All @@ -28,12 +30,23 @@
from pyiceberg.utils.singleton import _convert_to_hashable_type

if TYPE_CHECKING:
import pyarrow as pa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we still need this here


from pyiceberg.table import Table


class InspectTable:
"""A utility class for inspecting and analyzing Iceberg table metadata.

This class provides methods to extract and analyze information such as:
- Snapshots and their metadata.
- Manifests and partition data.
- Table history, references, and file-level statistics.

Attributes:
tbl (Table): The Iceberg table instance to inspect.


"""

tbl: Table

def __init__(self, tbl: Table) -> None:
Expand All @@ -45,6 +58,23 @@ def __init__(self, tbl: Table) -> None:
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e

def _get_snapshot(self, snapshot_id: Optional[int] = None) -> Snapshot:
"""Retrieve a specific snapshot or the current snapshot of the Iceberg table.

This method allows "time travel" to a specific snapshot by providing its ID.
If no `snapshot_id` is provided, the method returns the current snapshot
of the table.

Args:
snapshot_id (Optional[int]): The ID of the snapshot to retrieve.
If None, the current snapshot is returned.

Returns:
Snapshot: The requested snapshot instance.

Raises:
ValueError: If the specified snapshot ID is not found or if the table
does not have any snapshots.
"""
if snapshot_id is not None:
if snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id):
return snapshot
Expand All @@ -57,7 +87,20 @@ def _get_snapshot(self, snapshot_id: Optional[int] = None) -> Snapshot:
raise ValueError("Cannot get a snapshot as the table does not have any.")

def snapshots(self) -> "pa.Table":
import pyarrow as pa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we actually need this here, in case someone imports this function directly

from pyiceberg.table.inspect import snapshots

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. I will keep the import for pyarrow as suggested, in case the function is used directly.

"""Generate a table of all snapshots in the Iceberg table.

This method retrieves metadata about all snapshots stored in the Iceberg table,
including details such as timestamps, snapshot IDs, and parent-child relationships.

Returns:
pa.Table: A PyArrow table containing metadata for all snapshots, including fields like:
- committed_at: Timestamp when the snapshot was committed.
- snapshot_id: Unique ID of the snapshot.
- parent_id: ID of the parent snapshot (if any).
- operation: Type of operation performed (e.g., append, overwrite).
- manifest_list: Path to the manifest list file.
- summary: Additional metadata properties.
"""

snapshots_schema = pa.schema(
[
Expand Down Expand Up @@ -95,7 +138,20 @@ def snapshots(self) -> "pa.Table":
)

def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa
"""Generate a table of manifest entries for a specific snapshot.

This method retrieves metadata for all manifest entries within a given snapshot,
including file-level statistics such as column sizes and value counts.

Args:
snapshot_id (Optional[int]): The ID of the snapshot to retrieve entries for.
If None, entries for the current snapshot are returned.

Returns:
pa.Table: A PyArrow table containing manifest entries, including fields like:
- status: Status of the entry (e.g., added, existing, deleted).
- snapshot_id: The ID of the snapshot containing the entry.
- file details: Metadata such as file path, format, size, and partition."""

from pyiceberg.io.pyarrow import schema_to_pyarrow

Expand Down Expand Up @@ -226,7 +282,20 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
)

def refs(self) -> "pa.Table":
import pyarrow as pa
"""Retrieve references from the Iceberg table metadata as a PyArrow Table.

This method extracts reference information, such as branch or tag details,
snapshot IDs, and associated configuration parameters.

Returns:
pa.Table: A PyArrow table with reference details, including:
- name: Name of the reference.
- type: Type of reference (branch or tag).
- snapshot_id: ID of the associated snapshot.
- max_reference_age_in_ms: Maximum age of the reference in milliseconds.
- min_snapshots_to_keep: Minimum number of snapshots to retain.
- max_snapshot_age_in_ms: Maximum age of snapshots in milliseconds.
"""

ref_schema = pa.schema(
[
Expand Down Expand Up @@ -256,7 +325,22 @@ def refs(self) -> "pa.Table":
return pa.Table.from_pylist(ref_results, schema=ref_schema)

def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa
"""Retrieve partition information from the Iceberg table as a PyArrow Table.

This method aggregates metadata and statistics for table partitions, enabling insights
into partition-level details like record counts and file sizes. If a `snapshot_id`
is provided, historical data can be retrieved from that specific snapshot.

Args:
snapshot_id (Optional[int]): The ID of the snapshot to retrieve partition
information from. If None, the current snapshot is used.

Returns:
pa.Table: A PyArrow table containing partition metadata, including:
- record_count: Number of records in the partition.
- file_count: Number of files in the partition.
- total_data_file_size_in_bytes: Total size of data files in bytes.
- other partition-level metrics and timestamps."""

from pyiceberg.io.pyarrow import schema_to_pyarrow

Expand Down Expand Up @@ -348,7 +432,17 @@ def update_partitions_map(
)

def _get_manifests_schema(self) -> "pa.Schema":
import pyarrow as pa
"""Define the schema for manifest data in the Iceberg table.

This schema specifies the structure of manifest metadata, including fields
for partition summaries and file-level statistics.

Returns:
pa.Schema: The schema for manifest data, with fields like:
- content: Content type of the manifest (data or deletes).
- path: Path to the manifest file.
- length: Size of the manifest file in bytes.
- partition summaries: Metadata summarizing partition details"""

partition_summary_schema = pa.struct(
[
Expand Down Expand Up @@ -378,14 +472,34 @@ def _get_manifests_schema(self) -> "pa.Schema":
return manifest_schema

def _get_all_manifests_schema(self) -> "pa.Schema":
import pyarrow as pa
"""Extend the manifest schema to include additional fields.

This method adds fields like reference snapshot IDs to the standard manifest schema,
enabling support for tables with historical and branching capabilities.

Returns:
pa.Schema: The extended manifest schema, including additional fields for:
- reference_snapshot_id: ID of the snapshot referencing the manifest."""

all_manifests_schema = self._get_manifests_schema()
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema

def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
import pyarrow as pa
"""Generate a table of manifests with detailed metadata.

This method creates a structured table containing manifest metadata, including
partition summaries, file counts, and content type. It supports filtering by
snapshot and aggregating data for all manifests.

Args:
snapshot (Optional[Snapshot]): The snapshot for which manifests are generated.
is_all_manifests_table (bool): Whether to include data for all manifests,
including reference snapshots.

Returns:
pa.Table: A PyArrow table containing manifest details.
"""

def _partition_summaries_to_rows(
spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
Expand Down Expand Up @@ -454,10 +568,28 @@ def _partition_summaries_to_rows(
)

def manifests(self) -> "pa.Table":
"""Retrieve the table of manifests for the current snapshot.

This method extracts metadata about manifests, including file paths, partition summaries,
and snapshot-level metrics for the current state of the Iceberg table.

Returns:
pa.Table: A PyArrow table containing details about manifests.
"""
return self._generate_manifests_table(self.tbl.current_snapshot())

def metadata_log_entries(self) -> "pa.Table":
import pyarrow as pa
"""Retrieve the table of metadata log entries.

This method fetches historical metadata changes logged for the table, including
information about timestamps, schema updates, and snapshot IDs.

Returns:
pa.Table: A PyArrow table containing metadata log details, including:
- timestamp: Time of the metadata update.
- file: Path to the metadata file.
- latest_snapshot_id: The most recent snapshot ID.
"""

from pyiceberg.table.snapshots import MetadataLogEntry

Expand Down Expand Up @@ -493,7 +625,18 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any
)

def history(self) -> "pa.Table":
import pyarrow as pa
"""Retrieve the history table for the Iceberg table.

This method provides a historical view of the table's state changes, including
timestamps and parent-child relationships of snapshots.

Returns:
pa.Table: A PyArrow table with historical details, including:
- made_current_at: Timestamp when the snapshot was made current.
- snapshot_id: ID of the snapshot.
- parent_id: ID of the parent snapshot.
- is_current_ancestor: Whether the snapshot is an ancestor of the current state.
"""

history_schema = pa.schema(
[
Expand Down Expand Up @@ -524,7 +667,23 @@ def history(self) -> "pa.Table":
return pa.Table.from_pylist(history, schema=history_schema)

def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa
"""Retrieve a table of files from a specific snapshot, optionally filtered by content type.

This method fetches file-level metadata, including details about data and delete files,
for a given snapshot.

Args:
snapshot_id (Optional[int]): The snapshot ID to retrieve files for.
data_file_filter (Optional[Set[DataFileContent]]): A set of file content types
(e.g., data or delete files) to filter the results.

Returns:
pa.Table: A PyArrow table containing file metadata, including fields like:
- content: Type of file content (data or deletes).
- file_path: Path to the file.
- record_count: Number of records in the file.
- file_size_in_bytes: Size of the file in bytes.
"""

from pyiceberg.io.pyarrow import schema_to_pyarrow

Expand Down Expand Up @@ -637,16 +796,58 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
"""Retrieve a table of files for the current snapshot or a specific snapshot ID.

This method fetches file-level metadata for data and delete files in the table.

Args:
snapshot_id (Optional[int]): The snapshot ID to retrieve files for. If None,
the current snapshot is used.

Returns:
pa.Table: A PyArrow table containing file metadata.
"""
return self._files(snapshot_id)

def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
"""Retrieve a table of data files for the current snapshot or a specific snapshot ID.

This method fetches metadata for files containing table data (excluding delete files).

Args:
snapshot_id (Optional[int]): The snapshot ID to filter data files for. If None,
the current snapshot is used.

Returns:
pa.Table: A PyArrow table containing metadata for data files."""
return self._files(snapshot_id, {DataFileContent.DATA})

def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
"""Retrieve a table of delete files for the current snapshot or a specific snapshot ID.

This method fetches metadata for files containing delete markers.

Args:
snapshot_id (Optional[int]): The snapshot ID to filter delete files for. If None,
the current snapshot is used.

Returns:
pa.Table: A PyArrow table containing metadata for delete files.
"""
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

def all_manifests(self) -> "pa.Table":
import pyarrow as pa
"""Retrieve a table of all manifests from all snapshots in the table.

This method aggregates metadata for all manifests, including historical ones,
providing a comprehensive view of the table's state.

Returns:
pa.Table: A PyArrow table containing metadata for all manifests, including fields like:
- content: Content type of the manifest.
- path: Path to the manifest file.
- added_snapshot_id: Snapshot ID when the manifest was added.
"""

snapshots = self.tbl.snapshots()
if not snapshots:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,4 @@ module = "datafusion.*"
ignore_missing_imports = true

[tool.coverage.run]
source = ['pyiceberg/']
source = ['pyiceberg/']