Skip to content

Commit bf8b42d

Browse files
committed
fix
1 parent 2ecabe4 commit bf8b42d

File tree

5 files changed

+56
-50
lines changed

5 files changed

+56
-50
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,17 @@ private[sql] object CreateJacksonParser extends Serializable {
7878
def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
7979
jsonFactory.createParser(new InputStreamReader(is, enc))
8080
}
81+
82+
def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
83+
val ba = row.getBinary(0)
84+
85+
jsonFactory.createParser(ba, 0, ba.length)
86+
}
87+
88+
def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
89+
val binary = row.getBinary(0)
90+
val sd = getStreamDecoder(enc, binary, binary.length)
91+
92+
jsonFactory.createParser(sd)
93+
}
8194
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,15 @@ object SQLExecution {
7070
// streaming queries would give us call site like "run at <unknown>:0"
7171
val callSite = sc.getCallSite()
7272

73-
// Set all the specified SQL configs to local properties, so that they can be available at
74-
// the executor side.
75-
val allConfigs = sparkSession.sessionState.conf.getAllConfs
76-
allConfigs.foreach {
77-
// Excludes external configs defined by users.
78-
case (key, value) if key.startsWith("spark") => sc.setLocalProperty(key, value)
79-
}
80-
81-
sc.listenerBus.post(SparkListenerSQLExecutionStart(
82-
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
83-
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
84-
try {
85-
body
86-
} finally {
87-
sc.listenerBus.post(SparkListenerSQLExecutionEnd(
88-
executionId, System.currentTimeMillis()))
89-
allConfigs.foreach {
90-
case (key, _) => sc.setLocalProperty(key, null)
73+
withSQLConfPropagated(sparkSession) {
74+
sc.listenerBus.post(SparkListenerSQLExecutionStart(
75+
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
76+
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
77+
try {
78+
body
79+
} finally {
80+
sc.listenerBus.post(SparkListenerSQLExecutionEnd(
81+
executionId, System.currentTimeMillis()))
9182
}
9283
}
9384
} finally {
@@ -104,21 +95,30 @@ object SQLExecution {
10495
def withExecutionId[T](sparkSession: SparkSession, executionId: String)(body: => T): T = {
10596
val sc = sparkSession.sparkContext
10697
val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
98+
withSQLConfPropagated(sparkSession) {
99+
try {
100+
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
101+
body
102+
} finally {
103+
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
104+
}
105+
}
106+
}
107+
108+
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
107109
// Set all the specified SQL configs to local properties, so that they can be available at
108110
// the executor side.
109111
val allConfigs = sparkSession.sessionState.conf.getAllConfs
110-
allConfigs.foreach {
112+
for ((key, value) <- allConfigs) {
111113
// Excludes external configs defined by users.
112-
case (key, value) if key.startsWith("spark") => sc.setLocalProperty(key, value)
114+
if (key.startsWith("spark")) sparkSession.sparkContext.setLocalProperty(key, value)
113115
}
114116
try {
115-
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
116117
body
117118
} finally {
118119
allConfigs.foreach {
119-
case (key, _) => sc.setLocalProperty(key, null)
120+
case (key, _) => sparkSession.sparkContext.setLocalProperty(key, null)
120121
}
121-
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
122122
}
123123
}
124124
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
3434
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
37+
import org.apache.spark.sql.execution.SQLExecution
3738
import org.apache.spark.sql.execution.datasources._
3839
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
3940
import org.apache.spark.sql.types.StructType
@@ -93,32 +94,30 @@ object TextInputJsonDataSource extends JsonDataSource {
9394
inputPaths: Seq[FileStatus],
9495
parsedOptions: JSONOptions): StructType = {
9596
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
97+
9698
inferFromDataset(json, parsedOptions)
9799
}
98100

99101
def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
100102
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
101-
if (parsedOptions.encoding.isDefined) {
102-
// TODO: We should be able to parse the input string directly. Remove this hack when we
103-
// support setting encoding when reading text files.
104-
val encoding = parsedOptions.encoding.get
105-
val textDS = sampled.map(new Text(_))(Encoders.javaSerialization)
106-
val parser = CreateJacksonParser.text(encoding, _: JsonFactory, _: Text)
107-
JsonInferSchema.infer(textDS, parsedOptions, parser)
108-
} else {
109-
JsonInferSchema.infer(sampled, parsedOptions, CreateJacksonParser.string)
103+
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd
104+
val rowParser = parsedOptions.encoding.map { enc =>
105+
CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
106+
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow))
107+
108+
SQLExecution.withSQLConfPropagated(json.sparkSession) {
109+
JsonInferSchema.infer(rdd, parsedOptions, rowParser)
110110
}
111111
}
112112

113113
private def createBaseDataset(
114114
sparkSession: SparkSession,
115115
inputPaths: Seq[FileStatus],
116116
parsedOptions: JSONOptions): Dataset[String] = {
117-
val paths = inputPaths.map(_.getPath.toString)
118117
sparkSession.baseRelationToDataFrame(
119118
DataSource.apply(
120119
sparkSession,
121-
paths = paths,
120+
paths = inputPaths.map(_.getPath.toString),
122121
className = classOf[TextFileFormat].getName,
123122
options = parsedOptions.parameters
124123
).resolveRelation(checkFilesExist = false))
@@ -164,8 +163,9 @@ object MultiLineJsonDataSource extends JsonDataSource {
164163
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
165164
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))
166165

167-
JsonInferSchema.infer[PortableDataStream](
168-
sparkSession.createDataset(sampled)(Encoders.javaSerialization), parsedOptions, parser)
166+
SQLExecution.withSQLConfPropagated(sparkSession) {
167+
JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
168+
}
169169
}
170170

171171
private def createBaseRdd(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Comparator
2222
import com.fasterxml.jackson.core._
2323

2424
import org.apache.spark.SparkException
25-
import org.apache.spark.sql.{Dataset, Encoders}
25+
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2727
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
2828
import org.apache.spark.sql.catalyst.json.JSONOptions
@@ -39,14 +39,14 @@ private[sql] object JsonInferSchema {
3939
* 3. Replace any remaining null fields with string, the top type
4040
*/
4141
def infer[T](
42-
json: Dataset[T],
42+
json: RDD[T],
4343
configOptions: JSONOptions,
4444
createParser: (JsonFactory, T) => JsonParser): StructType = {
4545
val parseMode = configOptions.parseMode
4646
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord
4747

4848
// perform schema inference on each row and merge afterwards
49-
val inferredTypes = json.mapPartitions { iter =>
49+
val rootType = json.mapPartitions { iter =>
5050
val factory = new JsonFactory()
5151
configOptions.setJacksonOptions(factory)
5252
iter.flatMap { row =>
@@ -67,15 +67,8 @@ private[sql] object JsonInferSchema {
6767
}
6868
}
6969
}
70-
}(Encoders.javaSerialization)
71-
72-
// TODO: use `Dataset.fold` once we have it.
73-
val rootType = try {
74-
inferredTypes.reduce(compatibleRootType(columnNameOfCorruptRecord, parseMode))
75-
} catch {
76-
case e: UnsupportedOperationException if e.getMessage == "empty collection" =>
77-
StructType(Nil)
78-
}
70+
}.fold(StructType(Nil))(
71+
compatibleRootType(columnNameOfCorruptRecord, parseMode))
7972

8073
canonicalizeType(rootType) match {
8174
case Some(st: StructType) => st

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,7 +1374,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
13741374
test("SPARK-6245 JsonInferSchema.infer on empty RDD") {
13751375
// This is really a test that it doesn't throw an exception
13761376
val emptySchema = JsonInferSchema.infer(
1377-
empty,
1377+
empty.rdd,
13781378
new JSONOptions(Map.empty[String, String], "GMT"),
13791379
CreateJacksonParser.string)
13801380
assert(StructType(Seq()) === emptySchema)
@@ -1401,7 +1401,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
14011401

14021402
test("SPARK-8093 Erase empty structs") {
14031403
val emptySchema = JsonInferSchema.infer(
1404-
emptyRecords,
1404+
emptyRecords.rdd,
14051405
new JSONOptions(Map.empty[String, String], "GMT"),
14061406
CreateJacksonParser.string)
14071407
assert(StructType(Seq()) === emptySchema)

0 commit comments

Comments
 (0)