Skip to content

Commit a25d00c

Browse files
author
ding
committed
code rebase
2 parents e786838 + 0fbecc7 commit a25d00c

File tree

635 files changed

+20476
-9771
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

635 files changed

+20476
-9771
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ notifications:
4444
# 5. Run maven install before running lint-java.
4545
install:
4646
- export MAVEN_SKIP_RC=1
47-
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
47+
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
4848

4949
# 6. Run lint-java.
5050
script:

R/pkg/NAMESPACE

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
118
# Imports from base R
219
# Do not include stats:: "rpois", "runif" - causes error at runtime
320
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
@@ -47,7 +64,8 @@ exportMethods("glm",
4764
"spark.kstest",
4865
"spark.logit",
4966
"spark.randomForest",
50-
"spark.gbt")
67+
"spark.gbt",
68+
"spark.bisectingKmeans")
5169

5270
# Job group lifecycle management methods
5371
export("setJobGroup",
@@ -94,6 +112,7 @@ exportMethods("arrange",
94112
"freqItems",
95113
"gapply",
96114
"gapplyCollect",
115+
"getNumPartitions",
97116
"group_by",
98117
"groupBy",
99118
"head",
@@ -306,6 +325,7 @@ exportMethods("%in%",
306325
"toDegrees",
307326
"toRadians",
308327
"to_date",
328+
"to_timestamp",
309329
"to_utc_timestamp",
310330
"translate",
311331
"trim",

R/pkg/R/DataFrame.R

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,8 @@ setMethod("names",
323323
setMethod("names<-",
324324
signature(x = "SparkDataFrame"),
325325
function(x, value) {
326-
if (!is.null(value)) {
327-
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
328-
dataFrame(sdf)
329-
}
326+
colnames(x) <- value
327+
x
330328
})
331329

332330
#' @rdname columns
@@ -1717,6 +1715,23 @@ getColumn <- function(x, c) {
17171715
column(callJMethod(x@sdf, "col", c))
17181716
}
17191717

1718+
setColumn <- function(x, c, value) {
1719+
if (class(value) != "Column" && !is.null(value)) {
1720+
if (isAtomicLengthOne(value)) {
1721+
value <- lit(value)
1722+
} else {
1723+
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1724+
}
1725+
}
1726+
1727+
if (is.null(value)) {
1728+
nx <- drop(x, c)
1729+
} else {
1730+
nx <- withColumn(x, c, value)
1731+
}
1732+
nx
1733+
}
1734+
17201735
#' @param name name of a Column (without being wrapped by \code{""}).
17211736
#' @rdname select
17221737
#' @name $
@@ -1735,19 +1750,7 @@ setMethod("$", signature(x = "SparkDataFrame"),
17351750
#' @note $<- since 1.4.0
17361751
setMethod("$<-", signature(x = "SparkDataFrame"),
17371752
function(x, name, value) {
1738-
if (class(value) != "Column" && !is.null(value)) {
1739-
if (isAtomicLengthOne(value)) {
1740-
value <- lit(value)
1741-
} else {
1742-
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1743-
}
1744-
}
1745-
1746-
if (is.null(value)) {
1747-
nx <- drop(x, name)
1748-
} else {
1749-
nx <- withColumn(x, name, value)
1750-
}
1753+
nx <- setColumn(x, name, value)
17511754
x@sdf <- nx@sdf
17521755
x
17531756
})
@@ -1767,6 +1770,21 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17671770
getColumn(x, i)
17681771
})
17691772

1773+
#' @rdname subset
1774+
#' @name [[<-
1775+
#' @aliases [[<-,SparkDataFrame,numericOrcharacter-method
1776+
#' @note [[<- since 2.1.1
1777+
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1778+
function(x, i, value) {
1779+
if (is.numeric(i)) {
1780+
cols <- columns(x)
1781+
i <- cols[[i]]
1782+
}
1783+
nx <- setColumn(x, i, value)
1784+
x@sdf <- nx@sdf
1785+
x
1786+
})
1787+
17701788
#' @rdname subset
17711789
#' @name [
17721790
#' @aliases [,SparkDataFrame-method
@@ -1811,14 +1829,19 @@ setMethod("[", signature(x = "SparkDataFrame"),
18111829
#' Return subsets of SparkDataFrame according to given conditions
18121830
#' @param x a SparkDataFrame.
18131831
#' @param i,subset (Optional) a logical expression to filter on rows.
1832+
#' For extract operator [[ and replacement operator [[<-, the indexing parameter for
1833+
#' a single Column.
18141834
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
18151835
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
18161836
#' Otherwise, a SparkDataFrame will always be returned.
1837+
#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
1838+
#' If \code{NULL}, the specified Column is dropped.
18171839
#' @param ... currently not used.
18181840
#' @return A new SparkDataFrame containing only the rows that meet the condition with selected columns.
18191841
#' @export
18201842
#' @family SparkDataFrame functions
18211843
#' @aliases subset,SparkDataFrame-method
1844+
#' @seealso \link{withColumn}
18221845
#' @rdname subset
18231846
#' @name subset
18241847
#' @family subsetting functions
@@ -1836,6 +1859,10 @@ setMethod("[", signature(x = "SparkDataFrame"),
18361859
#' subset(df, df$age %in% c(19, 30), 1:2)
18371860
#' subset(df, df$age %in% c(19), select = c(1,2))
18381861
#' subset(df, select = c(1,2))
1862+
#' # Columns can be selected and set
1863+
#' df[["age"]] <- 23
1864+
#' df[[1]] <- df$age
1865+
#' df[[2]] <- NULL # drop column
18391866
#' }
18401867
#' @note subset since 1.5.0
18411868
setMethod("subset", signature(x = "SparkDataFrame"),
@@ -1960,7 +1987,7 @@ setMethod("selectExpr",
19601987
#' @aliases withColumn,SparkDataFrame,character-method
19611988
#' @rdname withColumn
19621989
#' @name withColumn
1963-
#' @seealso \link{rename} \link{mutate}
1990+
#' @seealso \link{rename} \link{mutate} \link{subset}
19641991
#' @export
19651992
#' @examples
19661993
#'\dontrun{
@@ -1971,6 +1998,10 @@ setMethod("selectExpr",
19711998
#' # Replace an existing column
19721999
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
19732000
#' newDF3 <- withColumn(newDF, "newCol", 42)
2001+
#' # Use extract operator to set an existing or new column
2002+
#' df[["age"]] <- 23
2003+
#' df[[2]] <- df$col1
2004+
#' df[[2]] <- NULL # drop column
19742005
#' }
19752006
#' @note withColumn since 1.4.0
19762007
setMethod("withColumn",
@@ -3406,3 +3437,26 @@ setMethod("randomSplit",
34063437
}
34073438
sapply(sdfs, dataFrame)
34083439
})
3440+
3441+
#' getNumPartitions
3442+
#'
3443+
#' Return the number of partitions
3444+
#'
3445+
#' @param x A SparkDataFrame
3446+
#' @family SparkDataFrame functions
3447+
#' @aliases getNumPartitions,SparkDataFrame-method
3448+
#' @rdname getNumPartitions
3449+
#' @name getNumPartitions
3450+
#' @export
3451+
#' @examples
3452+
#'\dontrun{
3453+
#' sparkR.session()
3454+
#' df <- createDataFrame(cars, numPartitions = 2)
3455+
#' getNumPartitions(df)
3456+
#' }
3457+
#' @note getNumPartitions since 2.1.1
3458+
setMethod("getNumPartitions",
3459+
signature(x = "SparkDataFrame"),
3460+
function(x) {
3461+
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
3462+
})

R/pkg/R/RDD.R

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ setMethod("checkpoint",
313313
#' @rdname getNumPartitions
314314
#' @aliases getNumPartitions,RDD-method
315315
#' @noRd
316-
setMethod("getNumPartitions",
316+
setMethod("getNumPartitionsRDD",
317317
signature(x = "RDD"),
318318
function(x) {
319319
callJMethod(getJRDD(x), "getNumPartitions")
@@ -329,7 +329,7 @@ setMethod("numPartitions",
329329
signature(x = "RDD"),
330330
function(x) {
331331
.Deprecated("getNumPartitions")
332-
getNumPartitions(x)
332+
getNumPartitionsRDD(x)
333333
})
334334

335335
#' Collect elements of an RDD
@@ -460,7 +460,7 @@ setMethod("countByValue",
460460
signature(x = "RDD"),
461461
function(x) {
462462
ones <- lapply(x, function(item) { list(item, 1L) })
463-
collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
463+
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
464464
})
465465

466466
#' Apply a function to all elements
@@ -780,7 +780,7 @@ setMethod("takeRDD",
780780
resList <- list()
781781
index <- -1
782782
jrdd <- getJRDD(x)
783-
numPartitions <- getNumPartitions(x)
783+
numPartitions <- getNumPartitionsRDD(x)
784784
serializedModeRDD <- getSerializedMode(x)
785785

786786
# TODO(shivaram): Collect more than one partition based on size
@@ -846,7 +846,7 @@ setMethod("firstRDD",
846846
#' @noRd
847847
setMethod("distinctRDD",
848848
signature(x = "RDD"),
849-
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
849+
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
850850
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
851851
reduced <- reduceByKey(identical.mapped,
852852
function(x, y) { x },
@@ -1053,7 +1053,7 @@ setMethod("coalesce",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)
1056-
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
1056+
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
10571057
func <- function(partIndex, part) {
10581058
set.seed(partIndex) # partIndex as seed
10591059
start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
11431143
#' @noRd
11441144
setMethod("sortBy",
11451145
signature(x = "RDD", func = "function"),
1146-
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
1146+
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
11471147
values(sortByKey(keyBy(x, func), ascending, numPartitions))
11481148
})
11491149

@@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
11751175
resList <- list()
11761176
index <- -1
11771177
jrdd <- getJRDD(newRdd)
1178-
numPartitions <- getNumPartitions(newRdd)
1178+
numPartitions <- getNumPartitionsRDD(newRdd)
11791179
serializedModeRDD <- getSerializedMode(newRdd)
11801180

11811181
while (TRUE) {
@@ -1407,7 +1407,7 @@ setMethod("setName",
14071407
setMethod("zipWithUniqueId",
14081408
signature(x = "RDD"),
14091409
function(x) {
1410-
n <- getNumPartitions(x)
1410+
n <- getNumPartitionsRDD(x)
14111411

14121412
partitionFunc <- function(partIndex, part) {
14131413
mapply(
@@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
14501450
setMethod("zipWithIndex",
14511451
signature(x = "RDD"),
14521452
function(x) {
1453-
n <- getNumPartitions(x)
1453+
n <- getNumPartitionsRDD(x)
14541454
if (n > 1) {
14551455
nums <- collectRDD(lapplyPartition(x,
14561456
function(part) {
@@ -1566,8 +1566,8 @@ setMethod("unionRDD",
15661566
setMethod("zipRDD",
15671567
signature(x = "RDD", other = "RDD"),
15681568
function(x, other) {
1569-
n1 <- getNumPartitions(x)
1570-
n2 <- getNumPartitions(other)
1569+
n1 <- getNumPartitionsRDD(x)
1570+
n2 <- getNumPartitionsRDD(other)
15711571
if (n1 != n2) {
15721572
stop("Can only zip RDDs which have the same number of partitions.")
15731573
}
@@ -1637,7 +1637,7 @@ setMethod("cartesian",
16371637
#' @noRd
16381638
setMethod("subtract",
16391639
signature(x = "RDD", other = "RDD"),
1640-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
1640+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
16411641
mapFunction <- function(e) { list(e, NA) }
16421642
rdd1 <- map(x, mapFunction)
16431643
rdd2 <- map(other, mapFunction)
@@ -1671,7 +1671,7 @@ setMethod("subtract",
16711671
#' @noRd
16721672
setMethod("intersection",
16731673
signature(x = "RDD", other = "RDD"),
1674-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
1674+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
16751675
rdd1 <- map(x, function(v) { list(v, NA) })
16761676
rdd2 <- map(other, function(v) { list(v, NA) })
16771677

@@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
17141714
if (length(rrdds) == 1) {
17151715
return(rrdds[[1]])
17161716
}
1717-
nPart <- sapply(rrdds, getNumPartitions)
1717+
nPart <- sapply(rrdds, getNumPartitionsRDD)
17181718
if (length(unique(nPart)) != 1) {
17191719
stop("Can only zipPartitions RDDs which have the same number of partitions.")
17201720
}

0 commit comments

Comments
 (0)