Skip to content

Commit bcb2f95

Browse files
authored
fix(sql): support more Parquet field type combinations in read_parquet (#6069)
1 parent c8f236e commit bcb2f95

31 files changed

Lines changed: 267 additions & 200 deletions

core/rust/qdb-core/src/col_driver/mod.rs

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ mod string;
3131
mod varchar;
3232

3333
use crate::col_type::{ColumnType, ColumnTypeTag};
34-
use crate::error::CoreResult;
34+
use crate::error::{CoreResult, fmt_err};
3535

3636
pub use array::*;
3737
pub use binary::*;
@@ -57,32 +57,33 @@ pub trait ColumnDriver {
5757
}
5858

5959
/// Obtain a type driver from the provided column type.
60-
pub fn lookup_driver(col_type: ColumnType) -> &'static dyn ColumnDriver {
60+
pub fn try_lookup_driver(col_type: ColumnType) -> CoreResult<&'static dyn ColumnDriver> {
6161
match (col_type.tag(), col_type.is_designated()) {
62-
(ColumnTypeTag::Boolean, _) => &BooleanDriver,
63-
(ColumnTypeTag::Byte, _) => &ByteDriver,
64-
(ColumnTypeTag::Short, _) => &ShortDriver,
65-
(ColumnTypeTag::Char, _) => &CharDriver,
66-
(ColumnTypeTag::Int, _) => &IntDriver,
67-
(ColumnTypeTag::Long, _) => &LongDriver,
68-
(ColumnTypeTag::Date, _) => &DateDriver,
69-
(ColumnTypeTag::Timestamp, false) => &TimestampDriver,
70-
(ColumnTypeTag::Timestamp, true) => &DesignatedTimestampDriver,
71-
(ColumnTypeTag::Float, _) => &FloatDriver,
72-
(ColumnTypeTag::Double, _) => &DoubleDriver,
73-
(ColumnTypeTag::String, _) => &StringDriver,
74-
(ColumnTypeTag::Symbol, _) => &SymbolDriver,
75-
(ColumnTypeTag::Long256, _) => &Long256Driver,
76-
(ColumnTypeTag::GeoByte, _) => &GeoByteDriver,
77-
(ColumnTypeTag::GeoShort, _) => &GeoShortDriver,
78-
(ColumnTypeTag::GeoInt, _) => &GeoIntDriver,
79-
(ColumnTypeTag::GeoLong, _) => &GeoLongDriver,
80-
(ColumnTypeTag::Binary, _) => &BinaryDriver,
81-
(ColumnTypeTag::Uuid, _) => &UuidDriver,
82-
(ColumnTypeTag::Long128, _) => &Long128Driver,
83-
(ColumnTypeTag::IPv4, _) => &IPv4Driver,
84-
(ColumnTypeTag::Varchar, _) => &VarcharDriver,
85-
(ColumnTypeTag::Array, _) => &ArrayDriver,
62+
(ColumnTypeTag::Boolean, _) => Ok(&BooleanDriver),
63+
(ColumnTypeTag::Byte, _) => Ok(&ByteDriver),
64+
(ColumnTypeTag::Short, _) => Ok(&ShortDriver),
65+
(ColumnTypeTag::Char, _) => Ok(&CharDriver),
66+
(ColumnTypeTag::Int, _) => Ok(&IntDriver),
67+
(ColumnTypeTag::Long, _) => Ok(&LongDriver),
68+
(ColumnTypeTag::Date, _) => Ok(&DateDriver),
69+
(ColumnTypeTag::Timestamp, false) => Ok(&TimestampDriver),
70+
(ColumnTypeTag::Timestamp, true) => Ok(&DesignatedTimestampDriver),
71+
(ColumnTypeTag::Float, _) => Ok(&FloatDriver),
72+
(ColumnTypeTag::Double, _) => Ok(&DoubleDriver),
73+
(ColumnTypeTag::String, _) => Ok(&StringDriver),
74+
(ColumnTypeTag::Symbol, _) => Ok(&SymbolDriver),
75+
(ColumnTypeTag::Long256, _) => Ok(&Long256Driver),
76+
(ColumnTypeTag::GeoByte, _) => Ok(&GeoByteDriver),
77+
(ColumnTypeTag::GeoShort, _) => Ok(&GeoShortDriver),
78+
(ColumnTypeTag::GeoInt, _) => Ok(&GeoIntDriver),
79+
(ColumnTypeTag::GeoLong, _) => Ok(&GeoLongDriver),
80+
(ColumnTypeTag::Binary, _) => Ok(&BinaryDriver),
81+
(ColumnTypeTag::Uuid, _) => Ok(&UuidDriver),
82+
(ColumnTypeTag::Long128, _) => Ok(&Long128Driver),
83+
(ColumnTypeTag::IPv4, _) => Ok(&IPv4Driver),
84+
(ColumnTypeTag::Varchar, _) => Ok(&VarcharDriver),
85+
(ColumnTypeTag::Array, _) => Ok(&ArrayDriver),
86+
_ => Err(fmt_err!(InvalidType, "unexpected column type {}", col_type,)),
8687
}
8788
}
8889

