-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-1345 adding missing dependency on avro for hadoop 0.23 to the new ... #263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Merged build triggered. Build is starting -or- tests failed to complete. |
|
Merged build started. Build is starting -or- tests failed to complete. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
|
LGTM - feel free to merge this! |
|
Okay just went ahead and merged this. |
…ew ... ...sql pom files Author: Thomas Graves <[email protected]> Closes apache#263 from tgravescs/SPARK-1345 and squashes the following commits: b43a2a0 [Thomas Graves] SPARK-1345 adding missing dependency on avro for hadoop 0.23 to the new sql pom files
Merge upstream apache
…runing ### What changes were proposed in this pull request? Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example: ```sql SELECT date_id, product_id FROM fact_sk f JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s ON f.store_id = s.new_store_id ``` Before this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` After this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] ``` This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- Subquery subquery#4009, [id=#284] : +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[]) : +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280] : +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[]) : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` ### Why are the changes needed? Improve DPP to support more cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test: SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- TPC-DS q58 | 40 | 20 TPC-DS q83 | 18 | 14 Closes #33664 from wangyum/SPARK-36444. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
…runing ### What changes were proposed in this pull request? Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example: ```sql SELECT date_id, product_id FROM fact_sk f JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s ON f.store_id = s.new_store_id ``` Before this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` After this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] ``` This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- Subquery subquery#4009, [id=#284] : +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[]) : +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280] : +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[]) : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` ### Why are the changes needed? Improve DPP to support more cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test: SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- TPC-DS q58 | 40 | 20 TPC-DS q83 | 18 | 14 Closes #33664 from wangyum/SPARK-36444. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 2310b99) Signed-off-by: Yuming Wang <[email protected]>
...sql pom files