ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans#10664
ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans#10664bkietz wants to merge 15 commits intoapache:masterfrom
Conversation
cpp/src/arrow/dataset/scanner.cc
Outdated
There was a problem hiding this comment.
It is a little unfortunate that we're making several heap allocations for every batch to carry effectively three integers (though I guess the overhead is probably not noticeable in the grand scheme of things and batches should be relatively large).
There was a problem hiding this comment.
I guess if it is ever noticeable overhead, we could make the booleans singletons and pool the integer allocations.
There was a problem hiding this comment.
|
I'll test this on EC2 when I get a chance and compare to 4.0/current master. |
|
@ursabot please benchmark lang=Python |
|
Benchmark runs are scheduled for baseline = 780e95c and contender = c1808f9156bd4df03cf8338217ffd93f519abb96. Results will be available as each benchmark for each run completes. |
|
(Looks like we need to rebase before the Conbench benchmarks can run.) |
9e1fbc8 to
c45af38
Compare
|
@ursabot please benchmark lang=Python |
|
Benchmark runs are scheduled for baseline = 780e95c and contender = dfb10c0. Results will be available as each benchmark for each run completes. |
|
From Conbench seems like performance isn't really affected. I'll manually run those benchmarks with more iterations as well. |
| auto batch_gen_gen, | ||
| FragmentsToBatches(std::move(fragment_gen), options, filter_and_project)); | ||
| auto batch_gen_gen_readahead = | ||
| MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead); |
There was a problem hiding this comment.
Ben and I spoke about this. The serial readahead wasn't actually adding anything (batch_gen_gen is really just listing fragments) and it was invalid because a serial readahead generator is not async reentrant and should not be consumed by MakeMergedGenerator.
|
This got stuck while I was testing it (I'll try to get a backtrace shortly). But performance indeed seems to be on par with before. Details4.0.1: 5.0.0 master Count rows, no filter, 10 iterations: This PR Count rows, no filter, 10 iterations: (It got stuck when a filter was applied) |
|
Ok, actually, I'm not sure if it was stuck or just me being inpatient… |
|
Now that I'm running it again it just seems that count_rows with a filter got very slow. Test scriptimport statistics
import time
import pyarrow
import pyarrow.dataset
import pyarrow.fs
fs = pyarrow.fs.S3FileSystem(region="us-east-2")
print("PyArrow:", pyarrow.__version__)
times = []
for _ in range(10):
start = time.monotonic()
ds = pyarrow.dataset.dataset([
"ursa-labs-taxi-data/2009/01/data.parquet",
"ursa-labs-taxi-data/2009/02/data.parquet",
"ursa-labs-taxi-data/2009/03/data.parquet",
"ursa-labs-taxi-data/2009/04/data.parquet",
],
format="parquet",
partitioning=["year", "month"],
filesystem=fs,
)
print("Rows:", ds.scanner(use_async=True, use_threads=True).count_rows())
times.append(time.monotonic() - start)
print("No filter")
print("Min:", min(times), "s")
print("Median:", statistics.median(times), "s")
print("Mean:", statistics.mean(times), "s")
print("Max:", max(times), "s")
print("With filter")
times = []
expr = pyarrow.dataset.field("passenger_count") > 1
for _ in range(10):
start = time.monotonic()
ds = pyarrow.dataset.dataset([
"ursa-labs-taxi-data/2009/01/data.parquet",
"ursa-labs-taxi-data/2009/02/data.parquet",
"ursa-labs-taxi-data/2009/03/data.parquet",
"ursa-labs-taxi-data/2009/04/data.parquet",
],
format="parquet",
partitioning=["year", "month"],
filesystem=fs,
)
print("Rows:", ds.scanner(use_async=True, use_threads=True, filter=expr).count_rows())
times.append(time.monotonic() - start)
print("Min:", min(times), "s")
print("Median:", statistics.median(times), "s")
print("Mean:", statistics.mean(times), "s")
print("Max:", max(times), "s") |
|
Well that's disappointing. I was actually expecting it to get faster |
|
If we correct the CountRows issue above, CountRows indeed gets much faster! |
westonpace
left a comment
There was a problem hiding this comment.
This review is overdue, sorry. I think my main question is whether we want to use a stop token instead of a stop producing method.
| std::unique_lock<std::mutex> lock(mutex_); | ||
| if (!finished_) { | ||
| finished_ = true; | ||
| finished_ = Loop([this, options] { |
There was a problem hiding this comment.
Rather than use Loop directly it seems you could use MakeTransferred and VisitAsyncGenerator. You might need to add ErrorVisitor and stop_callback support to VisitAsyncGenerator but then it would be more univerally supported.
There was a problem hiding this comment.
Is there a reason not to use Loop, though?
There was a problem hiding this comment.
No reason other than readability. The visitor would just be using Loop under the hood.
| bool finished_{false}; | ||
| int next_batch_index_{0}; | ||
| Future<> finished_fut_ = Future<>::MakeFinished(); | ||
| bool stop_requested_{false}; |
There was a problem hiding this comment.
Instead of exposing a StopProducing you might need to take in a stop token. If you don't then how will you handle the following...
A pyarrow user runs a to_table on some dataset. First it has to do inspection, then it has to actually do the scan. At any point the user might press Ctrl-C to cancel the thing.
We can support it with stop token by setting the Ctrl-C stop token handler and then passing that stop token down to the inspection call and the scan call.
As a minor benefit you can get rid of all the locks here because they are inside the stop token itself.
There was a problem hiding this comment.
There's an issue (ARROW-12938) for letting a StopToken run a callback since it's needed to support other things like gRPC (which also exposes a cancel method instead taking a cancellable context). But that'd be more work and requires dealing with signal safety in the callback.
There was a problem hiding this comment.
Replacing with a stop token sounds useful but I'd prefer to handle that in a follow up
There was a problem hiding this comment.
A follow up sounds fine to me.
lidavidm
left a comment
There was a problem hiding this comment.
LGTM. I left a couple tiny things but feel free to merge.
|
|
||
| // Returns a matcher that matches the value of a successful Result<T> or Future<T>. | ||
| // (Future<T> will be waited upon to acquire its result for matching.) | ||
| // Returns a matcher that waits on a Future (by default for 16 seconds) |
There was a problem hiding this comment.
nit: kDefaultAssertFinishesWaitSeconds seconds?
|
|
||
| ~ExecPlanImpl() override { | ||
| if (started_ && !stopped_) { | ||
| if (started_ && !finished_.is_finished()) { |
There was a problem hiding this comment.
This is slightly race-prone/this is a TOCTOU, right? Though the consequences aren't big here.
| Status StartProducing() override { | ||
| if (finished_) { | ||
| return Status::Invalid("Restarted SourceNode '", label(), "'"); | ||
| DCHECK(!stop_requested_) << "Restarted SourceNode"; |
There was a problem hiding this comment.
Should we not still return a Status? It might save us from odd bug reports if we error instead of letting internal state get trampled in case of a bug.
| bool finished_{false}; | ||
| int next_batch_index_{0}; | ||
| Future<> finished_fut_ = Future<>::MakeFinished(); | ||
| bool stop_requested_{false}; |
Replaces the body of AsyncScanner::ScanBatchesAsync with usage of an ExecPlan