Skip to content

Commit cee1c8c

Browse files
committed
resolve the comments
1 parent a22b804 commit cee1c8c

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ private[spark] class MapOutputTrackerMaster(
758758
endMapIndex: Int,
759759
startPartition: Int,
760760
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
761-
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex - $endMapIndex" +
761+
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" +
762762
s"partitions $startPartition-$endPartition")
763763
shuffleStatuses.get(shuffleId) match {
764764
case Some(shuffleStatus) =>
@@ -822,6 +822,8 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
822822
endMapIndex: Int,
823823
startPartition: Int,
824824
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
825+
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" +
826+
s"partitions $startPartition-$endPartition")
825827
val statuses = getStatuses(shuffleId, conf)
826828
try {
827829
MapOutputTracker.convertMapStatuses(

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
7474
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length)
7575
val avgPartitionSize = mapPartitionSizes.sum / maxSplits
7676
val advisoryPartitionSize = math.max(avgPartitionSize,
77-
conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE))
77+
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
7878
val partitionIndices = mapPartitionSizes.indices
7979
val partitionStartIndices = ArrayBuffer[Int]()
8080
var postMapPartitionSize = mapPartitionSizes(0)
@@ -168,11 +168,13 @@ case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
168168
// obtaining the raw data size of per partition,
169169
val leftSkewedReader = SkewedShufflePartitionReader(
170170
left, partitionId, leftMapIdStartIndices(i), leftEndMapId)
171+
val leftSort = smj.left.asInstanceOf[SortExec].copy(child = leftSkewedReader)
171172

172173
val rightSkewedReader = SkewedShufflePartitionReader(right, partitionId,
173174
rightMapIdStartIndices(j), rightEndMapId)
174-
subJoins += SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,
175-
leftSkewedReader, rightSkewedReader)
175+
val rightSort = smj.right.asInstanceOf[SortExec].copy(child = rightSkewedReader)
176+
subJoins += SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,
177+
leftSort, rightSort)
176178
}
177179
}
178180
}
@@ -240,6 +242,10 @@ case class SkewedShufflePartitionReader(
240242
}
241243
private var cachedSkewedShuffleRDD: SkewedShuffledRowRDD = null
242244

245+
override def nodeName: String = s"SkewedShuffleReader SkewedShuffleQueryStage: ${child}" +
246+
s" SkewedPartition: ${partitionIndex} startMapIndex: ${startMapIndex}" +
247+
s" endMapIndex: ${endMapIndex}"
248+
243249
override def doExecute(): RDD[InternalRow] = {
244250
if (cachedSkewedShuffleRDD == null) {
245251
cachedSkewedShuffleRDD = child match {

0 commit comments

Comments
 (0)