@@ -414,98 +414,113 @@ case class DataSource(
414414 }
415415
416416 /**
417- * Writes the given [[DataFrame ]] out to this [[DataSource ]].
418- *
419- * @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation ]].
417+ * Writes the given [[DataFrame ]] out in this [[FileFormat ]].
420418 */
421- def write (
422- mode : SaveMode ,
423- data : DataFrame ,
424- isForWriteOnly : Boolean = false ): Option [BaseRelation ] = {
419+ private def writeInFileFormat (format : FileFormat , mode : SaveMode , data : DataFrame ): Unit = {
420+ // Don't glob path for the write path. The contracts here are:
421+ // 1. Only one output path can be specified on the write path;
422+ // 2. Output path must be a legal HDFS style file system path;
423+ // 3. It's OK that the output path doesn't exist yet;
424+ val allPaths = paths ++ caseInsensitiveOptions.get(" path" )
425+ val outputPath = if (allPaths.length == 1 ) {
426+ val path = new Path (allPaths.head)
427+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
428+ path.makeQualified(fs.getUri, fs.getWorkingDirectory)
429+ } else {
430+ throw new IllegalArgumentException (" Expected exactly one path to be specified, but " +
431+ s " got: ${allPaths.mkString(" , " )}" )
432+ }
433+
434+ val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
435+ PartitioningUtils .validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
436+
437+ // If we are appending to a table that already exists, make sure the partitioning matches
438+ // up. If we fail to load the table for whatever reason, ignore the check.
439+ if (mode == SaveMode .Append ) {
440+ val existingPartitionColumns = Try {
441+ getOrInferFileFormatSchema(format, justPartitioning = true )._2.fieldNames.toList
442+ }.getOrElse(Seq .empty[String ])
443+ // TODO: Case sensitivity.
444+ val sameColumns =
445+ existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
446+ if (existingPartitionColumns.nonEmpty && ! sameColumns) {
447+ throw new AnalysisException (
448+ s """ Requested partitioning does not match existing partitioning.
449+ |Existing partitioning columns:
450+ | ${existingPartitionColumns.mkString(" , " )}
451+ |Requested partitioning columns:
452+ | ${partitionColumns.mkString(" , " )}
453+ | """ .stripMargin)
454+ }
455+ }
456+
457+ // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
458+ // not need to have the query as child, to avoid to analyze an optimized query,
459+ // because InsertIntoHadoopFsRelationCommand will be optimized first.
460+ val partitionAttributes = partitionColumns.map { name =>
461+ val plan = data.logicalPlan
462+ plan.resolve(name :: Nil , data.sparkSession.sessionState.analyzer.resolver).getOrElse {
463+ throw new AnalysisException (
464+ s " Unable to resolve $name given [ ${plan.output.map(_.name).mkString(" , " )}] " )
465+ }.asInstanceOf [Attribute ]
466+ }
467+ val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
468+ sparkSession.table(tableIdent).queryExecution.analyzed.collect {
469+ case LogicalRelation (t : HadoopFsRelation , _, _) => t.location
470+ }.head
471+ }
472+ // For partitioned relation r, r.schema's column ordering can be different from the column
473+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
474+ // will be adjusted within InsertIntoHadoopFsRelation.
475+ val plan =
476+ InsertIntoHadoopFsRelationCommand (
477+ outputPath = outputPath,
478+ staticPartitions = Map .empty,
479+ partitionColumns = partitionAttributes,
480+ bucketSpec = bucketSpec,
481+ fileFormat = format,
482+ options = options,
483+ query = data.logicalPlan,
484+ mode = mode,
485+ catalogTable = catalogTable,
486+ fileIndex = fileIndex)
487+ sparkSession.sessionState.executePlan(plan).toRdd
488+ }
489+
490+ /**
491+ * Writes the given [[DataFrame ]] out to this [[DataSource ]] and returns a [[BaseRelation ]] for
492+ * the following reading.
493+ */
494+ def writeAndRead (mode : SaveMode , data : DataFrame ): BaseRelation = {
425495 if (data.schema.map(_.dataType).exists(_.isInstanceOf [CalendarIntervalType ])) {
426496 throw new AnalysisException (" Cannot save interval data type into external storage." )
427497 }
428498
429499 providingClass.newInstance() match {
430500 case dataSource : CreatableRelationProvider =>
431- Some ( dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) )
501+ dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
432502 case format : FileFormat =>
433- // Don't glob path for the write path. The contracts here are:
434- // 1. Only one output path can be specified on the write path;
435- // 2. Output path must be a legal HDFS style file system path;
436- // 3. It's OK that the output path doesn't exist yet;
437- val allPaths = paths ++ caseInsensitiveOptions.get(" path" )
438- val outputPath = if (allPaths.length == 1 ) {
439- val path = new Path (allPaths.head)
440- val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
441- path.makeQualified(fs.getUri, fs.getWorkingDirectory)
442- } else {
443- throw new IllegalArgumentException (" Expected exactly one path to be specified, but " +
444- s " got: ${allPaths.mkString(" , " )}" )
445- }
446-
447- val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
448- PartitioningUtils .validatePartitionColumn(
449- data.schema, partitionColumns, caseSensitive)
450-
451- // If we are appending to a table that already exists, make sure the partitioning matches
452- // up. If we fail to load the table for whatever reason, ignore the check.
453- if (mode == SaveMode .Append ) {
454- val existingPartitionColumns = Try {
455- getOrInferFileFormatSchema(format, justPartitioning = true )._2.fieldNames.toList
456- }.getOrElse(Seq .empty[String ])
457- // TODO: Case sensitivity.
458- val sameColumns =
459- existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
460- if (existingPartitionColumns.nonEmpty && ! sameColumns) {
461- throw new AnalysisException (
462- s """ Requested partitioning does not match existing partitioning.
463- |Existing partitioning columns:
464- | ${existingPartitionColumns.mkString(" , " )}
465- |Requested partitioning columns:
466- | ${partitionColumns.mkString(" , " )}
467- | """ .stripMargin)
468- }
469- }
503+ writeInFileFormat(format, mode, data)
504+ // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
505+ copy(userSpecifiedSchema = Some (data.schema.asNullable)).resolveRelation()
506+ case _ =>
507+ sys.error(s " ${providingClass.getCanonicalName} does not allow create table as select. " )
508+ }
509+ }
470510
471- // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
472- // not need to have the query as child, to avoid to analyze an optimized query,
473- // because InsertIntoHadoopFsRelationCommand will be optimized first.
474- val partitionAttributes = partitionColumns.map { name =>
475- val plan = data.logicalPlan
476- plan.resolve(name :: Nil , data.sparkSession.sessionState.analyzer.resolver).getOrElse {
477- throw new AnalysisException (
478- s " Unable to resolve $name given [ ${plan.output.map(_.name).mkString(" , " )}] " )
479- }.asInstanceOf [Attribute ]
480- }
481- val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
482- sparkSession.table(tableIdent).queryExecution.analyzed.collect {
483- case LogicalRelation (t : HadoopFsRelation , _, _) => t.location
484- }.head
485- }
486- // For partitioned relation r, r.schema's column ordering can be different from the column
487- // ordering of data.logicalPlan (partition columns are all moved after data column). This
488- // will be adjusted within InsertIntoHadoopFsRelation.
489- val plan =
490- InsertIntoHadoopFsRelationCommand (
491- outputPath = outputPath,
492- staticPartitions = Map .empty,
493- partitionColumns = partitionAttributes,
494- bucketSpec = bucketSpec,
495- fileFormat = format,
496- options = options,
497- query = data.logicalPlan,
498- mode = mode,
499- catalogTable = catalogTable,
500- fileIndex = fileIndex)
501- sparkSession.sessionState.executePlan(plan).toRdd
502- if (isForWriteOnly) {
503- None
504- } else {
505- // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
506- Some (copy(userSpecifiedSchema = Some (data.schema.asNullable)).resolveRelation())
507- }
511+ /**
512+ * Writes the given [[DataFrame ]] out to this [[DataSource ]].
513+ */
514+ def write (mode : SaveMode , data : DataFrame ): Unit = {
515+ if (data.schema.map(_.dataType).exists(_.isInstanceOf [CalendarIntervalType ])) {
516+ throw new AnalysisException (" Cannot save interval data type into external storage." )
517+ }
508518
519+ providingClass.newInstance() match {
520+ case dataSource : CreatableRelationProvider =>
521+ dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
522+ case format : FileFormat =>
523+ writeInFileFormat(format, mode, data)
509524 case _ =>
510525 sys.error(s " ${providingClass.getCanonicalName} does not allow create table as select. " )
511526 }
0 commit comments