@@ -62,6 +62,15 @@ public void before() {
6262 env .setParallelism (1 );
6363 }
6464
65+ private void cancelJobIfRunning (TableResult result )
66+ throws InterruptedException , ExecutionException {
67+ try {
68+ result .getJobClient ().get ().cancel ().get ();
69+ } catch (IllegalStateException ignored ) {
70+ // job isn't running, ignore it
71+ }
72+ }
73+
6574 @ Test
6675 public void testConsumingAllEvents ()
6776 throws SQLException , InterruptedException , ExecutionException {
@@ -163,7 +172,7 @@ public void testConsumingAllEvents()
163172 List <String > actual = TestValuesTableFactory .getResults ("sink" );
164173 assertThat (actual , containsInAnyOrder (expected ));
165174
166- result . getJobClient (). get (). cancel (). get ( );
175+ cancelJobIfRunning ( result );
167176 }
168177
169178 @ Test
@@ -254,7 +263,7 @@ public void testAllTypes() throws Exception {
254263 List <String > actual = TestValuesTableFactory .getRawResults ("sink" );
255264 assertEquals (expected , actual );
256265
257- result . getJobClient (). get (). cancel (). get ( );
266+ cancelJobIfRunning ( result );
258267 }
259268
260269 @ Test
@@ -322,7 +331,7 @@ public void testStartupFromLatestOffset() throws Exception {
322331 List <String > actual = TestValuesTableFactory .getResults ("sink" );
323332 assertThat (actual , containsInAnyOrder (expected ));
324333
325- result . getJobClient (). get (). cancel (). get ( );
334+ cancelJobIfRunning ( result );
326335 }
327336
328337 @ Test
@@ -419,7 +428,7 @@ public void testMetadataColumns() throws Throwable {
419428 Collections .sort (expected );
420429 Collections .sort (actual );
421430 assertEquals (expected , actual );
422- result . getJobClient (). get (). cancel (). get ( );
431+ cancelJobIfRunning ( result );
423432 }
424433
425434 private static void waitForSnapshotStarted (String sinkName ) throws InterruptedException {
0 commit comments