-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the bug, including details regarding any error messages, version, and platform.
Hi,
we've encountered a performance issue with reading parquet files using the Pyarrow library.
In our use case, we read some columns (e.g. 100) from a parquet file containing many more columns (e.g. 20k).
The difference in performance between Pyarrow and raw C++ is 4-5x which is way too much to be acceptable.
The repro code: https://github.com/marcin-krystianc/arrow_issue_2023-10-06
In the graph below(Produced with https://github.com/marcin-krystianc/arrow_issue_2023-10-06/blob/master/plot_results.py), we can clearly see that when we read 100 columns from a parquet file (the orange line), the column reading time in python (on the left) is about 5x higher than in the C++ (on the right).

Interestingly, when we compare the column reading time when we read all columns, the difference is much smaller:

Given that Pyarrow delegates the actual parquet reading to the native C++ library, it is quite surprising that the overhead is so high and fluctuates significantly in various scenarios.
Python code:
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import time
import polars as pl
import csv
import gc
t_write = []
t_read_100_pre_buffer = []
path = "/tmp/test_wide.parquet"
columns_list = [
100, 200, 300, 400, 500, 600, 700, 800, 900,
1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
10_000, 20_000, 30_000, 40_000, 50_000,
]
chunks_list = [1000, 10_000]
rows_lsit = [5000]
with open('results_python.csv', 'w', encoding='UTF8', newline='') as f:
writer = csv.writer(f)
# write the header
writer.writerow(['columns','rows','chunk_size','writing(μs)','reading_all(μs)','reading_100(μs)'])
for chunk_size in chunks_list:
for rows in rows_lsit:
for columns in columns_list:
table = pl.DataFrame(
data=np.random.randn(rows, columns),
schema=[f"c{i}" for i in range(columns)]).to_arrow()
t = time.time()
pq.write_table(table, path, row_group_size=chunk_size, use_dictionary=False, write_statistics=False)
t_writing = time.time() - t
t_write.append(t_writing)
del table
gc.collect()
t_read = []
t_read_100 = []
for i in range(0, 3):
t = time.time()
res = pq.read_table(path, use_threads=False)
t_read.append(time.time() - t)
del res
gc.collect()
t = time.time()
res_100 = pq.read_table(path, columns=[f"c{i}" for i in range(100)], use_threads=False)
t_read_100.append(time.time() - t)
del res_100
gc.collect()
t_reading = min(t_read)
t_reading_100 = min(t_read_100)
data = [columns, rows, chunk_size, t_writing * 1_000_000, t_reading * 1_000_000, t_reading_100 * 1_000_000]
writer.writerow(data)
print(str(data))C++ code:
#include "arrow/api.h"
#include "arrow/io/api.h"
#include "arrow/result.h"
#include "arrow/util/type_fwd.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#include <iostream>
#include <list>
#include <chrono>
#include <random>
#include <vector>
#include <fstream>
#include <iomanip>
using arrow::Status;
namespace
{
const char *FILE_NAME = "/tmp/my_cpp.parquet";
std::shared_ptr<arrow::Table> GetTable(size_t nColumns, size_t nRows)
{
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_real_distribution<> rand_gen(0.0, 1.0);
std::vector<std::shared_ptr<arrow::Array>> arrays;
std::vector<std::shared_ptr<arrow::Field>> fields;
// For simplicity, we'll create int32 columns. You can expand this to handle other types.
for (int i = 0; i < nColumns; i++)
{
arrow::DoubleBuilder builder;
for (auto j = 0; j < nRows; j++)
{
if (!builder.Append(rand_gen(rng)).ok())
throw std::runtime_error("builder.Append");
}
std::shared_ptr<arrow::Array> array;
if (!builder.Finish(&array).ok())
throw std::runtime_error("builder.Finish");
arrays.push_back(array);
fields.push_back(arrow::field("c_" + std::to_string(i), arrow::float64(), false));
}
auto table = arrow::Table::Make(arrow::schema(fields), arrays);
return table;
}
Status WriteTableToParquet(size_t nColumns, size_t nRows, const std::string &filename, std::chrono::microseconds *dt, int64_t chunkSize)
{
auto table = GetTable(nColumns, nRows);
auto begin = std::chrono::steady_clock::now();
auto result = arrow::io::FileOutputStream::Open(filename);
auto outfile = result.ValueOrDie();
parquet::WriterProperties::Builder builder;
auto properties = builder
.max_row_group_length(chunkSize)
->disable_dictionary()
->build();
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, chunkSize, properties));
auto end = std::chrono::steady_clock::now();
*dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
return Status::OK();
}
Status ReadEntireTable(const std::string &filename, std::chrono::microseconds *dt)
{
auto begin = std::chrono::steady_clock::now();
std::shared_ptr<arrow::io::ReadableFile> infile;
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
std::shared_ptr<arrow::Table> parquet_table;
// Read the table.
PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));
auto end = std::chrono::steady_clock::now();
*dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
return Status::OK();
}
Status ReadColumnsAsTable(const std::string &filename, std::vector<int> indicies, std::chrono::microseconds *dt)
{
auto begin = std::chrono::steady_clock::now();
std::shared_ptr<arrow::io::ReadableFile> infile;
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
// Read the table.
std::shared_ptr<arrow::Table> parquet_table;
PARQUET_THROW_NOT_OK(reader->ReadTable(indicies, &parquet_table));
auto end = std::chrono::steady_clock::now();
*dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
return Status::OK();
}
Status RunMain(int argc, char **argv)
{
std::ofstream csvFile;
csvFile.open("results_cpp.csv", std::ios_base::out); // append instead of overwrite
csvFile << "columns, rows, chunk_size, writing(μs), reading_all(μs), reading_100(μs)" << std::endl;
std::list<int> nColumns = {
100, 200, 300, 400, 500, 600, 700, 800, 900,
1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
10000, 20000, 30000, 40000, 50000};
std::list<int64_t> chunk_sizes = {1000, 100000};
std::list<int> rows_list = {5000};
std::vector<int> indicies(100);
std::iota(indicies.begin(), indicies.end(), 0);
for (auto chunk_size : chunk_sizes)
{
for (int nRow : rows_list)
{
for (int nColumn : nColumns)
{
std::chrono::microseconds writing_dt;
ARROW_RETURN_NOT_OK(WriteTableToParquet(nColumn, nRow, FILE_NAME, &writing_dt, chunk_size));
const int repeats = 3;
std::vector<std::chrono::microseconds> reading_all_dts(repeats);
std::vector<std::chrono::microseconds> reading_100_dts(repeats);
for (int i = 0; i < repeats; i++)
{
ARROW_RETURN_NOT_OK(ReadEntireTable(FILE_NAME, &reading_all_dts[i]));
ARROW_RETURN_NOT_OK(ReadColumnsAsTable(FILE_NAME, indicies, &reading_100_dts[i]));
}
auto reading_all_dt = *std::min_element(reading_all_dts.begin(), reading_all_dts.end());
auto reading_100_dt = *std::min_element(reading_100_dts.begin(), reading_100_dts.end());
std::cerr << "(" << nColumn << ", " << nRow << ")"
<< ", chunk_size=" << chunk_size
<< ", writing_dt=" << writing_dt.count() / nColumn
<< ", reading_all_dt=" << reading_all_dt.count() / nColumn
<< ", reading_100_dt=" << reading_100_dt.count() / 100
<< std::endl;
csvFile << nColumn << ","
<< nRow << ","
<< chunk_size << ","
<< writing_dt.count() << ","
<< reading_all_dt.count() << ","
<< reading_100_dt.count()
<< std::endl;
}
}
}
return Status::OK();
}
}
int main(int argc, char **argv)
{
Status st = RunMain(argc, argv);
if (!st.ok())
{
std::cerr << st << std::endl;
return 1;
}
return 0;
}Component(s)
Parquet