Splits & Workers¶
What Is a Worker Here?¶
num_workers refers to DataLoader I/O workers — not training processes (DDP ranks).
PyTorch's DataLoader spawns background processes to prefetch data while the GPU trains. Each of these is an I/O worker: it reads files, decodes batches, and places them in a shared queue that the training process consumes.
Training process (GPU)
↑ consumes batches from shared queue
│
├── DataLoader Worker 0 ← reads its assigned files, decodes, batches
├── DataLoader Worker 1 ← reads its assigned files, decodes, batches
├── DataLoader Worker 2 ← reads its assigned files, decodes, batches
└── DataLoader Worker 3 ← reads its assigned files, decodes, batches
These workers are not DDP training ranks. In a multi-GPU setup, each DDP rank has its own DataLoader with its own num_workers I/O workers. Use num_ranks and rank on create_dataloader() for rank-level file sharding — see Rank-Aware DDP Sharding below.
How Splits Work¶
Files are pre-partitioned into Shard objects — one per DataLoader worker — before iteration begins. This happens once at create_dataloader() time.
File Discovery → Scan-Level Pruning → Split Generation → Shard Assignment
(fsspec/pyiceberg) (scan_filter) (strategy) (to DataLoader workers)
Each worker owns exactly one shard and reads its assigned files sequentially. Workers never communicate or share data.
Split Strategies¶
| Strategy | When | Balances by |
|---|---|---|
TargetSizeSplitStrategy |
Default (non-empty file list) | Row count via LPT scheduling |
RoundRobinSplitStrategy |
Fallback (empty file list) | File count |
TargetSizeSplitStrategy¶
Chunks are assigned to workers using greedy min-heap (LPT scheduling) — always assigns the next chunk to the least-loaded worker. Near-perfectly balanced row counts even for very unequal file sizes.
# Tune chunk size
loader, _ = StructuredDataset.create_dataloader(
...,
split_bytes="64MiB", # string or int bytes
# or
split_rows=50_000, # overrides split_bytes
)
Behavior by format¶
| Format | Chunk granularity | Sub-file splitting | How row count is known |
|---|---|---|---|
| Parquet | Row group | Yes | Footer metadata (no data read) |
| Iceberg | Row group (resolves to Parquet files) | Yes | Footer metadata via pyiceberg scan |
| ORC | Stripe (uniform approximation) | Yes — stripe-boundary chunks | Approximate (nrows/nstripes) |
| CSV | Whole file | No | File size only |
| JSON / JSONL | Whole file | No | File size only |
Parquet and Iceberg are first-class: row group metadata is read once in the main process at split-generation time (no data scan, just the file footer — typically a few KB per file). Row groups are packed into split_bytes-sized chunks. A single large file can produce many chunks assigned to different workers.
ORC files are split at stripe boundaries. Because PyArrow does not expose per-stripe row counts in the footer, the row count is approximated uniformly as nrows / nstripes. The reader calls ORCFile.read_stripe() for each assigned stripe — true random access, no full scan.
CSV, JSON, JSONL are treated as unsplittable: each file becomes one chunk. For good parallelism with these formats, partition your data into many smaller files before training.
Large CSV or JSONL files
If you have a few very large CSV or JSONL files, sub-file splitting is not available. Split them into smaller files (e.g. 128–512 MiB each) to give the strategy enough chunks to balance across workers.
Sub-File Splitting¶
For large Parquet and ORC files, TargetSizeSplitStrategy generates multiple Split objects with disjoint RowRange values — a single file can be distributed across multiple workers.
Split(file=DataFileInfo, row_range=None) # whole file
Split(file=DataFileInfo, row_range=RowRange(0, 250_000)) # rows 0–250k
Split(file=DataFileInfo, row_range=RowRange(250_000, 250_000)) # rows 250k–500k
For Parquet, the reader uses pq.ParquetFile.read_row_groups() — only the assigned row groups are read (true random access, row-group granularity).
For ORC, the reader uses ORCFile.read_stripe() — only the assigned stripes are read (stripe granularity). Row counts are approximated uniformly since PyArrow does not expose per-stripe row counts.
Shuffle¶
Shuffle operates at the chunk level before shard assignment:
loader, dataset = StructuredDataset.create_dataloader(
..., shuffle=True, shuffle_seed=42
)
for epoch in range(num_epochs):
dataset.set_epoch(epoch) # seed = shuffle_seed + epoch
for batch in loader:
...
- Call
set_epoch()in the main process before each epoch - With
shuffle=False,set_epoch()can be omitted — splits are always generated in the same deterministic order regardless of epoch number - For record-level shuffle (mixing rows across chunks), add
shuffle_buffer_size=N— see StructuredDataset docs
Rank-Aware DDP Sharding¶
num_ranks and rank add a second level of partitioning above the existing worker-level splits. The hierarchy is:
Rank partitioning → Worker partitioning (within each rank)
(num_ranks / rank) (num_workers / split strategy)
All splits are generated first, then interleaved by rank:
This means rank 0 gets splits 0, 2, 4, …; rank 1 gets splits 1, 3, 5, …; and so on. Each rank then further divides its own rank_splits across its num_workers I/O workers.
Usage¶
import torch.distributed as dist
dist.init_process_group(backend="nccl")
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
num_workers=4,
batch_size=1024,
shuffle=True,
num_ranks=dist.get_world_size(), # total DDP ranks
rank=dist.get_rank(), # this process's rank
)
for epoch in range(num_epochs):
dataset.set_epoch(epoch)
for batch in loader:
...
With Accelerate¶
from accelerate import Accelerator
accelerator = Accelerator()
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
num_workers=4,
batch_size=1024,
shuffle=True,
num_ranks=accelerator.num_processes,
rank=accelerator.process_index,
)
loader = accelerator.prepare(loader) # optional — adds gradient sync wrappers
With Horovod¶
import horovod.torch as hvd
hvd.init()
loader, dataset = StructuredDataset.create_dataloader(
path="s3://bucket/data/",
format="parquet",
num_workers=4,
batch_size=1024,
num_ranks=hvd.size(),
rank=hvd.rank(),
)
Default behaviour (V1 compatible)
num_ranks=1, rank=0 are the defaults — all splits go to the single rank, identical to V1 behaviour.
Global rank vs local rank
Use the global rank and world size (across all nodes), not the per-node local rank. In PyTorch DDP: dist.get_rank() is global. In Accelerate: accelerator.process_index is global. Local rank (LOCAL_RANK env var or accelerator.local_process_index) is the GPU index within a single node and should not be used here.
num_workers¶
num_workers controls how many DataLoader I/O background processes spawn to prefetch data. It has nothing to do with GPU count or DDP rank count.
num_workers=None # auto-detect: max(1, os.cpu_count() - 1), logged at INFO
num_workers=0 # single process — no forking, useful for debugging
num_workers=4 # 4 background I/O processes
A good starting point is num_workers=4. For cloud storage (S3, GCS), more workers overlap network latency well — try 8–16. For local NVMe, 2–4 is usually enough before CPU becomes the bottleneck.
Debugging
Use num_workers=0 when debugging — all reads happen in the main process, making stack traces and breakpoints work normally.
Custom Strategy¶
from torch_dataloader_utils.splits.core import DataFileInfo, Shard
class MyStrategy:
def generate(
self,
files: list[DataFileInfo],
num_workers: int,
epoch: int,
num_ranks: int = 1, # optional — V1 strategies without these params still work
rank: int = 0,
) -> list[Shard]:
...
loader, _ = StructuredDataset.create_dataloader(
..., split_strategy=MyStrategy()
)
No inheritance required — implement generate() and you're done. V1 strategies that do not accept num_ranks / rank continue to work — the library inspects the signature and omits those arguments if not present.
File Metadata¶
The split layer operates on two file metadata types:
DataFileInfo # plain files: path, file_size, record_count
IcebergDataFileInfo # extends DataFileInfo: + partition, snapshot_id
Both satisfy the same interface — SplitStrategy.generate() accepts list[DataFileInfo] and works with either. Iceberg's richer metadata (partition values, column-level statistics from the snapshot manifest) is used by IcebergDataset during the scan phase for predicate pushdown and file pruning — the split strategy itself only sees path, file_size, and record_count from the base class.
Custom strategies can isinstance-check for IcebergDataFileInfo if they want partition-aware assignment logic:
from torch_dataloader_utils.splits.core import DataFileInfo, IcebergDataFileInfo, Shard
class PartitionAwareStrategy:
def generate(self, files: list[DataFileInfo], num_workers: int, epoch: int) -> list[Shard]:
for f in files:
if isinstance(f, IcebergDataFileInfo):
partition = f.partition # e.g. {"region": "us-east-1"}
...