Skip to content

Commit 4fd5c28

Browse files
authored
GH-35304: [C++][ORC] Support attributes conversion (#35499)
### Rationale for this change Apache Orc has a per column attribute map and Apache Iceberg depends on this to encode its field metadata. However, the C++ Orc adapter does not know it which makes it difficult to support pyarrow and pyiceberg. ### What changes are included in this PR? Both reader and writer support Orc attributes conversion from/to arrow field metadata. ### Are these changes tested? Added two test cases to make sure the Orc adapter can preserve the attributes well. ### Are there any user-facing changes? No. * Closes: #35304 Authored-by: Gang Wu <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent cfcff49 commit 4fd5c28

File tree

4 files changed

+200
-24
lines changed

4 files changed

+200
-24
lines changed

cpp/src/arrow/adapters/orc/adapter.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,9 @@ class ORCFileReader::Impl {
321321
std::vector<std::shared_ptr<Field>> fields;
322322
fields.reserve(size);
323323
for (int child = 0; child < size; ++child) {
324-
ARROW_ASSIGN_OR_RAISE(auto elemtype, GetArrowType(type.getSubtype(child)));
325-
std::string name = type.getFieldName(child);
326-
fields.push_back(field(std::move(name), std::move(elemtype)));
324+
const std::string& name = type.getFieldName(child);
325+
ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField(name, type.getSubtype(child)));
326+
fields.push_back(std::move(elem_field));
327327
}
328328
ARROW_ASSIGN_OR_RAISE(auto metadata, ReadMetadata());
329329
return std::make_shared<Schema>(std::move(fields), std::move(metadata));

cpp/src/arrow/adapters/orc/adapter_test.cc

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,135 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) {
506506
ASSERT_EQ(nullptr, record_batch);
507507
}
508508

