1818
1919import static com .google .cloud .spanner .SpannerMatchers .isSpannerException ;
2020import static com .google .common .truth .Truth .assertThat ;
21+ import static org .mockito .Mockito .mock ;
22+ import static org .mockito .Mockito .verify ;
2123
2224import com .google .common .collect .AbstractIterator ;
2325import com .google .common .collect .Lists ;
2426import com .google .protobuf .ByteString ;
2527import com .google .protobuf .Value ;
2628import com .google .spanner .v1 .PartialResultSet ;
29+ import io .opencensus .trace .Span ;
2730import java .util .ArrayList ;
2831import java .util .Iterator ;
2932import java .util .LinkedList ;
3639import org .junit .runner .RunWith ;
3740import org .junit .runners .JUnit4 ;
3841import org .mockito .Mockito ;
42+ import org .mockito .internal .util .reflection .Whitebox ;
3943
4044/** Unit tests for {@link SpannerImpl.ResumableStreamIterator}. */
4145@ RunWith (JUnit4 .class )
@@ -90,15 +94,15 @@ public void close(@Nullable String message) {
9094 @ Rule public ExpectedException expectedException = ExpectedException .none ();
9195
9296 Starter starter = Mockito .mock (Starter .class );
93- AbstractResultSet .ResumableStreamIterator iterator ;
97+ AbstractResultSet .ResumableStreamIterator resumableStreamIterator ;
9498
9599 @ Before
96100 public void setUp () {
97101 initWithLimit (Integer .MAX_VALUE );
98102 }
99103
100104 private void initWithLimit (int maxBufferSize ) {
101- iterator =
105+ resumableStreamIterator =
102106 new AbstractResultSet .ResumableStreamIterator (maxBufferSize , "" , null ) {
103107 @ Override
104108 AbstractResultSet .CloseableIterator <PartialResultSet > startStream (
@@ -116,18 +120,24 @@ public void simple() {
116120 .thenReturn (resultSet (null , "a" ))
117121 .thenReturn (resultSet (null , "b" ))
118122 .thenReturn (null );
119- assertThat (consume (iterator )).containsExactly ("a" , "b" ).inOrder ();
123+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" ).inOrder ();
120124 }
121125
122126 @ Test
123- public void simpleWithRestartTokens () {
127+ public void closedSpan () {
128+ Span span = mock (Span .class );
129+ Whitebox .setInternalState (this .resumableStreamIterator , "span" , span );
130+
124131 ResultSetStream s1 = Mockito .mock (ResultSetStream .class );
125132 Mockito .when (starter .startStream (null )).thenReturn (new ResultSetIterator (s1 ));
126133 Mockito .when (s1 .next ())
127134 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r1" ), "a" ))
128135 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r2" ), "b" ))
129136 .thenReturn (null );
130- assertThat (consume (iterator )).containsExactly ("a" , "b" ).inOrder ();
137+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" ).inOrder ();
138+
139+ resumableStreamIterator .close ("closed" );
140+ verify (span ).end ();
131141 }
132142
133143 @ Test
@@ -146,7 +156,7 @@ public void restart() {
146156 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r3" ), "c" ))
147157 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r4" ), "d" ))
148158 .thenReturn (null );
149- assertThat (consume (iterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
159+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
150160 }
151161
152162 @ Test
@@ -167,7 +177,7 @@ public void restartWithHoldBack() {
167177 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r3" ), "c" ))
168178 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r4" ), "d" ))
169179 .thenReturn (null );
170- assertThat (consume (iterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
180+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
171181 }
172182
173183 @ Test
@@ -188,7 +198,9 @@ public void restartWithHoldBackMidStream() {
188198 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r3" ), "e" ))
189199 .thenReturn (resultSet (null , "f" ))
190200 .thenReturn (null );
191- assertThat (consume (iterator )).containsExactly ("a" , "b" , "c" , "d" , "e" , "f" ).inOrder ();
201+ assertThat (consume (resumableStreamIterator ))
202+ .containsExactly ("a" , "b" , "c" , "d" , "e" , "f" )
203+ .inOrder ();
192204 }
193205
194206 @ Test
@@ -201,7 +213,7 @@ public void nonRetryableError() {
201213 .thenReturn (resultSet (null , "X" ))
202214 .thenReturn (resultSet (null , "X" ))
203215 .thenThrow (new NonRetryableException (ErrorCode .FAILED_PRECONDITION , "failed by test" ));
204- Iterator <String > strings = stringIterator (iterator );
216+ Iterator <String > strings = stringIterator (resumableStreamIterator );
205217 assertThat (strings .next ()).isEqualTo ("a" );
206218 assertThat (strings .next ()).isEqualTo ("b" );
207219 expectedException .expect (isSpannerException (ErrorCode .FAILED_PRECONDITION ));
@@ -218,7 +230,7 @@ public void bufferLimitSimple() {
218230 .thenReturn (resultSet (null , "a" ))
219231 .thenReturn (resultSet (null , "b" ))
220232 .thenReturn (null );
221- assertThat (consume (iterator )).containsExactly ("a" , "b" ).inOrder ();
233+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" ).inOrder ();
222234 }
223235
224236 @ Test
@@ -231,7 +243,7 @@ public void bufferLimitSimpleWithRestartTokens() {
231243 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r1" ), "a" ))
232244 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r2" ), "b" ))
233245 .thenReturn (null );
234- assertThat (consume (iterator )).containsExactly ("a" , "b" ).inOrder ();
246+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" ).inOrder ();
235247 }
236248
237249 @ Test
@@ -252,7 +264,7 @@ public void bufferLimitRestart() {
252264 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r3" ), "c" ))
253265 .thenReturn (resultSet (ByteString .copyFromUtf8 ("r4" ), "d" ))
254266 .thenReturn (null );
255- assertThat (consume (iterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
267+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
256268 }
257269
258270 @ Test
@@ -271,7 +283,7 @@ public void bufferLimitRestartWithinLimitAtStartOfResults() {
271283 .thenReturn (resultSet (null , "a" ))
272284 .thenReturn (resultSet (null , "b" ))
273285 .thenReturn (null );
274- assertThat (consume (iterator )).containsExactly ("a" , "b" ).inOrder ();
286+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" ).inOrder ();
275287 }
276288
277289 @ Test
@@ -292,7 +304,7 @@ public void bufferLimitRestartWithinLimitMidResults() {
292304 .thenReturn (resultSet (null , "b" ))
293305 .thenReturn (resultSet (null , "c" ))
294306 .thenReturn (null );
295- assertThat (consume (iterator )).containsExactly ("a" , "b" , "c" ).inOrder ();
307+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" , "c" ).inOrder ();
296308 }
297309
298310 @ Test
@@ -307,9 +319,9 @@ public void bufferLimitMissingTokensUnsafeToRetry() {
307319 .thenReturn (resultSet (null , "c" ))
308320 .thenThrow (new RetryableException (ErrorCode .UNAVAILABLE , "failed by test" ));
309321
310- assertThat (consumeAtMost (3 , iterator )).containsExactly ("a" , "b" , "c" ).inOrder ();
322+ assertThat (consumeAtMost (3 , resumableStreamIterator )).containsExactly ("a" , "b" , "c" ).inOrder ();
311323 expectedException .expect (isSpannerException (ErrorCode .UNAVAILABLE ));
312- iterator .next ();
324+ resumableStreamIterator .next ();
313325 }
314326
315327 @ Test
@@ -329,7 +341,7 @@ public void bufferLimitMissingTokensSafeToRetry() {
329341 .thenReturn (new ResultSetIterator (s2 ));
330342 Mockito .when (s2 .next ()).thenReturn (resultSet (null , "d" )).thenReturn (null );
331343
332- assertThat (consume (iterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
344+ assertThat (consume (resumableStreamIterator )).containsExactly ("a" , "b" , "c" , "d" ).inOrder ();
333345 }
334346
335347 static PartialResultSet resultSet (@ Nullable ByteString resumeToken , String ... data ) {
0 commit comments