Skip to content

G-Research/JollyJack

Repository files navigation

JollyJack

JollyJack is a high-performance Parquet reader designed to load data directly into NumPy arrays and PyTorch tensors with minimal overhead.

Features

  • Load Parquet straight into NumPy arrays or PyTorch tensors (fp16, fp32, fp64, int32, int64)
  • Up to 6× faster and with lower memory use than vanilla PyArrow
  • Compatibility with PalletJack
  • Optional io_uring + O_DIRECT backend for I/O-bound workloads

Known limitations

  • Data must not contain null values
  • Destination NumPy arrays and PyTorch tensors must be column-major (Fortran-style)

Selecting a reader backend

By default, the reader uses the regular file API via parquet::ParquetFileReader. In most cases, this is the recommended choice.

An alternative reader backend based on io_uring is also available. It may provide better performance for some workloads, particularly when used together with O_DIRECT.

To enable the alternative backend, set the JJ_READER_BACKEND environment variable to one of the following values:

  • io_uring - Uses io_uring for async I/O with the page cache
  • io_uring_odirect - Uses io_uring with O_DIRECT (bypasses the page cache)

Performance tuning tips

JollyJack performance is primarily determined by I/O, threading, and memory allocation behavior. The optimal configuration depends on whether your workload is I/O-bound or memory-/CPU-bound.

Threading strategy

  • JollyJack can be safely called concurrently from multiple threads.
  • Parallel reads usually improve throughput, but oversubscribing threads can cause contention and degrade performance.

Reuse destination arrays

  • Reusing NumPy arrays or PyTorch tensors avoids repeated memory allocation.
  • While allocation itself is fast, it can trigger kernel contention and degrade performance.

Large datasets (exceeding filesystem cache)

For datasets larger than the available page cache, performance is typically I/O-bound. Enabling either pre_buffer=True or prefetch_page_cache=True brings throughput close to the raw I/O ceiling, but prefetch_page_cache avoids the increased LLC miss rate caused by pre_buffer (see Page cache prefetching below).

Recommended configuration:

  • use_threads = True, prefetch_page_cache = True, pre_buffer = False, with the default reader backend.

Small datasets (fitting in filesystem cache)

For datasets that comfortably fit in RAM, performance is typically CPU- or memory-bound. Using pre_buffer is not recommended because it leads to an increased LLC miss rate and suboptimal performance (see Page cache prefetching below).

Recommended configuration:

  • use_threads = True, prefetch_page_cache = True, pre_buffer = False, with the default reader backend.

Page cache prefetching with prefetch_page_cache

The prefetch_page_cache option calls posix_fadvise(POSIX_FADV_WILLNEED) to tell the kernel to start loading the relevant byte ranges into the page cache. Each worker thread then reads directly via pread into its own locally-allocated buffer, keeping data hot in its local CPU caches.

This avoids the LLC (Last Level Cache) miss problem with pre_buffer=True, where Arrow's IO thread pool fills temporary buffers on one core and worker threads on different cores later consume cold data.

This is only useful for local or network-mounted file systems that have a page cache. Remote file systems such as S3 will not benefit from this.

The Linux kernel's force_page_cache_ra caps the number of pages read per posix_fadvise call to the block device's readahead window. Any bytes beyond this cap are silently ignored. The readahead window is typically 128 KB or higher. Check the value for your device:

cat /sys/block/<device>/queue/read_ahead_kb

If range_size_limit exceeds this value, most of each coalesced range will not be prefetched. Set range_size_limit to match or stay below the device's read_ahead_kb:

cache_options = pa.CacheOptions(
    hole_size_limit=8192,
    range_size_limit=128*1024,  # must not exceed read_ahead_kb
    lazy=False,
)

There are two ways to enable page cache prefetching:

As a parameter on read_into_numpy:

jj.read_into_numpy(
    source=path,
    metadata=pr.metadata,
    np_array=np_array,
    row_group_indices=range(pr.metadata.num_row_groups),
    column_indices=range(pr.metadata.num_columns),
    prefetch_page_cache=True,
    cache_options=cache_options,
)

As a standalone call:

jj.prefetch_page_cache(
    source=path,
    metadata=pr.metadata,
    row_group_indices=range(pr.metadata.num_row_groups),
    column_indices=range(pr.metadata.num_columns),
    cache_options=cache_options,
)

