Skip to content

Commit 6b7f2ce

Browse files
author
Davies Liu
committed
[SPARK-8307] [SQL] improve timestamp from parquet
This PR change to convert julian day to unix timestamp directly (without Calendar and Timestamp). cc adrian-wang rxin Author: Davies Liu <[email protected]> Closes #6759 from davies/improve_ts and squashes the following commits: 849e301 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts b0e4cad [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 8e2d56f [Davies Liu] address comments 634b9f5 [Davies Liu] fix mima 4891efb [Davies Liu] address comment bfc437c [Davies Liu] fix build ae5979c [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 602b969 [Davies Liu] remove jodd 2f2e48c [Davies Liu] fix test 8ace611 [Davies Liu] fix mima 212143b [Davies Liu] fix mina c834108 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts a3171b8 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 5233974 [Davies Liu] fix scala style 361fd62 [Davies Liu] address comments ea196d4 [Davies Liu] improve timestamp from parquet
1 parent 860a49e commit 6b7f2ce

File tree

24 files changed

+175
-252
lines changed

24 files changed

+175
-252
lines changed

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@
156156
<scala.binary.version>2.10</scala.binary.version>
157157
<jline.version>${scala.version}</jline.version>
158158
<jline.groupid>org.scala-lang</jline.groupid>
159-
<jodd.version>3.6.3</jodd.version>
160159
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
161160
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
162161
<snappy.version>1.1.1.7</snappy.version>

project/MimaExcludes.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,17 @@ object MimaExcludes {
5454
ProblemFilters.exclude[MissingMethodProblem](
5555
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
5656
// SQL execution is considered private.
57-
excludePackage("org.apache.spark.sql.execution")
57+
excludePackage("org.apache.spark.sql.execution"),
58+
// NanoTime and CatalystTimestampConverter is only used inside catalyst,
59+
// not needed anymore
60+
ProblemFilters.exclude[MissingClassProblem](
61+
"org.apache.spark.sql.parquet.timestamp.NanoTime"),
62+
ProblemFilters.exclude[MissingClassProblem](
63+
"org.apache.spark.sql.parquet.timestamp.NanoTime$"),
64+
ProblemFilters.exclude[MissingClassProblem](
65+
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
66+
ProblemFilters.exclude[MissingClassProblem](
67+
"org.apache.spark.sql.parquet.CatalystTimestampConverter$")
5868
)
5969
case v if v.startsWith("1.4") =>
6070
Seq(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst
1919

2020
import java.lang.{Iterable => JavaIterable}
2121
import java.math.{BigDecimal => JavaBigDecimal}
22-
import java.sql.{Timestamp, Date}
22+
import java.sql.{Date, Timestamp}
2323
import java.util.{Map => JavaMap}
2424
import javax.annotation.Nullable
2525

2626
import scala.collection.mutable.HashMap
2727

28-
import org.apache.spark.sql.catalyst.expressions._
29-
import org.apache.spark.sql.catalyst.util.DateUtils
3028
import org.apache.spark.sql.Row
29+
import org.apache.spark.sql.catalyst.expressions._
30+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3131
import org.apache.spark.sql.types._
3232
import org.apache.spark.unsafe.types.UTF8String
3333

@@ -272,18 +272,18 @@ object CatalystTypeConverters {
272272
}
273273

274274
private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
275-
override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
275+
override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
276276
override def toScala(catalystValue: Any): Date =
277-
if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
277+
if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
278278
override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
279279
}
280280

281281
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
282282
override def toCatalystImpl(scalaValue: Timestamp): Long =
283-
DateUtils.fromJavaTimestamp(scalaValue)
283+
DateTimeUtils.fromJavaTimestamp(scalaValue)
284284
override def toScala(catalystValue: Any): Timestamp =
285285
if (catalystValue == null) null
286-
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
286+
else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
287287
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
288288
toScala(row.getLong(column))
289289
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.text.{DateFormat, SimpleDateFormat}
2424
import org.apache.spark.Logging
2525
import org.apache.spark.sql.catalyst
2626
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
27-
import org.apache.spark.sql.catalyst.util.DateUtils
27+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2828
import org.apache.spark.sql.types._
2929
import org.apache.spark.unsafe.types.UTF8String
3030

@@ -115,9 +115,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
115115
// UDFToString
116116
private[this] def castToString(from: DataType): Any => Any = from match {
117117
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
118-
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
118+
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d)))
119119
case TimestampType => buildCast[Long](_,
120-
t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
120+
t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
121121
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
122122
}
123123

