Skip to content

Commit 96a5001

Browse files
committed
[SPARK-24169][SQL] JsonToStructs should not access SQLConf at executor side
## What changes were proposed in this pull request? This PR is extracted from #21190 , to make it easier to backport. `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. ## How was this patch tested? tested in #21190 Author: Wenchen Fan <[email protected]> Closes #21226 from cloud-fan/minor4.
1 parent 991b526 commit 96a5001

File tree

5 files changed

+54
-48
lines changed

5 files changed

+54
-48
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,9 @@ object FunctionRegistry {
534534
// Otherwise, find a constructor method that matches the number of arguments, and use that.
535535
val params = Seq.fill(expressions.size)(classOf[Expression])
536536
val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
537-
val validParametersCount = constructors.map(_.getParameterCount).distinct.sorted
537+
val validParametersCount = constructors
538+
.filter(_.getParameterTypes.forall(_ == classOf[Expression]))
539+
.map(_.getParameterCount).distinct.sorted
538540
val expectedNumberOfParameters = if (validParametersCount.length == 1) {
539541
validParametersCount.head.toString
540542
} else {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -514,11 +514,10 @@ case class JsonToStructs(
514514
schema: DataType,
515515
options: Map[String, String],
516516
child: Expression,
517-
timeZoneId: Option[String] = None)
517+
timeZoneId: Option[String],
518+
forceNullableSchema: Boolean)
518519
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
519520

520-
val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)
521-
522521
// The JSON input data might be missing certain fields. We force the nullability
523522
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
524523
// can generate incorrect files if values are missing in columns declared as non-nullable.
@@ -532,14 +531,21 @@ case class JsonToStructs(
532531
schema = JsonExprUtils.validateSchemaLiteral(schema),
533532
options = Map.empty[String, String],
534533
child = child,
535-
timeZoneId = None)
534+
timeZoneId = None,
535+
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
536536

537537
def this(child: Expression, schema: Expression, options: Expression) =
538538
this(
539539
schema = JsonExprUtils.validateSchemaLiteral(schema),
540540
options = JsonExprUtils.convertToMapData(options),
541541
child = child,
542-
timeZoneId = None)
542+
timeZoneId = None,
543+
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
544+
545+
// Used in `org.apache.spark.sql.functions`
546+
def this(schema: DataType, options: Map[String, String], child: Expression) =
547+
this(schema, options, child, timeZoneId = None,
548+
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
543549

544550
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
545551
case _: StructType | ArrayType(_: StructType, _) =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
392392
val jsonData = """{"a": 1}"""
393393
val schema = StructType(StructField("a", IntegerType) :: Nil)
394394
checkEvaluation(
395-
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
395+
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
396396
InternalRow(1)
397397
)
398398
}
@@ -401,13 +401,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
401401
val jsonData = """{"a" 1}"""
402402
val schema = StructType(StructField("a", IntegerType) :: Nil)
403403
checkEvaluation(
404-
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
404+
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
405405
null
406406
)
407407

408408
// Other modes should still return `null`.
409409
checkEvaluation(
410-
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
410+
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId, true),
411411
null
412412
)
413413
}
@@ -416,70 +416,70 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
416416
val input = """[{"a": 1}, {"a": 2}]"""
417417
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
418418
val output = InternalRow(1) :: InternalRow(2) :: Nil
419-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
419+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
420420
}
421421

422422
test("from_json - input=object, schema=array, output=array of single row") {
423423
val input = """{"a": 1}"""
424424
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
425425
val output = InternalRow(1) :: Nil
426-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
426+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
427427
}
428428

429429
test("from_json - input=empty array, schema=array, output=empty array") {
430430
val input = "[ ]"
431431
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
432432
val output = Nil
433-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
433+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
434434
}
435435

436436
test("from_json - input=empty object, schema=array, output=array of single row with null") {
437437
val input = "{ }"
438438
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
439439
val output = InternalRow(null) :: Nil
440-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
440+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
441441
}
442442

443443
test("from_json - input=array of single object, schema=struct, output=single row") {
444444
val input = """[{"a": 1}]"""
445445
val schema = StructType(StructField("a", IntegerType) :: Nil)
446446
val output = InternalRow(1)
447-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
447+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
448448
}
449449

450450
test("from_json - input=array, schema=struct, output=null") {
451451
val input = """[{"a": 1}, {"a": 2}]"""
452452
val schema = StructType(StructField("a", IntegerType) :: Nil)
453453
val output = null
454-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
454+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
455455
}
456456

457457
test("from_json - input=empty array, schema=struct, output=null") {
458458
val input = """[]"""
459459
val schema = StructType(StructField("a", IntegerType) :: Nil)
460460
val output = null
461-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
461+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
462462
}
463463

