use arrow::csv::ReaderBuilder;
use clap::{Parser, ValueHint};
use parquet::{
arrow::ArrowWriter,
basic::{Compression, Encoding},
errors::ParquetError,
file::properties::{EnabledStatistics, WriterProperties},
};
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(clap::ValueEnum, Clone)]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
enum ParquetCompression {
UNCOMPRESSED,
SNAPPY,
GZIP,
LZO,
BROTLI,
LZ4,
ZSTD,
}
#[derive(clap::ValueEnum, Clone)]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
enum ParquetEncoding {
PLAIN,
RLE,
BIT_PACKED,
DELTA_BINARY_PACKED,
DELTA_LENGTH_BYTE_ARRAY,
DELTA_BYTE_ARRAY,
RLE_DICTIONARY,
}
#[derive(clap::ValueEnum, Clone)]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
enum ParquetEnabledStatistics {
None,
Chunk,
Page,
}
#[derive(Parser)]
#[clap(version = env!("CARGO_PKG_VERSION"), author = "Dominik Moritz <[email protected]>")]
struct Opts {
#[clap(name = "CSV", value_parser, value_hint = ValueHint::AnyPath)]
input: PathBuf,
#[clap(name = "PARQUET", value_parser, value_hint = ValueHint::AnyPath)]
output: PathBuf,
#[clap(short = 's', long, value_parser, value_hint = ValueHint::AnyPath)]
schema_file: Option<PathBuf>,
#[clap(long)]
max_read_records: Option<usize>,
#[clap(long)]
header: Option<bool>,
#[clap(short, long, default_value = ",")]
delimiter: char,
#[clap(short, long, value_enum)]
compression: Option<ParquetCompression>,
#[clap(short, long, value_enum)]
encoding: Option<ParquetEncoding>,
#[clap(long)]
data_pagesize_limit: Option<usize>,
#[clap(long)]
dictionary_pagesize_limit: Option<usize>,
#[clap(long)]
write_batch_size: Option<usize>,
#[clap(long)]
max_row_group_size: Option<usize>,
#[clap(long)]
created_by: Option<String>,
#[clap(long)]
dictionary: bool,
#[clap(long, value_enum)]
statistics: Option<ParquetEnabledStatistics>,
#[clap(long)]
max_statistics_size: Option<usize>,
#[clap(short, long)]
print_schema: bool,
#[clap(short = 'n', long)]
dry: bool,
}
fn main() -> Result<(), ParquetError> {
let opts: Opts = Opts::parse();
let mut input = File::open(opts.input)?;
let schema = match opts.schema_file {
Some(schema_def_file_path) => {
let schema_file = match File::open(&schema_def_file_path) {
Ok(file) => Ok(file),
Err(error) => Err(ParquetError::General(format!(
"Error opening schema file: {:?}, message: {}",
schema_def_file_path, error
))),
}?;
let schema: Result<arrow::datatypes::Schema, serde_json::Error> =
serde_json::from_reader(schema_file);
match schema {
Ok(schema) => Ok(schema),
Err(err) => Err(ParquetError::General(format!(
"Error reading schema json: {}",
err
))),
}
}
_ => {
match arrow::csv::reader::infer_file_schema(
&mut input,
opts.delimiter as u8,
opts.max_read_records,
opts.header.unwrap_or(true),
) {
Ok((schema, _inferred_has_header)) => Ok(schema),
Err(error) => Err(ParquetError::General(format!(
"Error inferring schema: {}",
error
))),
}
}
}?;
if opts.print_schema || opts.dry {
let json = serde_json::to_string_pretty(&schema).unwrap();
eprintln!("Schema:");
println!("{}", json);
if opts.dry {
return Ok(());
}
}
let schema_ref = Arc::new(schema);
let builder = ReaderBuilder::new()
.has_header(opts.header.unwrap_or(true))
.with_delimiter(opts.delimiter as u8)
.with_schema(schema_ref);
let reader = builder.build(input)?;
let output = File::create(opts.output)?;
let mut props = WriterProperties::builder().set_dictionary_enabled(opts.dictionary);
if let Some(statistics) = opts.statistics {
let statistics = match statistics {
ParquetEnabledStatistics::Chunk => EnabledStatistics::Chunk,
ParquetEnabledStatistics::Page => EnabledStatistics::Page,
ParquetEnabledStatistics::None => EnabledStatistics::None,
};
props = props.set_statistics_enabled(statistics);
}
if let Some(compression) = opts.compression {
let compression = match compression {
ParquetCompression::UNCOMPRESSED => Compression::UNCOMPRESSED,
ParquetCompression::SNAPPY => Compression::SNAPPY,
ParquetCompression::GZIP => Compression::GZIP,
ParquetCompression::LZO => Compression::LZO,
ParquetCompression::BROTLI => Compression::BROTLI,
ParquetCompression::LZ4 => Compression::LZ4,
ParquetCompression::ZSTD => Compression::ZSTD,
};
props = props.set_compression(compression);
}
if let Some(encoding) = opts.encoding {
let encoding = match encoding {
ParquetEncoding::PLAIN => Encoding::PLAIN,
ParquetEncoding::RLE => Encoding::RLE,
ParquetEncoding::BIT_PACKED => Encoding::BIT_PACKED,
ParquetEncoding::DELTA_BINARY_PACKED => Encoding::DELTA_BINARY_PACKED,
ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY => Encoding::DELTA_LENGTH_BYTE_ARRAY,
ParquetEncoding::DELTA_BYTE_ARRAY => Encoding::DELTA_BYTE_ARRAY,
ParquetEncoding::RLE_DICTIONARY => Encoding::RLE_DICTIONARY,
};
props = props.set_encoding(encoding);
}
if let Some(size) = opts.write_batch_size {
props = props.set_write_batch_size(size);
}
if let Some(size) = opts.data_pagesize_limit {
props = props.set_data_pagesize_limit(size);
}
if let Some(size) = opts.dictionary_pagesize_limit {
props = props.set_dictionary_pagesize_limit(size);
}
if let Some(size) = opts.dictionary_pagesize_limit {
props = props.set_dictionary_pagesize_limit(size);
}
if let Some(size) = opts.max_row_group_size {
props = props.set_max_row_group_size(size);
}
if let Some(created_by) = opts.created_by {
props = props.set_created_by(created_by);
}
if let Some(size) = opts.max_statistics_size {
props = props.set_max_statistics_size(size);
}
let mut writer = ArrowWriter::try_new(output, reader.schema(), Some(props.build()))?;
for batch in reader {
match batch {
Ok(batch) => writer.write(&batch)?,
Err(error) => return Err(error.into()),
}
}
match writer.close() {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}