Skip to content

Commit ff5b75a

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into enable_codegen
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
2 parents f4cf2c2 + 7d669a5 commit ff5b75a

File tree

126 files changed

+2323
-1151
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

126 files changed

+2323
-1151
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spark-env.sh
2828
spark-env.cmd
2929
spark-env.sh.template
3030
log4j-defaults.properties
31+
log4j-defaults-repl.properties
3132
bootstrap-tooltip.js
3233
jquery-1.11.1.min.js
3334
d3.min.js

R/pkg/R/serialize.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ writeObject <- function(con, object, writeType = TRUE) {
3737
# passing in vectors as arrays and instead require arrays to be passed
3838
# as lists.
3939
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
40+
# Checking types is needed here, since ‘is.na’ only handles atomic vectors,
41+
# lists and pairlists
42+
if (type %in% c("integer", "character", "logical", "double", "numeric")) {
43+
if (is.na(object)) {
44+
object <- NULL
45+
type <- "NULL"
46+
}
47+
}
4048
if (writeType) {
4149
writeType(con, type)
4250
}

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,43 @@ test_that("create DataFrame from RDD", {
101101
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
102102
})
103103

104+
test_that("convert NAs to null type in DataFrames", {
105+
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
106+
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
107+
expect_true(is.na(collect(df)[2, "a"]))
108+
expect_equal(collect(df)[2, "b"], 4L)
109+
110+
l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
111+
df <- createDataFrame(sqlContext, l)
112+
expect_equal(collect(df)[2, "x"], 1L)
113+
expect_true(is.na(collect(df)[2, "y"]))
114+
115+
rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
116+
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
117+
expect_true(is.na(collect(df)[2, "a"]))
118+
expect_equal(collect(df)[2, "b"], 4)
119+
120+
l <- data.frame(x = 1, y = c(1, NA_real_, 3))
121+
df <- createDataFrame(sqlContext, l)
122+
expect_equal(collect(df)[2, "x"], 1)
123+
expect_true(is.na(collect(df)[2, "y"]))
124+
125+
l <- list("a", "b", NA, "d")
126+
df <- createDataFrame(sqlContext, l)
127+
expect_true(is.na(collect(df)[3, "_1"]))
128+
expect_equal(collect(df)[4, "_1"], "d")
129+
130+
l <- list("a", "b", NA_character_, "d")
131+
df <- createDataFrame(sqlContext, l)
132+
expect_true(is.na(collect(df)[3, "_1"]))
133+
expect_equal(collect(df)[4, "_1"], "d")
134+
135+
l <- list(TRUE, FALSE, NA, TRUE)
136+
df <- createDataFrame(sqlContext, l)
137+
expect_true(is.na(collect(df)[3, "_1"]))
138+
expect_equal(collect(df)[4, "_1"], TRUE)
139+
})
140+
104141
test_that("toDF", {
105142
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
106143
df <- toDF(rdd, list("a", "b"))
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
log4j.appender.console=org.apache.log4j.ConsoleAppender
4+
log4j.appender.console.target=System.err
5+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
6+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
7+
8+
# Settings to quiet third party logs that are too verbose
9+
log4j.logger.org.spark-project.jetty=WARN
10+
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
11+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
12+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ function renderDagVizForJob(svgContainer) {
235235
// them separately later. Note that we cannot draw them now because we need to
236236
// put these edges in a separate container that is on top of all stage graphs.
237237
metadata.selectAll(".incoming-edge").each(function(v) {
238-
var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
238+
var edge = d3.select(this).text().trim().split(","); // e.g. 3,4 => [3, 4]
239239
crossStageEdges.push(edge);
240240
});
241241
});

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,25 @@ trait Logging {
121121
if (usingLog4j12) {
122122
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
123123
if (!log4j12Initialized) {
124-
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
125-
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
126-
case Some(url) =>
127-
PropertyConfigurator.configure(url)
128-
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
129-
case None =>
130-
System.err.println(s"Spark was unable to load $defaultLogProps")
124+
if (Utils.isInInterpreter) {
125+
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
126+
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
127+
case Some(url) =>
128+
PropertyConfigurator.configure(url)
129+
System.err.println(s"Using Spark's repl log4j profile: $replDefaultLogProps")
130+
System.err.println("To adjust logging level use sc.setLogLevel(\"INFO\")")
131+
case None =>
132+
System.err.println(s"Spark was unable to load $replDefaultLogProps")
133+
}
134+
} else {
135+
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
136+
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
137+
case Some(url) =>
138+
PropertyConfigurator.configure(url)
139+
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
140+
case None =>
141+
System.err.println(s"Spark was unable to load $defaultLogProps")
142+
}
131143
}
132144
}
133145
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.util.concurrent.ConcurrentHashMap
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

24-
import scala.collection.mutable.{HashSet, Map}
24+
import scala.collection.mutable.{HashMap, HashSet, Map}
2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
2727

@@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
284284
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
285285
}
286286

287+
/**
288+
* Return a list of locations that each have fraction of map output greater than the specified
289+
* threshold.
290+
*
291+
* @param shuffleId id of the shuffle
292+
* @param reducerId id of the reduce task
293+
* @param numReducers total number of reducers in the shuffle
294+
* @param fractionThreshold fraction of total map output size that a location must have
295+
* for it to be considered large.
296+
*
297+
* This method is not thread-safe.
298+
*/
299+
def getLocationsWithLargestOutputs(
300+
shuffleId: Int,
301+
reducerId: Int,
302+
numReducers: Int,
303+
fractionThreshold: Double)
304+
: Option[Array[BlockManagerId]] = {
305+
306+
if (mapStatuses.contains(shuffleId)) {
307+
val statuses = mapStatuses(shuffleId)
308+
if (statuses.nonEmpty) {
309+
// HashMap to add up sizes of all blocks at the same location
310+
val locs = new HashMap[BlockManagerId, Long]
311+
var totalOutputSize = 0L
312+
var mapIdx = 0
313+
while (mapIdx < statuses.length) {
314+
val status = statuses(mapIdx)
315+
val blockSize = status.getSizeForBlock(reducerId)
316+
if (blockSize > 0) {
317+
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
318+
totalOutputSize += blockSize
319+
}
320+
mapIdx = mapIdx + 1
321+
}
322+
val topLocs = locs.filter { case (loc, size) =>
323+
size.toDouble / totalOutputSize >= fractionThreshold
324+
}
325+
// Return if we have any locations which satisfy the required threshold
326+
if (topLocs.nonEmpty) {
327+
return Some(topLocs.map(_._1).toArray)
328+
}
329+
}
330+
}
331+
None
332+
}
333+
287334
def incrementEpoch() {
288335
epochLock.synchronized {
289336
epoch += 1

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark
2020
import java.io.File
2121
import java.net.Socket
2222

23+
import akka.actor.ActorSystem
24+
2325
import scala.collection.JavaConversions._
2426
import scala.collection.mutable
2527
import scala.util.Properties
@@ -75,7 +77,8 @@ class SparkEnv (
7577
val conf: SparkConf) extends Logging {
7678

7779
// TODO Remove actorSystem
78-
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
80+
@deprecated("Actor system is no longer supported as of 1.4")
81+
val actorSystem: ActorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
7982

8083
private[spark] var isStopped = false
8184
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
2929
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
3030
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
3131

32-
import org.apache.spark.Logging
32+
import org.apache.spark.{Logging, SparkConf}
3333

3434
/**
3535
* Netty-based backend server that is used to communicate between R and Java.
@@ -41,7 +41,8 @@ private[spark] class RBackend {
4141
private[this] var bossGroup: EventLoopGroup = null
4242

4343
def init(): Int = {
44-
bossGroup = new NioEventLoopGroup(2)
44+
val conf = new SparkConf()
45+
bossGroup = new NioEventLoopGroup(conf.getInt("spark.r.numRBackendThreads", 2))
4546
val workerGroup = bossGroup
4647
val handler = new RBackendHandler(this)
4748

core/src/main/scala/org/apache/spark/api/r/SerDe.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.api.r
1919

2020
import java.io.{DataInputStream, DataOutputStream}
21-
import java.sql.{Date, Time}
21+
import java.sql.{Timestamp, Date, Time}
2222

2323
import scala.collection.JavaConversions._
2424

@@ -107,9 +107,12 @@ private[spark] object SerDe {
107107
Date.valueOf(readString(in))
108108
}
109109

110-
def readTime(in: DataInputStream): Time = {
111-
val t = in.readDouble()
112-
new Time((t * 1000L).toLong)
110+
def readTime(in: DataInputStream): Timestamp = {
111+
val seconds = in.readDouble()
112+
val sec = Math.floor(seconds).toLong
113+
val t = new Timestamp(sec * 1000L)
114+
t.setNanos(((seconds - sec) * 1e9).toInt)
115+
t
113116
}
114117

115118
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
227230
case "java.sql.Time" =>
228231
writeType(dos, "time")
229232
writeTime(dos, value.asInstanceOf[Time])
233+
case "java.sql.Timestamp" =>
234+
writeType(dos, "time")
235+
writeTime(dos, value.asInstanceOf[Timestamp])
230236
case "[B" =>
231237
writeType(dos, "raw")
232238
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
289295
out.writeDouble(value.getTime.toDouble / 1000.0)
290296
}
291297

298+
def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
299+
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
300+
}
292301

293302
// NOTE: Only works for ASCII right now
294303
def writeString(out: DataOutputStream, value: String): Unit = {

0 commit comments

Comments
 (0)