1717
1818package org .apache .spark .sql .execution .datasources .json
1919
20- import scala .reflect .ClassTag
21-
2220import com .fasterxml .jackson .core .{JsonFactory , JsonParser }
2321import com .google .common .io .ByteStreams
2422import org .apache .hadoop .conf .Configuration
2523import org .apache .hadoop .fs .FileStatus
26- import org .apache .hadoop .io .{ LongWritable , Text }
24+ import org .apache .hadoop .io .Text
2725import org .apache .hadoop .mapreduce .Job
28- import org .apache .hadoop .mapreduce .lib .input .{ FileInputFormat , TextInputFormat }
26+ import org .apache .hadoop .mapreduce .lib .input .FileInputFormat
2927
3028import org .apache .spark .TaskContext
3129import org .apache .spark .input .{PortableDataStream , StreamInputFormat }
3230import org .apache .spark .rdd .{BinaryFileRDD , RDD }
33- import org .apache .spark .sql .{AnalysisException , SparkSession }
31+ import org .apache .spark .sql .{AnalysisException , Dataset , Encoders , SparkSession }
3432import org .apache .spark .sql .catalyst .InternalRow
3533import org .apache .spark .sql .catalyst .json .{CreateJacksonParser , JacksonParser , JSONOptions }
36- import org .apache .spark .sql .execution .datasources .{CodecStreams , HadoopFileLinesReader , PartitionedFile }
34+ import org .apache .spark .sql .execution .datasources .{CodecStreams , DataSource , HadoopFileLinesReader , PartitionedFile }
35+ import org .apache .spark .sql .execution .datasources .text .TextFileFormat
3736import org .apache .spark .sql .types .StructType
3837import org .apache .spark .unsafe .types .UTF8String
3938import org .apache .spark .util .Utils
4039
4140/**
4241 * Common functions for parsing JSON files
43- * @tparam T A datatype containing the unparsed JSON, such as [[Text ]] or [[String ]]
4442 */
45- abstract class JsonDataSource [ T ] extends Serializable {
43+ abstract class JsonDataSource extends Serializable {
4644 def isSplitable : Boolean
4745
4846 /**
@@ -53,35 +51,24 @@ abstract class JsonDataSource[T] extends Serializable {
5351 file : PartitionedFile ,
5452 parser : JacksonParser ): Iterator [InternalRow ]
5553
56- /**
57- * Create an [[RDD ]] that handles the preliminary parsing of [[T ]] records
58- */
59- protected def createBaseRdd (
60- sparkSession : SparkSession ,
61- inputPaths : Seq [FileStatus ]): RDD [T ]
62-
63- /**
64- * A generic wrapper to invoke the correct [[JsonFactory ]] method to allocate a [[JsonParser ]]
65- * for an instance of [[T ]]
66- */
67- def createParser (jsonFactory : JsonFactory , value : T ): JsonParser
68-
69- final def infer (
54+ final def inferSchema (
7055 sparkSession : SparkSession ,
7156 inputPaths : Seq [FileStatus ],
7257 parsedOptions : JSONOptions ): Option [StructType ] = {
7358 if (inputPaths.nonEmpty) {
74- val jsonSchema = JsonInferSchema .infer(
75- createBaseRdd(sparkSession, inputPaths),
76- parsedOptions,
77- createParser)
59+ val jsonSchema = infer(sparkSession, inputPaths, parsedOptions)
7860 checkConstraints(jsonSchema)
7961 Some (jsonSchema)
8062 } else {
8163 None
8264 }
8365 }
8466
67+ protected def infer (
68+ sparkSession : SparkSession ,
69+ inputPaths : Seq [FileStatus ],
70+ parsedOptions : JSONOptions ): StructType
71+
8572 /** Constraints to be imposed on schema to be stored. */
8673 private def checkConstraints (schema : StructType ): Unit = {
8774 if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
@@ -95,53 +82,46 @@ abstract class JsonDataSource[T] extends Serializable {
9582}
9683
9784object JsonDataSource {
98- def apply (options : JSONOptions ): JsonDataSource [_] = {
85+ def apply (options : JSONOptions ): JsonDataSource = {
9986 if (options.wholeFile) {
10087 WholeFileJsonDataSource
10188 } else {
10289 TextInputJsonDataSource
10390 }
10491 }
105-
106- /**
107- * Create a new [[RDD ]] via the supplied callback if there is at least one file to process,
108- * otherwise an [[org.apache.spark.rdd.EmptyRDD ]] will be returned.
109- */
110- def createBaseRdd [T : ClassTag ](
111- sparkSession : SparkSession ,
112- inputPaths : Seq [FileStatus ])(
113- fn : (Configuration , String ) => RDD [T ]): RDD [T ] = {
114- val paths = inputPaths.map(_.getPath)
115-
116- if (paths.nonEmpty) {
117- val job = Job .getInstance(sparkSession.sessionState.newHadoopConf())
118- FileInputFormat .setInputPaths(job, paths : _* )
119- fn(job.getConfiguration, paths.mkString(" ," ))
120- } else {
121- sparkSession.sparkContext.emptyRDD[T ]
122- }
123- }
12492}
12593
126- object TextInputJsonDataSource extends JsonDataSource [ Text ] {
94+ object TextInputJsonDataSource extends JsonDataSource {
12795 override val isSplitable : Boolean = {
12896 // splittable if the underlying source is
12997 true
13098 }
13199
132- override protected def createBaseRdd (
100+ override def infer (
133101 sparkSession : SparkSession ,
134- inputPaths : Seq [FileStatus ]): RDD [Text ] = {
135- JsonDataSource .createBaseRdd(sparkSession, inputPaths) {
136- case (conf, name) =>
137- sparkSession.sparkContext.newAPIHadoopRDD(
138- conf,
139- classOf [TextInputFormat ],
140- classOf [LongWritable ],
141- classOf [Text ])
142- .setName(s " JsonLines: $name" )
143- .values // get the text column
144- }
102+ inputPaths : Seq [FileStatus ],
103+ parsedOptions : JSONOptions ): StructType = {
104+ val json : Dataset [String ] = createBaseDataset(sparkSession, inputPaths)
105+ inferFromDataset(json, parsedOptions)
106+ }
107+
108+ def inferFromDataset (json : Dataset [String ], parsedOptions : JSONOptions ): StructType = {
109+ val sampled : Dataset [String ] = JsonUtils .sample(json, parsedOptions)
110+ val rdd : RDD [UTF8String ] = sampled.queryExecution.toRdd.map(_.getUTF8String(0 ))
111+ JsonInferSchema .infer(rdd, parsedOptions, CreateJacksonParser .utf8String)
112+ }
113+
114+ private def createBaseDataset (
115+ sparkSession : SparkSession ,
116+ inputPaths : Seq [FileStatus ]): Dataset [String ] = {
117+ val paths = inputPaths.map(_.getPath.toString)
118+ sparkSession.baseRelationToDataFrame(
119+ DataSource .apply(
120+ sparkSession,
121+ paths = paths,
122+ className = classOf [TextFileFormat ].getName
123+ ).resolveRelation(checkFilesExist = false ))
124+ .select(" value" ).as(Encoders .STRING )
145125 }
146126
147127 override def readFile (
@@ -150,41 +130,48 @@ object TextInputJsonDataSource extends JsonDataSource[Text] {
150130 parser : JacksonParser ): Iterator [InternalRow ] = {
151131 val linesReader = new HadoopFileLinesReader (file, conf)
152132 Option (TaskContext .get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
153- linesReader.flatMap(parser.parse(_, createParser , textToUTF8String))
133+ linesReader.flatMap(parser.parse(_, CreateJacksonParser .text , textToUTF8String))
154134 }
155135
156136 private def textToUTF8String (value : Text ): UTF8String = {
157137 UTF8String .fromBytes(value.getBytes, 0 , value.getLength)
158138 }
159-
160- override def createParser (jsonFactory : JsonFactory , value : Text ): JsonParser = {
161- CreateJacksonParser .text(jsonFactory, value)
162- }
163139}
164140
165- object WholeFileJsonDataSource extends JsonDataSource [ PortableDataStream ] {
141+ object WholeFileJsonDataSource extends JsonDataSource {
166142 override val isSplitable : Boolean = {
167143 false
168144 }
169145
170- override protected def createBaseRdd (
146+ override def infer (
147+ sparkSession : SparkSession ,
148+ inputPaths : Seq [FileStatus ],
149+ parsedOptions : JSONOptions ): StructType = {
150+ val json : RDD [PortableDataStream ] = createBaseRdd(sparkSession, inputPaths)
151+ val sampled : RDD [PortableDataStream ] = JsonUtils .sample(json, parsedOptions)
152+ JsonInferSchema .infer(sampled, parsedOptions, createParser)
153+ }
154+
155+ private def createBaseRdd (
171156 sparkSession : SparkSession ,
172157 inputPaths : Seq [FileStatus ]): RDD [PortableDataStream ] = {
173- JsonDataSource .createBaseRdd(sparkSession, inputPaths) {
174- case (conf, name) =>
175- new BinaryFileRDD (
176- sparkSession.sparkContext,
177- classOf [StreamInputFormat ],
178- classOf [String ],
179- classOf [PortableDataStream ],
180- conf,
181- sparkSession.sparkContext.defaultMinPartitions)
182- .setName(s " JsonFile: $name" )
183- .values
184- }
158+ val paths = inputPaths.map(_.getPath)
159+ val job = Job .getInstance(sparkSession.sessionState.newHadoopConf())
160+ val conf = job.getConfiguration
161+ val name = paths.mkString(" ," )
162+ FileInputFormat .setInputPaths(job, paths : _* )
163+ new BinaryFileRDD (
164+ sparkSession.sparkContext,
165+ classOf [StreamInputFormat ],
166+ classOf [String ],
167+ classOf [PortableDataStream ],
168+ conf,
169+ sparkSession.sparkContext.defaultMinPartitions)
170+ .setName(s " JsonFile: $name" )
171+ .values
185172 }
186173
187- override def createParser (jsonFactory : JsonFactory , record : PortableDataStream ): JsonParser = {
174+ private def createParser (jsonFactory : JsonFactory , record : PortableDataStream ): JsonParser = {
188175 CreateJacksonParser .inputStream(
189176 jsonFactory,
190177 CodecStreams .createInputStreamWithCloseResource(record.getConfiguration, record.getPath()))
0 commit comments