Skip to content

Commit 9ad2c85

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-17154
2 parents 437ac99 + c8c0906 commit 9ad2c85

File tree

422 files changed

+11485
-3252
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

422 files changed

+11485
-3252
lines changed

R/pkg/R/DataFrame.R

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
5555
.Object
5656
})
5757

58+
#' Set options/mode and then return the write object
59+
#' @noRd
60+
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
61+
options <- varargsToStrEnv(...)
62+
if (!is.null(path)) {
63+
options[["path"]] <- path
64+
}
65+
jmode <- convertToJSaveMode(mode)
66+
write <- callJMethod(write, "mode", jmode)
67+
write <- callJMethod(write, "options", options)
68+
write
69+
}
70+
5871
#' @export
5972
#' @param sdf A Java object reference to the backing Scala DataFrame
6073
#' @param isCached TRUE if the SparkDataFrame is cached
@@ -727,6 +740,8 @@ setMethod("toJSON",
727740
#'
728741
#' @param x A SparkDataFrame
729742
#' @param path The directory where the file is saved
743+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
744+
#' @param ... additional argument(s) passed to the method.
730745
#'
731746
#' @family SparkDataFrame functions
732747
#' @rdname write.json
@@ -743,8 +758,9 @@ setMethod("toJSON",
743758
#' @note write.json since 1.6.0
744759
setMethod("write.json",
745760
signature(x = "SparkDataFrame", path = "character"),
746-
function(x, path) {
761+
function(x, path, mode = "error", ...) {
747762
write <- callJMethod(x@sdf, "write")
763+
write <- setWriteOptions(write, mode = mode, ...)
748764
invisible(callJMethod(write, "json", path))
749765
})
750766

@@ -755,6 +771,8 @@ setMethod("write.json",
755771
#'
756772
#' @param x A SparkDataFrame
757773
#' @param path The directory where the file is saved
774+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
775+
#' @param ... additional argument(s) passed to the method.
758776
#'
759777
#' @family SparkDataFrame functions
760778
#' @aliases write.orc,SparkDataFrame,character-method
@@ -771,8 +789,9 @@ setMethod("write.json",
771789
#' @note write.orc since 2.0.0
772790
setMethod("write.orc",
773791
signature(x = "SparkDataFrame", path = "character"),
774-
function(x, path) {
792+
function(x, path, mode = "error", ...) {
775793
write <- callJMethod(x@sdf, "write")
794+
write <- setWriteOptions(write, mode = mode, ...)
776795
invisible(callJMethod(write, "orc", path))
777796
})
778797

@@ -783,6 +802,8 @@ setMethod("write.orc",
783802
#'
784803
#' @param x A SparkDataFrame
785804
#' @param path The directory where the file is saved
805+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
806+
#' @param ... additional argument(s) passed to the method.
786807
#'
787808
#' @family SparkDataFrame functions
788809
#' @rdname write.parquet
@@ -800,8 +821,9 @@ setMethod("write.orc",
800821
#' @note write.parquet since 1.6.0
801822
setMethod("write.parquet",
802823
signature(x = "SparkDataFrame", path = "character"),
803-
function(x, path) {
824+
function(x, path, mode = "error", ...) {
804825
write <- callJMethod(x@sdf, "write")
826+
write <- setWriteOptions(write, mode = mode, ...)
805827
invisible(callJMethod(write, "parquet", path))
806828
})
807829

@@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
825847
#'
826848
#' @param x A SparkDataFrame
827849
#' @param path The directory where the file is saved
850+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
851+
#' @param ... additional argument(s) passed to the method.
828852
#'
829853
#' @family SparkDataFrame functions
830854
#' @aliases write.text,SparkDataFrame,character-method
@@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
841865
#' @note write.text since 2.0.0
842866
setMethod("write.text",
843867
signature(x = "SparkDataFrame", path = "character"),
844-
function(x, path) {
868+
function(x, path, mode = "error", ...) {
845869
write <- callJMethod(x@sdf, "write")
870+
write <- setWriteOptions(write, mode = mode, ...)
846871
invisible(callJMethod(write, "text", path))
847872
})
848873

@@ -2608,7 +2633,7 @@ setMethod("except",
26082633
#' @param ... additional argument(s) passed to the method.
26092634
#'
26102635
#' @family SparkDataFrame functions
2611-
#' @aliases write.df,SparkDataFrame,character-method
2636+
#' @aliases write.df,SparkDataFrame-method
26122637
#' @rdname write.df
26132638
#' @name write.df
26142639
#' @export
@@ -2622,21 +2647,25 @@ setMethod("except",
26222647
#' }
26232648
#' @note write.df since 1.4.0
26242649
setMethod("write.df",
2625-
signature(df = "SparkDataFrame", path = "character"),
2626-
function(df, path, source = NULL, mode = "error", ...) {
2650+
signature(df = "SparkDataFrame"),
2651+
function(df, path = NULL, source = NULL, mode = "error", ...) {
2652+
if (!is.null(path) && !is.character(path)) {
2653+
stop("path should be charactor, NULL or omitted.")
2654+
}
2655+
if (!is.null(source) && !is.character(source)) {
2656+
stop("source should be character, NULL or omitted. It is the datasource specified ",
2657+
"in 'spark.sql.sources.default' configuration by default.")
2658+
}
2659+
if (!is.character(mode)) {
2660+
stop("mode should be charactor or omitted. It is 'error' by default.")
2661+
}
26272662
if (is.null(source)) {
26282663
source <- getDefaultSqlSource()
26292664
}
2630-
jmode <- convertToJSaveMode(mode)
2631-
options <- varargsToEnv(...)
2632-
if (!is.null(path)) {
2633-
options[["path"]] <- path
2634-
}
26352665
write <- callJMethod(df@sdf, "write")
26362666
write <- callJMethod(write, "format", source)
2637-
write <- callJMethod(write, "mode", jmode)
2638-
write <- callJMethod(write, "options", options)
2639-
write <- callJMethod(write, "save", path)
2667+
write <- setWriteOptions(write, path = path, mode = mode, ...)
2668+
write <- handledCallJMethod(write, "save")
26402669
})
26412670

26422671
#' @rdname write.df
@@ -2691,7 +2720,7 @@ setMethod("saveAsTable",
26912720
source <- getDefaultSqlSource()
26922721
}
26932722
jmode <- convertToJSaveMode(mode)
2694-
options <- varargsToEnv(...)
2723+
options <- varargsToStrEnv(...)
26952724

26962725
write <- callJMethod(df@sdf, "write")
26972726
write <- callJMethod(write, "format", source)

R/pkg/R/SQLContext.R

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
328328
#' It goes through the entire dataset once to determine the schema.
329329
#'
330330
#' @param path Path of file to read. A vector of multiple paths is allowed.
331+
#' @param ... additional external data source specific named properties.
331332
#' @return SparkDataFrame
332333
#' @rdname read.json
333334
#' @export
@@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
341342
#' @name read.json
342343
#' @method read.json default
343344
#' @note read.json since 1.6.0
344-
read.json.default <- function(path) {
345+
read.json.default <- function(path, ...) {
345346
sparkSession <- getSparkSession()
347+
options <- varargsToStrEnv(...)
346348
# Allow the user to have a more flexible definiton of the text file path
347349
paths <- as.list(suppressWarnings(normalizePath(path)))
348350
read <- callJMethod(sparkSession, "read")
351+
read <- callJMethod(read, "options", options)
349352
sdf <- callJMethod(read, "json", paths)
350353
dataFrame(sdf)
351354
}
@@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
405408
#' Loads an ORC file, returning the result as a SparkDataFrame.
406409
#'
407410
#' @param path Path of file to read.
411+
#' @param ... additional external data source specific named properties.
408412
#' @return SparkDataFrame
409413
#' @rdname read.orc
410414
#' @export
411415
#' @name read.orc
412416
#' @note read.orc since 2.0.0
413-
read.orc <- function(path) {
417+
read.orc <- function(path, ...) {
414418
sparkSession <- getSparkSession()
419+
options <- varargsToStrEnv(...)
415420
# Allow the user to have a more flexible definiton of the ORC file path
416421
path <- suppressWarnings(normalizePath(path))
417422
read <- callJMethod(sparkSession, "read")
423+
read <- callJMethod(read, "options", options)
418424
sdf <- callJMethod(read, "orc", path)
419425
dataFrame(sdf)
420426
}
@@ -430,11 +436,13 @@ read.orc <- function(path) {
430436
#' @name read.parquet
431437
#' @method read.parquet default
432438
#' @note read.parquet since 1.6.0
433-
read.parquet.default <- function(path) {
439+
read.parquet.default <- function(path, ...) {
434440
sparkSession <- getSparkSession()
441+
options <- varargsToStrEnv(...)
435442
# Allow the user to have a more flexible definiton of the Parquet file path
436443
paths <- as.list(suppressWarnings(normalizePath(path)))
437444
read <- callJMethod(sparkSession, "read")
445+
read <- callJMethod(read, "options", options)
438446
sdf <- callJMethod(read, "parquet", paths)
439447
dataFrame(sdf)
440448
}
@@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
467475
#' Each line in the text file is a new row in the resulting SparkDataFrame.
468476
#'
469477
#' @param path Path of file to read. A vector of multiple paths is allowed.
478+
#' @param ... additional external data source specific named properties.
470479
#' @return SparkDataFrame
471480
#' @rdname read.text
472481
#' @export
@@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
479488
#' @name read.text
480489
#' @method read.text default
481490
#' @note read.text since 1.6.1
482-
read.text.default <- function(path) {
491+
read.text.default <- function(path, ...) {
483492
sparkSession <- getSparkSession()
493+
options <- varargsToStrEnv(...)
484494
# Allow the user to have a more flexible definiton of the text file path
485495
paths <- as.list(suppressWarnings(normalizePath(path)))
486496
read <- callJMethod(sparkSession, "read")
497+
read <- callJMethod(read, "options", options)
487498
sdf <- callJMethod(read, "text", paths)
488499
dataFrame(sdf)
489500
}
@@ -771,8 +782,15 @@ dropTempView <- function(viewName) {
771782
#' @method read.df default
772783
#' @note read.df since 1.4.0
773784
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
785+
if (!is.null(path) && !is.character(path)) {
786+
stop("path should be charactor, NULL or omitted.")
787+
}
788+
if (!is.null(source) && !is.character(source)) {
789+
stop("source should be character, NULL or omitted. It is the datasource specified ",
790+
"in 'spark.sql.sources.default' configuration by default.")
791+
}
774792
sparkSession <- getSparkSession()
775-
options <- varargsToEnv(...)
793+
options <- varargsToStrEnv(...)
776794
if (!is.null(path)) {
777795
options[["path"]] <- path
778796
}
@@ -784,16 +802,16 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
784802
}
785803
if (!is.null(schema)) {
786804
stopifnot(class(schema) == "structType")
787-
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source,
788-
schema$jobj, options)
805+
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
806+
source, schema$jobj, options)
789807
} else {
790-
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
791-
"loadDF", sparkSession, source, options)
808+
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
809+
source, options)
792810
}
793811
dataFrame(sdf)
794812
}
795813

796-
read.df <- function(x, ...) {
814+
read.df <- function(x = NULL, ...) {
797815
dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
798816
}
799817

@@ -805,7 +823,7 @@ loadDF.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
805823
read.df(path, source, schema, ...)
806824
}
807825

808-
loadDF <- function(x, ...) {
826+
loadDF <- function(x = NULL, ...) {
809827
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
810828
}
811829

@@ -835,7 +853,7 @@ loadDF <- function(x, ...) {
835853
#' @note createExternalTable since 1.4.0
836854
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
837855
sparkSession <- getSparkSession()
838-
options <- varargsToEnv(...)
856+
options <- varargsToStrEnv(...)
839857
if (!is.null(path)) {
840858
options[["path"]] <- path
841859
}

R/pkg/R/generics.R

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })
633633

634634
#' @rdname write.df
635635
#' @export
636-
setGeneric("write.df", function(df, path, source = NULL, mode = "error", ...) {
636+
setGeneric("write.df", function(df, path = NULL, source = NULL, mode = "error", ...) {
637637
standardGeneric("write.df")
638638
})
639639

@@ -651,23 +651,25 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
651651

652652
#' @rdname write.json
653653
#' @export
654-
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
654+
setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })
655655

656656
#' @rdname write.orc
657657
#' @export
658-
setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
658+
setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })
659659

660660
#' @rdname write.parquet
661661
#' @export
662-
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
662+
setGeneric("write.parquet", function(x, path, ...) {
663+
standardGeneric("write.parquet")
664+
})
663665

664666
#' @rdname write.parquet
665667
#' @export
666668
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
667669

668670
#' @rdname write.text
669671
#' @export
670-
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
672+
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
671673

672674
#' @rdname schema
673675
#' @export
@@ -732,7 +734,7 @@ setGeneric("withColumnRenamed",
732734

733735
#' @rdname write.df
734736
#' @export
735-
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
737+
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })
736738

737739
#' @rdname randomSplit
738740
#' @export

R/pkg/R/mllib.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,9 @@ setMethod("predict", signature(object = "KMeansModel"),
696696
setMethod("spark.mlp", signature(data = "SparkDataFrame"),
697697
function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
698698
tol = 1E-6, stepSize = 0.03, seed = NULL) {
699+
if (is.null(layers)) {
700+
stop ("layers must be a integer vector with length > 1.")
701+
}
699702
layers <- as.integer(na.omit(layers))
700703
if (length(layers) <= 1) {
701704
stop ("layers must be a integer vector with length > 1.")

0 commit comments

Comments
 (0)