@@ -162,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
162162
if (periodIdx != -1 && n.length() - periodIdx > 9) {
163163
n = n.substring(0, periodIdx + 10)
164164
}
165-
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
165+
try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
166166
catch { case _: java.lang.IllegalArgumentException => null }
167167
})
168168
case BooleanType =>
@@ -176,7 +176,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
176176
case ByteType =>
177177
buildCast[Byte](_, b => longToTimestamp(b.toLong))
178178
case DateType =>
179-
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
179+
buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000)
180180
// TimestampWritable.decimalToTimestamp
181181
case DecimalType() =>
182182
buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -225,13 +225,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
225225
private[this] def castToDate(from: DataType): Any => Any = from match {
226226
case StringType =>
227227
buildCast[UTF8String](_, s =>
228-
try DateUtils.fromJavaDate(Date.valueOf(s.toString))
228+
try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
229229
catch { case _: java.lang.IllegalArgumentException => null }
230230
)
231231
case TimestampType =>
232232
// throw valid precision more than seconds, according to Hive.
233233
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
234-
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
234+
buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L))
235235
// Hive throws this exception as a Semantic Exception
236236
// It is never possible to compare result when hive return with exception,
237237
// so we can return null
@@ -442,7 +442,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
442442
case (DateType, StringType) =>
443443
defineCodeGen(ctx, ev, c =>
444444
s"""${ctx.stringType}.fromString(
445-
org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""")
445+
org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""")
446446
// Special handling required for timestamps in hive test cases since the toString function
447447
// does not match the expected output.
448448
case (TimestampType, StringType) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
2222
import org.apache.spark.sql.catalyst
2323
import org.apache.spark.sql.catalyst.CatalystTypeConverters
2424
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
25-
import org.apache.spark.sql.catalyst.util.DateUtils
25+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2626
import org.apache.spark.sql.types._
2727
import org.apache.spark.unsafe.types.UTF8String
2828

@@ -39,8 +39,8 @@ object Literal {
3939
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
4040
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
4141
case d: Decimal => Literal(d, DecimalType.Unlimited)
42-
case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
43-
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
42+
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
43+
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
4444
case a: Array[Byte] => Literal(a, BinaryType)
4545
case null => Literal(null, NullType)
4646
case _ =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,28 @@
1717

1818
package org.apache.spark.sql.catalyst.util
1919

20-
import java.sql.{Timestamp, Date}
20+
import java.sql.{Date, Timestamp}
2121
import java.text.SimpleDateFormat
2222
import java.util.{Calendar, TimeZone}
2323

2424
import org.apache.spark.sql.catalyst.expressions.Cast
2525

2626
/**
27-
* Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
27+
* Helper functions for converting between internal and external date and time representations.
28+
* Dates are exposed externally as java.sql.Date and are represented internally as the number of
29+
* dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp
30+
* and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond
31+
* precision.
2832
*/
29-
object DateUtils {
30-
private val MILLIS_PER_DAY = 86400000
31-
private val HUNDRED_NANOS_PER_SECOND = 10000000L
33+
object DateTimeUtils {
34+
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
35+
36+
// see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
37+
final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5
38+
final val SECONDS_PER_DAY = 60 * 60 * 24L
39+
final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L
40+
final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100
41+
3242

3343
// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
3444
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
@@ -117,4 +127,25 @@ object DateUtils {
117127
0L
118128
}
119129
}
130+
131+
/**
132+
* Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day
133+
* and nanoseconds in a day
134+
*/
135+
def fromJulianDay(day: Int, nanoseconds: Long): Long = {
136+
// use Long to avoid rounding errors
137+
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
138+
seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
139+
}
140+
141+
/**
142+
* Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
143+
*/
144+
def toJulianDay(num100ns: Long): (Int, Long) = {
145+
val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
146+
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
147+
val secondsInDay = seconds % SECONDS_PER_DAY
148+
val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L
149+
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
150+
}
120151
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
2020
import java.sql.{Timestamp, Date}
2121

2222
import org.apache.spark.SparkFunSuite
23-
import org.apache.spark.sql.catalyst.util.DateUtils
23+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2424
import org.apache.spark.sql.types._
2525

2626
/**
@@ -156,7 +156,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
156156
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
157157
checkEvaluation(cast(cast(d, StringType), DateType), 0)
158158
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
159-
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
159+
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
160160

161161
// all convert to string type to check
162162
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
@@ -301,9 +301,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
301301
checkEvaluation(cast(ts, LongType), 15.toLong)
302302
checkEvaluation(cast(ts, FloatType), 15.002f)
303303
checkEvaluation(cast(ts, DoubleType), 15.002)
304-
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
305-
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
306-
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
304+
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
305+
checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
306+
DateTimeUtils.fromJavaTimestamp(ts))
307+
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
307308
checkEvaluation(
308309
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
309310
millis.toFloat / 1000)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet
2323

2424
import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.sql.catalyst.dsl.expressions._
26-
import org.apache.spark.sql.catalyst.util.DateUtils
26+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2727
import org.apache.spark.sql.types.{IntegerType, BooleanType}
2828

2929

@@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
167167
checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row)
168168
checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row)
169169

170-
val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
171-
val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
170+
val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))
171+
val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02"))
172172
checkEvaluation(Literal(d1) < Literal(d2), true)
173173

174174
val ts1 = new Timestamp(12)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Arrays
2323
import org.scalatest.Matchers
2424

2525
import org.apache.spark.SparkFunSuite
26-
import org.apache.spark.sql.catalyst.util.DateUtils
26+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2727
import org.apache.spark.sql.types._
2828
import org.apache.spark.unsafe.PlatformDependent
2929
import org.apache.spark.unsafe.array.ByteArrayMethods
@@ -83,8 +83,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
8383
val row = new SpecificMutableRow(fieldTypes)
8484
row.setLong(0, 0)
8585
row.setString(1, "Hello")
86-
row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01")))
87-
row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
86+
row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
87+
row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
8888

8989
val sizeRequired: Int = converter.getSizeRequirement(row)
9090
sizeRequired should be (8 + (8 * 4) +
@@ -98,9 +98,9 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
9898
unsafeRow.getLong(0) should be (0)
9999
unsafeRow.getString(1) should be ("Hello")
100100
// Date is represented as Int in unsafeRow
101-
DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
101+
DateTimeUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
102102
// Timestamp is represented as Long in unsafeRow
103-
DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
103+
DateTimeUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
104104
(Timestamp.valueOf("2015-05-08 08:10:25"))
105105
}
106106

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala renamed to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,31 @@ import java.sql.Timestamp
2121

2222
import org.apache.spark.SparkFunSuite
2323

24-
class DateUtilsSuite extends SparkFunSuite {
24+
class DateTimeUtilsSuite extends SparkFunSuite {
2525

26-
test("timestamp") {
26+
test("timestamp and 100ns") {
2727
val now = new Timestamp(System.currentTimeMillis())
2828
now.setNanos(100)
29-
val ns = DateUtils.fromJavaTimestamp(now)
30-
assert(ns % 10000000L == 1)
31-
assert(DateUtils.toJavaTimestamp(ns) == now)
29+
val ns = DateTimeUtils.fromJavaTimestamp(now)
30+
assert(ns % 10000000L === 1)
31+
assert(DateTimeUtils.toJavaTimestamp(ns) === now)
3232

3333
List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
34-
val ts = DateUtils.toJavaTimestamp(t)
35-
assert(DateUtils.fromJavaTimestamp(ts) == t)
36-
assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
34+
val ts = DateTimeUtils.toJavaTimestamp(t)
35+
assert(DateTimeUtils.fromJavaTimestamp(ts) === t)
36+
assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) === ts)
3737
}
3838
}
39+
40+
test("100ns and julian day") {
41+
val (d, ns) = DateTimeUtils.toJulianDay(0)
42+
assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH)
43+
assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND)
44+
assert(DateTimeUtils.fromJulianDay(d, ns) == 0L)
45+
46+
val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
47+
val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t))
48+
val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1))
49+
assert(t.equals(t2))
50+
}
3951
}

0 commit comments

Comments
 (0)