@@ -34,61 +34,82 @@ class CollapseRepartitionSuite extends PlanTest {
3434
3535
3636 test(" collapse two adjacent coalesces into one" ) {
37- val query = testRelation
37+ // Always respects the top coalesces amd removes useless coalesce below coalesce
38+ val query1 = testRelation
3839 .coalesce(10 )
3940 .coalesce(20 )
41+ val query2 = testRelation
42+ .coalesce(30 )
43+ .coalesce(20 )
44+
45+ val optimized1 = Optimize .execute(query1.analyze)
46+ val optimized2 = Optimize .execute(query2.analyze)
4047
41- val optimized = Optimize .execute(query.analyze)
4248 val correctAnswer = testRelation.coalesce(20 ).analyze
4349
44- comparePlans(optimized, correctAnswer)
50+ comparePlans(optimized1, correctAnswer)
51+ comparePlans(optimized2, correctAnswer)
4552 }
4653
4754 test(" collapse two adjacent repartitions into one" ) {
48- val query = testRelation
55+ // Always respects the top repartition amd removes useless repartition below repartition
56+ val query1 = testRelation
4957 .repartition(10 )
5058 .repartition(20 )
59+ val query2 = testRelation
60+ .repartition(30 )
61+ .repartition(20 )
5162
52- val optimized = Optimize .execute(query.analyze)
63+ val optimized1 = Optimize .execute(query1.analyze)
64+ val optimized2 = Optimize .execute(query2.analyze)
5365 val correctAnswer = testRelation.repartition(20 ).analyze
5466
55- comparePlans(optimized, correctAnswer)
67+ comparePlans(optimized1, correctAnswer)
68+ comparePlans(optimized2, correctAnswer)
5669 }
5770
58- test(" collapse one coalesce and one repartition into one " ) {
59- // Remove useless coalesce below repartition
71+ test(" coalesce above repartition" ) {
72+ // Remove useless coalesce above repartition
6073 val query1 = testRelation
74+ .repartition(10 )
6175 .coalesce(20 )
62- .repartition(5 )
6376
6477 val optimized1 = Optimize .execute(query1.analyze)
65- val correctAnswer1 = testRelation.repartition(5 ).analyze
78+ val correctAnswer1 = testRelation.repartition(10 ).analyze
6679
6780 comparePlans(optimized1, correctAnswer1)
6881
69- // Remove useless coalesce above repartition when its numPartitions is larger than or equal to
70- // the child's numPartitions
82+ // No change in this case
7183 val query2 = testRelation
72- .repartition(5 )
84+ .repartition(30 )
7385 .coalesce(20 )
7486
7587 val optimized2 = Optimize .execute(query2.analyze)
76- val correctAnswer2 = testRelation.repartition( 5 ) .analyze
88+ val correctAnswer2 = query2 .analyze
7789
7890 comparePlans(optimized2, correctAnswer2)
91+ }
92+
93+ test(" repartition above coalesce" ) {
94+ // Always respects the top repartition amd removes useless coalesce below repartition
95+ val query1 = testRelation
96+ .coalesce(10 )
97+ .repartition(20 )
98+ // Remove useless coalesce above repartition
99+ val query2 = testRelation
100+ .coalesce(30 )
101+ .repartition(20 )
79102
80- // Keep coalesce above repartition unchanged when its numPartitions is smaller than the child
81- val query3 = testRelation
82- .repartition(5 )
83- .coalesce(3 )
103+ val optimized1 = Optimize .execute(query1.analyze)
104+ val optimized2 = Optimize .execute(query2.analyze)
84105
85- val optimized3 = Optimize .execute(query3.analyze)
86- val correctAnswer3 = testRelation.repartition(5 ).coalesce(3 ).analyze
106+ val correctAnswer = testRelation.repartition(20 ).analyze
87107
88- comparePlans(optimized3, correctAnswer3)
108+ comparePlans(optimized1, correctAnswer)
109+ comparePlans(optimized2, correctAnswer)
89110 }
90111
91- test(" collapse repartition and repartitionBy into one " ) {
112+ test(" repartitionBy above repartition " ) {
92113 val query1 = testRelation
93114 .repartition(10 )
94115 .distribute(' a )(20 )
@@ -99,7 +120,7 @@ class CollapseRepartitionSuite extends PlanTest {
99120 comparePlans(optimized1, correctAnswer1)
100121
101122 val query2 = testRelation
102- .coalesce( 10 )
123+ .repartition( 30 )
103124 .distribute(' a )(20 )
104125
105126 val optimized2 = Optimize .execute(query2.analyze)
@@ -108,7 +129,27 @@ class CollapseRepartitionSuite extends PlanTest {
108129 comparePlans(optimized2, correctAnswer2)
109130 }
110131
111- test(" collapse repartitionBy and repartition into one" ) {
132+ test(" repartitionBy above coalesce" ) {
133+ val query1 = testRelation
134+ .coalesce(10 )
135+ .distribute(' a )(20 )
136+
137+ val optimized1 = Optimize .execute(query1.analyze)
138+ val correctAnswer1 = testRelation.distribute(' a )(20 ).analyze
139+
140+ comparePlans(optimized1, correctAnswer1)
141+
142+ val query2 = testRelation
143+ .coalesce(20 )
144+ .distribute(' a )(30 )
145+
146+ val optimized2 = Optimize .execute(query2.analyze)
147+ val correctAnswer2 = testRelation.distribute(' a )(30 ).analyze
148+
149+ comparePlans(optimized2, correctAnswer2)
150+ }
151+
152+ test(" repartition above repartitionBy" ) {
112153 val query1 = testRelation
113154 .distribute(' a )(20 )
114155 .repartition(10 )
@@ -123,30 +164,48 @@ class CollapseRepartitionSuite extends PlanTest {
123164 .repartition(30 )
124165
125166 val optimized2 = Optimize .execute(query2.analyze)
126- val correctAnswer2 = testRelation.distribute(' a )(20 ).analyze
167+ val correctAnswer2 = testRelation.distribute(' a )(30 ).analyze
127168
128169 comparePlans(optimized2, correctAnswer2)
129170 }
130171
131172 test(" coalesce above repartitionBy" ) {
132- val query = testRelation
173+ val query1 = testRelation
133174 .distribute(' a )(20 )
134175 .coalesce(10 )
135176
136- val optimized = Optimize .execute(query.analyze)
137- val correctAnswer = testRelation.distribute(' a )(20 ).coalesce(10 ).analyze
177+ val optimized1 = Optimize .execute(query1.analyze)
178+ val correctAnswer1 = testRelation.distribute(' a )(20 ).coalesce(10 ).analyze
179+
180+ comparePlans(optimized1, correctAnswer1)
181+
182+ val query2 = testRelation
183+ .distribute(' a )(20 )
184+ .coalesce(30 )
138185
139- comparePlans(optimized, correctAnswer)
186+ val optimized2 = Optimize .execute(query2.analyze)
187+ val correctAnswer2 = testRelation.distribute(' a )(20 ).analyze
188+
189+ comparePlans(optimized2, correctAnswer2)
140190 }
141191
142192 test(" collapse two adjacent repartitionBys into one" ) {
143- val query = testRelation
193+ val query1 = testRelation
144194 .distribute(' b )(10 )
145195 .distribute(' a )(20 )
146196
147- val optimized = Optimize .execute(query.analyze)
148- val correctAnswer = testRelation.distribute(' a )(20 ).analyze
197+ val optimized1 = Optimize .execute(query1.analyze)
198+ val correctAnswer1 = testRelation.distribute(' a )(20 ).analyze
199+
200+ comparePlans(optimized1, correctAnswer1)
201+
202+ val query2 = testRelation
203+ .distribute(' b )(30 )
204+ .distribute(' a )(20 )
205+
206+ val optimized2 = Optimize .execute(query2.analyze)
207+ val correctAnswer2 = testRelation.distribute(' a )(20 ).analyze
149208
150- comparePlans(optimized, correctAnswer )
209+ comparePlans(optimized2, correctAnswer2 )
151210 }
152211}
0 commit comments