Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions c_glib/arrow-glib/stream-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ G_BEGIN_DECLS
*/

typedef struct GArrowStreamReaderPrivate_ {
std::shared_ptr<arrow::ipc::StreamReader> stream_reader;
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> stream_reader;
} GArrowStreamReaderPrivate;

enum {
Expand Down Expand Up @@ -85,7 +85,7 @@ garrow_stream_reader_set_property(GObject *object,
switch (prop_id) {
case PROP_STREAM_READER:
priv->stream_reader =
*static_cast<std::shared_ptr<arrow::ipc::StreamReader> *>(g_value_get_pointer(value));
*static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
Expand Down Expand Up @@ -124,8 +124,8 @@ garrow_stream_reader_class_init(GArrowStreamReaderClass *klass)
gobject_class->get_property = garrow_stream_reader_get_property;

spec = g_param_spec_pointer("stream-reader",
"ipc::StreamReader",
"The raw std::shared<arrow::ipc::StreamReader> *",
"ipc::RecordBatchStreamReader",
"The raw std::shared<arrow::ipc::RecordBatchStreamReader> *",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_STREAM_READER, spec);
Expand All @@ -143,10 +143,10 @@ GArrowStreamReader *
garrow_stream_reader_new(GArrowInputStream *stream,
GError **error)
{
std::shared_ptr<arrow::ipc::StreamReader> arrow_stream_reader;
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> arrow_stream_reader;
auto status =
arrow::ipc::StreamReader::Open(garrow_input_stream_get_raw(stream),
&arrow_stream_reader);
arrow::ipc::RecordBatchStreamReader::Open(garrow_input_stream_get_raw(stream),
&arrow_stream_reader);
if (garrow_error_check(error, status, "[ipc][stream-reader][open]")) {
return garrow_stream_reader_new_raw(&arrow_stream_reader);
} else {
Expand Down Expand Up @@ -179,7 +179,7 @@ garrow_stream_reader_get_schema(GArrowStreamReader *stream_reader)
*/
GArrowRecordBatch *
garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader,
GError **error)
GError **error)
{
auto arrow_stream_reader =
garrow_stream_reader_get_raw(stream_reader);
Expand All @@ -202,7 +202,7 @@ garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader,
G_END_DECLS

GArrowStreamReader *
garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader)
garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader)
{
auto stream_reader =
GARROW_STREAM_READER(g_object_new(GARROW_TYPE_STREAM_READER,
Expand All @@ -211,7 +211,7 @@ garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_st
return stream_reader;
}

std::shared_ptr<arrow::ipc::StreamReader>
std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader)
{
GArrowStreamReaderPrivate *priv;
Expand Down
2 changes: 1 addition & 1 deletion c_glib/arrow-glib/stream-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ typedef struct _GArrowStreamReaderClass GArrowStreamReaderClass;
/**
* GArrowStreamReader:
*
* It wraps `arrow::ipc::StreamReader`.
* It wraps `arrow::ipc::InputStreamReader`.
*/
struct _GArrowStreamReader
{
Expand Down
4 changes: 2 additions & 2 deletions c_glib/arrow-glib/stream-reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@

#include <arrow-glib/stream-reader.h>

GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader);
std::shared_ptr<arrow::ipc::StreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader);
GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader);
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader);
16 changes: 8 additions & 8 deletions c_glib/arrow-glib/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ G_BEGIN_DECLS
*/

typedef struct GArrowStreamWriterPrivate_ {
std::shared_ptr<arrow::ipc::StreamWriter> stream_writer;
std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> stream_writer;
} GArrowStreamWriterPrivate;

enum {
Expand Down Expand Up @@ -89,7 +89,7 @@ garrow_stream_writer_set_property(GObject *object,
switch (prop_id) {
case PROP_STREAM_WRITER:
priv->stream_writer =
*static_cast<std::shared_ptr<arrow::ipc::StreamWriter> *>(g_value_get_pointer(value));
*static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
Expand Down Expand Up @@ -128,8 +128,8 @@ garrow_stream_writer_class_init(GArrowStreamWriterClass *klass)
gobject_class->get_property = garrow_stream_writer_get_property;

spec = g_param_spec_pointer("stream-writer",
"ipc::StreamWriter",
"The raw std::shared<arrow::ipc::StreamWriter> *",
"ipc::RecordBatchStreamWriter",
"The raw std::shared<arrow::ipc::RecordBatchStreamWriter> *",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_STREAM_WRITER, spec);
Expand All @@ -149,11 +149,11 @@ garrow_stream_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
std::shared_ptr<arrow::ipc::StreamWriter> arrow_stream_writer;
std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> arrow_stream_writer;
auto status =
arrow::ipc::StreamWriter::Open(garrow_output_stream_get_raw(sink).get(),
garrow_schema_get_raw(schema),
&arrow_stream_writer);
arrow::ipc::RecordBatchStreamWriter::Open(garrow_output_stream_get_raw(sink).get(),
garrow_schema_get_raw(schema),
&arrow_stream_writer);
if (garrow_error_check(error, status, "[ipc][stream-writer][open]")) {
return garrow_stream_writer_new_raw(&arrow_stream_writer);
} else {
Expand Down
2 changes: 1 addition & 1 deletion c_glib/arrow-glib/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ typedef struct _GArrowStreamWriterClass GArrowStreamWriterClass;
/**
* GArrowStreamWriter:
*
* It wraps `arrow::ipc::StreamWriter`.
* It wraps `arrow::ipc::RecordBatchStreamWriter`.
*/
struct _GArrowStreamWriter
{
Expand Down
8 changes: 4 additions & 4 deletions c_glib/arrow-glib/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

#include <arrow-glib/writer.h>

GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::StreamWriter> *arrow_stream_writer);
std::shared_ptr<arrow::ipc::StreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer);
GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_stream_writer);
std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer);

GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::FileWriter> *arrow_file_writer);
arrow::ipc::FileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer);
GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_file_writer);
arrow::ipc::RecordBatchFileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer);
2 changes: 2 additions & 0 deletions ci/travis_before_script_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake -DARROW_TEST_MEMCHECK=on \
$CMAKE_COMMON_FLAGS \
-DARROW_CXXFLAGS="-Wconversion -Werror" \
-DARROW_NO_DEPRECATED_API=on \
$ARROW_CPP_DIR
else
cmake $CMAKE_COMMON_FLAGS \
-DARROW_CXXFLAGS=-Werror \
-DARROW_NO_DEPRECATED_API=on \
$ARROW_CPP_DIR
fi

