Skip to content

Commit 871c303

Browse files
committed
Read JDBC table use custom schema
1 parent 88a23d3 commit 871c303

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,17 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
7070
""".stripMargin.replaceAll("\n", " ")).executeUpdate()
7171
conn.commit()
7272

73-
conn.prepareStatement("CREATE TABLE ts_with_timezone (id NUMBER(10), t TIMESTAMP WITH TIME ZONE)")
74-
.executeUpdate()
75-
conn.prepareStatement("INSERT INTO ts_with_timezone VALUES (1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS TZR'))")
76-
.executeUpdate()
73+
conn.prepareStatement(
74+
"CREATE TABLE ts_with_timezone (id NUMBER(10), t TIMESTAMP WITH TIME ZONE)").executeUpdate()
75+
conn.prepareStatement(
76+
"INSERT INTO ts_with_timezone VALUES " +
77+
"(1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate()
78+
conn.commit()
79+
80+
conn.prepareStatement(
81+
"CREATE TABLE custom_column_types (id NUMBER, n1 number(1), n2 number(1))").executeUpdate()
82+
conn.prepareStatement(
83+
"INSERT INTO custom_column_types values(12312321321321312312312312123, 1, 0)").executeUpdate()
7784
conn.commit()
7885

7986
sql(
@@ -198,4 +205,37 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
198205
val types = rows(0).toSeq.map(x => x.getClass.toString)
199206
assert(types(1).equals("class java.sql.Timestamp"))
200207
}
208+
209+
test("SPARK-20427/SPARK-20921: read table use custom schema") {
210+
211+
// default will throw IllegalArgumentException
212+
val e = intercept[org.apache.spark.SparkException] {
213+
spark.read.jdbc(jdbcUrl, "custom_column_types", new Properties()).collect()
214+
}
215+
assert(e.getMessage.contains(
216+
"requirement failed: Decimal precision 39 exceeds max precision 38"))
217+
218+
// custom schema can read data
219+
val schema = StructType(Seq(
220+
StructField("ID", DecimalType(DecimalType.MAX_PRECISION, 0), true,
221+
new MetadataBuilder().putName("ID").build()),
222+
StructField("N1", IntegerType, true, new MetadataBuilder().putName("N1").build()),
223+
StructField("N2", BooleanType, true, new MetadataBuilder().putName("N2").build())))
224+
225+
val dfRead = spark.read.schema(schema).jdbc(jdbcUrl, "custom_column_types", new Properties())
226+
val rows = dfRead.collect()
227+
228+
// verify the data type inserted
229+
val types = rows(0).toSeq.map(x => x.getClass.toString)
230+
assert(types(0).equals("class java.math.BigDecimal"))
231+
assert(types(1).equals("class java.lang.Integer"))
232+
assert(types(2).equals("class java.lang.Boolean"))
233+
234+
// verify the value inserted
235+
val values = rows(0)
236+
assert(values.getDecimal(0).equals(new java.math.BigDecimal("12312321321321312312312312123")))
237+
assert(values.getInt(1).equals(1))
238+
assert(values.getBoolean(2).equals(false))
239+
}
240+
201241
}

sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ class MetadataBuilder {
273273
/** Puts a [[Metadata]] array. */
274274
def putMetadataArray(key: String, value: Array[Metadata]): this.type = put(key, value)
275275

276+
/** Puts a name. */
277+
def putName(name: String): this.type = put("name", name)
278+
276279
/** Builds the [[Metadata]] instance. */
277280
def build(): Metadata = {
278281
new Metadata(map.toMap)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
197197
* @since 1.4.0
198198
*/
199199
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
200-
assertNoSpecifiedSchema("jdbc")
201200
// properties should override settings in extraOptions.
202201
this.extraOptions ++= properties.asScala
203202
// explicit url and dbtable should override all
204203
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
204+
if (!userSpecifiedSchema.isEmpty) {
205+
this.extraOptions +=
206+
(JDBCOptions.JDBC_CREATE_TABLE_COLUMN_TYPES -> userSpecifiedSchema.get.json)
207+
}
205208
format("jdbc").load()
206209
}
207210

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ private[sql] case class JDBCRelation(
110110

111111
override val needConversion: Boolean = false
112112

113-
override val schema: StructType = JDBCRDD.resolveTable(jdbcOptions)
113+
override val schema: StructType = if (!jdbcOptions.createTableColumnTypes.isEmpty) {
114+
StructType.fromString(jdbcOptions.createTableColumnTypes.get)
115+
} else {
116+
JDBCRDD.resolveTable(jdbcOptions)
117+
}
114118

115119
// Check if JDBCRDD.compileFilter can accept input filters
116120
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {

0 commit comments

Comments
 (0)