1919package org .apache .pinot .controller .helix .core .minion ;
2020
2121import com .google .common .collect .ImmutableMap ;
22- import com .google .common .collect .Lists ;
2322import java .util .List ;
2423import java .util .Map ;
2524import java .util .Set ;
25+ import java .util .function .Predicate ;
2626import org .apache .pinot .controller .ControllerConf ;
2727import org .apache .pinot .controller .helix .ControllerTest ;
2828import org .apache .pinot .core .common .MinionConstants ;
3333import org .apache .pinot .spi .data .Schema ;
3434import org .apache .pinot .spi .utils .builder .TableConfigBuilder ;
3535import org .apache .pinot .spi .utils .builder .TableNameBuilder ;
36+ import org .apache .pinot .util .TestUtils ;
3637import org .quartz .CronTrigger ;
3738import org .quartz .JobDetail ;
3839import org .quartz .JobKey ;
3940import org .quartz .Scheduler ;
41+ import org .quartz .SchedulerException ;
4042import org .quartz .Trigger ;
4143import org .quartz .impl .matchers .GroupMatcher ;
4244import org .testng .annotations .AfterClass ;
4951public class PinotTaskManagerStatelessTest extends ControllerTest {
5052 private static final String RAW_TABLE_NAME = "myTable" ;
5153 private static final String OFFLINE_TABLE_NAME = TableNameBuilder .OFFLINE .tableNameWithType (RAW_TABLE_NAME );
54+ private static final long TIMEOUT_IN_MS = 10_000L ;
5255
5356 @ BeforeClass
5457 public void setUp ()
@@ -87,46 +90,44 @@ public void testPinotTaskManagerSchedulerWithUpdate()
8790 new TableTaskConfig (
8891 ImmutableMap .of ("SegmentGenerationAndPushTask" , ImmutableMap .of ("schedule" , "0 */10 * ? * * *" )))).build ();
8992 addTableConfig (tableConfig );
90- Thread . sleep ( 2000 );
91- List < String > jobGroupNames = scheduler . getJobGroupNames ();
92- assertEquals ( jobGroupNames , Lists . newArrayList ( MinionConstants . SegmentGenerationAndPushTask . TASK_TYPE ) );
93+ waitForJobGroupNames ( _controllerStarter . getTaskManager (),
94+ jgn -> jgn . size () == 1 && jgn . contains ( MinionConstants . SegmentGenerationAndPushTask . TASK_TYPE ),
95+ "JobGroupNames should have SegmentGenerationAndPushTask only" );
9396 validateJob (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE , "0 */10 * ? * * *" );
9497
9598 // 2. Update table to new schedule
9699 tableConfig .setTaskConfig (new TableTaskConfig (
97100 ImmutableMap .of ("SegmentGenerationAndPushTask" , ImmutableMap .of ("schedule" , "0 */20 * ? * * *" ))));
98101 updateTableConfig (tableConfig );
99- Thread . sleep ( 2000 );
100- jobGroupNames = scheduler . getJobGroupNames ();
101- assertEquals ( jobGroupNames , Lists . newArrayList ( MinionConstants . SegmentGenerationAndPushTask . TASK_TYPE ) );
102+ waitForJobGroupNames ( _controllerStarter . getTaskManager (),
103+ jgn -> jgn . size () == 1 && jgn . contains ( MinionConstants . SegmentGenerationAndPushTask . TASK_TYPE ),
104+ "JobGroupNames should have SegmentGenerationAndPushTask only" );
102105 validateJob (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE , "0 */20 * ? * * *" );
103106
104107 // 3. Update table to new task and schedule
105108 tableConfig .setTaskConfig (new TableTaskConfig (
106109 ImmutableMap .of ("SegmentGenerationAndPushTask" , ImmutableMap .of ("schedule" , "0 */30 * ? * * *" ),
107110 "MergeRollupTask" , ImmutableMap .of ("schedule" , "0 */10 * ? * * *" ))));
108111 updateTableConfig (tableConfig );
109- Thread .sleep (2000 );
110- jobGroupNames = scheduler .getJobGroupNames ();
111- assertEquals (jobGroupNames .size (), 2 );
112- assertTrue (jobGroupNames .contains (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE ));
113- assertTrue (jobGroupNames .contains (MinionConstants .MergeRollupTask .TASK_TYPE ));
112+ waitForJobGroupNames (_controllerStarter .getTaskManager (),
113+ jgn -> jgn .size () == 2 && jgn .contains (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE ) && jgn
114+ .contains (MinionConstants .MergeRollupTask .TASK_TYPE ),
115+ "JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask" );
114116 validateJob (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE , "0 */30 * ? * * *" );
115117 validateJob (MinionConstants .MergeRollupTask .TASK_TYPE , "0 */10 * ? * * *" );
116118
117119 // 4. Remove one task from the table
118120 tableConfig .setTaskConfig (
119121 new TableTaskConfig (ImmutableMap .of ("MergeRollupTask" , ImmutableMap .of ("schedule" , "0 */10 * ? * * *" ))));
120122 updateTableConfig (tableConfig );
121- Thread . sleep ( 2000 );
122- jobGroupNames = scheduler . getJobGroupNames ();
123- assertEquals ( jobGroupNames , Lists . newArrayList ( MinionConstants . MergeRollupTask . TASK_TYPE ) );
123+ waitForJobGroupNames ( _controllerStarter . getTaskManager (),
124+ jgn -> jgn . size () == 1 && jgn . contains ( MinionConstants . MergeRollupTask . TASK_TYPE ),
125+ "JobGroupNames should have MergeRollupTask only" );
124126 validateJob (MinionConstants .MergeRollupTask .TASK_TYPE , "0 */10 * ? * * *" );
125127
126128 // 4. Drop table
127129 dropOfflineTable (RAW_TABLE_NAME );
128- jobGroupNames = scheduler .getJobGroupNames ();
129- assertTrue (jobGroupNames .isEmpty ());
130+ waitForJobGroupNames (_controllerStarter .getTaskManager (), List ::isEmpty , "JobGroupNames should be empty" );
130131
131132 stopFakeInstances ();
132133 stopController ();
@@ -154,9 +155,9 @@ public void testPinotTaskManagerSchedulerWithRestart()
154155 new TableTaskConfig (
155156 ImmutableMap .of ("SegmentGenerationAndPushTask" , ImmutableMap .of ("schedule" , "0 */10 * ? * * *" )))).build ();
156157 addTableConfig (tableConfig );
157- Thread . sleep ( 2000 );
158- List < String > jobGroupNames = scheduler . getJobGroupNames ();
159- assertEquals ( jobGroupNames , Lists . newArrayList ( MinionConstants . SegmentGenerationAndPushTask . TASK_TYPE ) );
158+ waitForJobGroupNames ( _controllerStarter . getTaskManager (),
159+ jgn -> jgn . size () == 1 && jgn . contains ( MinionConstants . SegmentGenerationAndPushTask . TASK_TYPE ),
160+ "JobGroupNames should have SegmentGenerationAndPushTask only" );
160161 validateJob (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE , "0 */10 * ? * * *" );
161162
162163 // Restart controller
@@ -168,7 +169,6 @@ public void testPinotTaskManagerSchedulerWithRestart()
168169 ImmutableMap .of ("SegmentGenerationAndPushTask" , ImmutableMap .of ("schedule" , "0 */10 * ? * * *" ),
169170 "MergeRollupTask" , ImmutableMap .of ("schedule" , "0 */20 * ? * * *" ))));
170171 updateTableConfig (tableConfig );
171- Thread .sleep (2000 );
172172
173173 // Task is put into table config.
174174 TableConfig tableConfigAfterRestart =
@@ -179,21 +179,31 @@ public void testPinotTaskManagerSchedulerWithRestart()
179179
180180 // The new MergeRollup task wouldn't be scheduled if not eagerly checking table configs
181181 // after setting up subscriber on ChildChanges zk event when controller gets restarted.
182- taskManager = _controllerStarter .getTaskManager ();
183- scheduler = taskManager .getScheduler ();
184- jobGroupNames = scheduler .getJobGroupNames ();
185- assertEquals (jobGroupNames .size (), 2 );
186- assertTrue (jobGroupNames .contains (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE ));
187- assertTrue (jobGroupNames .contains (MinionConstants .MergeRollupTask .TASK_TYPE ));
182+ waitForJobGroupNames (_controllerStarter .getTaskManager (),
183+ jgn -> jgn .size () == 2 && jgn .contains (MinionConstants .SegmentGenerationAndPushTask .TASK_TYPE ) && jgn
184+ .contains (MinionConstants .MergeRollupTask .TASK_TYPE ),
185+ "JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask" );
188186
189187 dropOfflineTable (RAW_TABLE_NAME );
190- jobGroupNames = scheduler .getJobGroupNames ();
191- assertTrue (jobGroupNames .isEmpty ());
188+ waitForJobGroupNames (_controllerStarter .getTaskManager (), List ::isEmpty , "JobGroupNames should be empty" );
192189
193190 stopFakeInstances ();
194191 stopController ();
195192 }
196193
194+ private void waitForJobGroupNames (PinotTaskManager taskManager , Predicate <List <String >> predicate ,
195+ String errorMessage ) {
196+ TestUtils .waitForCondition (aVoid -> {
197+ try {
198+ Scheduler scheduler = taskManager .getScheduler ();
199+ List <String > jobGroupNames = scheduler .getJobGroupNames ();
200+ return predicate .test (jobGroupNames );
201+ } catch (SchedulerException e ) {
202+ throw new RuntimeException (e );
203+ }
204+ }, TIMEOUT_IN_MS , errorMessage );
205+ }
206+
197207 private void validateJob (String taskType , String cronExpression )
198208 throws Exception {
199209 PinotTaskManager taskManager = _controllerStarter .getTaskManager ();
@@ -208,8 +218,18 @@ private void validateJob(String taskType, String cronExpression)
208218 assertEquals (jobDetail .getKey ().getGroup (), taskType );
209219 assertSame (jobDetail .getJobDataMap ().get ("PinotTaskManager" ), taskManager );
210220 assertSame (jobDetail .getJobDataMap ().get ("LeadControllerManager" ), _controllerStarter .getLeadControllerManager ());
221+ // jobDetail and jobTrigger are not added atomically by the scheduler,
222+ // the jobDetail is added to an internal map firstly, and jobTrigger
223+ // is added to another internal map afterwards, so we check for the existence
224+ // of jobTrigger with some waits to be more defensive.
225+ TestUtils .waitForCondition (aVoid -> {
226+ try {
227+ return scheduler .getTriggersOfJob (jobKey ).size () == 1 ;
228+ } catch (SchedulerException e ) {
229+ throw new RuntimeException (e );
230+ }
231+ }, TIMEOUT_IN_MS , "JobDetail exiting but missing JobTrigger" );
211232 List <? extends Trigger > triggersOfJob = scheduler .getTriggersOfJob (jobKey );
212- assertEquals (triggersOfJob .size (), 1 );
213233 Trigger trigger = triggersOfJob .iterator ().next ();
214234 assertTrue (trigger instanceof CronTrigger );
215235 assertEquals (((CronTrigger ) trigger ).getCronExpression (), cronExpression );
0 commit comments