Skip to content

Commit b32dcd1

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into mapWithState
2 parents 7a39eaf + 7a0a630 commit b32dcd1

File tree

118 files changed

+1319
-951
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+1319
-951
lines changed

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,8 @@ setMethod("names",
323323
setMethod("names<-",
324324
signature(x = "SparkDataFrame"),
325325
function(x, value) {
326-
if (!is.null(value)) {
327-
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
328-
dataFrame(sdf)
329-
}
326+
colnames(x) <- value
327+
x
330328
})
331329

332330
#' @rdname columns

R/pkg/R/mllib_clustering.R

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,13 @@ setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"
375375

376376
#' @param object a fitted k-means model.
377377
#' @return \code{summary} returns summary information of the fitted model, which is a list.
378-
#' The list includes the model's \code{k} (number of cluster centers),
378+
#' The list includes the model's \code{k} (the configured number of cluster centers),
379379
#' \code{coefficients} (model cluster centers),
380-
#' \code{size} (number of data points in each cluster), and \code{cluster}
381-
#' (cluster centers of the transformed data).
380+
#' \code{size} (number of data points in each cluster), \code{cluster}
381+
#' (cluster centers of the transformed data), {is.loaded} (whether the model is loaded
382+
#' from a saved file), and \code{clusterSize}
383+
#' (the actual number of cluster centers. When using initMode = "random",
384+
#' \code{clusterSize} may not equal to \code{k}).
382385
#' @rdname spark.kmeans
383386
#' @export
384387
#' @note summary(KMeansModel) since 2.0.0
@@ -390,16 +393,17 @@ setMethod("summary", signature(object = "KMeansModel"),
390393
coefficients <- callJMethod(jobj, "coefficients")
391394
k <- callJMethod(jobj, "k")
392395
size <- callJMethod(jobj, "size")
393-
coefficients <- t(matrix(unlist(coefficients), ncol = k))
396+
clusterSize <- callJMethod(jobj, "clusterSize")
397+
coefficients <- t(matrix(unlist(coefficients), ncol = clusterSize))
394398
colnames(coefficients) <- unlist(features)
395-
rownames(coefficients) <- 1:k
399+
rownames(coefficients) <- 1:clusterSize
396400
cluster <- if (is.loaded) {
397401
NULL
398402
} else {
399403
dataFrame(callJMethod(jobj, "cluster"))
400404
}
401405
list(k = k, coefficients = coefficients, size = size,
402-
cluster = cluster, is.loaded = is.loaded)
406+
cluster = cluster, is.loaded = is.loaded, clusterSize = clusterSize)
403407
})
404408

405409
# Predicted values based on a k-means model

R/pkg/inst/tests/testthat/test_mllib_clustering.R

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,20 @@ test_that("spark.kmeans", {
196196
model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
197197
initMode = "random", seed = 22222, tol = 1E-5)
198198

199-
fitted.model1 <- fitted(model1)
200-
fitted.model2 <- fitted(model2)
199+
summary.model1 <- summary(model1)
200+
summary.model2 <- summary(model2)
201+
cluster1 <- summary.model1$cluster
202+
cluster2 <- summary.model2$cluster
203+
clusterSize1 <- summary.model1$clusterSize
204+
clusterSize2 <- summary.model2$clusterSize
205+
201206
# The predicted clusters are different
202-
expect_equal(sort(collect(distinct(select(fitted.model1, "prediction")))$prediction),
207+
expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction),
203208
c(0, 1, 2, 3))
204-
expect_equal(sort(collect(distinct(select(fitted.model2, "prediction")))$prediction),
209+
expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction),
205210
c(0, 1, 2))
211+
expect_equal(clusterSize1, 4)
212+
expect_equal(clusterSize2, 3)
206213
})
207214

