Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 1733d5a

Browse files
authored
fix: remove applying header for multiplexing client and add a unit test for multiplexing with different location (#1850)
* . * . * . * . * . * .
1 parent 66853c2 commit 1733d5a

2 files changed

Lines changed: 51 additions & 26 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.gax.batching.FlowController;
20-
import com.google.api.gax.rpc.FixedHeaderProvider;
2120
import com.google.auto.value.AutoValue;
2221
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
2322
import com.google.common.base.Stopwatch;
2423
import com.google.common.collect.ImmutableList;
2524
import java.io.IOException;
2625
import java.util.Collections;
2726
import java.util.Comparator;
28-
import java.util.HashMap;
2927
import java.util.HashSet;
3028
import java.util.List;
3129
import java.util.Map;
@@ -327,19 +325,6 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
327325
}
328326
// currently we use different header for the client in each connection worker to be different
329327
// as the backend require the header to have the same write_stream field as request body.
330-
BigQueryWriteClient clientAfterModification = client;
331-
if (ownsBigQueryWriteClient) {
332-
BigQueryWriteSettings settings = client.getSettings();
333-
334-
// Every header to write api is required to set write_stream in the header to help routing
335-
// the request to correct region.
336-
HashMap<String, String> newHeaders = new HashMap<>();
337-
newHeaders.putAll(settings.toBuilder().getHeaderProvider().getHeaders());
338-
newHeaders.put("x-goog-request-params", "write_stream=" + streamName);
339-
BigQueryWriteSettings stubSettings =
340-
settings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build();
341-
clientAfterModification = BigQueryWriteClient.create(stubSettings);
342-
}
343328
ConnectionWorker connectionWorker =
344329
new ConnectionWorker(
345330
streamName,
@@ -348,7 +333,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
348333
maxInflightBytes,
349334
limitExceededBehavior,
350335
traceId,
351-
clientAfterModification,
336+
client,
352337
ownsBigQueryWriteClient);
353338
connectionWorkerPool.add(connectionWorker);
354339
log.info(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,16 +243,6 @@ public void testBatchWriteWithCommittedStreamEU()
243243
streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3);
244244
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
245245
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());
246-
247-
TableResult result =
248-
bigquery.listTableData(
249-
tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
250-
Iterator<FieldValueList> iter = result.getValues().iterator();
251-
assertEquals("aaa", iter.next().get(0).getStringValue());
252-
assertEquals("bbb", iter.next().get(0).getStringValue());
253-
assertEquals("ccc", iter.next().get(0).getStringValue());
254-
assertEquals("ddd", iter.next().get(0).getStringValue());
255-
assertEquals(false, iter.hasNext());
256246
}
257247

258248
@Test
@@ -1256,4 +1246,54 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec
12561246
assertEquals(1L, response.get().getAppendResult().getOffset().getValue());
12571247
}
12581248
}
1249+
1250+
@Test
1251+
public void testMultiplexingMixedLocation()
1252+
throws IOException, InterruptedException, ExecutionException {
1253+
ConnectionWorkerPool.setOptions(
1254+
ConnectionWorkerPool.Settings.builder()
1255+
.setMinConnectionsPerRegion(1)
1256+
.setMaxConnectionsPerRegion(2)
1257+
.build());
1258+
String defaultStream1 =
1259+
String.format(
1260+
"projects/%s/datasets/%s/tables/%s/streams/_default",
1261+
ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
1262+
String defaultStream2 =
1263+
String.format(
1264+
"projects/%s/datasets/%s/tables/%s/streams/_default",
1265+
ServiceOptions.getDefaultProjectId(), DATASET, TABLE2);
1266+
String defaultStream3 =
1267+
String.format(
1268+
"projects/%s/datasets/%s/tables/%s/streams/_default",
1269+
ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE);
1270+
1271+
StreamWriter streamWriter1 =
1272+
StreamWriter.newBuilder(defaultStream1)
1273+
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
1274+
.setEnableConnectionPool(true)
1275+
.build();
1276+
StreamWriter streamWriter2 =
1277+
StreamWriter.newBuilder(defaultStream2)
1278+
.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()))
1279+
.setEnableConnectionPool(true)
1280+
.build();
1281+
StreamWriter streamWriter3 =
1282+
StreamWriter.newBuilder(defaultStream3)
1283+
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
1284+
.setEnableConnectionPool(true)
1285+
.build();
1286+
ApiFuture<AppendRowsResponse> response1 =
1287+
streamWriter1.append(CreateProtoRows(new String[] {"aaa"}));
1288+
ApiFuture<AppendRowsResponse> response2 =
1289+
streamWriter2.append(CreateProtoRowsComplex(new String[] {"aaa"}));
1290+
ApiFuture<AppendRowsResponse> response3 =
1291+
streamWriter3.append(CreateProtoRows(new String[] {"bbb"}));
1292+
assertEquals(0L, response1.get().getAppendResult().getOffset().getValue());
1293+
assertEquals(0L, response2.get().getAppendResult().getOffset().getValue());
1294+
assertEquals(0L, response3.get().getAppendResult().getOffset().getValue());
1295+
assertEquals("us", streamWriter1.getLocation());
1296+
assertEquals("us", streamWriter2.getLocation());
1297+
assertEquals("eu", streamWriter3.getLocation());
1298+
}
12591299
}

0 commit comments

Comments
 (0)