@@ -55,10 +55,12 @@ test_that("read.stream, write.stream, awaitTermination, stopQuery", {
5555 q <- write.stream(counts , " memory" , queryName = " people" , outputMode = " complete" )
5656
5757 expect_false(awaitTermination(q , 5 * 1000 ))
58+ callJMethod(q @ ssq , " processAllAvailable" )
5859 expect_equal(head(sql(" SELECT count(*) FROM people" ))[[1 ]], 3 )
5960
6061 writeLines(mockLinesNa , jsonPathNa )
6162 awaitTermination(q , 5 * 1000 )
63+ callJMethod(q @ ssq , " processAllAvailable" )
6264 expect_equal(head(sql(" SELECT count(*) FROM people" ))[[1 ]], 6 )
6365
6466 stopQuery(q )
@@ -75,6 +77,7 @@ test_that("print from explain, lastProgress, status, isActive", {
7577 q <- write.stream(counts , " memory" , queryName = " people2" , outputMode = " complete" )
7678
7779 awaitTermination(q , 5 * 1000 )
80+ callJMethod(q @ ssq , " processAllAvailable" )
7881
7982 expect_equal(capture.output(explain(q ))[[1 ]], " == Physical Plan ==" )
8083 expect_true(any(grepl(" \" description\" : \" MemorySink\" " , capture.output(lastProgress(q )))))
@@ -99,6 +102,7 @@ test_that("Stream other format", {
99102 q <- write.stream(counts , " memory" , queryName = " people3" , outputMode = " complete" )
100103
101104 expect_false(awaitTermination(q , 5 * 1000 ))
105+ callJMethod(q @ ssq , " processAllAvailable" )
102106 expect_equal(head(sql(" SELECT count(*) FROM people3" ))[[1 ]], 3 )
103107
104108 expect_equal(queryName(q ), " people3" )
0 commit comments