@@ -140,12 +140,35 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
140140 InsertIntoParquetTable (relation, planLater(child), overwrite= true )(sparkContext) :: Nil
141141 case logical.InsertIntoTable (table : ParquetRelation , partition, child, overwrite) =>
142142 InsertIntoParquetTable (table, planLater(child), overwrite)(sparkContext) :: Nil
143- case PhysicalOperation (projectList, filters, relation : ParquetRelation ) =>
144- // TODO: Should be pushing down filters as well.
143+ case PhysicalOperation (projectList, filters : Seq [Expression ], relation : ParquetRelation ) => {
144+ val remainingFilters =
145+ if (sparkContext.conf.getBoolean(ParquetFilters .PARQUET_FILTER_PUSHDOWN_ENABLED , true )) {
146+ filters.filter {
147+ // Note: filters cannot be pushed down to Parquet if they contain more complex
148+ // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
149+ // all filters that have been pushed down. Note that a predicate such as
150+ // "(A AND B) OR C" can result in "A OR C" being pushed down.
151+ filter =>
152+ val recordFilter = ParquetFilters .createFilter(filter)
153+ if (! recordFilter.isDefined) {
154+ // First case: the pushdown did not result in any record filter.
155+ true
156+ } else {
157+ // Second case: a record filter was created; here we are conservative in
158+ // the sense that even if "A" was pushed and we check for "A AND B" we
159+ // still want to keep "A AND B" in the higher-level filter, not just "B".
160+ ! ParquetFilters .findExpression(recordFilter.get, filter).isDefined
161+ }
162+ }
163+ } else {
164+ filters
165+ }
145166 pruneFilterProject(
146167 projectList,
147- filters,
148- ParquetTableScan (_, relation, None )(sparkContext)) :: Nil
168+ remainingFilters,
169+ ParquetTableScan (_, relation, filters)(sparkContext)) :: Nil
170+ }
171+
149172 case _ => Nil
150173 }
151174 }
0 commit comments