The standalone call is useful for sliding-window prefetching, where you prefetch the next files while processing the current one:

# Prime the pump
for path in file_paths[:PREFETCH_DEPTH]:
    jj.prefetch_page_cache(source=path, ...)

# Main loop
for i, path in enumerate(file_paths):
    # Slide the window
    ahead_index = i + PREFETCH_DEPTH
    if ahead_index < len(file_paths):
        jj.prefetch_page_cache(source=file_paths[ahead_index], ...)

    # Page cache should already be warm
    jj.read_into_numpy(source=path, np_array=np_array, ...)

    process(np_array)

Column ordering and use_threads:

When using prefetch_page_cache, the order in which columns are read matters. Sorting column_indices by source column index produces a sequential I/O pattern, which can significantly improve throughput. The effect depends on the storage device and kernel readahead settings. Use a dict to preserve the original target mapping:

# column_indices_to_read is an unsorted list, e.g. [5, 2, 8]
# Sort by source column index, preserving the original target mapping.
col_indices = {
    src: dst
    for src, dst in sorted(
        zip(column_indices_to_read, range(len(column_indices_to_read)))
    )
}
# Result: {2: 1, 5: 0, 8: 2} — reads columns in file order,
# writes each to the same target column as the unsorted list.

For similar reasons, avoid setting use_threads=True with prefetch_page_cache. Arrow's internal thread pool dispatches column reads across cores in an unpredictable order, breaking the sequential I/O pattern that makes prefetching effective. Use multiple worker threads at the application level instead, each reading its own file or row group with use_threads=False.

Pre-buffering and cache_options

If you use pre_buffer=True instead of prefetch_page_cache, the following tuning applies.

When pre_buffer=True, Arrow merges nearby column ranges and reads them into temporary buffers. The default maximum merged range is 32 MB (range_size_limit).

Arrow supports several memory allocators (mimalloc, jemalloc, system). With mimalloc (the default on most platforms), allocations above ~16 MB go straight to the OS (mmap/munmap) instead of the internal arena. This means the memory cannot be reused between calls, and each call pays the cost of mapping and zeroing fresh pages. Other allocators may behave similarly.

To avoid this, lower range_size_limit so that merged ranges fit inside the allocator's arena:

cache_options = pa.CacheOptions(
    hole_size_limit=8192,           # default
    range_size_limit=16*1024*1024,  # 16 MB, fits in mimalloc arena
    lazy=False,
)
jj.read_into_numpy(
    source=path,
    metadata=None,
    np_array=np_array,
    row_group_indices=[0],
    column_indices=range(n_columns),
    pre_buffer=True,
    cache_options=cache_options,
)

To debug allocator issues with mimalloc, run with MIMALLOC_SHOW_STATS=1 and MIMALLOC_VERBOSE=1. This prints allocation statistics at process exit.

Pre-buffering and ARROW_IO_THREADS

When pre_buffer=True, Arrow dispatches reads to its IO thread pool, configured via the ARROW_IO_THREADS environment variable (default: 8). Tuning this value may improve performance.

Requirements

  • pyarrow ~= 24.0.0

JollyJack builds on top of PyArrow. While the source package may work with newer versions, the prebuilt binary wheels are built and tested against pyarrow 24.x.

Installation

pip install jollyjack

How to use

Generating a sample Parquet file

import jollyjack as jj
import pyarrow.parquet as pq
import pyarrow as pa
import numpy as np

from pyarrow import fs

chunk_size = 3
n_row_groups = 2
n_columns = 5
n_rows = n_row_groups * chunk_size
path = "my.parquet"

data = np.random.rand(n_rows, n_columns).astype(np.float32)
pa_arrays = [pa.array(data[:, i]) for i in range(n_columns)]
schema = pa.schema([(f"column_{i}", pa.float32()) for i in range(n_columns)])
table = pa.Table.from_arrays(pa_arrays, schema=schema)
pq.write_table(
    table,
    path,
    row_group_size=chunk_size,
    use_dictionary=False,
    write_statistics=True,
    store_schema=False,
    write_page_index=True,
)

Generating a NumPy array to read into

# Create an array of zeros
np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")

Reading an entire file into a NumPy array

pr = pq.ParquetReader()
pr.open(path)

row_begin = 0
row_end = 0