509+
TEST(TestAdapterRead, ReadFieldAttributes) {
510+
const std::string id_key = "iceberg.id";
511+
const std::string required_key = "iceberg.required";
512+
513+
auto set_attributes = [&](liborc::Type* type, const std::string& id,
514+
const std::string& required) {
515+
type->setAttribute(id_key, id);
516+
type->setAttribute(required_key, required);
517+
};
518+
519+
auto check_attributes = [&](const std::shared_ptr<arrow::Field>& field,
520+
const std::string& expect_id,
521+
const std::string& expect_required) {
522+
auto field_metadata = field->metadata();
523+
ASSERT_NE(field_metadata, nullptr);
524+
ASSERT_EQ(expect_id, field_metadata->Get(id_key));
525+
ASSERT_EQ(expect_required, field_metadata->Get(required_key));
526+
};
527+
528+
auto c1_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
529+
set_attributes(c1_type.get(), "1", "true");
530+
531+
auto c2_elem_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
532+
set_attributes(c2_elem_type.get(), "3", "false");
533+
auto c2_type = liborc::createListType(std::move(c2_elem_type));
534+
set_attributes(c2_type.get(), "2", "false");
535+
536+
auto c3_key_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
537+
set_attributes(c3_key_type.get(), "5", "true");
538+
auto c3_value_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
539+
set_attributes(c3_value_type.get(), "6", "false");
540+
auto c3_type = liborc::createMapType(std::move(c3_key_type), std::move(c3_value_type));
541+
set_attributes(c3_type.get(), "4", "false");
542+
543+
auto c4_sub_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
544+
set_attributes(c4_sub_type.get(), "8", "false");
545+
auto c4_type = liborc::createStructType();
546+
c4_type->addStructField("c4_1", std::move(c4_sub_type));
547+
set_attributes(c4_type.get(), "7", "false");
548+
549+
auto orc_type = liborc::createStructType();
550+
orc_type->addStructField("c1", std::move(c1_type));
551+
orc_type->addStructField("c2", std::move(c2_type));
552+
orc_type->addStructField("c3", std::move(c3_type));
553+
orc_type->addStructField("c4", std::move(c4_type));
554+
555+
MemoryOutputStream mem_stream(kDefaultMemStreamSize);
556+
auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream);
557+
writer->close();
558+
559+
std::shared_ptr<io::RandomAccessFile> in_stream(std::make_shared<io::BufferReader>(
560+
reinterpret_cast<const uint8_t*>(mem_stream.getData()),
561+
static_cast<int64_t>(mem_stream.getLength())));
562+
ASSERT_OK_AND_ASSIGN(
563+
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
564+
ASSERT_EQ(0, reader->NumberOfRows());
565+
566+
ASSERT_OK_AND_ASSIGN(auto schema, reader->ReadSchema());
567+
ASSERT_EQ(4, schema->num_fields());
568+
569+
// check top level fields
570+
check_attributes(schema->field(0), "1", "true");
571+
check_attributes(schema->field(1), "2", "false");
572+
check_attributes(schema->field(2), "4", "false");
573+
check_attributes(schema->field(3), "7", "false");
574+
575+
// check list element type
576+
auto list_type = checked_pointer_cast<arrow::ListType>(schema->field(1)->type());
577+
check_attributes(list_type->value_field(), "3", "false");
578+
579+
// check map key/value types
580+
auto map_type = checked_pointer_cast<arrow::MapType>(schema->field(2)->type());
581+
check_attributes(map_type->key_field(), "5", "true");
582+
check_attributes(map_type->item_field(), "6", "false");
583+
584+
// check struct sub-field type
585+
auto struct_type = checked_pointer_cast<arrow::StructType>(schema->field(3)->type());
586+
check_attributes(struct_type->field(0), "8", "false");
587+
}
588+
589+
TEST(TestAdapterReadWrite, FieldAttributesRoundTrip) {
590+
EXPECT_OK_AND_ASSIGN(auto buffer_output_stream, io::BufferOutputStream::Create(1024));
591+
auto write_options = adapters::orc::WriteOptions();
592+
write_options.compression = Compression::UNCOMPRESSED;
593+
EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open(
594+
buffer_output_stream.get(), write_options));
595+
596+
auto schema = ::arrow::schema(
597+
{::arrow::field("c0", ::arrow::int64(), /*nullable=*/true,
598+
key_value_metadata({"k0"}, {"v0"})),
599+
::arrow::field("c1", ::arrow::utf8(), /*nullable=*/true,
600+
key_value_metadata({"k1"}, {"v1"})),
601+
::arrow::field(
602+
"c2", ::arrow::list(::arrow::field("item", ::arrow::int64(), /*nullable=*/true,
603+
key_value_metadata({"k2"}, {"v2"})))),
604+
::arrow::field("c3",
605+
std::make_shared<MapType>(
606+
::arrow::field("key", ::arrow::utf8(), /*nullable=*/false,
607+
key_value_metadata({"k3"}, {"v3"})),
608+
::arrow::field("value", ::arrow::int64(), /*nullable=*/true,
609+
key_value_metadata({"k4"}, {"v4"})))),
610+
::arrow::field("c4", ::arrow::struct_({::arrow::field(
611+
"sub", ::arrow::int64(),
612+
/*nullable=*/true, key_value_metadata({"k5"}, {"v5"}))})),
613+
::arrow::field("c5",
614+
::arrow::sparse_union(
615+
{::arrow::field("_union_0", ::arrow::int64(), /*nullable=*/true,
616+
key_value_metadata({"k6"}, {"v6"})),
617+
::arrow::field("_union_1", ::arrow::utf8(), /*nullable=*/true,
618+
key_value_metadata({"k7"}, {"v7"}))},
619+
{0, 1}))});
620+
auto expected_output_table = ::arrow::TableFromJSON(
621+
schema, {R"([[1, "a", [1, 2], [["a", 1]], {"sub": 1}, null]])"});
622+
ARROW_EXPECT_OK(writer->Write(*expected_output_table));
623+
ARROW_EXPECT_OK(writer->Close());
624+
625+
EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish());
626+
std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(buffer));
627+
EXPECT_OK_AND_ASSIGN(
628+
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
629+
EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read());
630+
ASSERT_OK(actual_output_table->ValidateFull());
631+
AssertTablesEqual(*expected_output_table, *actual_output_table);
632+
633+
// Check schema equality with metadata.
634+
EXPECT_OK_AND_ASSIGN(auto read_schema, reader->ReadSchema());
635+
AssertSchemaEqual(schema, read_schema, /*check_metadata=*/true);
636+
}
637+
509638
// Trivial
510639

511640
class TestORCWriterTrivialNoWrite : public ::testing::Test {};