208215
test_that("spark.lda with libsvm", {

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,14 @@ test_that("names() colnames() set the column names", {
869869
colnames(df) <- c("col3", "col4")
870870
expect_equal(names(df)[1], "col3")
871871

872+
expect_error(names(df) <- NULL, "Invalid column names.")
873+
expect_error(names(df) <- c("sepal.length", "sepal_width"),
874+
"Column names cannot contain the '.' symbol.")
875+
expect_error(names(df) <- c(1, 2), "Invalid column names.")
876+
expect_error(names(df) <- c("a"),
877+
"Column names must have the same length as the number of columns in the dataset.")
878+
expect_error(names(df) <- c("1", NA), "Column names cannot be NA.")
879+
872880
expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
873881
"Column names cannot contain the '.' symbol.")
874882
expect_error(colnames(df) <- c(1, 2), "Invalid column names.")

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,8 @@ SparkR supports the following machine learning models and algorithms.
488488

489489
#### Clustering
490490

491+
* Bisecting $k$-means
492+
491493
* Gaussian Mixture Model (GMM)
492494

493495
* $k$-means Clustering
@@ -738,6 +740,18 @@ summary(rfModel)
738740
predictions <- predict(rfModel, df)
739741
```
740742

743+
#### Bisecting k-Means
744+
745+
`spark.bisectingKmeans` is a kind of [hierarchical clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering) using a divisive (or "top-down") approach: all observations start in one cluster, and splits are performed recursively as one moves down the hierarchy.
746+
747+
```{r, warning=FALSE}
748+
df <- createDataFrame(iris)
749+
model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
750+
summary(model)
751+
fitted <- predict(model, df)
752+
head(select(fitted, "Sepal_Length", "prediction"))
753+
```
754+
741755
#### Gaussian Mixture Model
742756

743757
`spark.gaussianMixture` fits multivariate [Gaussian Mixture Model](https://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) (GMM) against a `SparkDataFrame`. [Expectation-Maximization](https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) (EM) is used to approximate the maximum likelihood estimator (MLE) of the model.

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
import javax.annotation.concurrent.GuardedBy;
2121
import java.io.IOException;
2222
import java.util.Arrays;
23+
import java.util.ArrayList;
2324
import java.util.BitSet;
2425
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.TreeMap;
2529

2630
import com.google.common.annotations.VisibleForTesting;
2731
import org.slf4j.Logger;
@@ -144,23 +148,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
144148
// spilling, avoid to have too many spilled files.
145149
if (got < required) {
146150
// Call spill() on other consumers to release memory
151+
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
152+
// which is just spilled in last few times and re-spilling on it will produce many small
153+
// spill files.
154+
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
147155
for (MemoryConsumer c: consumers) {
148156
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
149-
try {
150-
long released = c.spill(required - got, consumer);
151-
if (released > 0) {
152-
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
153-
Utils.bytesToString(released), c, consumer);
154-
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
155-
if (got >= required) {
156-
break;
157-
}
157+
long key = c.getUsed();
158+
List<MemoryConsumer> list = sortedConsumers.get(key);
159+
if (list == null) {
160+
list = new ArrayList<>(1);
161+
sortedConsumers.put(key, list);
162+
}
163+
list.add(c);
164+
}
165+
}
166+
while (!sortedConsumers.isEmpty()) {
167+
// Get the consumer using the least memory more than the remaining required memory.
168+
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
169+
sortedConsumers.ceilingEntry(required - got);
170+
// No consumer has used memory more than the remaining required memory.
171+
// Get the consumer of largest used memory.
172+
if (currentEntry == null) {
173+
currentEntry = sortedConsumers.lastEntry();
174+
}
175+
List<MemoryConsumer> cList = currentEntry.getValue();
176+
MemoryConsumer c = cList.remove(cList.size() - 1);
177+
if (cList.isEmpty()) {
178+
sortedConsumers.remove(currentEntry.getKey());
179+
}
180+
try {
181+
long released = c.spill(required - got, consumer);
182+
if (released > 0) {
183+
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
184+
Utils.bytesToString(released), c, consumer);
185+
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
186+
if (got >= required) {
187+
break;
158188
}
159-
} catch (IOException e) {
160-
logger.error("error while calling spill() on " + c, e);
161-
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
162-
+ e.getMessage());
163189
}
190+
} catch (IOException e) {
191+
logger.error("error while calling spill() on " + c, e);
192+
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
193+
+ e.getMessage());
164194
}
165195
}
166196
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
262262
/**
263263
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
264264
* suffix is provided then seconds are assumed.
265-
* @throws java.util.NoSuchElementException
265+
* @throws java.util.NoSuchElementException If the time parameter is not set
266266
*/
267267
def getTimeAsSeconds(key: String): Long = {
268268
Utils.timeStringAsSeconds(get(key))
@@ -279,7 +279,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
279279
/**
280280
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
281281
* suffix is provided then milliseconds are assumed.
282-
* @throws java.util.NoSuchElementException
282+
* @throws java.util.NoSuchElementException If the time parameter is not set
283283
*/
284284
def getTimeAsMs(key: String): Long = {
285285
Utils.timeStringAsMs(get(key))
@@ -296,7 +296,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
296296
/**
297297
* Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
298298
* suffix is provided then bytes are assumed.
299-
* @throws java.util.NoSuchElementException
299+
* @throws java.util.NoSuchElementException If the size parameter is not set
300300
*/
301301
def getSizeAsBytes(key: String): Long = {
302302
Utils.byteStringAsBytes(get(key))
@@ -320,7 +320,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
320320
/**
321321
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
322322
* suffix is provided then Kibibytes are assumed.
323-
* @throws java.util.NoSuchElementException
323+
* @throws java.util.NoSuchElementException If the size parameter is not set
324324
*/
325325
def getSizeAsKb(key: String): Long = {
326326
Utils.byteStringAsKb(get(key))
@@ -337,7 +337,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
337337
/**
338338
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
339339
* suffix is provided then Mebibytes are assumed.
340-
* @throws java.util.NoSuchElementException
340+
* @throws java.util.NoSuchElementException If the size parameter is not set
341341
*/
342342
def getSizeAsMb(key: String): Long = {
343343
Utils.byteStringAsMb(get(key))
@@ -354,7 +354,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
354354
/**
355355
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
356356
* suffix is provided then Gibibytes are assumed.
357-
* @throws java.util.NoSuchElementException
357+
* @throws java.util.NoSuchElementException If the size parameter is not set
358358
*/
359359
def getSizeAsGb(key: String): Long = {
360360
Utils.byteStringAsGb(get(key))

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,4 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
163163
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
164164
}
165165
}
166-
167-
/** Whether we are using a direct output committer */
168-
def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct")
169166
}

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,6 @@ object SparkHadoopMapReduceWriter extends Logging {
8383
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
8484
committer.setupJob(jobContext)
8585

86-
// When speculation is on and output committer class name contains "Direct", we should warn
87-
// users that they may loss data if they are using a direct output committer.
88-
if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) {
89-
val warningMessage =
90-
s"$committer may be an output committer that writes data directly to " +
91-
"the final location. Because speculation is enabled, this output committer may " +
92-
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
93-
"committer that does not have this behavior (e.g. FileOutputCommitter)."
94-
logWarning(warningMessage)
95-
}
96-
9786
// Try to write all RDD partitions as a Hadoop OutputFormat.
9887
try {
9988
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
@@ -230,10 +219,6 @@ object SparkHadoopWriterUtils {
230219
enabledInConf && !validationDisabled
231220
}
232221

233-
def isSpeculationEnabled(conf: SparkConf): Boolean = {
234-
conf.getBoolean("spark.speculation", false)
235-
}
236-
237222
// TODO: these don't seem like the right abstractions.
238223
// We should abstract the duplicate code in a less awkward way.
239224

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
496496
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
497497
*
498498
* @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
499-
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
499+
* key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
500500
*/
501501
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
502502
// groupByKey shouldn't use map side combine because map side combine does not
@@ -520,7 +520,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
520520
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
521521
*
522522
* @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
523-
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
523+
* key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
524524
*/
525525
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
526526
groupByKey(new HashPartitioner(numPartitions))

0 commit comments

Comments
 (0)