Skip to content

Commit a8a6a83

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into SPARK-16406
2 parents b4c8cdb + 44c7c62 commit a8a6a83

File tree

323 files changed

+11547
-2850
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

323 files changed

+11547
-2850
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,4 @@ spark-warehouse/
7777
# For R session data
7878
.RData
7979
.RHistory
80+
.Rhistory

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ exportMethods("arrange",
6969
"first",
7070
"freqItems",
7171
"gapply",
72+
"gapplyCollect",
7273
"group_by",
7374
"groupBy",
7475
"head",
@@ -234,6 +235,7 @@ exportMethods("%in%",
234235
"over",
235236
"percent_rank",
236237
"pmod",
238+
"posexplode",
237239
"quarter",
238240
"rand",
239241
"randn",

R/pkg/R/DataFrame.R

Lines changed: 112 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ setMethod("isLocal",
176176
#' @param x A SparkDataFrame
177177
#' @param numRows The number of rows to print. Defaults to 20.
178178
#' @param truncate Whether truncate long strings. If true, strings more than 20 characters will be
179-
#' truncated and all cells will be aligned right
180-
#'
179+
#' truncated. However, if set greater than zero, truncates strings longer than `truncate`
180+
#' characters and all cells will be aligned right.
181181
#' @family SparkDataFrame functions
182182
#' @rdname showDF
183183
#' @name showDF
@@ -193,7 +193,12 @@ setMethod("isLocal",
193193
setMethod("showDF",
194194
signature(x = "SparkDataFrame"),
195195
function(x, numRows = 20, truncate = TRUE) {
196-
s <- callJMethod(x@sdf, "showString", numToInt(numRows), truncate)
196+
if (is.logical(truncate) && truncate) {
197+
s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(20))
198+
} else {
199+
truncate2 <- as.numeric(truncate)
200+
s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(truncate2))
201+
}
197202
cat(s)
198203
})
199204

@@ -466,7 +471,7 @@ setMethod("createOrReplaceTempView",
466471

467472
#' (Deprecated) Register Temporary Table
468473
#'
469-
#' Registers a SparkDataFrame as a Temporary Table in the SQLContext
474+
#' Registers a SparkDataFrame as a Temporary Table in the SparkSession
470475
#' @param x A SparkDataFrame
471476
#' @param tableName A character vector containing the name of the table
472477
#'
@@ -493,7 +498,7 @@ setMethod("registerTempTable",
493498

494499
#' insertInto
495500
#'
496-
#' Insert the contents of a SparkDataFrame into a table registered in the current SQL Context.
501+
#' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession.
497502
#'
498503
#' @param x A SparkDataFrame
499504
#' @param tableName A character vector containing the name of the table
@@ -1339,7 +1344,7 @@ setMethod("dapplyCollect",
13391344

13401345
#' gapply
13411346
#'
1342-
#' Group the SparkDataFrame using the specified columns and apply the R function to each
1347+
#' Groups the SparkDataFrame using the specified columns and applies the R function to each
13431348
#' group.
13441349
#'
13451350
#' @param x A SparkDataFrame
@@ -1351,9 +1356,11 @@ setMethod("dapplyCollect",
13511356
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
13521357
#' The schema must match to output of `func`. It has to be defined for each
13531358
#' output column with preferred output column name and corresponding data type.
1359+
#' @return a SparkDataFrame
13541360
#' @family SparkDataFrame functions
13551361
#' @rdname gapply
13561362
#' @name gapply
1363+
#' @seealso \link{gapplyCollect}
13571364
#' @export
13581365
#' @examples
13591366
#'
@@ -1369,14 +1376,22 @@ setMethod("dapplyCollect",
13691376
#' columns with data types integer and string and the mean which is a double.
13701377
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
13711378
#' structField("avg", "double"))
1372-
#' df1 <- gapply(
1379+
#' result <- gapply(
13731380
#' df,
1374-
#' list("a", "c"),
1381+
#' c("a", "c"),
13751382
#' function(key, x) {
13761383
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1377-
#' },
1378-
#' schema)
1379-
#' collect(df1)
1384+
#' }, schema)
1385+
#'
1386+
#' We can also group the data and afterwards call gapply on GroupedData.
1387+
#' For Example:
1388+
#' gdf <- group_by(df, "a", "c")
1389+
#' result <- gapply(
1390+
#' gdf,
1391+
#' function(key, x) {
1392+
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1393+
#' }, schema)
1394+
#' collect(result)
13801395
#'
13811396
#' Result
13821397
#' ------
@@ -1394,16 +1409,16 @@ setMethod("dapplyCollect",
13941409
#' structField("Petal_Width", "double"))
13951410
#' df1 <- gapply(
13961411
#' df,
1397-
#' list(df$"Species"),
1412+
#' df$"Species",
13981413
#' function(key, x) {
13991414
#' m <- suppressWarnings(lm(Sepal_Length ~
14001415
#' Sepal_Width + Petal_Length + Petal_Width, x))
14011416
#' data.frame(t(coef(m)))
14021417
#' }, schema)
14031418
#' collect(df1)
14041419
#'
1405-
#'Result
1406-
#'---------
1420+
#' Result
1421+
#' ---------
14071422
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
14081423
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
14091424
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
@@ -1418,6 +1433,89 @@ setMethod("gapply",
14181433
gapply(grouped, func, schema)
14191434
})
14201435

1436+
#' gapplyCollect
1437+
#'
1438+
#' Groups the SparkDataFrame using the specified columns, applies the R function to each
1439+
#' group and collects the result back to R as data.frame.
1440+
#'
1441+
#' @param x A SparkDataFrame
1442+
#' @param cols Grouping columns
1443+
#' @param func A function to be applied to each group partition specified by grouping
1444+
#' column of the SparkDataFrame. The function `func` takes as argument
1445+
#' a key - grouping columns and a data frame - a local R data.frame.
1446+
#' The output of `func` is a local R data.frame.
1447+
#' @return a data.frame
1448+
#' @family SparkDataFrame functions
1449+
#' @rdname gapplyCollect
1450+
#' @name gapplyCollect
1451+
#' @seealso \link{gapply}
1452+
#' @export
1453+
#' @examples
1454+
#'
1455+
#' \dontrun{
1456+
#' Computes the arithmetic mean of the second column by grouping
1457+
#' on the first and third columns. Output the grouping values and the average.
1458+
#'
1459+
#' df <- createDataFrame (
1460+
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
1461+
#' c("a", "b", "c", "d"))
1462+
#'
1463+
#' result <- gapplyCollect(
1464+
#' df,
1465+
#' c("a", "c"),
1466+
#' function(key, x) {
1467+
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1468+
#' colnames(y) <- c("key_a", "key_c", "mean_b")
1469+
#' y
1470+
#' })
1471+
#'
1472+
#' We can also group the data and afterwards call gapply on GroupedData.
1473+
#' For Example:
1474+
#' gdf <- group_by(df, "a", "c")
1475+
#' result <- gapplyCollect(
1476+
#' gdf,
1477+
#' function(key, x) {
1478+
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1479+
#' colnames(y) <- c("key_a", "key_c", "mean_b")
1480+
#' y
1481+
#' })
1482+
#'
1483+
#' Result
1484+
#' ------
1485+
#' key_a key_c mean_b
1486+
#' 3 3 3.0
1487+
#' 1 1 1.5
1488+
#'
1489+
#' Fits linear models on iris dataset by grouping on the 'Species' column and
1490+
#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1491+
#' and 'Petal_Width' as training features.
1492+
#'
1493+
#' df <- createDataFrame (iris)
1494+
#' result <- gapplyCollect(
1495+
#' df,
1496+
#' df$"Species",
1497+
#' function(key, x) {
1498+
#' m <- suppressWarnings(lm(Sepal_Length ~
1499+
#' Sepal_Width + Petal_Length + Petal_Width, x))
1500+
#' data.frame(t(coef(m)))
1501+
#' })
1502+
#'
1503+
#' Result
1504+
#'---------
1505+
#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width
1506+
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
1507+
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
1508+
#' 3 2.351890 0.6548350 0.2375602 0.2521257
1509+
#'
1510+
#'}
1511+
#' @note gapplyCollect(SparkDataFrame) since 2.0.0
1512+
setMethod("gapplyCollect",
1513+
signature(x = "SparkDataFrame"),
1514+
function(x, cols, func) {
1515+
grouped <- do.call("groupBy", c(x, cols))
1516+
gapplyCollect(grouped, func)
1517+
})
1518+
14211519
############################## RDD Map Functions ##################################
14221520
# All of the following functions mirror the existing RDD map functions, #
14231521
# but allow for use with DataFrames by first converting to an RRDD before calling #

R/pkg/R/functions.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2934,3 +2934,20 @@ setMethod("sort_array",
29342934
jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", x@jc, asc)
29352935
column(jc)
29362936
})
2937+
2938+
#' posexplode
2939+
#'
2940+
#' Creates a new row for each element with position in the given array or map column.
2941+
#'
2942+
#' @rdname posexplode
2943+
#' @name posexplode
2944+
#' @family collection_funcs
2945+
#' @export
2946+
#' @examples \dontrun{posexplode(df$c)}
2947+
#' @note posexplode since 2.1.0
2948+
setMethod("posexplode",
2949+
signature(x = "Column"),
2950+
function(x) {
2951+
jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", x@jc)
2952+
column(jc)
2953+
})

R/pkg/R/generics.R

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,10 @@ setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect")
469469
#' @export
470470
setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
471471

472+
#' @rdname gapplyCollect
473+
#' @export
474+
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })
475+
472476
#' @rdname summary
473477
#' @export
474478
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
@@ -1050,6 +1054,10 @@ setGeneric("percent_rank", function(x) { standardGeneric("percent_rank") })
10501054
#' @export
10511055
setGeneric("pmod", function(y, x) { standardGeneric("pmod") })
10521056

1057+
#' @rdname posexplode
1058+
#' @export
1059+
setGeneric("posexplode", function(x) { standardGeneric("posexplode") })
1060+
10531061
#' @rdname quarter
10541062
#' @export
10551063
setGeneric("quarter", function(x) { standardGeneric("quarter") })
@@ -1247,6 +1255,7 @@ setGeneric("spark.glm", function(data, formula, ...) { standardGeneric("spark.gl
12471255
#' @export
12481256
setGeneric("glm")
12491257

1258+
#' predict
12501259
#' @rdname predict
12511260
#' @export
12521261
setGeneric("predict", function(object, ...) { standardGeneric("predict") })
@@ -1271,6 +1280,7 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s
12711280
#' @export
12721281
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })
12731282

1283+
#' write.ml
12741284
#' @rdname write.ml
12751285
#' @export
12761286
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })

R/pkg/R/group.R

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -196,64 +196,51 @@ createMethods()
196196

197197
#' gapply
198198
#'
199-
#' Applies a R function to each group in the input GroupedData
200-
#'
201-
#' @param x a GroupedData
202-
#' @param func A function to be applied to each group partition specified by GroupedData.
203-
#' The function `func` takes as argument a key - grouping columns and
204-
#' a data frame - a local R data.frame.
205-
#' The output of `func` is a local R data.frame.
206-
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
207-
#' The schema must match to output of `func`. It has to be defined for each
208-
#' output column with preferred output column name and corresponding data type.
209-
#' @return a SparkDataFrame
199+
#' @param x A GroupedData
210200
#' @rdname gapply
211201
#' @name gapply
212202
#' @export
213-
#' @examples
214-
#' \dontrun{
215-
#' Computes the arithmetic mean of the second column by grouping
216-
#' on the first and third columns. Output the grouping values and the average.
217-
#'
218-
#' df <- createDataFrame (
219-
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
220-
#' c("a", "b", "c", "d"))
221-
#'
222-
#' Here our output contains three columns, the key which is a combination of two
223-
#' columns with data types integer and string and the mean which is a double.
224-
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
225-
#' structField("avg", "double"))
226-
#' df1 <- gapply(
227-
#' df,
228-
#' list("a", "c"),
229-
#' function(key, x) {
230-
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
231-
#' },
232-
#' schema)
233-
#' collect(df1)
234-
#'
235-
#' Result
236-
#' ------
237-
#' a c avg
238-
#' 3 3 3.0
239-
#' 1 1 1.5
240-
#' }
241203
#' @note gapply(GroupedData) since 2.0.0
242204
setMethod("gapply",
243205
signature(x = "GroupedData"),
244206
function(x, func, schema) {
245-
try(if (is.null(schema)) stop("schema cannot be NULL"))
246-
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
247-
connection = NULL)
248-
broadcastArr <- lapply(ls(.broadcastNames),
249-
function(name) { get(name, .broadcastNames) })
250-
sdf <- callJStatic(
251-
"org.apache.spark.sql.api.r.SQLUtils",
252-
"gapply",
253-
x@sgd,
254-
serialize(cleanClosure(func), connection = NULL),
255-
packageNamesArr,
256-
broadcastArr,
257-
schema$jobj)
258-
dataFrame(sdf)
207+
if (is.null(schema)) stop("schema cannot be NULL")
208+
gapplyInternal(x, func, schema)
209+
})
210+
211+
#' gapplyCollect
212+
#'
213+
#' @param x A GroupedData
214+
#' @rdname gapplyCollect
215+
#' @name gapplyCollect
216+
#' @export
217+
#' @note gapplyCollect(GroupedData) since 2.0.0
218+
setMethod("gapplyCollect",
219+
signature(x = "GroupedData"),
220+
function(x, func) {
221+
gdf <- gapplyInternal(x, func, NULL)
222+
content <- callJMethod(gdf@sdf, "collect")
223+
# content is a list of items of struct type. Each item has a single field
224+
# which is a serialized data.frame corresponds to one group of the
225+
# SparkDataFrame.
226+
ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
227+
ldf <- do.call(rbind, ldfs)
228+
row.names(ldf) <- NULL
229+
ldf
259230
})
231+
232+
gapplyInternal <- function(x, func, schema) {
233+
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
234+
connection = NULL)
235+
broadcastArr <- lapply(ls(.broadcastNames),
236+
function(name) { get(name, .broadcastNames) })
237+
sdf <- callJStatic(
238+
"org.apache.spark.sql.api.r.SQLUtils",
239+
"gapply",
240+
x@sgd,
241+
serialize(cleanClosure(func), connection = NULL),
242+
packageNamesArr,
243+
broadcastArr,
244+
if (class(schema) == "structType") { schema$jobj } else { NULL })
245+
dataFrame(sdf)
246+
}

0 commit comments

Comments
 (0)