ARROW-1808: [C++] Make RecordBatch, Table virtual interfaces for column access#1337
ARROW-1808: [C++] Make RecordBatch, Table virtual interfaces for column access#1337wesm wants to merge 14 commits intoapache:masterfrom
Conversation
Change-Id: I0a252a1c7606b0d98827765029aaa4dce3e445bb
…efactoring Change-Id: I10e2b777beae1adc7dbbe2120d9331c9b37eb4ff
…o set build type in Travis CI Change-Id: I8fe9a8e155c6c37a7a4a5f921eb411d45bd8c5d8
Change-Id: I180f4c0d2af218b9fca4ff115be19c9dc6c7d9f7
Change-Id: Ia41fd3d634bb7d0a9090f7758e230b1665df21a3
…at define Change-Id: I81e9f9a11cd0c43e0d629e404643f95b16a1f75a
Change-Id: I996c45e66a4b835b2c46003a6bed84da7d446eee
Change-Id: I87885c88e09617dceee6be3b801bdb5442081955
|
@kou I fixed the glib compilation, but I added DCHECKs to the record batch constructor to assert that the schema is the same size as the columns, but this isn't being checked it seems in the Glib bindings: |
Change-Id: If3a168a02443007946dd80f47a4aaa4f9d60b132
| const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0; | ||
|
|
||
| /// \brief Name in i-th column | ||
| const std::string& column_name(int i) const; |
There was a problem hiding this comment.
The name is coming from the schema, so a copy not strictly necessary
Change-Id: I36b935cc45ec2280ab221be14dccfda44f9ccab6
|
@kou I removed the dchecks from the ctor that I mentioned in favor of validating in |
|
@wesm I confirmed. The test creates 0 rows record batch with empty columns. It causes the segmentation fault. The following patch fixes this: diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb
index 3de8e5cf..67aed85f 100644
--- a/c_glib/test/test-file-writer.rb
+++ b/c_glib/test/test-file-writer.rb
@@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase
include Helper::Buildable
def test_write_record_batch
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-ipc-file-writer")
output = Arrow::FileOutputStream.new(tempfile.path, false)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
begin
- record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])
file_writer.write_record_batch(record_batch)
ensure
file_writer.close
@@ -38,8 +42,12 @@ class TestFileWriter < Test::Unit::TestCase
input = Arrow::MemoryMappedInputStream.new(tempfile.path)
begin
file_reader = Arrow::RecordBatchFileReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
file_reader.schema.fields.collect(&:name))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ file_reader.read_record_batch(0))
ensure
input.close
end
diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb
index a71a3704..2adf25b3 100644
--- a/c_glib/test/test-gio-input-stream.rb
+++ b/c_glib/test/test-gio-input-stream.rb
@@ -16,15 +16,21 @@
# under the License.
class TestGIOInputStream < Test::Unit::TestCase
+ include Helper::Buildable
+
def test_reader_backend
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-gio-input-stream")
output = Arrow::FileOutputStream.new(tempfile.path, false)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
begin
- record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])
file_writer.write_record_batch(record_batch)
ensure
file_writer.close
@@ -38,8 +44,12 @@ class TestGIOInputStream < Test::Unit::TestCase
input = Arrow::GIOInputStream.new(input_stream)
begin
file_reader = Arrow::RecordBatchFileReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
file_reader.schema.fields.collect(&:name))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ file_reader.read_record_batch(0))
ensure
input.close
end
diff --git a/c_glib/test/test-gio-output-stream.rb b/c_glib/test/test-gio-output-stream.rb
index adaa8c1b..c77598ed 100644
--- a/c_glib/test/test-gio-output-stream.rb
+++ b/c_glib/test/test-gio-output-stream.rb
@@ -16,17 +16,23 @@
# under the License.
class TestGIOOutputStream < Test::Unit::TestCase
+ include Helper::Buildable
+
def test_writer_backend
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-gio-output-stream")
file = Gio::File.new_for_path(tempfile.path)
output_stream = file.append_to(:none)
output = Arrow::GIOOutputStream.new(output_stream)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
begin
- record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])
file_writer.write_record_batch(record_batch)
ensure
file_writer.close
@@ -38,8 +44,12 @@ class TestGIOOutputStream < Test::Unit::TestCase
input = Arrow::MemoryMappedInputStream.new(tempfile.path)
begin
file_reader = Arrow::RecordBatchFileReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
file_reader.schema.fields.collect(&:name))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ file_reader.read_record_batch(0))
ensure
input.close
end
diff --git a/c_glib/test/test-stream-writer.rb b/c_glib/test/test-stream-writer.rb
index c3d0e149..32754e20 100644
--- a/c_glib/test/test-stream-writer.rb
+++ b/c_glib/test/test-stream-writer.rb
@@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase
include Helper::Buildable
def test_write_record_batch
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-ipc-stream-writer")
output = Arrow::FileOutputStream.new(tempfile.path, false)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema)
begin
columns = [
- build_boolean_array([true]),
+ build_boolean_array(data),
]
- record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+ record_batch = Arrow::RecordBatch.new(schema, data.size, columns)
stream_writer.write_record_batch(record_batch)
ensure
stream_writer.close
@@ -41,10 +43,12 @@ class TestStreamWriter < Test::Unit::TestCase
input = Arrow::MemoryMappedInputStream.new(tempfile.path)
begin
stream_reader = Arrow::RecordBatchStreamReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
stream_reader.schema.fields.collect(&:name))
- assert_equal(true,
- stream_reader.read_next.get_column(0).get_value(0))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ stream_reader.read_next)
assert_nil(stream_reader.read_next)
ensure
input.close
|
|
If we allow empty record batch (no columns record batch), the following check is needed: diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 3c1db061..6af9bc4b 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -139,9 +139,11 @@ class RecordBatchSerializer : public ArrayVisitor {
buffers_.clear();
}
- // Perform depth-first traversal of the row-batch
- for (int i = 0; i < batch.num_columns(); ++i) {
- RETURN_NOT_OK(VisitArray(*batch.column(i)));
+ if (batch.num_rows() > 0) {
+ // Perform depth-first traversal of the row-batch
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ RETURN_NOT_OK(VisitArray(*batch.column(i)));
+ }
}
// The position for the start of a buffer relative to the passed frame ofI'm OK that we deny no columns record batch. It'll simplify our code. |
|
@kou it seems like there are two different issues here. Here, a schema with 1 field was passed along with a list of 0 columns: - record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])I believe this would result in segfaults even if the number of rows is non-zero. So having empty / length-0 record batches in the IPC writer code path is fine so long as the columns matches the schema. The reason this bug was not caught before was that the |
Change-Id: I23c701d8246faf8669d70a1bf6ce5ce0bc170591
Change-Id: Ie205d1fb8c3e05a2b01ecf4c317cfdc1d3acbd24
Change-Id: I345dfe5aaf7e5adb49ceccf5663d950c21c03558
Change-Id: I7511a0b63c3f540d4dc688eef5a86f80f09228d0
|
Merging, since now the Linux build has failed only when reaching parquet-cpp. I will update the parquet-cpp patch and then merge that once its build passes |
I agree with you.
I think that it's better that we do it in more higher layer such as GLib bindings layer. I think that we don't do needless checks in C++ layer for simplicity and performance. I'll add boundschecking in GLib bindings later. For now, I think that it's better that we always validate a newly created record batch. If we always validate it, we can assume that all record batches always have valid data. From d9260c09765b1cd337cda5a09497ee1b985ef623 Mon Sep 17 00:00:00 2001
From: Kouhei Sutou <[email protected]>
Date: Wed, 22 Nov 2017 09:09:30 +0900
Subject: [PATCH] [GLib] Always validate on creating new record batch
---
c_glib/arrow-glib/record-batch.cpp | 13 ++++++++---
c_glib/arrow-glib/record-batch.h | 3 ++-
c_glib/example/go/write-batch.go | 10 +++++++--
c_glib/example/go/write-stream.go | 10 +++++++--
c_glib/test/test-record-batch.rb | 46 +++++++++++++++++++++++++-------------
5 files changed, 58 insertions(+), 24 deletions(-)
diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp
index f23a0cf7..73de6eeb 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -135,13 +135,15 @@ garrow_record_batch_class_init(GArrowRecordBatchClass *klass)
* @schema: The schema of the record batch.
* @n_rows: The number of the rows in the record batch.
* @columns: (element-type GArrowArray): The columns in the record batch.
+ * @error: (nullable): Return location for a #GError or %NULL.
*
- * Returns: A newly created #GArrowRecordBatch.
+ * Returns: (nullable): A newly created #GArrowRecordBatch or %NULL on error.
*/
GArrowRecordBatch *
garrow_record_batch_new(GArrowSchema *schema,
guint32 n_rows,
- GList *columns)
+ GList *columns,
+ GError **error)
{
std::vector<std::shared_ptr<arrow::Array>> arrow_columns;
for (GList *node = columns; node; node = node->next) {
@@ -152,7 +154,12 @@ garrow_record_batch_new(GArrowSchema *schema,
auto arrow_record_batch =
arrow::RecordBatch::Make(garrow_schema_get_raw(schema),
n_rows, arrow_columns);
- return garrow_record_batch_new_raw(&arrow_record_batch);
+ auto status = arrow_record_batch->Validate();
+ if (garrow_error_check(error, status, "[record-batch][new]")) {
+ return garrow_record_batch_new_raw(&arrow_record_batch);
+ } else {
+ return NULL;
+ }
}
/**
diff --git a/c_glib/arrow-glib/record-batch.h b/c_glib/arrow-glib/record-batch.h
index 021f894f..823a42bb 100644
--- a/c_glib/arrow-glib/record-batch.h
+++ b/c_glib/arrow-glib/record-batch.h
@@ -68,7 +68,8 @@ GType garrow_record_batch_get_type(void) G_GNUC_CONST;
GArrowRecordBatch *garrow_record_batch_new(GArrowSchema *schema,
guint32 n_rows,
- GList *columns);
+ GList *columns,
+ GError **error);
gboolean garrow_record_batch_equal(GArrowRecordBatch *record_batch,
GArrowRecordBatch *other_record_batch);
diff --git a/c_glib/example/go/write-batch.go b/c_glib/example/go/write-batch.go
index 9dbc3c00..f4d03ed9 100644
--- a/c_glib/example/go/write-batch.go
+++ b/c_glib/example/go/write-batch.go
@@ -188,7 +188,10 @@ func main() {
BuildDoubleArray(),
}
- recordBatch := arrow.NewRecordBatch(schema, 4, columns)
+ recordBatch, err := arrow.NewRecordBatch(schema, 4, columns)
+ if err != nil {
+ log.Fatalf("Failed to create record batch #1: %v", err)
+ }
_, err = writer.WriteRecordBatch(recordBatch)
if err != nil {
log.Fatalf("Failed to write record batch #1: %v", err)
@@ -198,7 +201,10 @@ func main() {
for i, column := range columns {
slicedColumns[i] = column.Slice(1, 3)
}
- recordBatch = arrow.NewRecordBatch(schema, 3, slicedColumns)
+ recordBatch, err = arrow.NewRecordBatch(schema, 3, slicedColumns)
+ if err != nil {
+ log.Fatalf("Failed to create record batch #2: %v", err)
+ }
_, err = writer.WriteRecordBatch(recordBatch)
if err != nil {
log.Fatalf("Failed to write record batch #2: %v", err)
diff --git a/c_glib/example/go/write-stream.go b/c_glib/example/go/write-stream.go
index 244741e8..7225156a 100644
--- a/c_glib/example/go/write-stream.go
+++ b/c_glib/example/go/write-stream.go
@@ -188,7 +188,10 @@ func main() {
BuildDoubleArray(),
}
- recordBatch := arrow.NewRecordBatch(schema, 4, columns)
+ recordBatch, err := arrow.NewRecordBatch(schema, 4, columns)
+ if err != nil {
+ log.Fatalf("Failed to create record batch #1: %v", err)
+ }
_, err = writer.WriteRecordBatch(recordBatch)
if err != nil {
log.Fatalf("Failed to write record batch #1: %v", err)
@@ -198,7 +201,10 @@ func main() {
for i, column := range columns {
slicedColumns[i] = column.Slice(1, 3)
}
- recordBatch = arrow.NewRecordBatch(schema, 3, slicedColumns)
+ recordBatch, err = arrow.NewRecordBatch(schema, 3, slicedColumns)
+ if err != nil {
+ log.Fatalf("Failed to create record batch #2: %v", err)
+ }
writer.WriteRecordBatch(recordBatch)
_, err = writer.WriteRecordBatch(recordBatch)
if err != nil {
diff --git a/c_glib/test/test-record-batch.rb b/c_glib/test/test-record-batch.rb
index 9fd34b7d..325944b8 100644
--- a/c_glib/test/test-record-batch.rb
+++ b/c_glib/test/test-record-batch.rb
@@ -18,18 +18,32 @@
class TestTable < Test::Unit::TestCase
include Helper::Buildable
- def test_new
- fields = [
- Arrow::Field.new("visible", Arrow::BooleanDataType.new),
- Arrow::Field.new("valid", Arrow::BooleanDataType.new),
- ]
- schema = Arrow::Schema.new(fields)
- columns = [
- build_boolean_array([true]),
- build_boolean_array([false]),
- ]
- record_batch = Arrow::RecordBatch.new(schema, 1, columns)
- assert_equal(1, record_batch.n_rows)
+ sub_test_case(".new") do
+ def test_valid
+ fields = [
+ Arrow::Field.new("visible", Arrow::BooleanDataType.new),
+ Arrow::Field.new("valid", Arrow::BooleanDataType.new),
+ ]
+ schema = Arrow::Schema.new(fields)
+ columns = [
+ build_boolean_array([true]),
+ build_boolean_array([false]),
+ ]
+ record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+ assert_equal(1, record_batch.n_rows)
+ end
+
+ def test_no_columns
+ fields = [
+ Arrow::Field.new("visible", Arrow::BooleanDataType.new),
+ ]
+ schema = Arrow::Schema.new(fields)
+ message = "[record-batch][new]: " +
+ "Invalid: Number of columns did not match schema"
+ assert_raise(Arrow::Error::Invalid.new(message)) do
+ Arrow::RecordBatch.new(schema, 0, [])
+ end
+ end
end
sub_test_case("instance methods") do
@@ -40,7 +54,7 @@ class TestTable < Test::Unit::TestCase
]
schema = Arrow::Schema.new(fields)
columns = [
- build_boolean_array([true, false, true, false, true, false]),
+ build_boolean_array([true, false, true, false, true]),
build_boolean_array([false, true, false, true, false]),
]
@record_batch = Arrow::RecordBatch.new(schema, 5, columns)
@@ -53,7 +67,7 @@ class TestTable < Test::Unit::TestCase
]
schema = Arrow::Schema.new(fields)
columns = [
- build_boolean_array([true, false, true, false, true, false]),
+ build_boolean_array([true, false, true, false, true]),
build_boolean_array([false, true, false, true, false]),
]
other_record_batch = Arrow::RecordBatch.new(schema, 5, columns)
@@ -71,7 +85,7 @@ class TestTable < Test::Unit::TestCase
end
def test_columns
- assert_equal([6, 5],
+ assert_equal([5, 5],
@record_batch.columns.collect(&:length))
end
@@ -94,7 +108,7 @@ class TestTable < Test::Unit::TestCase
def test_to_s
assert_equal(<<-PRETTY_PRINT, @record_batch.to_s)
-visible: [true, false, true, false, true, false]
+visible: [true, false, true, false, true]
valid: [false, true, false, true, false]
PRETTY_PRINT
end
--
2.15.0
|
While this will cause some minor API breakage in parquet-cpp and some other downstream users, this is reasonably long overdue. It will permit implementations of the RecordBatch or Table interface that do lazy IO / data loading or lazy materialization of columns.
I will write a patch to fix up parquet-cpp, and will look to see if glib is easy to fix. There's no good way to go about merging this patch since a green build is not possible, so once we're happy with the patch, I can merge this patch and then work on getting a green build in parquet-cpp so we don't have a broken build there for too long