for rg in range(pr.metadata.num_row_groups):
    row_begin = row_end
    row_end = row_begin + pr.metadata.row_group(rg).num_rows

    # To define which subset of the NumPy array we want read into,
    # we need to create a view which shares underlying memory with the target NumPy array
    subset_view = np_array[row_begin:row_end, :]
    jj.read_into_numpy(
        source=path,
        metadata=pr.metadata,
        np_array=subset_view,
        row_group_indices=[rg],
        column_indices=range(pr.metadata.num_columns),
    )

# Alternatively
with fs.LocalFileSystem().open_input_file(path) as f:
    jj.read_into_numpy(
        source=f,
        metadata=None,
        np_array=np_array,
        row_group_indices=range(pr.metadata.num_row_groups),
        column_indices=range(pr.metadata.num_columns),
    )

Reading columns in reverse order

with fs.LocalFileSystem().open_input_file(path) as f:
    jj.read_into_numpy(
        source=f,
        metadata=None,
        np_array=np_array,
        row_group_indices=range(pr.metadata.num_row_groups),
        column_indices={
            i: pr.metadata.num_columns - i - 1 for i in range(pr.metadata.num_columns)
        },
    )

Reading column 3 into multiple destination columns

with fs.LocalFileSystem().open_input_file(path) as f:
    jj.read_into_numpy(
        source=f,
        metadata=None,
        np_array=np_array,
        row_group_indices=range(pr.metadata.num_row_groups),
        column_indices=((3, 0), (3, 1)),
    )

Sparse reading

np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
with fs.LocalFileSystem().open_input_file(path) as f:
    jj.read_into_numpy(
        source=f,
        metadata=None,
        np_array=np_array,
        row_group_indices=[0],
        row_ranges=[slice(0, 1), slice(4, 6)],
        column_indices=range(pr.metadata.num_columns),
    )
print(np_array)

Using cache options

np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
cache_options = pa.CacheOptions(
    hole_size_limit=8192,           # default
    range_size_limit=16*1024*1024,  # 16 MB, fits in mimalloc arena
    lazy=False,
)
with fs.LocalFileSystem().open_input_file(path) as f:
    jj.read_into_numpy(
        source=f,
        metadata=None,
        np_array=np_array,
        row_group_indices=[0],
        row_ranges=[slice(0, 1), slice(4, 6)],
        column_indices=range(pr.metadata.num_columns),
        cache_options=cache_options,
        pre_buffer=True,
    )
print(np_array)

Using page cache prefetching

np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
pr = pq.ParquetReader()
pr.open(path)

# cache_options controls which byte ranges are prefetched into the page cache.
# range_size_limit should not exceed the device's read_ahead_kb,
# because the kernel silently ignores readahead beyond that cap.
cache_options = pa.CacheOptions(
    hole_size_limit=8192,
    range_size_limit=128*1024,  # must not exceed read_ahead_kb
    lazy=False,
)

# Prefetch and read in one call
jj.read_into_numpy(
    source=path,
    metadata=pr.metadata,
    np_array=np_array,
    row_group_indices=range(pr.metadata.num_row_groups),
    column_indices=range(pr.metadata.num_columns),
    cache_options=cache_options,
    prefetch_page_cache=True,
)

# Or prefetch separately, then read
jj.prefetch_page_cache(
    source=path,
    metadata=pr.metadata,
    row_group_indices=range(pr.metadata.num_row_groups),
    column_indices=range(pr.metadata.num_columns),
    cache_options=cache_options,
)
jj.read_into_numpy(
    source=path,
    metadata=pr.metadata,
    np_array=np_array,
    row_group_indices=range(pr.metadata.num_row_groups),
    column_indices=range(pr.metadata.num_columns),
    pre_buffer=False,
)

Generating a PyTorch tensor to read into

import torch

# Create a tensor and transpose it to get Fortran-style order
tensor = torch.zeros(n_columns, n_rows, dtype=torch.float32).transpose(0, 1)

Reading an entire file into a PyTorch tensor

pr = pq.ParquetReader()
pr.open(path)

jj.read_into_torch(
    source=path,
    metadata=pr.metadata,
    tensor=tensor,
    row_group_indices=range(pr.metadata.num_row_groups),
    column_indices=range(pr.metadata.num_columns),
    pre_buffer=True,
    use_threads=True,
)

print(tensor)

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors