@@ -243,16 +243,6 @@ public void testBatchWriteWithCommittedStreamEU()
243243 streamWriter .append (CreateProtoRows (new String [] {"ddd" }), 3 );
244244 assertEquals (1 , response1 .get ().getAppendResult ().getOffset ().getValue ());
245245 assertEquals (3 , response2 .get ().getAppendResult ().getOffset ().getValue ());
246-
247- TableResult result =
248- bigquery .listTableData (
249- tableInfoEU .getTableId (), BigQuery .TableDataListOption .startIndex (0L ));
250- Iterator <FieldValueList > iter = result .getValues ().iterator ();
251- assertEquals ("aaa" , iter .next ().get (0 ).getStringValue ());
252- assertEquals ("bbb" , iter .next ().get (0 ).getStringValue ());
253- assertEquals ("ccc" , iter .next ().get (0 ).getStringValue ());
254- assertEquals ("ddd" , iter .next ().get (0 ).getStringValue ());
255- assertEquals (false , iter .hasNext ());
256246 }
257247
258248 @ Test
@@ -1256,4 +1246,54 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec
12561246 assertEquals (1L , response .get ().getAppendResult ().getOffset ().getValue ());
12571247 }
12581248 }
1249+
1250+ @ Test
1251+ public void testMultiplexingMixedLocation ()
1252+ throws IOException , InterruptedException , ExecutionException {
1253+ ConnectionWorkerPool .setOptions (
1254+ ConnectionWorkerPool .Settings .builder ()
1255+ .setMinConnectionsPerRegion (1 )
1256+ .setMaxConnectionsPerRegion (2 )
1257+ .build ());
1258+ String defaultStream1 =
1259+ String .format (
1260+ "projects/%s/datasets/%s/tables/%s/streams/_default" ,
1261+ ServiceOptions .getDefaultProjectId (), DATASET , TABLE );
1262+ String defaultStream2 =
1263+ String .format (
1264+ "projects/%s/datasets/%s/tables/%s/streams/_default" ,
1265+ ServiceOptions .getDefaultProjectId (), DATASET , TABLE2 );
1266+ String defaultStream3 =
1267+ String .format (
1268+ "projects/%s/datasets/%s/tables/%s/streams/_default" ,
1269+ ServiceOptions .getDefaultProjectId (), DATASET_EU , TABLE );
1270+
1271+ StreamWriter streamWriter1 =
1272+ StreamWriter .newBuilder (defaultStream1 )
1273+ .setWriterSchema (ProtoSchemaConverter .convert (FooType .getDescriptor ()))
1274+ .setEnableConnectionPool (true )
1275+ .build ();
1276+ StreamWriter streamWriter2 =
1277+ StreamWriter .newBuilder (defaultStream2 )
1278+ .setWriterSchema (ProtoSchemaConverter .convert (ComplicateType .getDescriptor ()))
1279+ .setEnableConnectionPool (true )
1280+ .build ();
1281+ StreamWriter streamWriter3 =
1282+ StreamWriter .newBuilder (defaultStream3 )
1283+ .setWriterSchema (ProtoSchemaConverter .convert (FooType .getDescriptor ()))
1284+ .setEnableConnectionPool (true )
1285+ .build ();
1286+ ApiFuture <AppendRowsResponse > response1 =
1287+ streamWriter1 .append (CreateProtoRows (new String [] {"aaa" }));
1288+ ApiFuture <AppendRowsResponse > response2 =
1289+ streamWriter2 .append (CreateProtoRowsComplex (new String [] {"aaa" }));
1290+ ApiFuture <AppendRowsResponse > response3 =
1291+ streamWriter3 .append (CreateProtoRows (new String [] {"bbb" }));
1292+ assertEquals (0L , response1 .get ().getAppendResult ().getOffset ().getValue ());
1293+ assertEquals (0L , response2 .get ().getAppendResult ().getOffset ().getValue ());
1294+ assertEquals (0L , response3 .get ().getAppendResult ().getOffset ().getValue ());
1295+ assertEquals ("us" , streamWriter1 .getLocation ());
1296+ assertEquals ("us" , streamWriter2 .getLocation ());
1297+ assertEquals ("eu" , streamWriter3 .getLocation ());
1298+ }
12591299}
0 commit comments