@@ -74,7 +74,7 @@ case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
7474 SQLConf .ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS ), mapPartitionSizes.length)
7575 val avgPartitionSize = mapPartitionSizes.sum / maxSplits
7676 val advisoryPartitionSize = math.max(avgPartitionSize,
77- conf.getConf(SQLConf .SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE ))
77+ conf.getConf(SQLConf .ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD ))
7878 val partitionIndices = mapPartitionSizes.indices
7979 val partitionStartIndices = ArrayBuffer [Int ]()
8080 var postMapPartitionSize = mapPartitionSizes(0 )
@@ -168,11 +168,13 @@ case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
168168 // obtaining the raw data size of per partition,
169169 val leftSkewedReader = SkewedShufflePartitionReader (
170170 left, partitionId, leftMapIdStartIndices(i), leftEndMapId)
171+ val leftSort = smj.left.asInstanceOf [SortExec ].copy(child = leftSkewedReader)
171172
172173 val rightSkewedReader = SkewedShufflePartitionReader (right, partitionId,
173174 rightMapIdStartIndices(j), rightEndMapId)
174- subJoins += SortMergeJoinExec (leftKeys, rightKeys, joinType, condition,
175- leftSkewedReader, rightSkewedReader)
175+ val rightSort = smj.right.asInstanceOf [SortExec ].copy(child = rightSkewedReader)
176+ subJoins += SortMergeJoinExec (leftKeys, rightKeys, joinType, condition,
177+ leftSort, rightSort)
176178 }
177179 }
178180 }
@@ -240,6 +242,10 @@ case class SkewedShufflePartitionReader(
240242 }
241243 private var cachedSkewedShuffleRDD : SkewedShuffledRowRDD = null
242244
245+ override def nodeName : String = s " SkewedShuffleReader SkewedShuffleQueryStage: ${child}" +
246+ s " SkewedPartition: ${partitionIndex} startMapIndex: ${startMapIndex}" +
247+ s " endMapIndex: ${endMapIndex}"
248+
243249 override def doExecute (): RDD [InternalRow ] = {
244250 if (cachedSkewedShuffleRDD == null ) {
245251 cachedSkewedShuffleRDD = child match {
0 commit comments