1818
1919import static com.google.cloud.spanner.SpannerMatchers.isSpannerException;
2020import static com.google.common.truth.Truth.assertThat;
21+ import static org.mockito.Mockito.mock;
22+ import static org.mockito.Mockito.verify;
2123
2224import com.google.common.collect.AbstractIterator;
2325import com.google.common.collect.Lists;
2426import com.google.protobuf.ByteString;
2527import com.google.protobuf.Value;
2628import com.google.spanner.v1.PartialResultSet;
29+ import io.opencensus.trace.Span;
2730import java.util.ArrayList;
2831import java.util.Iterator;
2932import java.util.LinkedList;
3639import org.junit.runner.RunWith;
3740import org.junit.runners.JUnit4;
3841import org.mockito.Mockito;
42+ import org.mockito.internal.util.reflection.Whitebox;
3943
4044/** Unit tests for {@link SpannerImpl.ResumableStreamIterator}. */
4145@RunWith(JUnit4.class)
@@ -90,15 +94,15 @@ public void close(@Nullable String message) {
9094 @Rule public ExpectedException expectedException = ExpectedException.none();
9195
9296 Starter starter = Mockito.mock(Starter.class);
93- AbstractResultSet.ResumableStreamIterator iterator ;
97+ AbstractResultSet.ResumableStreamIterator resumableStreamIterator ;
9498
9599 @Before
96100 public void setUp() {
97101 initWithLimit(Integer.MAX_VALUE);
98102 }
99103
100104 private void initWithLimit(int maxBufferSize) {
101- iterator =
105+ resumableStreamIterator =
102106 new AbstractResultSet.ResumableStreamIterator(maxBufferSize, "", null) {
103107 @Override
104108 AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
@@ -116,18 +120,24 @@ public void simple() {
116120 .thenReturn(resultSet(null, "a"))
117121 .thenReturn(resultSet(null, "b"))
118122 .thenReturn(null);
119- assertThat(consume(iterator )).containsExactly("a", "b").inOrder();
123+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b").inOrder();
120124 }
121125
122126 @Test
123- public void simpleWithRestartTokens() {
127+ public void closedSpan() {
128+ Span span = mock(Span.class);
129+ Whitebox.setInternalState(this.resumableStreamIterator, "span", span);
130+
124131 ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
125132 Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
126133 Mockito.when(s1.next())
127134 .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
128135 .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
129136 .thenReturn(null);
130- assertThat(consume(iterator)).containsExactly("a", "b").inOrder();
137+ assertThat(consume(resumableStreamIterator)).containsExactly("a", "b").inOrder();
138+
139+ resumableStreamIterator.close("closed");
140+ verify(span).end();
131141 }
132142
133143 @Test
@@ -146,7 +156,7 @@ public void restart() {
146156 .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
147157 .thenReturn(resultSet(ByteString.copyFromUtf8("r4"), "d"))
148158 .thenReturn(null);
149- assertThat(consume(iterator )).containsExactly("a", "b", "c", "d").inOrder();
159+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b", "c", "d").inOrder();
150160 }
151161
152162 @Test
@@ -167,7 +177,7 @@ public void restartWithHoldBack() {
167177 .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
168178 .thenReturn(resultSet(ByteString.copyFromUtf8("r4"), "d"))
169179 .thenReturn(null);
170- assertThat(consume(iterator )).containsExactly("a", "b", "c", "d").inOrder();
180+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b", "c", "d").inOrder();
171181 }
172182
173183 @Test
@@ -188,7 +198,9 @@ public void restartWithHoldBackMidStream() {
188198 .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "e"))
189199 .thenReturn(resultSet(null, "f"))
190200 .thenReturn(null);
191- assertThat(consume(iterator)).containsExactly("a", "b", "c", "d", "e", "f").inOrder();
201+ assertThat(consume(resumableStreamIterator))
202+ .containsExactly("a", "b", "c", "d", "e", "f")
203+ .inOrder();
192204 }
193205
194206 @Test
@@ -201,7 +213,7 @@ public void nonRetryableError() {
201213 .thenReturn(resultSet(null, "X"))
202214 .thenReturn(resultSet(null, "X"))
203215 .thenThrow(new NonRetryableException(ErrorCode.FAILED_PRECONDITION, "failed by test"));
204- Iterator<String> strings = stringIterator(iterator );
216+ Iterator<String> strings = stringIterator(resumableStreamIterator );
205217 assertThat(strings.next()).isEqualTo("a");
206218 assertThat(strings.next()).isEqualTo("b");
207219 expectedException.expect(isSpannerException(ErrorCode.FAILED_PRECONDITION));
@@ -218,7 +230,7 @@ public void bufferLimitSimple() {
218230 .thenReturn(resultSet(null, "a"))
219231 .thenReturn(resultSet(null, "b"))
220232 .thenReturn(null);
221- assertThat(consume(iterator )).containsExactly("a", "b").inOrder();
233+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b").inOrder();
222234 }
223235
224236 @Test
@@ -231,7 +243,7 @@ public void bufferLimitSimpleWithRestartTokens() {
231243 .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
232244 .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
233245 .thenReturn(null);
234- assertThat(consume(iterator )).containsExactly("a", "b").inOrder();
246+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b").inOrder();
235247 }
236248
237249 @Test
@@ -252,7 +264,7 @@ public void bufferLimitRestart() {
252264 .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
253265 .thenReturn(resultSet(ByteString.copyFromUtf8("r4"), "d"))
254266 .thenReturn(null);
255- assertThat(consume(iterator )).containsExactly("a", "b", "c", "d").inOrder();
267+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b", "c", "d").inOrder();
256268 }
257269
258270 @Test
@@ -271,7 +283,7 @@ public void bufferLimitRestartWithinLimitAtStartOfResults() {
271283 .thenReturn(resultSet(null, "a"))
272284 .thenReturn(resultSet(null, "b"))
273285 .thenReturn(null);
274- assertThat(consume(iterator )).containsExactly("a", "b").inOrder();
286+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b").inOrder();
275287 }
276288
277289 @Test
@@ -292,7 +304,7 @@ public void bufferLimitRestartWithinLimitMidResults() {
292304 .thenReturn(resultSet(null, "b"))
293305 .thenReturn(resultSet(null, "c"))
294306 .thenReturn(null);
295- assertThat(consume(iterator )).containsExactly("a", "b", "c").inOrder();
307+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b", "c").inOrder();
296308 }
297309
298310 @Test
@@ -307,9 +319,9 @@ public void bufferLimitMissingTokensUnsafeToRetry() {
307319 .thenReturn(resultSet(null, "c"))
308320 .thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
309321
310- assertThat(consumeAtMost(3, iterator )).containsExactly("a", "b", "c").inOrder();
322+ assertThat(consumeAtMost(3, resumableStreamIterator )).containsExactly("a", "b", "c").inOrder();
311323 expectedException.expect(isSpannerException(ErrorCode.UNAVAILABLE));
312- iterator .next();
324+ resumableStreamIterator .next();
313325 }
314326
315327 @Test
@@ -329,7 +341,7 @@ public void bufferLimitMissingTokensSafeToRetry() {
329341 .thenReturn(new ResultSetIterator(s2));
330342 Mockito.when(s2.next()).thenReturn(resultSet(null, "d")).thenReturn(null);
331343
332- assertThat(consume(iterator )).containsExactly("a", "b", "c", "d").inOrder();
344+ assertThat(consume(resumableStreamIterator )).containsExactly("a", "b", "c", "d").inOrder();
333345 }
334346
335347 static PartialResultSet resultSet(@Nullable ByteString resumeToken, String... data) {
0 commit comments