Skip to content

Commit c02cd21

Browse files
committed
address test
1 parent fc472bd commit c02cd21

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

R/pkg/inst/tests/testthat/test_streaming.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ test_that("read.stream, write.stream, awaitTermination, stopQuery", {
5555
q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete")
5656

5757
expect_false(awaitTermination(q, 5 * 1000))
58+
callJMethod(q@ssq, "processAllAvailable")
5859
expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
5960

6061
writeLines(mockLinesNa, jsonPathNa)
6162
awaitTermination(q, 5 * 1000)
63+
callJMethod(q@ssq, "processAllAvailable")
6264
expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
6365

6466
stopQuery(q)
@@ -75,6 +77,7 @@ test_that("print from explain, lastProgress, status, isActive", {
7577
q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete")
7678

7779
awaitTermination(q, 5 * 1000)
80+
callJMethod(q@ssq, "processAllAvailable")
7881

7982
expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
8083
expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q)))))
@@ -99,6 +102,7 @@ test_that("Stream other format", {
99102
q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete")
100103

101104
expect_false(awaitTermination(q, 5 * 1000))
105+
callJMethod(q@ssq, "processAllAvailable")
102106
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
103107

104108
expect_equal(queryName(q), "people3")

0 commit comments

Comments
 (0)