1919import static org .easymock .EasyMock .anyObject ;
2020import static org .easymock .EasyMock .createMock ;
2121import static org .easymock .EasyMock .expect ;
22- import static org .easymock .EasyMock .expectLastCall ;
2322import static org .easymock .EasyMock .replay ;
2423import static org .easymock .EasyMock .verify ;
2524import static org .junit .Assert .assertArrayEquals ;
@@ -46,7 +45,7 @@ public class BlobReadChannelImplTest {
4645
4746 private static final String BUCKET_NAME = "b" ;
4847 private static final String BLOB_NAME = "n" ;
49- private static final BlobId BLOB_ID = BlobId .of (BUCKET_NAME , BLOB_NAME );
48+ private static final BlobId BLOB_ID = BlobId .of (BUCKET_NAME , BLOB_NAME , - 1L );
5049 private static final Map <StorageRpc .Option , ?> EMPTY_RPC_OPTIONS = ImmutableMap .of ();
5150 private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 ;
5251 private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024 ;
@@ -88,7 +87,7 @@ public void testReadBuffered() throws IOException {
8887 ByteBuffer firstReadBuffer = ByteBuffer .allocate (42 );
8988 ByteBuffer secondReadBuffer = ByteBuffer .allocate (42 );
9089 expect (storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 0 , DEFAULT_CHUNK_SIZE ))
91- .andReturn (result );
90+ .andReturn (StorageRpc . Tuple . of ( "etag" , result ) );
9291 replay (storageRpcMock );
9392 reader .read (firstReadBuffer );
9493 reader .read (secondReadBuffer );
@@ -107,10 +106,11 @@ public void testReadBig() throws IOException {
107106 byte [] secondResult = randomByteArray (DEFAULT_CHUNK_SIZE );
108107 ByteBuffer firstReadBuffer = ByteBuffer .allocate (DEFAULT_CHUNK_SIZE );
109108 ByteBuffer secondReadBuffer = ByteBuffer .allocate (42 );
110- storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 0 , DEFAULT_CHUNK_SIZE );
111- expectLastCall ().andReturn (firstResult );
112- storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , DEFAULT_CHUNK_SIZE , CUSTOM_CHUNK_SIZE );
113- expectLastCall ().andReturn (secondResult );
109+ expect (storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 0 , DEFAULT_CHUNK_SIZE ))
110+ .andReturn (StorageRpc .Tuple .of ("etag" , firstResult ));
111+ expect (storageRpcMock .read (
112+ BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , DEFAULT_CHUNK_SIZE , CUSTOM_CHUNK_SIZE ))
113+ .andReturn (StorageRpc .Tuple .of ("etag" , secondResult ));
114114 replay (storageRpcMock );
115115 reader .read (firstReadBuffer );
116116 reader .read (secondReadBuffer );
@@ -125,7 +125,7 @@ public void testReadFinish() throws IOException {
125125 byte [] result = {};
126126 ByteBuffer readBuffer = ByteBuffer .allocate (DEFAULT_CHUNK_SIZE );
127127 expect (storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 0 , DEFAULT_CHUNK_SIZE ))
128- .andReturn (result );
128+ .andReturn (StorageRpc . Tuple . of ( "etag" , result ) );
129129 replay (storageRpcMock );
130130 assertEquals (-1 , reader .read (readBuffer ));
131131 }
@@ -137,7 +137,7 @@ public void testSeek() throws IOException {
137137 byte [] result = randomByteArray (DEFAULT_CHUNK_SIZE );
138138 ByteBuffer readBuffer = ByteBuffer .allocate (DEFAULT_CHUNK_SIZE );
139139 expect (storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 42 , DEFAULT_CHUNK_SIZE ))
140- .andReturn (result );
140+ .andReturn (StorageRpc . Tuple . of ( "etag" , result ) );
141141 replay (storageRpcMock );
142142 reader .read (readBuffer );
143143 assertArrayEquals (result , readBuffer .array ());
@@ -166,16 +166,41 @@ public void testReadClosed() {
166166 }
167167 }
168168
169+ @ Test
170+ public void testReadGenerationChanged () throws IOException {
171+ BlobId blobId = BlobId .of (BUCKET_NAME , BLOB_NAME );
172+ reader = new BlobReadChannelImpl (options , blobId , EMPTY_RPC_OPTIONS );
173+ byte [] firstResult = randomByteArray (DEFAULT_CHUNK_SIZE );
174+ byte [] secondResult = randomByteArray (DEFAULT_CHUNK_SIZE );
175+ ByteBuffer firstReadBuffer = ByteBuffer .allocate (DEFAULT_CHUNK_SIZE );
176+ ByteBuffer secondReadBuffer = ByteBuffer .allocate (DEFAULT_CHUNK_SIZE );
177+ expect (storageRpcMock .read (blobId .toPb (), EMPTY_RPC_OPTIONS , 0 , DEFAULT_CHUNK_SIZE ))
178+ .andReturn (StorageRpc .Tuple .of ("etag1" , firstResult ));
179+ expect (storageRpcMock .read (
180+ blobId .toPb (), EMPTY_RPC_OPTIONS , DEFAULT_CHUNK_SIZE , DEFAULT_CHUNK_SIZE ))
181+ .andReturn (StorageRpc .Tuple .of ("etag2" , firstResult ));
182+ replay (storageRpcMock );
183+ reader .read (firstReadBuffer );
184+ try {
185+ reader .read (secondReadBuffer );
186+ fail ("Expected BlobReadChannel read to throw StorageException" );
187+ } catch (StorageException ex ) {
188+ StringBuilder messageBuilder = new StringBuilder ();
189+ messageBuilder .append ("Blob " ).append (blobId ).append (" was updated while reading" );
190+ assertEquals (messageBuilder .toString (), ex .getMessage ());
191+ }
192+ }
193+
169194 @ Test
170195 public void testSaveAndRestore () throws IOException , ClassNotFoundException {
171196 byte [] firstResult = randomByteArray (DEFAULT_CHUNK_SIZE );
172197 byte [] secondResult = randomByteArray (DEFAULT_CHUNK_SIZE );
173198 ByteBuffer firstReadBuffer = ByteBuffer .allocate (42 );
174199 ByteBuffer secondReadBuffer = ByteBuffer .allocate (DEFAULT_CHUNK_SIZE );
175200 expect (storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 0 , DEFAULT_CHUNK_SIZE ))
176- .andReturn (firstResult );
201+ .andReturn (StorageRpc . Tuple . of ( "etag" , firstResult ) );
177202 expect (storageRpcMock .read (BLOB_ID .toPb (), EMPTY_RPC_OPTIONS , 42 , DEFAULT_CHUNK_SIZE ))
178- .andReturn (secondResult );
203+ .andReturn (StorageRpc . Tuple . of ( "etag" , secondResult ) );
179204 replay (storageRpcMock );
180205 reader = new BlobReadChannelImpl (options , BLOB_ID , EMPTY_RPC_OPTIONS );
181206 reader .read (firstReadBuffer );
0 commit comments