464464
test("from_json - input=empty object, schema=struct, output=single row with null") {
465465
val input = """{ }"""
466466
val schema = StructType(StructField("a", IntegerType) :: Nil)
467467
val output = InternalRow(null)
468-
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
468+
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
469469
}
470470

471471
test("from_json null input column") {
472472
val schema = StructType(StructField("a", IntegerType) :: Nil)
473473
checkEvaluation(
474-
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
474+
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId, true),
475475
null
476476
)
477477
}
478478

479479
test("SPARK-20549: from_json bad UTF-8") {
480480
val schema = StructType(StructField("a", IntegerType) :: Nil)
481481
checkEvaluation(
482-
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
482+
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId, true),
483483
null)
484484
}
485485

@@ -491,14 +491,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
491491
c.set(2016, 0, 1, 0, 0, 0)
492492
c.set(Calendar.MILLISECOND, 123)
493493
checkEvaluation(
494-
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
494+
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId, true),
495495
InternalRow(c.getTimeInMillis * 1000L)
496496
)
497497
// The result doesn't change because the json string includes timezone string ("Z" here),
498498
// which means the string represents the timestamp string in the timezone regardless of
499499
// the timeZoneId parameter.
500500
checkEvaluation(
501-
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
501+
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST"), true),
502502
InternalRow(c.getTimeInMillis * 1000L)
503503
)
504504

@@ -512,7 +512,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
512512
schema,
513513
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
514514
Literal(jsonData2),
515-
Option(tz.getID)),
515+
Option(tz.getID),
516+
true),
516517
InternalRow(c.getTimeInMillis * 1000L)
517518
)
518519
checkEvaluation(
@@ -521,7 +522,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
521522
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
522523
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
523524
Literal(jsonData2),
524-
gmtId),
525+
gmtId,
526+
true),
525527
InternalRow(c.getTimeInMillis * 1000L)
526528
)
527529
}
@@ -530,7 +532,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
530532
test("SPARK-19543: from_json empty input column") {
531533
val schema = StructType(StructField("a", IntegerType) :: Nil)
532534
checkEvaluation(
533-
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
535+
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId, true),
534536
null
535537
)
536538
}
@@ -685,27 +687,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
685687

686688
test("from_json missing fields") {
687689
for (forceJsonNullableSchema <- Seq(false, true)) {
688-
withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString) {
689-
val input =
690-
"""{
691-
| "a": 1,
692-
| "c": "foo"
693-
|}
694-
|""".stripMargin
695-
val jsonSchema = new StructType()
696-
.add("a", LongType, nullable = false)
697-
.add("b", StringType, nullable = false)
698-
.add("c", StringType, nullable = false)
699-
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
700-
checkEvaluation(
701-
JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId),
702-
output
703-
)
704-
val schema = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
705-
.dataType
706-
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
707-
assert(schemaToCompare == schema)
708-
}
690+
val input =
691+
"""{
692+
| "a": 1,
693+
| "c": "foo"
694+
|}
695+
|""".stripMargin
696+
val jsonSchema = new StructType()
697+
.add("a", LongType, nullable = false)
698+
.add("b", StringType, nullable = false)
699+
.add("c", StringType, nullable = false)
700+
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
701+
val expr = JsonToStructs(
702+
jsonSchema, Map.empty, Literal.create(input, StringType), gmtId, forceJsonNullableSchema)
703+
checkEvaluation(expr, output)
704+
val schema = expr.dataType
705+
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
706+
assert(schemaToCompare == schema)
709707
}
710708
}
711709
}

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3179,7 +3179,7 @@ object functions {
31793179
* @since 2.2.0
31803180
*/
31813181
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
3182-
JsonToStructs(schema, options, e.expr)
3182+
new JsonToStructs(schema, options, e.expr)
31833183
}
31843184

31853185
/**

sql/core/src/test/resources/sql-tests/results/json-functions.sql.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ select to_json()
129129
struct<>
130130
-- !query 12 output
131131
org.apache.spark.sql.AnalysisException
132-
Invalid number of arguments for function to_json. Expected: one of 1, 2 and 3; Found: 0; line 1 pos 7
132+
Invalid number of arguments for function to_json. Expected: one of 1 and 2; Found: 0; line 1 pos 7
133133

134134

135135
-- !query 13
@@ -225,7 +225,7 @@ select from_json()
225225
struct<>
226226
-- !query 21 output
227227
org.apache.spark.sql.AnalysisException
228-
Invalid number of arguments for function from_json. Expected: one of 2, 3 and 4; Found: 0; line 1 pos 7
228+
Invalid number of arguments for function from_json. Expected: one of 2 and 3; Found: 0; line 1 pos 7
229229

230230

231231
-- !query 22

0 commit comments

Comments
 (0)