torch-dataloader-utils¶
A lightweight PyTorch library for reading structured tabular data from cloud object storage directly into a DataLoader. No Ray, no Spark, no heavy infrastructure.
Why?¶
PyTorch has no good built-in story for streaming structured data from cloud storage into a DataLoader. The existing ecosystem is either deprecated, too heavy, or built for different data shapes:
| Solution | Problem |
|---|---|
torchdata |
Solves checkpointing (StatefulDataLoader) and pipeline composition (nodes); no built-in cloud file sharding or Iceberg support — requires significant custom wiring |
| Ray Data | Requires a Ray cluster — overkill for most training jobs |
HuggingFace datasets |
No Iceberg support, opinionated schema |
| WebDataset | Designed for unstructured data (images, audio) |
This library fills that gap. But to understand why it exists, it helps to understand exactly where torch.utils.data falls short.
Problem 1 — Map-style Dataset requires indexable data¶
PyTorch's canonical pattern uses a map-style Dataset:
class MyDataset(Dataset):
def __len__(self): ... # must know total row count
def __getitem__(self, idx): ... # must seek to any row by index
This works well for in-memory data or image folders. It breaks for cloud-stored tabular data because:
__len__is expensive — you'd need to scan metadata across potentially thousands of Parquet files just to return a number__getitem__(idx)has poor granularity — Parquet supports seeking to a row group (typically 100k–1M rows) via footer metadata, but not to an individual row. Fetching row 1,234,567 means decoding the entire row group it lives in — potentially hundreds of MBs — to return one row-
DistributedSamplerdestroys I/O locality — it generates a globally shuffled list of row indices and assigns each rank a scattered slice. Say you have 3 files with 3 row groups each (9 row groups total) and 2 DDP ranks:DistributedSampler assigns: Rank 0 → rows [0, 2, 4, 6, 8, 10, 12, 14, 16] (every other row — from ALL row groups) Rank 1 → rows [1, 3, 5, 7, 9, 11, 13, 15, 17] (every other row — from ALL row groups) To serve those rows, both ranks must open all 3 files and decode all 9 row groups — just to use half the rows from each. Every row group is decoded twice. Total I/O: 2×.On cloud storage, each row group seek is a separate HTTP range request (~50–100 ms latency each). Sequential reads are fast; scattered seeks across row groups are slow and expensive. The more ranks you add, the worse the amplification.
Problem 2 — IterableDataset has no distributed sharding¶
IterableDataset drops the index contract, which fits file streaming naturally. But PyTorch gives workers no mechanism to know which files belong to them:
class MyIterableDataset(IterableDataset):
def __iter__(self):
# get_worker_info() tells you your worker id and num_workers
# but PyTorch never tells you *which files* are yours
for file in all_files:
yield from read(file)
The standard workaround: every worker reads every file and filters by index parity.
worker = torch.utils.data.get_worker_info()
for i, row in enumerate(all_rows):
if i % worker.num_workers == worker.id:
yield row # keep 1-in-N rows, discard the rest
Every file is read N times. On S3 or GCS, you pay per-byte — including bytes you immediately throw away.
Problem 3 — Accelerate / DDP gives no file-level sharding either¶
HuggingFace Accelerate and PyTorch DDP wrap your DataLoader across GPU ranks. But wrapping an IterableDataset-based loader doesn't partition the underlying files — it just splits the batch stream after data has already been loaded. Each rank still runs its own DataLoader that reads the full dataset:
Rank 0: DataLoader → reads ALL files → sends every other batch to GPU 0
Rank 1: DataLoader → reads ALL files → sends every other batch to GPU 1
→ full dataset read twice, twice the I/O cost
Accelerate's documentation acknowledges this and says "implement sharding yourself" — which is exactly what this library does.
What this library does¶
Instead of distributing row indices after data is loaded, this library distributes files (or row group ranges for large Parquet files) before iteration begins. Same 3-file, 2-rank example:
This library assigns at create_dataloader() time:
Rank 0 → File A (all row groups) + File B (all row groups)
Rank 1 → File C (all row groups)
Each file is opened once, read sequentially start to finish,
by exactly one worker. No row group is ever decoded twice.
Total I/O: 1×.
What is a split?
A split (called a Shard internally) is a worker's read assignment — the list of file chunks it is responsible for. A chunk is either a whole file or a row group range within a file (RowRange(offset, length)). How chunks are generated depends on the format:
| Format | Chunk granularity | Sub-file splitting |
|---|---|---|
| Parquet | Row group (from footer metadata) | Yes — one file → many chunks |
| Iceberg | Resolves to Parquet data files → same as Parquet | Yes |
| ORC | Whole file | No — one file = one chunk |
| CSV / JSON / JSONL | Whole file | No — one file = one chunk |
For Parquet and Iceberg, footer metadata is scanned once in the main process (no data read) to determine row group sizes, then row groups are packed into target-sized chunks. For ORC and text formats, each file becomes one unsplittable chunk — so for good parallelism, shard large ORC/CSV files into many smaller ones.
Parquet example — 3 files with different sizes:
File A: 3 row groups → chunk A1 (rg 0-1), chunk A2 (rg 2)
File B: 2 row groups → chunk B1 (rg 0-1)
File C: 4 row groups → chunk C1 (rg 0-1), chunk C2 (rg 2-3)
All chunks: [A1, A2, B1, C1, C2]
→ shuffle (optional)
→ round-robin assign to workers:
Worker 0 Shard: [A1, C1] ← reads row groups 0-1 of A, then 0-1 of C
Worker 1 Shard: [A2, C2] ← reads row group 2 of A, then 2-3 of C
Worker 2 Shard: [B1] ← reads row groups 0-1 of B
The assignment is serialized into each worker's initializer — workers receive their shard before they start and never need to communicate with each other or the main process during iteration.
See Splits & Workers for full details on split strategies, sub-file splitting, shuffle, and tuning chunk size.
- File-level sharding at startup — splits are assigned once in the main process at
create_dataloader()time, before any worker spawns. Workers never coordinate. - Sub-file splitting for large Parquet — a single 10 GB file can be split across multiple workers at row group boundaries; each worker reads a disjoint range sequentially.
- Each byte read exactly once — no worker reads a file another worker owns. No amplification regardless of how many workers or ranks you use.
- Standard
DataLoaderoutput — returns a plaintorch.utils.data.DataLoader. Nothing else in your training stack changes. Works with Accelerate, FSDP, DDP unchanged. - Epoch-level shuffle — chunks are shuffled before assignment each epoch via
dataset.set_epoch(epoch), giving good randomness without record-level I/O overhead.
One Interface Across Everything¶
A key design goal is that where your data lives and how it is stored should not change your training code. The library abstracts three independent axes:
Storage backend Format Catalog
────────────── ────────────── ──────────────
S3 Parquet —
GCS × ORC × Apache Iceberg
Azure CSV (REST, Glue,
Local JSON / JSONL Hive, JDBC)
──────────────
Any combination → same DataLoader interface
Whether you are reading raw Parquet files from S3, ORC files from GCS, or an Iceberg table in AWS Glue — the call looks the same and the output is the same:
# Raw files on S3
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/", format="parquet", ...
)
# Raw files on GCS
loader, dataset = StructuredDataset.create_dataloader(
path="gcs://bucket/data/", format="orc", ...
)
# Iceberg table via Glue catalog
loader, dataset = IcebergDataset.create_dataloader(
table="my_db.my_table",
catalog_config={"type": "glue", "region_name": "us-east-1"},
...
)
# All three return the same standard DataLoader
for batch in loader:
loss = model(batch["feature_a"], batch["label"])
How this is achieved:
fsspecprovides a unified filesystem interface — S3, GCS, Azure, and local all expose the sameopen()/ls()API. Adding a new cloud provider requires only installing the rightfsspecplugin (s3fs,gcsfs,adlfs).pyarrowprovides a unified reader across Parquet, ORC, CSV, and JSON with consistent predicate pushdown, column projection, and ArrowRecordBatchoutput regardless of the underlying format.pyicebergresolves Iceberg tables to their underlying data files — which are just Parquet files — and hands them to the same reader pipeline. Partition pruning, snapshot pinning, and delete file handling are all transparent.
The result: switching from local Parquet during development to Iceberg-on-S3 in production is a one-line config change, not a rewrite.
Install¶
pip install torch-dataloader-utils # core (Parquet, ORC, CSV, JSON)
pip install torch-dataloader-utils[s3] # + S3 support
pip install torch-dataloader-utils[iceberg] # + Apache Iceberg support
pip install torch-dataloader-utils[all] # everything
Quick Start¶
import pyarrow.compute as pc
from torch_dataloader_utils import StructuredDataset
loader, dataset = StructuredDataset.create_dataloader(
path="s3://my-bucket/data/train/",
format="parquet",
num_workers=4,
batch_size=1024,
columns=["feature_a", "feature_b", "label"],
filters=pc.field("date") > "2024-01-01",
shuffle=True,
)
for epoch in range(num_epochs):
dataset.set_epoch(epoch)
for batch in loader:
loss = model(batch["feature_a"], batch["label"])
loss.backward()
For Iceberg tables:
from torch_dataloader_utils import IcebergDataset
loader, dataset = IcebergDataset.create_dataloader(
table="my_db.my_table",
catalog_config={"type": "rest", "uri": "https://catalog.example.com"},
num_workers=4,
batch_size=1024,
filters=pc.field("region_id") >= 5, # auto-prunes files + filters rows
)
Architecture¶
FilesystemLayer → FormatLayer → DatasetLayer
fsspec pyarrow.dataset torch.IterableDataset
pyiceberg (Iceberg only)
Files are discovered once at create_dataloader() time, partitioned into per-worker splits, and read lazily during iteration. DataLoader is always constructed with batch_size=None — Arrow owns batching internally.
Dependencies¶
Core¶
| Library | Version | Role |
|---|---|---|
| PyTorch | ≥2.2 | IterableDataset, get_worker_info() |
| pyarrow | ≥15.0 | All file format reading, predicate pushdown |
| fsspec | ≥2024.2 | Filesystem abstraction |
Optional¶
| Extra | Install |
|---|---|
s3 |
pip install torch-dataloader-utils[s3] |
gcs |
pip install torch-dataloader-utils[gcs] |
azure |
pip install torch-dataloader-utils[azure] |
iceberg |
pip install torch-dataloader-utils[iceberg] |
all |
pip install torch-dataloader-utils[all] |