Skip to content

ARROW-10251: [Rust] [DataFusion] MemTable::load() now loads partitions in parallel#8428

Closed
andygrove wants to merge 2 commits intoapache:masterfrom
andygrove:ARROW-10251
Closed

ARROW-10251: [Rust] [DataFusion] MemTable::load() now loads partitions in parallel#8428
andygrove wants to merge 2 commits intoapache:masterfrom
andygrove:ARROW-10251

Conversation

@andygrove
Copy link
Copy Markdown
Member

No description provided.

@andygrove
Copy link
Copy Markdown
Member Author

andygrove commented Oct 10, 2020

For the TPCH benchmark with --mem-table this gave me ~10x speedup in load times. fyi @jhorstmann

Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/s1/parquet", file_format: "parquet", mem_table: true }
Loading data into memory
Loaded data into memory in 486 ms
Query 1 iteration 0 took 166 ms
Query 1 iteration 1 took 154 ms
Query 1 iteration 2 took 156 ms

@github-actions
Copy link
Copy Markdown

Copy link
Copy Markdown
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

Left two small suggestions, but with a 10x, LGTM anyways xD (pending checks).

let exec = exec.clone();
let task: JoinHandle<Result<Vec<RecordBatch>>> = task::spawn(async move {
let it = exec.execute(partition).await?;
Ok(it.into_iter().collect::<ArrowResult<Vec<RecordBatch>>>()?)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(it.into_iter().collect::<ArrowResult<Vec<RecordBatch>>>()?)
it.into_iter().collect::<ArrowResult<Vec<RecordBatch>>>()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I have not tested this)

data.push(partition_batches);
let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partition_count);
for task in tasks {
let result = task.await.unwrap()?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = task.await.unwrap()?;
let result = task.await.expect("To have some data on every recordBatch")?;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants