Scripts and reusable functions for implementing common data operations (tokenization, splitting, subsampling, packing, ...)
Built with special support for Mosaic Streaming Datasets (MDS).
Clone this repo and install via pip install -e .
or install from pypi via pip install datatools-py
.
datatools contributes some core libraries that can be used to easily build custom data pipelines, specifically from datatools import load, process
.
Loads the dataset at the path tries to infer what format it is in (e.g., compressed json, pyarrow, MDS, ...) based on clues from the file format and directory structure
Processes an input dataset and writes the results to disk. It supports:
- Multi-processing with many CPUs, e.g.
ProcessOptions(num_proc=16)
(or as flag-w 16
) - Slurm array parallelization, e.g.
ProcessOptions(slurm_array=True)
(or--slurm_array
) automatically sets upjob_id
andnum_jobs
using slurm environment variables - Custom indexing, e.g. only working on a subset
--index_range 0 30
or using a custom index file--index_path path/to/index.npy
See ProcessOptions for details. - By default we write the output file as mosaic-streaming MDS shards, which we merge into a single MDS dataset when the job finishes. However, the code also supports writing to JSONL files (
--jsonl
) and ndarray files for each column (--ndarray
). The shards for these output formats are not automatically merged.
The process_fn
should be a function takes one to three arguments:
- A subset of the data with
len(...)
and.[...]
access - The global indices corresponding to the subset (optionally)
- The
process_id
for logging or sharding purposes (optionally)
from datatools import load, process, ProcessOptions
from transformers import AutoTokenizer
# Load dataset (can be json, parquet, MDS, etc.)
dataset = load("path/to/dataset")
# Setup tokenizer and processing function
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.1-8B")
def tokenize_docs(data_subset):
for item in data_subset:
# Tokenize text and return dict with tokens and length
tokens = tokenizer.encode(item["text"], add_special_tokens=False)
# Chunk the text into 1024 token chunks
for i in range(0, len(tokens), 1024):
yield {
"input_ids": tokens[i:i+1024],
"length": len(tokens[i:i+1024])
}
# Process dataset with 4 workers and write to disk
process(dataset, tokenize_docs, "path/to/output", process_options=ProcessOptions(num_proc=4))
datatools comes with the following default script:
tokenize
: tokenize datasets per documentpack
: pack tokenized documents into fixed sequencespeek
: print datasets as json to stdoutwrangle
: subsample, merge datasets, make random splits (e.g., train/test/validation), etc...merge_index
: merge mosaic streaming datasets in subfolders to a big dataset
Run <script> --help
for detailed arguments! Many scripts automatically add all arguments from ProcessOptions
(e.g. number of processes -w <processes>
) and LoadOptions
.