Skip to content

Commit 0654409

Browse files
committed
Make timezone aware expression without timezone unresolved.
1 parent 7536e28 commit 0654409

File tree

11 files changed

+112
-84
lines changed

11 files changed

+112
-84
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -150,19 +150,19 @@ class Analyzer(
150150
ResolveAggregateFunctions ::
151151
TimeWindowing ::
152152
ResolveInlineTables(conf) ::
153+
ResolveTimeZone(conf) ::
153154
TypeCoercion.typeCoercionRules ++
154155
extendedResolutionRules : _*),
155156
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
156157
Batch("View", Once,
157-
AliasViewChild(conf)),
158+
AliasViewChild(conf),
159+
ResolveTimeZone(conf)),
158160
Batch("Nondeterministic", Once,
159161
PullOutNondeterministic),
160162
Batch("UDF", Once,
161163
HandleNullInputsForUDF),
162164
Batch("FixNullability", Once,
163165
FixNullability),
164-
Batch("ResolveTimeZone", Once,
165-
ResolveTimeZone),
166166
Batch("Subquery", Once,
167167
UpdateOuterReferences),
168168
Batch("Cleanup", fixedPoint,
@@ -2347,23 +2347,27 @@ class Analyzer(
23472347
}
23482348
}
23492349
}
2350+
}
23502351

2351-
/**
2352-
* Replace [[TimeZoneAwareExpression]] without timezone id by its copy with session local
2353-
* time zone.
2354-
*/
2355-
object ResolveTimeZone extends Rule[LogicalPlan] {
2356-
2357-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressions {
2358-
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
2359-
e.withTimeZone(conf.sessionLocalTimeZone)
2360-
// Casts could be added in the subquery plan through the rule TypeCoercion while coercing
2361-
// the types between the value expression and list query expression of IN expression.
2362-
// We need to subject the subquery plan through ResolveTimeZone again to setup timezone
2363-
// information for time zone aware expressions.
2364-
case e: ListQuery => e.withNewPlan(apply(e.plan))
2365-
}
2352+
/**
2353+
* Replace [[TimeZoneAwareExpression]] without timezone id by its copy with session local
2354+
* time zone.
2355+
*/
2356+
case class ResolveTimeZone(conf: SQLConf) extends Rule[LogicalPlan] {
2357+
private val transformTimeZoneExprs: PartialFunction[Expression, Expression] = {
2358+
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
2359+
e.withTimeZone(conf.sessionLocalTimeZone)
2360+
// Casts could be added in the subquery plan through the rule TypeCoercion while coercing
2361+
// the types between the value expression and list query expression of IN expression.
2362+
// We need to subject the subquery plan through ResolveTimeZone again to setup timezone
2363+
// information for time zone aware expressions.
2364+
case e: ListQuery => e.withNewPlan(apply(e.plan))
23662365
}
2366+
2367+
override def apply(plan: LogicalPlan): LogicalPlan =
2368+
plan.resolveExpressions(transformTimeZoneExprs)
2369+
2370+
def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs)
23672371
}
23682372

23692373
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
2020
import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.catalyst.InternalRow
23-
import org.apache.spark.sql.catalyst.expressions.{Cast, TimeZoneAwareExpression}
23+
import org.apache.spark.sql.catalyst.expressions.Cast
2424
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2525
import org.apache.spark.sql.catalyst.rules.Rule
2626
import org.apache.spark.sql.internal.SQLConf
@@ -99,12 +99,9 @@ case class ResolveInlineTables(conf: SQLConf) extends Rule[LogicalPlan] {
9999
val castedExpr = if (e.dataType.sameType(targetType)) {
100100
e
101101
} else {
102-
Cast(e, targetType)
102+
Cast(e, targetType, Some(conf.sessionLocalTimeZone))
103103
}
104-
castedExpr.transform {
105-
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
106-
e.withTimeZone(conf.sessionLocalTimeZone)
107-
}.eval()
104+
castedExpr.eval()
108105
} catch {
109106
case NonFatal(ex) =>
110107
table.failAnalysis(s"failed to evaluate expression ${e.sql}: ${ex.getMessage}")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.{Calendar, TimeZone}
2424
import scala.util.control.NonFatal
2525

2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2827
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode}
2928
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3029
import org.apache.spark.sql.types._
@@ -34,6 +33,9 @@ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
3433
* Common base class for time zone aware expressions.
3534
*/
3635
trait TimeZoneAwareExpression extends Expression {
36+
/** The expression is only resolved when the time zone has been set. */
37+
override lazy val resolved: Boolean =
38+
childrenResolved && checkInputDataTypes().isSuccess && timeZoneId.isDefined
3739

3840
/** the timezone ID to be used to evaluate value. */
3941
def timeZoneId: Option[String]

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfter
2222
import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal, Rand}
2424
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
25+
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
2526
import org.apache.spark.sql.types.{LongType, NullType, TimestampType}
2627

