@@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource {
150150 inputPaths : Seq [FileStatus ],
151151 options : CSVOptions ): Dataset [String ] = {
152152 val paths = inputPaths.map(_.getPath.toString)
153+ val df = sparkSession.baseRelationToDataFrame(
154+ DataSource .apply(
155+ sparkSession,
156+ paths = paths,
157+ className = classOf [TextFileFormat ].getName,
158+ options = options.parameters
159+ ).resolveRelation(checkFilesExist = false ))
160+ .select(" value" ).as[String ](Encoders .STRING )
161+
153162 if (Charset .forName(options.charset) == StandardCharsets .UTF_8 ) {
154- sparkSession.baseRelationToDataFrame(
155- DataSource .apply(
156- sparkSession,
157- paths = paths,
158- className = classOf [TextFileFormat ].getName,
159- options = options.parameters
160- ).resolveRelation(checkFilesExist = false ))
161- .select(" value" ).as[String ](Encoders .STRING )
163+ df
162164 } else {
163165 val charset = options.charset
164- val rdd = sparkSession.sparkContext
165- .hadoopFile[ LongWritable , Text , TextInputFormat ](paths.mkString( " , " ) )
166- .mapPartitions(_.map(pair => new String (pair._2.getBytes , 0 , pair._2.getLength , charset)) )
167- sparkSession.createDataset(rdd )(Encoders .STRING )
166+ sparkSession.createDataset(df.queryExecution.toRdd.map { row =>
167+ val bytes = row.getBinary( 0 )
168+ new String (bytes , 0 , bytes.length , charset)
169+ } )(Encoders .STRING )
168170 }
169171 }
170172}
0 commit comments