Expand Down
8 changes: 8 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
"Build the Arrow micro benchmarks"
OFF)

option(ARROW_NO_DEPRECATED_API
"Exclude deprecated APIs from build"
OFF)

option(ARROW_IPC
"Build the Arrow IPC extensions"
ON)
Expand Down Expand Up @@ -154,6 +158,10 @@ include(BuildUtils)

include(SetupCxxFlags)

if (ARROW_NO_DEPRECATED_API)
add_definitions(-DARROW_NO_DEPRECATED_API)
endif()

# Add common flags
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_COMMON_FLAGS}")
set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/arrow/ipc/file-to-stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@
#include "arrow/util/io-util.h"

namespace arrow {
namespace ipc {

// Reads a file on the file system and prints to stdout the stream version of it.
Status ConvertToStream(const char* path) {
std::shared_ptr<io::ReadableFile> in_file;
std::shared_ptr<ipc::FileReader> reader;
std::shared_ptr<RecordBatchFileReader> reader;

RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));

io::StdoutStream sink;
std::shared_ptr<ipc::StreamWriter> writer;
RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
std::shared_ptr<RecordBatchStreamWriter> writer;
RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> chunk;
RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
Expand All @@ -44,14 +45,15 @@ Status ConvertToStream(const char* path) {
return writer->Close();
}

} // namespace ipc
} // namespace arrow

int main(int argc, char** argv) {
if (argc != 2) {
std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
return 1;
}
arrow::Status status = arrow::ConvertToStream(argv[1]);
arrow::Status status = arrow::ipc::ConvertToStream(argv[1]);
if (!status.ok()) {
std::cerr << "Could not convert to stream: " << status.ToString() << std::endl;
return 1;
Expand Down
25 changes: 13 additions & 12 deletions cpp/src/arrow/ipc/ipc-read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture {
if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
RETURN_NOT_OK(mmap_->Seek(0));

std::shared_ptr<FileWriter> file_writer;
RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
std::shared_ptr<RecordBatchFileWriter> file_writer;
RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
RETURN_NOT_OK(file_writer->Close());

int64_t offset;
RETURN_NOT_OK(mmap_->Tell(&offset));

std::shared_ptr<FileReader> file_reader;
RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader));
std::shared_ptr<RecordBatchFileReader> file_reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));

return file_reader->GetRecordBatch(0, result);
}
Expand Down Expand Up @@ -487,8 +487,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {

Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
// Write the file
std::shared_ptr<FileWriter> writer;
RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
std::shared_ptr<RecordBatchFileWriter> writer;
RETURN_NOT_OK(
RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));

const int num_batches = static_cast<int>(in_batches.size());

Expand All @@ -504,8 +505,8 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {

// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
std::shared_ptr<FileReader> reader;
RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
std::shared_ptr<RecordBatchFileReader> reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader));

EXPECT_EQ(num_batches, reader->num_record_batches());
for (int i = 0; i < num_batches; ++i) {
Expand Down Expand Up @@ -553,8 +554,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
Status RoundTripHelper(
const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
// Write the file
std::shared_ptr<StreamWriter> writer;
RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
std::shared_ptr<RecordBatchStreamWriter> writer;
RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer));
int num_batches = 5;
for (int i = 0; i < num_batches; ++i) {
RETURN_NOT_OK(writer->WriteRecordBatch(batch));
Expand All @@ -565,8 +566,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);

std::shared_ptr<StreamReader> reader;
RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
std::shared_ptr<RecordBatchStreamReader> reader;
RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader));

std::shared_ptr<RecordBatch> chunk;
while (true) {
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/ipc/json-integration-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ static Status ConvertJsonToArrow(
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
}

std::shared_ptr<ipc::FileWriter> writer;
RETURN_NOT_OK(ipc::FileWriter::Open(out_file.get(), reader->schema(), &writer));
std::shared_ptr<ipc::RecordBatchFileWriter> writer;
RETURN_NOT_OK(
ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer));

for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
Expand All @@ -96,8 +97,8 @@ static Status ConvertArrowToJson(
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file));
RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file));

std::shared_ptr<ipc::FileReader> reader;
RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
std::shared_ptr<ipc::RecordBatchFileReader> reader;
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));

if (FLAGS_verbose) {
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
Expand Down Expand Up @@ -137,8 +138,8 @@ static Status ValidateArrowVsJson(
std::shared_ptr<io::ReadableFile> arrow_file;
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file));

std::shared_ptr<ipc::FileReader> arrow_reader;
RETURN_NOT_OK(ipc::FileReader::Open(arrow_file, &arrow_reader));
std::shared_ptr<ipc::RecordBatchFileReader> arrow_reader;
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader));

auto json_schema = json_reader->schema();
auto arrow_schema = arrow_reader->schema();
Expand Down
Loading