cpp/src/arrow/adapters/orc/util.cc

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "arrow/util/bitmap_ops.h"
3131
#include "arrow/util/checked_cast.h"
3232
#include "arrow/util/decimal.h"
33+
#include "arrow/util/key_value_metadata.h"
3334
#include "arrow/util/range.h"
3435
#include "arrow/util/string.h"
3536
#include "arrow/visit_data_inline.h"
@@ -951,6 +952,15 @@ Status WriteBatch(const Array& array, int64_t orc_offset,
951952
}
952953
}
953954

955+
void SetAttributes(const std::shared_ptr<arrow::Field>& field, liborc::Type* type) {
956+
if (field->HasMetadata()) {
957+
const auto& metadata = field->metadata();
958+
for (int64_t i = 0; i < metadata->size(); i++) {
959+
type->setAttribute(metadata->key(i), metadata->value(i));
960+
}
961+
}
962+
}
963+
954964
Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
955965
Type::type kind = type.id();
956966
switch (kind) {
@@ -1000,9 +1010,9 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
10001010
case Type::type::LIST:
10011011
case Type::type::FIXED_SIZE_LIST:
10021012
case Type::type::LARGE_LIST: {
1003-
std::shared_ptr<DataType> arrow_child_type =
1004-
checked_cast<const BaseListType&>(type).value_type();
1005-
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
1013+
const auto& value_field = checked_cast<const BaseListType&>(type).value_field();
1014+
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*value_field->type()));
1015+
SetAttributes(value_field, orc_subtype.get());
10061016
return liborc::createListType(std::move(orc_subtype));
10071017
}
10081018
case Type::type::STRUCT: {
@@ -1011,19 +1021,19 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
10111021
checked_cast<const StructType&>(type).fields();
10121022
for (auto it = arrow_fields.begin(); it != arrow_fields.end(); ++it) {
10131023
std::string field_name = (*it)->name();
1014-
std::shared_ptr<DataType> arrow_child_type = (*it)->type();
1015-
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
1024+
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*(*it)->type()));
1025+
SetAttributes(*it, orc_subtype.get());
10161026
out_type->addStructField(field_name, std::move(orc_subtype));
10171027
}
10181028
return std::move(out_type);
10191029
}
10201030
case Type::type::MAP: {
1021-
std::shared_ptr<DataType> key_arrow_type =
1022-
checked_cast<const MapType&>(type).key_type();
1023-
std::shared_ptr<DataType> item_arrow_type =
1024-
checked_cast<const MapType&>(type).item_type();
1025-
ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_arrow_type));
1026-
ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_arrow_type));
1031+
const auto& key_field = checked_cast<const MapType&>(type).key_field();
1032+
const auto& item_field = checked_cast<const MapType&>(type).item_field();
1033+
ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_field->type()));
1034+
ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_field->type()));
1035+
SetAttributes(key_field, key_orc_type.get());
1036+
SetAttributes(item_field, item_orc_type.get());
10271037
return liborc::createMapType(std::move(key_orc_type), std::move(item_orc_type));
10281038
}
10291039
case Type::type::DENSE_UNION:
@@ -1034,6 +1044,7 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
10341044
for (const auto& arrow_field : arrow_fields) {
10351045
std::shared_ptr<DataType> arrow_child_type = arrow_field->type();
10361046
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
1047+
SetAttributes(arrow_field, orc_subtype.get());
10371048
out_type->addUnionChild(std::move(orc_subtype));
10381049
}
10391050
return std::move(out_type);
@@ -1132,23 +1143,26 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type) {
11321143
if (subtype_count != 1) {
11331144
return Status::TypeError("Invalid Orc List type");
11341145
}
1135-
ARROW_ASSIGN_OR_RAISE(auto elemtype, GetArrowType(type->getSubtype(0)));
1136-
return list(std::move(elemtype));
1146+
ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField("item", type->getSubtype(0)));
1147+
return list(std::move(elem_field));
11371148
}
11381149
case liborc::MAP: {
11391150
if (subtype_count != 2) {
11401151
return Status::TypeError("Invalid Orc Map type");
11411152
}
1142-
ARROW_ASSIGN_OR_RAISE(auto key_type, GetArrowType(type->getSubtype(0)));
1143-
ARROW_ASSIGN_OR_RAISE(auto item_type, GetArrowType(type->getSubtype(1)));
1144-
return map(std::move(key_type), std::move(item_type));
1153+
ARROW_ASSIGN_OR_RAISE(
1154+
auto key_field, GetArrowField("key", type->getSubtype(0), /*nullable=*/false));
1155+
ARROW_ASSIGN_OR_RAISE(auto value_field,
1156+
GetArrowField("value", type->getSubtype(1)));
1157+
return std::make_shared<MapType>(std::move(key_field), std::move(value_field));
11451158
}
11461159
case liborc::STRUCT: {
11471160
FieldVector fields(subtype_count);
11481161
for (int child = 0; child < subtype_count; ++child) {
1149-
ARROW_ASSIGN_OR_RAISE(auto elem_type, GetArrowType(type->getSubtype(child)));
1150-
std::string name = type->getFieldName(child);
1151-
fields[child] = field(std::move(name), std::move(elem_type));
1162+
const auto& name = type->getFieldName(child);
1163+
ARROW_ASSIGN_OR_RAISE(auto elem_field,
1164+
GetArrowField(name, type->getSubtype(child)));
1165+
fields[child] = std::move(elem_field);
11521166
}
11531167
return struct_(std::move(fields));
11541168
}
@@ -1159,8 +1173,9 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type) {
11591173
FieldVector fields(subtype_count);
11601174
std::vector<int8_t> type_codes(subtype_count);
11611175
for (int child = 0; child < subtype_count; ++child) {
1162-
ARROW_ASSIGN_OR_RAISE(auto elem_type, GetArrowType(type->getSubtype(child)));
1163-
fields[child] = field("_union_" + ToChars(child), std::move(elem_type));
1176+
ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField("_union_" + ToChars(child),
1177+
type->getSubtype(child)));
1178+
fields[child] = std::move(elem_field);
11641179
type_codes[child] = static_cast<int8_t>(child);
11651180
}
11661181
return sparse_union(std::move(fields), std::move(type_codes));
@@ -1176,11 +1191,35 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const Schema& schema) {
11761191
for (int i = 0; i < numFields; i++) {
11771192
const auto& field = schema.field(i);
11781193
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*field->type()));
1194+
SetAttributes(field, orc_subtype.get());
11791195
out_type->addStructField(field->name(), std::move(orc_subtype));
11801196
}
11811197
return std::move(out_type);
11821198
}
11831199

