IcebergDataset¶
IcebergDataset reads Apache Iceberg tables via pyiceberg, resolves them to data files, and streams batches using the same worker split mechanism as StructuredDataset.
Install¶
create_dataloader()¶
from torch_dataloader_utils import IcebergDataset
import pyarrow.compute as pc
loader, dataset = IcebergDataset.create_dataloader(
table="my_db.my_table", # or "namespace.db.table"
catalog_config={
"type": "rest", # rest | glue | hive | jdbc
"uri": "https://catalog.example.com",
"credential": "token:abc123",
},
num_workers=4,
batch_size=1024,
columns=["feature_a", "feature_b", "label"],
filters=pc.field("region_id") >= 5, # auto-prunes files AND filters rows
snapshot_id=None, # None = current; set for time travel
shuffle=True,
split_bytes="64MiB",
output_format="torch",
)
Scan-Filter Auto-Derivation¶
Passing filters is all you need. The library auto-translates common pyarrow expressions into a native pyiceberg expression and pushes it into table.scan(row_filter=...) at plan_files() time — pruning entire partitions and files before splits are generated.
Supported auto-translation: >=, >, <=, <, ==, != with integer, float, or string literals; & (AND), | (OR), arbitrarily nested.
INFO Auto-derived scan_filter: pc.Expression (region_id >= 5)
→ pyiceberg GreaterThanOrEqual(...)
INFO Iceberg scan complete: files=4 (down from 6 without filter)
Explicit Two-Layer Control¶
For cases where file-pruning and row-filtering need different predicates:
from pyiceberg.expressions import GreaterThan
loader, dataset = IcebergDataset.create_dataloader(
...
scan_filter=GreaterThan("partition_dt", 20240101), # file/partition pruning
filters=pc.field("score") > 0.9, # row-level filter in workers
)
When scan_filter is provided explicitly, auto-derivation is skipped.
Parameters¶
Most StructuredDataset parameters apply — except path, format, partitioning, and storage_options. Storage credentials for Iceberg are handled via catalog_config, not storage_options.
| Parameter | Type | Default | Description |
|---|---|---|---|
table |
str |
— | Fully qualified table identifier ("db.table" or "ns.db.table") |
catalog_config |
dict |
— | Forwarded to pyiceberg.catalog.load_catalog(). Pass credentials here, not in storage_options |
snapshot_id |
int \| None |
None |
Pin a snapshot for time travel. None = current snapshot |
scan_filter |
BooleanExpression \| None |
None |
Native pyiceberg expression for file/partition pruning. Auto-derived from filters when not set |
Catalog Config Examples¶
=== "REST"
catalog_config = {
"type": "rest",
"uri": "https://catalog.example.com",
"credential": "token:abc123",
}
=== "AWS Glue"
catalog_config = {
"type": "glue",
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
"region_name": "us-east-1",
}
=== "Local SQLite (testing)"
catalog_config = {
"name": "local",
"uri": "sqlite:////tmp/catalog.db",
"warehouse": "file:///tmp/warehouse",
}
Delete Files¶
When the Iceberg table contains position delete files (written by DELETE or MERGE INTO), IcebergDataset automatically switches to pyiceberg.io.pyarrow.ArrowScan per file — deleted rows are never returned.
Limitations with delete files
- Equality deletes are not supported by pyiceberg and will raise
NotImplementedError. Compact the table first. - Sub-file splitting is disabled when delete files are present — position delete offsets reference absolute row positions in the original file.