Implement RecordBatch <--> FlightData encode/decode + tests#3391
Implement RecordBatch <--> FlightData encode/decode + tests#3391alamb merged 18 commits intoapache:masterfrom
RecordBatch <--> FlightData encode/decode + tests#3391Conversation
| /// calling [`Self::into_inner`] and using the [`FlightDataStream`] | ||
| /// directly. | ||
| #[derive(Debug)] | ||
| pub struct FlightRecordBatchStream { |
There was a problem hiding this comment.
This is moved / renamed / tested in decode.rs
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; |
There was a problem hiding this comment.
This is code based on https://github.com/influxdata/influxdb_iox/pull/6460,
It handles encoding RecordBatches into FlightData and the details of dictionaries, etc.
| /// Ipc writer options | ||
| options: IpcWriteOptions, | ||
| /// Metadata to add to the schema message | ||
| app_metadata: Bytes, |
There was a problem hiding this comment.
I hope eventually there can be options related to dictionary encoding here -- like "try and match dictionaries" for example
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Tests for round trip encoding / decoding |
There was a problem hiding this comment.
Here are the round trip tests for encode/decode (that ensure that RecordBatches sent via Flight get through correctly
I expect that we can use this framework as we sort out how to properly encode dictionaries (e.g. #3389)
| use crate::CONTINUATION_MARKER; | ||
|
|
||
| /// IPC write options used to control the behaviour of the writer | ||
| /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] |
|
|
||
| // TODO test for do_get | ||
| #[tokio::test] | ||
| async fn test_do_get() { |
There was a problem hiding this comment.
here are the tests for do_get promised in #3378
|
cc @avantgardnerio @nevi-me and @tustvold |
| DecodedPayload::Schema(_) => { | ||
| self.got_schema = true; | ||
| // Need next message, poll inner again | ||
| } | ||
| DecodedPayload::RecordBatch(batch) => { | ||
| return Poll::Ready(Some(Ok(batch))); | ||
| } |
There was a problem hiding this comment.
Do we need to verify the schema of record batch against received schema?
There was a problem hiding this comment.
I actually would like to avoid verifying the schema at the lower level decoder so that clients can send other schema messages if they so wish (we actually do this in IOx).
The higher level record batch decoder, however, should verify the I think
I added tests to illustrate these behaviors in test_chained_streams_batch_decoder and test_chained_streams_data_decoder
There was a problem hiding this comment.
I also added test_mismatched_schema_message to test sending an incorrect schema message. Currently it panics but we can probably turn that into a useful message at some point I improved the message as well
Co-authored-by: Liang-Chi Hsieh <[email protected]>
…s into alamb/flight_data_transfer
| make_array(data) | ||
| } | ||
| _ => { | ||
| if nodes.len() <= node_index { |
There was a problem hiding this comment.
The code panic's at nodes.get(node_index) below -- so this change just makes an error rather than a panic, which is a slightly better user experience
|
@tustvold if you have a chance, I would appreciate a review of this PR. I am particularly interested in your opinion of how we should handle sending RecordBatch'es and retain the dictionary encoding. One way might be to have the encode stream match / assign dictionary ids prior to sending. I realize this would be less efficient than if the creators of the dictionaries were to set the ids, but it would likely be more efficient than hydrating the dictionaries into actual data. |
I think we could just assign a unique ID to each dictionary encoded field, and then send a new DictionaryBatch for each field for each RecordBatch. We could then potentially retain the previous dictionary and use |
tustvold
left a comment
There was a problem hiding this comment.
Mostly just some minor nits, I personally would avoid upstreaming the hydration of dictionaries so that we aren't committed to supporting it long term, I think it should be relatively straightforward to properly support dictionaries in a somewhat sane manner
arrow-flight/tests/encode_decode.rs
Outdated
| if i == i / 2 { | ||
| None | ||
| } else { | ||
| // repeat some values for low cardinality | ||
| let v = i / 3; | ||
| Some(format!("value{v}")) | ||
| } |
There was a problem hiding this comment.
| if i == i / 2 { | |
| None | |
| } else { | |
| // repeat some values for low cardinality | |
| let v = i / 3; | |
| Some(format!("value{v}")) | |
| } | |
| // repeat some values for low cardinality | |
| (i != i / 2).then(|| format!("value{}", i / 3)) |
I also wonder if i == i / 2 is what you meant to write, as that will only be true for 0
There was a problem hiding this comment.
you are right -- I meant num_rows / 2 -- thank you
There was a problem hiding this comment.
As above, I think the then formulation is harder to read so I would like to leave the current less concise formulaton
| fn make_primative_batch(num_rows: usize) -> RecordBatch { | ||
| let i: UInt8Array = (0..num_rows) | ||
| .map(|i| { | ||
| if i == num_rows / 2 { |
There was a problem hiding this comment.
This could be rewritten with Option::then as below, to make it significantly more concise
There was a problem hiding this comment.
It is true -- this would be more concise. However after trying it (see below) I think the current formulation is easier to read, so I plan to leave it as is.
Thank you for the suggestion
let i: UInt8Array = (0..num_rows)
- .map(|i| {
- if i == num_rows / 2 {
- None
- } else {
- Some(i.try_into().unwrap())
- }
- })
+ .map(|i| (i != num_rows / 2).then(|| i.try_into().unwrap()))
.collect();
arrow-flight/src/encode.rs
Outdated
| /// 1. Hydrates any dictionaries to its underlying type. See | ||
| /// hydrate_dictionary for more information. | ||
| /// | ||
| pub fn prepare_batch_for_flight( |
There was a problem hiding this comment.
This method is stateless which means it necessarily will never be able to properly handle dictionaries, given we just deprecated a similar method in the IPC reader, perhaps we should avoid doing this in favour of the stateful encoder above?
There was a problem hiding this comment.
I am not quite sure what you are suggesting here. I removed the pub in 2267d13 so that we can have more flexibility with non breaking API changes in the future.
| /// * <https://github.com/apache/arrow-rs/issues/1206> | ||
| /// | ||
| /// For now we just hydrate the dictionaries to their underlying type | ||
| fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> { |
There was a problem hiding this comment.
I wonder if we should keep this as a workaround in IOx, and only upstream the proper dictionary support once it is ready? I just wonder if this is a mode we really want to support long-term, especially if we have it the default behaviour?
There was a problem hiding this comment.
See #3391 (comment)
I will attempt to get rid of this workaround prior to the arrow 31 release
Thank you -- I will investigate how much effort it will take to do so. I am almost out of time today so I will likely not get a chance to work on this until later in the week |
| None => { | ||
| // inner is done | ||
| self.done = true; | ||
| } |
There was a problem hiding this comment.
Once reaching here, I guess the queue is guaranteed to be empty? If so, seems we can just return Poll::Ready(None); here?
There was a problem hiding this comment.
Yes that is correct. This case is also handled on the next loop iteration, but I think making it explicit is good too. I did so in 4f30ab7
I have pondered this for a while and my plan is to proceed with merging this PR (with the temporary dictionary hydration) and work on proper dictionary support as a follow on PR for the following reasons:
In order to minimize the API churn however, I plan to hold this PR until after we have released 30.0.0 (#3336 ) so we can minimize the chance of releasing the dictionary hydration code |
Co-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Raphael Taylor-Davies <[email protected]>
|
Benchmark runs are scheduled for baseline = 9398af6 and contender = dc09b0b. dc09b0b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
The follow on PR is ready: #3402 |
Which issue does this PR close?
Closes #3371
(note a large amount of this PR is test code)
Rationale for this change
The details of encoding / sending / receiving / reconstructiing RecordBatches over flight is common and somewhat duplicated across every implementation of flight. The mid level flight client aims to automate much of the common piece of this
What changes are included in this PR?
encode.rslogic inFlightDataEncoderBuilder(based on the code from IOx in https://github.com/influxdata/influxdb_iox/pull/6460)decode.rsFlightClient::do_getAre there any user-facing changes?
yes, encoders and decoders
Next planned PRs: