StructuredDataset¶
StructuredDataset reads file-based formats (Parquet, ORC, CSV, JSON, JSONL) from any fsspec-compatible filesystem.
create_dataloader()¶
from torch_dataloader_utils import StructuredDataset
import pyarrow.compute as pc
loader, dataset = StructuredDataset.create_dataloader(
path="s3://my-bucket/data/train/",
format="parquet", # parquet | orc | csv | json | jsonl
num_workers=None, # None = auto-detect, 0 = single process
batch_size=1024,
columns=["feature_a", "feature_b", "label"],
filters=pc.field("date") > "2024-01-01", # predicate pushdown via pyarrow
shuffle=True,
shuffle_seed=42,
split_bytes="128MiB", # target chunk size (default 128 MiB)
split_rows=None, # target rows per chunk (overrides split_bytes)
split_strategy=None, # None = auto-select TargetSizeSplitStrategy
output_format="torch", # torch | numpy | arrow | dict
storage_options={"key": "...", "secret": "..."},
partitioning="hive", # None | "hive"
num_ranks=1, # total DDP world size (default 1 = single process)
rank=0, # this process's global DDP rank (default 0)
show_progress=False, # tqdm progress bars per worker per file
progress_interval_sec=120, # how often to refresh bars (seconds)
)
Returns (DataLoader, dataset). Keep a reference to dataset to call set_epoch() each epoch when shuffle=True.
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str |
— | Directory, glob, or single file path |
format |
str |
— | parquet, orc, csv, json, jsonl |
num_workers |
int \| None |
None |
DataLoader workers. None = max(1, cpu_count - 1). 0 = single process |
batch_size |
int |
1024 |
Rows per batch |
columns |
list[str] \| None |
None |
Column projection. None = all columns |
filters |
pc.Expression \| None |
None |
Row-level predicate pushdown via pyarrow |
shuffle |
bool |
False |
Shuffle chunks before assigning to workers |
shuffle_seed |
int |
42 |
Base seed — actual seed is shuffle_seed + epoch |
shuffle_buffer_size |
int \| None |
None |
Record-level shuffle buffer size (rows). None = no record shuffle |
split_bytes |
int \| str \| None |
None |
Target bytes per chunk. Strings like "128MiB" accepted. None = 128 MiB |
split_rows |
int \| None |
None |
Target rows per chunk. Overrides split_bytes |
split_strategy |
SplitStrategy \| None |
None |
Custom strategy. None = auto-select |
output_format |
str |
"torch" |
torch, numpy, arrow, dict |
storage_options |
dict \| None |
None |
Forwarded to fsspec (credentials, endpoint, etc.) |
collate_fn |
Callable \| None |
None |
Custom DataLoader collate function |
partitioning |
str \| None |
None |
"hive" decodes key=value directory segments as columns |
num_ranks |
int |
1 |
Total DDP world size. Default 1 = single-process (V1 behaviour) |
rank |
int |
0 |
This process's global DDP rank (0-indexed). Default 0 |
show_progress |
bool |
False |
Show tqdm progress bars (one per worker per file). Requires tqdm |
progress_interval_sec |
float |
120.0 |
How often to refresh progress bars (seconds) |
Epoch Reshuffling¶
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
shuffle=True,
num_workers=4,
)
for epoch in range(num_epochs):
dataset.set_epoch(epoch) # regenerates splits with seed + epoch
for batch in loader:
...
Call set_epoch() in the main process before each epoch, not inside a worker.
Record-Level Shuffle¶
shuffle=True randomises chunk order — but within a chunk, rows are read in file order. If your Parquet files are sorted by timestamp or user ID, consecutive batches will still be correlated.
shuffle_buffer_size adds a per-worker reservoir buffer that mixes rows across chunks:
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
shuffle=True, # chunk-level: reorders file chunks across epochs
shuffle_buffer_size=50_000, # record-level: mixes rows within each worker's stream
num_workers=4,
)
The two levels are independent — use either or both:
shuffle |
shuffle_buffer_size |
Effect |
|---|---|---|
True |
None |
Chunk order randomised, rows in file order |
False |
50_000 |
Fixed chunk order, rows mixed within each worker |
True |
50_000 |
Chunk order randomised + rows mixed (recommended for training) |
Buffer sizing:
shuffle_buffer_size |
20 float32 cols | 100 float32 cols |
|---|---|---|
| 10,000 rows | 0.8 MB / worker | 4 MB / worker |
| 50,000 rows | 4 MB / worker | 20 MB / worker |
| 100,000 rows | 8 MB / worker | 40 MB / worker |
With num_workers=8 and shuffle_buffer_size=50_000 (100 cols): 8 × 20 MB = 160 MB total. For full shuffle quality set shuffle_buffer_size to dataset_size / num_workers.
Memory lives in worker processes
Each worker maintains its own independent buffer. No IPC occurs until a completed output batch crosses the DataLoader pipe — the buffer is entirely local to the worker heap.
from torch_dataloader_utils.splits.core import DataFileInfo, Shard
class MyStrategy:
def generate(self, files: list[DataFileInfo], num_workers: int, epoch: int) -> list[Shard]:
...
loader, _ = StructuredDataset.create_dataloader(
..., split_strategy=MyStrategy()
)
No inheritance required — just implement generate().
Collate Function¶
create_dataloader() handles collation automatically based on output_format:
output_format |
collate_fn required? |
Default behaviour |
|---|---|---|
"torch" |
No | PyTorch default collate (stacks tensors) |
"numpy" |
No | Auto-generated passthrough (lambda x: x) |
"arrow" |
Yes | Raises ValueError if not provided |
"dict" |
Yes | Raises ValueError if not provided |
# arrow / dict — must provide collate_fn
loader, _ = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
output_format="arrow",
collate_fn=lambda x: x,
)
Direct constructor
The ValueError for missing collate_fn is raised at constructor time when using StructuredDataset(...) directly. create_dataloader() auto-generates a passthrough for "numpy" so you never need to pass one for numpy mode.
Distributed Training (DDP)¶
Pass num_ranks and rank to shard files across DDP processes. Each rank receives an interleaved subset of all splits — rank 0 gets splits 0, 2, 4, …; rank 1 gets splits 1, 3, 5, …; and so on.
PyTorch DDP¶
import torch.distributed as dist
from torch_dataloader_utils import StructuredDataset
dist.init_process_group(backend="nccl")
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
num_workers=4,
batch_size=1024,
shuffle=True,
num_ranks=dist.get_world_size(),
rank=dist.get_rank(),
)
for epoch in range(num_epochs):
dataset.set_epoch(epoch)
for batch in loader:
...
HuggingFace Accelerate¶
from accelerate import Accelerator
from torch_dataloader_utils import StructuredDataset
accelerator = Accelerator()
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
num_workers=4,
batch_size=1024,
shuffle=True,
num_ranks=accelerator.num_processes,
rank=accelerator.process_index, # global rank, not local_process_index
)
loader = accelerator.prepare(loader) # optional — adds gradient sync wrappers
for epoch in range(num_epochs):
dataset.set_epoch(epoch)
for batch in loader:
...
Global rank vs local rank
Use accelerator.process_index (global rank across all nodes), not accelerator.local_process_index (GPU index within one node). The same applies to PyTorch DDP: dist.get_rank() is the correct global rank.
Advanced: Direct Constructor¶
from torch.utils.data import DataLoader
from torch_dataloader_utils import StructuredDataset
ds = StructuredDataset(files=..., format="parquet", ...)
loader = DataLoader(ds, batch_size=None, num_workers=4)
loader = accelerator.prepare(loader)
Observability¶
StructuredDataset emits structured logs at every stage — startup summary, split assignment table, load balance warnings, per-file logs, progress bars, and epoch summaries.
See Observability for the full reference.