1200+
Result<std::shared_ptr<const KeyValueMetadata>> GetFieldMetadata(
1201+
const liborc::Type* type) {
1202+
if (type == nullptr) {
1203+
return nullptr;
1204+
}
1205+
const auto keys = type->getAttributeKeys();
1206+
if (keys.empty()) {
1207+
return nullptr;
1208+
}
1209+
auto metadata = std::make_shared<KeyValueMetadata>();
1210+
for (const auto& key : keys) {
1211+
metadata->Append(key, type->getAttributeValue(key));
1212+
}
1213+
return std::const_pointer_cast<const KeyValueMetadata>(metadata);
1214+
}
1215+
1216+
Result<std::shared_ptr<Field>> GetArrowField(const std::string& name,
1217+
const liborc::Type* type, bool nullable) {
1218+
ARROW_ASSIGN_OR_RAISE(auto arrow_type, GetArrowType(type));
1219+
ARROW_ASSIGN_OR_RAISE(auto metadata, GetFieldMetadata(type));
1220+
return field(name, std::move(arrow_type), nullable, std::move(metadata));
1221+
}
1222+
11841223
} // namespace orc
11851224
} // namespace adapters
11861225
} // namespace arrow

cpp/src/arrow/adapters/orc/util.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "arrow/array/builder_base.h"
2424
#include "arrow/result.h"
2525
#include "arrow/status.h"
26+
#include "arrow/type_fwd.h"
2627
#include "orc/OrcFile.hh"
2728

2829
namespace liborc = orc;
@@ -35,6 +36,13 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type);
3536

3637
Result<std::unique_ptr<liborc::Type>> GetOrcType(const Schema& schema);
3738

39+
Result<std::shared_ptr<const KeyValueMetadata>> GetFieldMetadata(
40+
const liborc::Type* type);
41+
42+
Result<std::shared_ptr<Field>> GetArrowField(const std::string& name,
43+
const liborc::Type* type,
44+
bool nullable = true);
45+
3846
ARROW_EXPORT Status AppendBatch(const liborc::Type* type,
3947
liborc::ColumnVectorBatch* batch, int64_t offset,
4048
int64_t length, arrow::ArrayBuilder* builder);

0 commit comments

Comments
 (0)