2728
/**
@@ -91,12 +92,13 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
9192
test("convert TimeZoneAwareExpression") {
9293
val table = UnresolvedInlineTable(Seq("c1"),
9394
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
94-
val converted = ResolveInlineTables(conf).convert(table)
95+
val withTimeZone = ResolveTimeZone(conf).apply(table)
96+
val LocalRelation(output, data) = ResolveInlineTables(conf).apply(withTimeZone)
9597
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
9698
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
97-
assert(converted.output.map(_.dataType) == Seq(TimestampType))
98-
assert(converted.data.size == 1)
99-
assert(converted.data(0).getLong(0) == correct)
99+
assert(output.map(_.dataType) == Seq(TimestampType))
100+
assert(data.size == 1)
101+
assert(data.head.getLong(0) == correct)
100102
}
101103

102104
test("nullability inference in convert") {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.plans.PlanTest
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.unsafe.types.CalendarInterval
3031

@@ -787,6 +788,12 @@ class TypeCoercionSuite extends PlanTest {
787788
}
788789
}
789790

791+
private val timeZoneResolver = ResolveTimeZone(new SQLConf)
792+
793+
private def widenSetOperationTypes(plan: LogicalPlan): LogicalPlan = {
794+
timeZoneResolver(TypeCoercion.WidenSetOperationTypes(plan))
795+
}
796+
790797
test("WidenSetOperationTypes for except and intersect") {
791798
val firstTable = LocalRelation(
792799
AttributeReference("i", IntegerType)(),
@@ -799,11 +806,10 @@ class TypeCoercionSuite extends PlanTest {
799806
AttributeReference("f", FloatType)(),
800807
AttributeReference("l", LongType)())
801808

802-
val wt = TypeCoercion.WidenSetOperationTypes
803809
val expectedTypes = Seq(StringType, DecimalType.SYSTEM_DEFAULT, FloatType, DoubleType)
804810

805-
val r1 = wt(Except(firstTable, secondTable)).asInstanceOf[Except]
806-
val r2 = wt(Intersect(firstTable, secondTable)).asInstanceOf[Intersect]
811+
val r1 = widenSetOperationTypes(Except(firstTable, secondTable)).asInstanceOf[Except]
812+
val r2 = widenSetOperationTypes(Intersect(firstTable, secondTable)).asInstanceOf[Intersect]
807813
checkOutput(r1.left, expectedTypes)
808814
checkOutput(r1.right, expectedTypes)
809815
checkOutput(r2.left, expectedTypes)
@@ -838,10 +844,9 @@ class TypeCoercionSuite extends PlanTest {
838844
AttributeReference("p", ByteType)(),
839845
AttributeReference("q", DoubleType)())
840846

841-
val wt = TypeCoercion.WidenSetOperationTypes
842847
val expectedTypes = Seq(StringType, DecimalType.SYSTEM_DEFAULT, FloatType, DoubleType)
843848

844-
val unionRelation = wt(
849+
val unionRelation = widenSetOperationTypes(
845850
Union(firstTable :: secondTable :: thirdTable :: forthTable :: Nil)).asInstanceOf[Union]
846851
assert(unionRelation.children.length == 4)
847852
checkOutput(unionRelation.children.head, expectedTypes)
@@ -862,17 +867,15 @@ class TypeCoercionSuite extends PlanTest {
862867
}
863868
}
864869

865-
val dp = TypeCoercion.WidenSetOperationTypes
866-
867870
val left1 = LocalRelation(
868871
AttributeReference("l", DecimalType(10, 8))())
869872
val right1 = LocalRelation(
870873
AttributeReference("r", DecimalType(5, 5))())
871874
val expectedType1 = Seq(DecimalType(10, 8))
872875

873-
val r1 = dp(Union(left1, right1)).asInstanceOf[Union]
874-
val r2 = dp(Except(left1, right1)).asInstanceOf[Except]
875-
val r3 = dp(Intersect(left1, right1)).asInstanceOf[Intersect]
876+
val r1 = widenSetOperationTypes(Union(left1, right1)).asInstanceOf[Union]
877+
val r2 = widenSetOperationTypes(Except(left1, right1)).asInstanceOf[Except]
878+
val r3 = widenSetOperationTypes(Intersect(left1, right1)).asInstanceOf[Intersect]
876879

877880
checkOutput(r1.children.head, expectedType1)
878881
checkOutput(r1.children.last, expectedType1)
@@ -891,17 +894,17 @@ class TypeCoercionSuite extends PlanTest {
891894
val plan2 = LocalRelation(
892895
AttributeReference("r", rType)())
893896

894-
val r1 = dp(Union(plan1, plan2)).asInstanceOf[Union]
895-
val r2 = dp(Except(plan1, plan2)).asInstanceOf[Except]
896-
val r3 = dp(Intersect(plan1, plan2)).asInstanceOf[Intersect]
897+
val r1 = widenSetOperationTypes(Union(plan1, plan2)).asInstanceOf[Union]
898+
val r2 = widenSetOperationTypes(Except(plan1, plan2)).asInstanceOf[Except]
899+
val r3 = widenSetOperationTypes(Intersect(plan1, plan2)).asInstanceOf[Intersect]
897900

898901
checkOutput(r1.children.last, Seq(expectedType))
899902
checkOutput(r2.right, Seq(expectedType))
900903
checkOutput(r3.right, Seq(expectedType))
901904

902-
val r4 = dp(Union(plan2, plan1)).asInstanceOf[Union]
903-
val r5 = dp(Except(plan2, plan1)).asInstanceOf[Except]
904-
val r6 = dp(Intersect(plan2, plan1)).asInstanceOf[Intersect]
905+
val r4 = widenSetOperationTypes(Union(plan2, plan1)).asInstanceOf[Union]
906+
val r5 = widenSetOperationTypes(Except(plan2, plan1)).asInstanceOf[Except]
907+
val r6 = widenSetOperationTypes(Intersect(plan2, plan1)).asInstanceOf[Intersect]
905908

906909
checkOutput(r4.children.last, Seq(expectedType))
907910
checkOutput(r5.left, Seq(expectedType))

0 commit comments

Comments
 (0)