Skip to content

Commit a47af49

Browse files
authored
fix flaky test cases with a bit more robust waiting logic (#8154)
1 parent e4e5c6d commit a47af49

File tree

1 file changed

+50
-30
lines changed

1 file changed

+50
-30
lines changed

pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
package org.apache.pinot.controller.helix.core.minion;
2020

2121
import com.google.common.collect.ImmutableMap;
22-
import com.google.common.collect.Lists;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.Set;
25+
import java.util.function.Predicate;
2626
import org.apache.pinot.controller.ControllerConf;
2727
import org.apache.pinot.controller.helix.ControllerTest;
2828
import org.apache.pinot.core.common.MinionConstants;
@@ -33,10 +33,12 @@
3333
import org.apache.pinot.spi.data.Schema;
3434
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
3535
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
36+
import org.apache.pinot.util.TestUtils;
3637
import org.quartz.CronTrigger;
3738
import org.quartz.JobDetail;
3839
import org.quartz.JobKey;
3940
import org.quartz.Scheduler;
41+
import org.quartz.SchedulerException;
4042
import org.quartz.Trigger;
4143
import org.quartz.impl.matchers.GroupMatcher;
4244
import org.testng.annotations.AfterClass;
@@ -49,6 +51,7 @@
4951
public class PinotTaskManagerStatelessTest extends ControllerTest {
5052
private static final String RAW_TABLE_NAME = "myTable";
5153
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
54+
private static final long TIMEOUT_IN_MS = 10_000L;
5255

5356
@BeforeClass
5457
public void setUp()
@@ -87,46 +90,44 @@ public void testPinotTaskManagerSchedulerWithUpdate()
8790
new TableTaskConfig(
8891
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
8992
addTableConfig(tableConfig);
90-
Thread.sleep(2000);
91-
List<String> jobGroupNames = scheduler.getJobGroupNames();
92-
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
93+
waitForJobGroupNames(_controllerStarter.getTaskManager(),
94+
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
95+
"JobGroupNames should have SegmentGenerationAndPushTask only");
9396
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");
9497

9598
// 2. Update table to new schedule
9699
tableConfig.setTaskConfig(new TableTaskConfig(
97100
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
98101
updateTableConfig(tableConfig);
99-
Thread.sleep(2000);
100-
jobGroupNames = scheduler.getJobGroupNames();
101-
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
102+
waitForJobGroupNames(_controllerStarter.getTaskManager(),
103+
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
104+
"JobGroupNames should have SegmentGenerationAndPushTask only");
102105
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */20 * ? * * *");
103106

104107
// 3. Update table to new task and schedule
105108
tableConfig.setTaskConfig(new TableTaskConfig(
106109
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */30 * ? * * *"),
107110
"MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
108111
updateTableConfig(tableConfig);
109-
Thread.sleep(2000);
110-
jobGroupNames = scheduler.getJobGroupNames();
111-
assertEquals(jobGroupNames.size(), 2);
112-
assertTrue(jobGroupNames.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
113-
assertTrue(jobGroupNames.contains(MinionConstants.MergeRollupTask.TASK_TYPE));
112+
waitForJobGroupNames(_controllerStarter.getTaskManager(),
113+
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn
114+
.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
115+
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");
114116
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */30 * ? * * *");
115117
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");
116118

117119
// 4. Remove one task from the table
118120
tableConfig.setTaskConfig(
119121
new TableTaskConfig(ImmutableMap.of("MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
120122
updateTableConfig(tableConfig);
121-
Thread.sleep(2000);
122-
jobGroupNames = scheduler.getJobGroupNames();
123-
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.MergeRollupTask.TASK_TYPE));
123+
waitForJobGroupNames(_controllerStarter.getTaskManager(),
124+
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
125+
"JobGroupNames should have MergeRollupTask only");
124126
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");
125127

126128
// 4. Drop table
127129
dropOfflineTable(RAW_TABLE_NAME);
128-
jobGroupNames = scheduler.getJobGroupNames();
129-
assertTrue(jobGroupNames.isEmpty());
130+
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
130131

131132
stopFakeInstances();
132133
stopController();
@@ -154,9 +155,9 @@ public void testPinotTaskManagerSchedulerWithRestart()
154155
new TableTaskConfig(
155156
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
156157
addTableConfig(tableConfig);
157-
Thread.sleep(2000);
158-
List<String> jobGroupNames = scheduler.getJobGroupNames();
159-
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
158+
waitForJobGroupNames(_controllerStarter.getTaskManager(),
159+
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
160+
"JobGroupNames should have SegmentGenerationAndPushTask only");
160161
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");
161162

162163
// Restart controller
@@ -168,7 +169,6 @@ public void testPinotTaskManagerSchedulerWithRestart()
168169
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"),
169170
"MergeRollupTask", ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
170171
updateTableConfig(tableConfig);
171-
Thread.sleep(2000);
172172

173173
// Task is put into table config.
174174
TableConfig tableConfigAfterRestart =
@@ -179,21 +179,31 @@ public void testPinotTaskManagerSchedulerWithRestart()
179179

180180
// The new MergeRollup task wouldn't be scheduled if not eagerly checking table configs
181181
// after setting up subscriber on ChildChanges zk event when controller gets restarted.
182-
taskManager = _controllerStarter.getTaskManager();
183-
scheduler = taskManager.getScheduler();
184-
jobGroupNames = scheduler.getJobGroupNames();
185-
assertEquals(jobGroupNames.size(), 2);
186-
assertTrue(jobGroupNames.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
187-
assertTrue(jobGroupNames.contains(MinionConstants.MergeRollupTask.TASK_TYPE));
182+
waitForJobGroupNames(_controllerStarter.getTaskManager(),
183+
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn
184+
.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
185+
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");
188186

189187
dropOfflineTable(RAW_TABLE_NAME);
190-
jobGroupNames = scheduler.getJobGroupNames();
191-
assertTrue(jobGroupNames.isEmpty());
188+
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
192189

193190
stopFakeInstances();
194191
stopController();
195192
}
196193

194+
private void waitForJobGroupNames(PinotTaskManager taskManager, Predicate<List<String>> predicate,
195+
String errorMessage) {
196+
TestUtils.waitForCondition(aVoid -> {
197+
try {
198+
Scheduler scheduler = taskManager.getScheduler();
199+
List<String> jobGroupNames = scheduler.getJobGroupNames();
200+
return predicate.test(jobGroupNames);
201+
} catch (SchedulerException e) {
202+
throw new RuntimeException(e);
203+
}
204+
}, TIMEOUT_IN_MS, errorMessage);
205+
}
206+
197207
private void validateJob(String taskType, String cronExpression)
198208
throws Exception {
199209
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
@@ -208,8 +218,18 @@ private void validateJob(String taskType, String cronExpression)
208218
assertEquals(jobDetail.getKey().getGroup(), taskType);
209219
assertSame(jobDetail.getJobDataMap().get("PinotTaskManager"), taskManager);
210220
assertSame(jobDetail.getJobDataMap().get("LeadControllerManager"), _controllerStarter.getLeadControllerManager());
221+
// jobDetail and jobTrigger are not added atomically by the scheduler,
222+
// the jobDetail is added to an internal map firstly, and jobTrigger
223+
// is added to another internal map afterwards, so we check for the existence
224+
// of jobTrigger with some waits to be more defensive.
225+
TestUtils.waitForCondition(aVoid -> {
226+
try {
227+
return scheduler.getTriggersOfJob(jobKey).size() == 1;
228+
} catch (SchedulerException e) {
229+
throw new RuntimeException(e);
230+
}
231+
}, TIMEOUT_IN_MS, "JobDetail exiting but missing JobTrigger");
211232
List<? extends Trigger> triggersOfJob = scheduler.getTriggersOfJob(jobKey);
212-
assertEquals(triggersOfJob.size(), 1);
213233
Trigger trigger = triggersOfJob.iterator().next();
214234
assertTrue(trigger instanceof CronTrigger);
215235
assertEquals(((CronTrigger) trigger).getCronExpression(), cronExpression);

0 commit comments

Comments
 (0)