Skip to content

Commit 3250f33

Browse files
committed
Use text file sources in CSV's schema inference for consistency
1 parent e6e43cb commit 3250f33

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource {
150150
inputPaths: Seq[FileStatus],
151151
options: CSVOptions): Dataset[String] = {
152152
val paths = inputPaths.map(_.getPath.toString)
153+
val df = sparkSession.baseRelationToDataFrame(
154+
DataSource.apply(
155+
sparkSession,
156+
paths = paths,
157+
className = classOf[TextFileFormat].getName,
158+
options = options.parameters
159+
).resolveRelation(checkFilesExist = false))
160+
.select("value").as[String](Encoders.STRING)
161+
153162
if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
154-
sparkSession.baseRelationToDataFrame(
155-
DataSource.apply(
156-
sparkSession,
157-
paths = paths,
158-
className = classOf[TextFileFormat].getName,
159-
options = options.parameters
160-
).resolveRelation(checkFilesExist = false))
161-
.select("value").as[String](Encoders.STRING)
163+
df
162164
} else {
163165
val charset = options.charset
164-
val rdd = sparkSession.sparkContext
165-
.hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(","))
166-
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
167-
sparkSession.createDataset(rdd)(Encoders.STRING)
166+
sparkSession.createDataset(df.queryExecution.toRdd.map { row =>
167+
val bytes = row.getBinary(0)
168+
new String(bytes, 0, bytes.length, charset)
169+
})(Encoders.STRING)
168170
}
169171
}
170172
}

0 commit comments

Comments
 (0)