Skip to content

Commit ca9a160

Browse files
committed
[SPARK-14557][SQL]: Resolving merge conflicts
2 parents 6bd529c + 8012793 commit ca9a160

File tree

1,357 files changed

+24686
-15033
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,357 files changed

+24686
-15033
lines changed

R/pkg/NAMESPACE

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ export("setJobGroup",
2424
"clearJobGroup",
2525
"cancelJobGroup")
2626

27+
# Export Utility methods
28+
export("setLogLevel")
29+
2730
exportClasses("DataFrame")
2831

2932
exportMethods("arrange",
@@ -101,6 +104,7 @@ exportMethods("arrange",
101104
"withColumn",
102105
"withColumnRenamed",
103106
"write.df",
107+
"write.jdbc",
104108
"write.json",
105109
"write.parquet",
106110
"write.text")
@@ -125,6 +129,7 @@ exportMethods("%in%",
125129
"between",
126130
"bin",
127131
"bitwiseNOT",
132+
"bround",
128133
"cast",
129134
"cbrt",
130135
"ceil",
@@ -284,6 +289,7 @@ export("as.DataFrame",
284289
"loadDF",
285290
"parquetFile",
286291
"read.df",
292+
"read.jdbc",
287293
"read.json",
288294
"read.parquet",
289295
"read.text",
@@ -292,7 +298,8 @@ export("as.DataFrame",
292298
"tableToDF",
293299
"tableNames",
294300
"tables",
295-
"uncacheTable")
301+
"uncacheTable",
302+
"print.summary.GeneralizedLinearRegressionModel")
296303

297304
export("structField",
298305
"structField.jobj",

R/pkg/R/DataFrame.R

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2296,12 +2296,8 @@ setMethod("fillna",
22962296
#' }
22972297
setMethod("as.data.frame",
22982298
signature(x = "DataFrame"),
2299-
function(x, ...) {
2300-
# Check if additional parameters have been passed
2301-
if (length(list(...)) > 0) {
2302-
stop(paste("Unused argument(s): ", paste(list(...), collapse = ", ")))
2303-
}
2304-
collect(x)
2299+
function(x, row.names = NULL, optional = FALSE, ...) {
2300+
as.data.frame(collect(x), row.names, optional, ...)
23052301
})
23062302

23072303
#' The specified DataFrame is attached to the R search path. This means that
@@ -2363,7 +2359,7 @@ setMethod("with",
23632359
#' @examples \dontrun{
23642360
#' # Create a DataFrame from the Iris dataset
23652361
#' irisDF <- createDataFrame(sqlContext, iris)
2366-
#'
2362+
#'
23672363
#' # Show the structure of the DataFrame
23682364
#' str(irisDF)
23692365
#' }
@@ -2468,3 +2464,40 @@ setMethod("drop",
24682464
function(x) {
24692465
base::drop(x)
24702466
})
2467+
2468+
#' Saves the content of the DataFrame to an external database table via JDBC
2469+
#'
2470+
#' Additional JDBC database connection properties can be set (...)
2471+
#'
2472+
#' Also, mode is used to specify the behavior of the save operation when
2473+
#' data already exists in the data source. There are four modes: \cr
2474+
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
2475+
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
2476+
#' error: An exception is expected to be thrown. \cr
2477+
#' ignore: The save operation is expected to not save the contents of the DataFrame
2478+
#' and to not change the existing data. \cr
2479+
#'
2480+
#' @param x A SparkSQL DataFrame
2481+
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
2482+
#' @param tableName The name of the table in the external database
2483+
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
2484+
#' @family DataFrame functions
2485+
#' @rdname write.jdbc
2486+
#' @name write.jdbc
2487+
#' @export
2488+
#' @examples
2489+
#'\dontrun{
2490+
#' sc <- sparkR.init()
2491+
#' sqlContext <- sparkRSQL.init(sc)
2492+
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
2493+
#' write.jdbc(df, jdbcUrl, "table", user = "username", password = "password")
2494+
#' }
2495+
setMethod("write.jdbc",
2496+
signature(x = "DataFrame", url = "character", tableName = "character"),
2497+
function(x, url, tableName, mode = "error", ...){
2498+
jmode <- convertToJSaveMode(mode)
2499+
jprops <- varargsToJProperties(...)
2500+
write <- callJMethod(x@sdf, "write")
2501+
write <- callJMethod(write, "mode", jmode)
2502+
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
2503+
})

R/pkg/R/SQLContext.R

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,3 +583,61 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL
583583
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
584584
dataFrame(sdf)
585585
}
586+
587+
#' Create a DataFrame representing the database table accessible via JDBC URL
588+
#'
589+
#' Additional JDBC database connection properties can be set (...)
590+
#'
591+
#' Only one of partitionColumn or predicates should be set. Partitions of the table will be
592+
#' retrieved in parallel based on the `numPartitions` or by the predicates.
593+
#'
594+
#' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
595+
#' your external database systems.
596+
#'
597+
#' @param sqlContext SQLContext to use
598+
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
599+
#' @param tableName the name of the table in the external database
600+
#' @param partitionColumn the name of a column of integral type that will be used for partitioning
601+
#' @param lowerBound the minimum value of `partitionColumn` used to decide partition stride
602+
#' @param upperBound the maximum value of `partitionColumn` used to decide partition stride
603+
#' @param numPartitions the number of partitions, This, along with `lowerBound` (inclusive),
604+
#' `upperBound` (exclusive), form partition strides for generated WHERE
605+
#' clause expressions used to split the column `partitionColumn` evenly.
606+
#' This defaults to SparkContext.defaultParallelism when unset.
607+
#' @param predicates a list of conditions in the where clause; each one defines one partition
608+
#' @return DataFrame
609+
#' @rdname read.jdbc
610+
#' @name read.jdbc
611+
#' @export
612+
#' @examples
613+
#'\dontrun{
614+
#' sc <- sparkR.init()
615+
#' sqlContext <- sparkRSQL.init(sc)
616+
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
617+
#' df <- read.jdbc(sqlContext, jdbcUrl, "table", predicates = list("field<=123"), user = "username")
618+
#' df2 <- read.jdbc(sqlContext, jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0,
619+
#' upperBound = 10000, user = "username", password = "password")
620+
#' }
621+
622+
read.jdbc <- function(sqlContext, url, tableName,
623+
partitionColumn = NULL, lowerBound = NULL, upperBound = NULL,
624+
numPartitions = 0L, predicates = list(), ...) {
625+
jprops <- varargsToJProperties(...)
626+
627+
read <- callJMethod(sqlContext, "read")
628+
if (!is.null(partitionColumn)) {
629+
if (is.null(numPartitions) || numPartitions == 0) {
630+
sc <- callJMethod(sqlContext, "sparkContext")
631+
numPartitions <- callJMethod(sc, "defaultParallelism")
632+
} else {
633+
numPartitions <- numToInt(numPartitions)
634+
}
635+
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
636+
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
637+
} else if (length(predicates) > 0) {
638+
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
639+
} else {
640+
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
641+
}
642+
dataFrame(sdf)
643+
}

R/pkg/R/context.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,20 @@ broadcast <- function(sc, object) {
225225
setCheckpointDir <- function(sc, dirName) {
226226
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
227227
}
228+
229+
#' Set new log level
230+
#'
231+
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
232+
#'
233+
#' @rdname setLogLevel
234+
#' @param sc Spark Context to use
235+
#' @param level New log level
236+
#' @export
237+
#' @examples
238+
#'\dontrun{
239+
#' setLogLevel(sc, "ERROR")
240+
#'}
241+
242+
setLogLevel <- function(sc, level) {
243+
callJMethod(sc, "setLogLevel", level)
244+
}

R/pkg/R/functions.R

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,7 @@ setMethod("rint",
994994

995995
#' round
996996
#'
997-
#' Returns the value of the column `e` rounded to 0 decimal places.
997+
#' Returns the value of the column `e` rounded to 0 decimal places using HALF_UP rounding mode.
998998
#'
999999
#' @rdname round
10001000
#' @name round
@@ -1008,6 +1008,26 @@ setMethod("round",
10081008
column(jc)
10091009
})
10101010

1011+
#' bround
1012+
#'
1013+
#' Returns the value of the column `e` rounded to `scale` decimal places using HALF_EVEN rounding
1014+
#' mode if `scale` >= 0 or at integral part when `scale` < 0.
1015+
#' Also known as Gaussian rounding or bankers' rounding that rounds to the nearest even number.
1016+
#' bround(2.5, 0) = 2, bround(3.5, 0) = 4.
1017+
#'
1018+
#' @rdname bround
1019+
#' @name bround
1020+
#' @family math_funcs
1021+
#' @export
1022+
#' @examples \dontrun{bround(df$c, 0)}
1023+
setMethod("bround",
1024+
signature(x = "Column"),
1025+
function(x, scale = 0) {
1026+
jc <- callJStatic("org.apache.spark.sql.functions", "bround", x@jc, as.integer(scale))
1027+
column(jc)
1028+
})
1029+
1030+
10111031
#' rtrim
10121032
#'
10131033
#' Trim the spaces from right end for the specified string value.

R/pkg/R/generics.R

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,10 @@ setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })
397397

398398
#' @rdname as.data.frame
399399
#' @export
400-
setGeneric("as.data.frame")
400+
setGeneric("as.data.frame",
401+
function(x, row.names = NULL, optional = FALSE, ...) {
402+
standardGeneric("as.data.frame")
403+
})
401404

402405
#' @rdname attach
403406
#' @export
@@ -577,6 +580,12 @@ setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) {
577580
standardGeneric("saveDF")
578581
})
579582

583+
#' @rdname write.jdbc
584+
#' @export
585+
setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
586+
standardGeneric("write.jdbc")
587+
})
588+
580589
#' @rdname write.json
581590
#' @export
582591
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
@@ -751,6 +760,10 @@ setGeneric("bin", function(x) { standardGeneric("bin") })
751760
#' @export
752761
setGeneric("bitwiseNOT", function(x) { standardGeneric("bitwiseNOT") })
753762

763+
#' @rdname bround
764+
#' @export
765+
setGeneric("bround", function(x, ...) { standardGeneric("bround") })
766+
754767
#' @rdname cbrt
755768
#' @export
756769
setGeneric("cbrt", function(x) { standardGeneric("cbrt") })

0 commit comments

Comments
 (0)