Skip to content

Roadmap

V2 (Released)

Rank-Aware DDP Sharding

TargetSizeSplitStrategy and RoundRobinSplitStrategy now accept num_ranks and rank parameters. StructuredDataset and IcebergDataset expose these on both the constructor and create_dataloader(). Splits are assigned using interleaved slicing (all_splits[rank::num_ranks]) so each DDP rank reads a disjoint subset of files. Works with PyTorch DDP, Accelerate, and Horovod.

Deterministic Distributed Shuffling

shuffle=True with set_epoch(epoch) produces a globally consistent shuffle across all ranks. The full chunk list is shuffled before rank slicing — if each rank shuffled independently, ranks could overlap. All ranks call set_epoch(epoch) with the same value and get the same globally shuffled assignment, with no duplicates and no missed samples. Epoch number is used as a seed offset for full reproducibility.

Row-Group-Aware Split Scheduling

TargetSizeSplitStrategy reads Parquet row group metadata (byte sizes, row counts) once in the main process and packs consecutive row groups into target-sized chunks. A single large file can be split across multiple workers at row group boundaries — each worker reads only its assigned row groups via pq.ParquetFile.read_row_groups(). Assignment uses greedy LPT (longest-processing-time) heap scheduling for near-optimal load balance.

Projection and Predicate Pushdown

columns= projects down to only the required columns before data leaves storage. filters= applies predicate pushdown at both the file level (via Iceberg partition pruning) and row group level (via Parquet statistics). Arrow never reads columns or row groups that the filter eliminates.

ORC Sub-File Splitting

ORC files are now split at stripe boundaries, matching the fine-grained load balancing that Parquet has had since V1. Because PyArrow does not expose per-stripe row counts, the row count is approximated uniformly as nrows / nstripes. The reader calls ORCFile.read_stripe() for assigned stripes — true random access, no full scan.

ORC Support for Iceberg Tables

_detect_format now handles ORC-backed Iceberg tables. Iceberg tables storing data in ORC format are fully supported.

Arrow-Native Zero-Copy Batching

The entire pipeline is Parquet / ORC → Arrow RecordBatch → tensor — no pandas, no intermediate Python objects. DataLoader is constructed with batch_size=None so PyTorch never re-batches already-batched Arrow output. Numeric columns convert to tensors with a single memory copy; non-numeric columns (strings, timestamps) become Python lists without materializing intermediate arrays.

Multi-Worker Integration Tests on Linux CI

The full multi-worker integration test suite now runs on Linux in CI. macOS spawn mode caused deadlocks with pyarrow generators; Linux fork mode works correctly.

Mid-Epoch Checkpoint and Resume

Persist which splits have been fully consumed so that on crash or restart the DataLoader can skip already-processed splits and resume from the partial one. Epoch number is checkpointed alongside model weights for deterministic shuffle resumption.

Integrates with PyTorch's state_dict() / load_state_dict() protocol. CheckpointMismatchError is raised with a specific diagnosis if num_workers, shuffle_seed, or the file list changed since the checkpoint was saved. At most one in-progress shard re-reads from scratch on resume.

Record-Level Shuffle Buffer

shuffle_buffer_size=N adds a per-worker reservoir buffer holding N rows as an Arrow Table. Each output batch is drawn uniformly at random from the buffer, which is refilled from the source stream to maintain depth. Buffer is in-memory per worker — no IPC until the completed batch crosses the DataLoader pipe. Deterministic per shuffle_seed + epoch + worker_id.

Observability

WorkerMetrics exposes rows read, bytes read (estimated), files read, and elapsed seconds per worker. dataset.get_metrics() drains and returns results after each epoch. Optional tqdm progress bars via show_progress=True. Structured log lines at INFO level for split assignment, per-file completion, and epoch summary. Load-balance warning emitted when the max/min worker byte ratio exceeds 2×.

GCS and Azure Integration Tests

In-process MemoryFileSystem mock patches fsspec.url_to_fs for gs:// and az:// URIs, exercising the full PyFileSystem(FSSpecHandler(fs)) wrapping path in reader.py — the same code path taken by real gcsfs and adlfs, without requiring external infrastructure in CI.


V2 (Pending)

Row-Level Interleaving Across Files

Instead of finishing one file before starting the next, yield one batch from each open file in round-robin rotation. Breaks the within-split file ordering correlation that the shuffle buffer alone cannot eliminate when a worker has many small files assigned to it.


V3

Adaptive Dynamic Splitting

Rebalance splits across workers during iteration if some workers finish significantly faster than others. Useful for heterogeneous file sizes where static LPT scheduling still leaves some workers idle near the end of an epoch.

Memory-Aware Batching

Adaptive batch sizing based on available memory. Useful for workloads with variable row widths (e.g. embeddings with variable-length fields) where a fixed batch_size can trigger OOM on wide batches.

Schema Validation and Evolution

Enforce an expected schema at create_dataloader() time — validate column names, types, and nullability against a user-provided schema before iteration begins. Emit a clear error rather than a cryptic Arrow type mismatch mid-epoch. Also handle schema evolution across files in the same dataset (e.g. new columns added to later partitions).


Considered and Declined

Rust Backend

A Rust rewrite of the file-reading layer (decompression, Arrow conversion, scheduling) would add enormous implementation complexity. The bottleneck in cloud training is almost always network I/O latency, not CPU throughput — PyArrow's internals are already C++ and are not meaningfully slower than a Rust equivalent for I/O-bound workloads. The compressor that decompresses a Snappy-encoded row group in 2 ms versus 1 ms does not move GPU utilization when the preceding S3 GET took 80 ms. A Rust backend is the right answer if and only if decompression or Arrow conversion shows up as a CPU bottleneck in profiling — which it has not. Revisit if benchmarks show otherwise.

GPU-Aware Prefetch Scheduling

Dynamically adjusting prefetch depth based on GPU utilization feedback would require a tight coupling between the DataLoader and the training loop, turning a clean data-loading library into a training-loop framework. PyTorch's DataLoader prefetch queue already overlaps I/O with compute for the common case. Defer until there is evidence that static prefetch depth is a measurable bottleneck.

Query-Planner-Style Execution (loader.filter().select().batch())

An execution-plan interface would require building an expression tree, an optimizer, and a physical plan evaluator — essentially writing a small query engine. Arrow and Parquet already support projection and predicate pushdown natively via columns= and filters=; there is no missing capability, only a more fluent API surface. The additional abstraction layer adds complexity without improving performance or expressiveness for the target workload. Not planned.