Skip to content

Commit 1fdf002

Browse files
committed
parseUserSpecifiedColumnTypes -> getCustomSchema
1 parent 7fc97b4 commit 1fdf002

File tree

4 files changed

+31
-35
lines changed

4 files changed

+31
-35
lines changed

examples/src/main/python/sql/datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def jdbc_dataset_example(spark):
184184
.option("dbtable", "schema.tablename") \
185185
.option("user", "username") \
186186
.option("password", "password") \
187-
.option("customDataFrameColumnTypes", "id DECIMAL(38, 0), name STRING") \
187+
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
188188
.load()
189189

190190
# Saving data to a JDBC source

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,11 @@ private[sql] case class JDBCRelation(
112112
override val needConversion: Boolean = false
113113

114114
override val schema: StructType = {
115-
val schema = JDBCRDD.resolveTable(jdbcOptions)
116-
val customSchema = jdbcOptions.customSchema
117-
if (customSchema.isDefined) {
118-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, customSchema.get,
119-
sqlContext.sessionState.conf.resolver)
120-
} else {
121-
schema
115+
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
116+
jdbcOptions.customSchema match {
117+
case Some(customSchema) => JdbcUtils.getCustomSchema(
118+
tableSchema, customSchema, sparkSession.sessionState.conf.resolver)
119+
case None => tableSchema
122120
}
123121
}
124122

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,6 @@ object JdbcUtils extends Logging {
302302
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
303303
}
304304
val metadata = new MetadataBuilder()
305-
.putString("name", columnName)
306305
.putLong("scale", fieldScale)
307306
val columnType =
308307
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
@@ -772,26 +771,25 @@ object JdbcUtils extends Logging {
772771
* Parses the user specified customSchema option value to DataFrame schema,
773772
* and returns it if it's all columns are equals to default schema's.
774773
*/
775-
def parseUserSpecifiedColumnTypes(
776-
schema: StructType,
777-
columnTypes: String,
778-
nameEquality: Resolver): StructType = {
779-
val userSchema = CatalystSqlParser.parseTableSchema(columnTypes)
774+
def getCustomSchema(
775+
tableSchema: StructType,
776+
customSchema: String,
777+
nameEquality: Resolver): StructType = {
778+
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)
780779

781780
SchemaUtils.checkColumnNameDuplication(
782781
userSchema.map(_.name), "in the customSchema option value", nameEquality)
783782

784-
if (userSchema.size != schema.size) {
785-
throw new AnalysisException("Please provide all the columns, " +
786-
s"all columns are: ${schema.fields.map(_.name).mkString(",")}")
783+
val colNames = tableSchema.fieldNames.mkString(",")
784+
val errorMsg = s"Please provide all the columns, all columns are: $colNames"
785+
if (userSchema.size != tableSchema.size) {
786+
throw new AnalysisException(errorMsg)
787787
}
788788

789789
// This is resolved by names, only check the column names.
790790
userSchema.fieldNames.foreach { col =>
791-
schema.find(f => nameEquality(f.name, col)).getOrElse {
792-
throw new AnalysisException(
793-
s"${JDBCOptions.JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES} option column $col not found in " +
794-
s"schema ${schema.catalogString}")
791+
tableSchema.find(f => nameEquality(f.name, col)).getOrElse {
792+
throw new AnalysisException(errorMsg)
795793
}
796794
}
797795
userSchema

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,62 +24,62 @@ import org.apache.spark.sql.types._
2424

2525
class JdbcUtilsSuite extends SparkFunSuite {
2626

27-
val schema = StructType(Seq(
27+
val tableSchema = StructType(Seq(
2828
StructField("C1", StringType, false), StructField("C2", IntegerType, false)))
2929
val caseSensitive = org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
3030
val caseInsensitive = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
3131

3232
test("Parse user specified column types") {
3333
assert(
34-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "C1 DATE, C2 STRING", caseInsensitive) ===
34+
JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseInsensitive) ===
3535
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
36-
assert(JdbcUtils.parseUserSpecifiedColumnTypes(schema, "C1 DATE, C2 STRING", caseSensitive) ===
36+
assert(JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseSensitive) ===
3737
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
3838
assert(
39-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "c1 DATE, C2 STRING", caseInsensitive) ===
39+
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) ===
4040
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true))))
41-
assert(JdbcUtils.parseUserSpecifiedColumnTypes(
42-
schema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) ===
41+
assert(JdbcUtils.getCustomSchema(
42+
tableSchema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) ===
4343
StructType(Seq(StructField("c1", DecimalType(38, 0), true),
4444
StructField("C2", StringType, true))))
4545

4646
// Throw AnalysisException
4747
val duplicate = intercept[AnalysisException]{
48-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "c1 DATE, c1 STRING", caseInsensitive) ===
48+
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", caseInsensitive) ===
4949
StructType(Seq(StructField("c1", DateType, true), StructField("c1", StringType, true)))
5050
}
5151
assert(duplicate.getMessage.contains(
5252
"Found duplicate column(s) in the customSchema option value"))
5353

5454
val allColumns = intercept[AnalysisException]{
55-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "C1 STRING", caseSensitive) ===
55+
JdbcUtils.getCustomSchema(tableSchema, "C1 STRING", caseSensitive) ===
5656
StructType(Seq(StructField("C1", DateType, true)))
5757
}
5858
assert(allColumns.getMessage.contains("Please provide all the columns,"))
5959

6060
val caseSensitiveColumnNotFound = intercept[AnalysisException]{
61-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "c1 DATE, C2 STRING", caseSensitive) ===
61+
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
6262
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true)))
6363
}
6464
assert(caseSensitiveColumnNotFound.getMessage.contains(
65-
s"${JDBCOptions.JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES} option column c1 not found in schema"))
65+
"Please provide all the columns, all columns are: C1,C2;"))
6666

6767
val caseInsensitiveColumnNotFound = intercept[AnalysisException]{
68-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "c3 DATE, C2 STRING", caseInsensitive) ===
68+
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE, C2 STRING", caseInsensitive) ===
6969
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
7070
}
7171
assert(caseInsensitiveColumnNotFound.getMessage.contains(
72-
s"${JDBCOptions.JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES} option column c3 not found in schema"))
72+
"Please provide all the columns, all columns are: C1,C2;"))
7373

7474
// Throw ParseException
7575
val dataTypeNotSupported = intercept[ParseException]{
76-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "c3 DATEE, C2 STRING", caseInsensitive) ===
76+
JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) ===
7777
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
7878
}
7979
assert(dataTypeNotSupported.getMessage.contains("DataType datee is not supported"))
8080

8181
val mismatchedInput = intercept[ParseException]{
82-
JdbcUtils.parseUserSpecifiedColumnTypes(schema, "c3 DATE. C2 STRING", caseInsensitive) ===
82+
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE. C2 STRING", caseInsensitive) ===
8383
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
8484
}
8585
assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting"))

0 commit comments

Comments
 (0)