@@ -122,11 +123,19 @@ mod tests {
122123
(ColumnTypeTag::Long128.into_type(), "long128"),
123124
(ColumnTypeTag::IPv4.into_type(), "ipv4"),
124125
(ColumnTypeTag::Varchar.into_type(), "varchar"),
126+
(ColumnTypeTag::Array.into_type(), "array"),
125127
];
126128
for (col_type, exp_descr) in cases.iter().copied() {
127-
let driver = lookup_driver(col_type);
129+
let driver = try_lookup_driver(col_type).unwrap();
128130
let actual_descr = driver.descr();
129131
assert_eq!(actual_descr, exp_descr);
130132
}
131133
}
134+
135+
#[test]
136+
fn test_lookup_driver_undefined_errors() {
137+
// Undefined via code 0 should error
138+
let undefined = ColumnType::new(ColumnTypeTag::Undefined, 0);
139+
assert!(try_lookup_driver(undefined).is_err());
140+
}
132141
}

core/rust/qdb-core/src/col_type.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
use crate::error::{CoreError, CoreErrorExt, CoreResult, fmt_err};
2525
use serde::{Deserialize, Deserializer, Serialize};
2626
use std::fmt::{Debug, Display, Formatter};
27-
use std::num::NonZeroI32;
2827

2928
// Don't forget to update VALUES when modifying this list.
3029
#[repr(u8)]
3130
#[derive(Debug, Copy, Clone, PartialEq)]
3231
pub enum ColumnTypeTag {
32+
/// Placeholder for unsupported/unknown types.
33+
/// Note: ColumnType::try_from(i32) rejects code <= 0, so Undefined is not a valid serialized code.
34+
Undefined = 0,
3335
Boolean = 1,
3436
Byte = 2,
3537
Short = 3,
@@ -57,7 +59,8 @@ pub enum ColumnTypeTag {
5759

5860
impl ColumnTypeTag {
5961
#[cfg(test)]
60-
const VALUES: [Self; 23] = [
62+
const VALUES: [Self; 24] = [
63+
Self::Undefined,
6164
Self::Boolean,
6265
Self::Byte,
6366
Self::Short,
@@ -113,15 +116,13 @@ impl ColumnTypeTag {
113116

114117
ColumnTypeTag::Long256 => Some(32),
115118

116-
ColumnTypeTag::Binary
117-
| ColumnTypeTag::String
118-
| ColumnTypeTag::Varchar
119-
| ColumnTypeTag::Array => None,
119+
_ => None,
120120
}
121121
}
122122

123123
pub const fn name(self) -> &'static str {
124124
match self {
125+
ColumnTypeTag::Undefined => "undefined",
125126
ColumnTypeTag::Boolean => "boolean",
126127
ColumnTypeTag::Byte => "byte",
127128
ColumnTypeTag::Short => "short",
@@ -162,6 +163,7 @@ impl TryFrom<u8> for ColumnTypeTag {
162163

163164
fn try_from(col_tag_num: u8) -> Result<Self, Self::Error> {
164165
match col_tag_num {
166+
0 => Ok(ColumnTypeTag::Undefined),
165167
1 => Ok(ColumnTypeTag::Boolean),
166168
2 => Ok(ColumnTypeTag::Byte),
167169
3 => Ok(ColumnTypeTag::Short),
@@ -181,7 +183,6 @@ impl TryFrom<u8> for ColumnTypeTag {
181183
17 => Ok(ColumnTypeTag::GeoLong),
182184
18 => Ok(ColumnTypeTag::Binary),
183185
19 => Ok(ColumnTypeTag::Uuid),
184-
21 => Ok(ColumnTypeTag::IPv4),
185186
24 => Ok(ColumnTypeTag::Long128),
186187
25 => Ok(ColumnTypeTag::IPv4),
187188
26 => Ok(ColumnTypeTag::Varchar),
@@ -211,24 +212,23 @@ const ARRAY_NDIMS_FIELD_POS: i32 = 14;
211212
#[serde(transparent)]
212213
pub struct ColumnType {
213214
// Optimization so `Option<ColumnType>` is the same size as `ColumnType`.
214-
code: NonZeroI32,
215+
code: i32,
215216
}
216217

217218
impl ColumnType {
218219
pub fn new(tag: ColumnTypeTag, extra_type_info: i32) -> Self {
219220
let shifted_extra_type_info = extra_type_info << 8;
220-
let code = NonZeroI32::new(tag as i32 | shifted_extra_type_info)
221-
.expect("column type code should never be zero");
221+
let code = tag as i32 | shifted_extra_type_info;
222222
Self { code }
223223
}
224224

225225
pub fn code(&self) -> i32 {
226-
self.code.get()
226+
self.code
227227
}
228228

229229
pub fn is_designated(&self) -> bool {
230230
(self.tag() == ColumnTypeTag::Timestamp)
231-
&& ((self.code.get() & TYPE_FLAG_DESIGNATED_TIMESTAMP) > 0)
231+
&& ((self.code & TYPE_FLAG_DESIGNATED_TIMESTAMP) > 0)
232232
}
233233

234234
pub fn into_designated(self) -> CoreResult<ColumnType> {
@@ -239,7 +239,7 @@ impl ColumnType {
239239
self
240240
));
241241
}
242-
let code = NonZeroI32::new(self.code() | TYPE_FLAG_DESIGNATED_TIMESTAMP).unwrap();
242+
let code = self.code() | TYPE_FLAG_DESIGNATED_TIMESTAMP;
243243
Ok(Self { code })
244244
}
245245

@@ -305,8 +305,7 @@ impl TryFrom<i32> for ColumnType {
305305
let _tag: ColumnTypeTag = col_tag_num
306306
.try_into()
307307
.with_context(|_| format!("could not parse {v} to a valid ColumnType"))?;
308-
let code = NonZeroI32::new(v).expect("column type code should never be zero");
309-
Ok(Self { code })
308+
Ok(Self { code: v })
310309
}
311310
}
312311

core/rust/qdbr/src/parquet_read/decode.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ pub fn decode_page(
864864
| (PhysicalType::ByteArray, _, Some(PrimitiveConvertedType::Utf8)) => {
865865
let encoding = page.encoding();
866866
match (encoding, dict, column_type.tag()) {
867-
(Encoding::DeltaLengthByteArray, None, ColumnTypeTag::String) => {
867+
(Encoding::DeltaLengthByteArray, _, ColumnTypeTag::String) => {
868868
let mut slicer =
869869
DeltaLengthArraySlicer::try_new(values_buffer, row_hi, row_count)?;
870870
decode_page0(
@@ -875,7 +875,7 @@ pub fn decode_page(
875875
)?;
876876
Ok(())
877877
}
878-
(Encoding::DeltaLengthByteArray, None, ColumnTypeTag::Varchar) => {
878+
(Encoding::DeltaLengthByteArray, _, ColumnTypeTag::Varchar) => {
879879
let mut slicer =
880880
DeltaLengthArraySlicer::try_new(values_buffer, row_hi, row_count)?;
881881
decode_page0(
@@ -907,7 +907,7 @@ pub fn decode_page(
907907
)?;
908908
Ok(())
909909
}
910-
(Encoding::Plain, None, ColumnTypeTag::String) => {
910+
(Encoding::Plain, _, ColumnTypeTag::String) => {
911911
let mut slicer = PlainVarSlicer::new(values_buffer, row_count);
912912
decode_page0(
913913
page,
@@ -962,10 +962,10 @@ pub fn decode_page(
962962
_ => Err(encoding_error),
963963
}
964964
}
965-
(PhysicalType::ByteArray, None, _) => {
965+
(PhysicalType::ByteArray, _, _) => {
966966
let encoding = page.encoding();
967967
match (encoding, dict, column_type.tag()) {
968-
(Encoding::Plain, None, ColumnTypeTag::Binary) => {
968+
(Encoding::Plain, _, ColumnTypeTag::Binary) => {
969969
let mut slicer = PlainVarSlicer::new(values_buffer, row_count);
970970
decode_page0(
971971
page,
@@ -975,7 +975,7 @@ pub fn decode_page(
975975
)?;
976976
Ok(())
977977
}
978-
(Encoding::DeltaLengthByteArray, None, ColumnTypeTag::Binary) => {
978+
(Encoding::DeltaLengthByteArray, _, ColumnTypeTag::Binary) => {
979979
let mut slicer =
980980
DeltaLengthArraySlicer::try_new(values_buffer, row_hi, row_count)?;
981981
decode_page0(
@@ -1007,7 +1007,7 @@ pub fn decode_page(
10071007
)?;
10081008
Ok(())
10091009
}
1010-
(Encoding::Plain, None, ColumnTypeTag::Array) => {
1010+
(Encoding::Plain, _, ColumnTypeTag::Array) => {
10111011
// raw array encoding
10121012
let mut slicer = PlainVarSlicer::new(values_buffer, row_count);
10131013
decode_page0(
@@ -1018,7 +1018,7 @@ pub fn decode_page(
10181018
)?;
10191019
Ok(())
10201020
}
1021-
(Encoding::DeltaLengthByteArray, None, ColumnTypeTag::Array) => {
1021+
(Encoding::DeltaLengthByteArray, _, ColumnTypeTag::Array) => {
10221022
let mut slicer =
10231023
DeltaLengthArraySlicer::try_new(values_buffer, row_hi, row_count)?;
10241024
decode_page0(
@@ -1073,8 +1073,8 @@ pub fn decode_page(
10731073
_ => Err(encoding_error),
10741074
}
10751075
}
1076-
(PhysicalType::Double, None, _) => match (page.encoding(), dict, column_type.tag()) {
1077-
(Encoding::Plain, None, ColumnTypeTag::Double) => {
1076+
(PhysicalType::Double, _, _) => match (page.encoding(), dict, column_type.tag()) {
1077+
(Encoding::Plain, _, ColumnTypeTag::Double) => {
10781078
bufs.aux_vec.clear();
10791079
bufs.aux_ptr = ptr::null_mut();
10801080

@@ -1114,7 +1114,7 @@ pub fn decode_page(
11141114
)?;
11151115
Ok(())
11161116
}
1117-
(Encoding::Plain, None, ColumnTypeTag::Array) => {
1117+
(Encoding::Plain, _, ColumnTypeTag::Array) => {
11181118
let mut slicer = DataPageFixedSlicer::<8>::new(values_buffer, row_count);
11191119
decode_array_page(page, row_lo, row_hi, &mut slicer, bufs)?;
11201120
Ok(())
@@ -1137,8 +1137,8 @@ pub fn decode_page(
11371137
}
11381138
_ => Err(encoding_error),
11391139
},
1140-
// fixed-size types only
1141-
(typ, None, _) => {
1140+
// check remaining fixed-size types
1141+
(typ, _, _) => {
11421142
bufs.aux_vec.clear();
11431143
bufs.aux_ptr = ptr::null_mut();
11441144

@@ -1165,7 +1165,7 @@ pub fn decode_page(
11651165
)?;
11661166
Ok(())
11671167
}
1168-
(Encoding::Plain, None, PhysicalType::Float, ColumnTypeTag::Float) => {
1168+
(Encoding::Plain, _, PhysicalType::Float, ColumnTypeTag::Float) => {
11691169
decode_page0(
11701170
page,
11711171
row_lo,
@@ -1178,7 +1178,7 @@ pub fn decode_page(
11781178
)?;
11791179
Ok(())
11801180
}
1181-
(Encoding::Plain, None, PhysicalType::Boolean, ColumnTypeTag::Boolean) => {
1181+
(Encoding::Plain, _, PhysicalType::Boolean, ColumnTypeTag::Boolean) => {
11821182
decode_page0(
11831183
page,
11841184
row_lo,
@@ -1191,7 +1191,7 @@ pub fn decode_page(
11911191
)?;
11921192
Ok(())
11931193
}
1194-
(Encoding::Rle, None, PhysicalType::Boolean, ColumnTypeTag::Boolean) => {
1194+
(Encoding::Rle, _, PhysicalType::Boolean, ColumnTypeTag::Boolean) => {
11951195
decode_page0(
11961196
page,
11971197
row_lo,
@@ -1207,7 +1207,6 @@ pub fn decode_page(
12071207
_ => Err(encoding_error),
12081208
}
12091209
}
1210-
_ => Err(encoding_error),
12111210
};
12121211

12131212
match decoding_result {

0 commit comments

Comments
 (0)