|
51 | 51 | import java.util.concurrent.locks.ReentrantLock; |
52 | 52 | import java.util.logging.Level; |
53 | 53 | import java.util.logging.Logger; |
| 54 | +import java.util.regex.Matcher; |
| 55 | +import java.util.regex.Pattern; |
54 | 56 | import javax.annotation.concurrent.GuardedBy; |
55 | 57 |
|
56 | 58 | /** |
@@ -219,11 +221,29 @@ class ConnectionWorker implements AutoCloseable { |
219 | 221 | private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; |
220 | 222 | private long testOnlyAppendLoopSleepTime = 0; |
221 | 223 |
|
| 224 | + private static String projectMatching = "projects/[^/]+/"; |
| 225 | + private static Pattern streamPatternProject = Pattern.compile(projectMatching); |
| 226 | + |
222 | 227 | /** The maximum size of one request. Defined by the API. */ |
223 | 228 | public static long getApiMaxRequestBytes() { |
224 | 229 | return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) |
225 | 230 | } |
226 | 231 |
|
| 232 | + static String extractProjectName(String streamName) { |
| 233 | + Matcher streamMatcher = streamPatternProject.matcher(streamName); |
| 234 | + if (streamMatcher.find()) { |
| 235 | + return streamMatcher.group(); |
| 236 | + } else { |
| 237 | + throw new IllegalStateException( |
| 238 | + String.format("The passed in stream name does not match standard format %s", streamName)); |
| 239 | + } |
| 240 | + } |
| 241 | + |
| 242 | + static String getRoutingHeader(String streamName, String location) { |
| 243 | + String project = extractProjectName(streamName); |
| 244 | + return project + "locations/" + location; |
| 245 | + } |
| 246 | + |
227 | 247 | public ConnectionWorker( |
228 | 248 | String streamName, |
229 | 249 | String location, |
@@ -259,6 +279,10 @@ public ConnectionWorker( |
259 | 279 | newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); |
260 | 280 | if (this.location == null) { |
261 | 281 | newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName); |
| 282 | + } else { |
| 283 | + newHeaders.put( |
| 284 | + "x-goog-request-params", |
| 285 | + "write_location=" + getRoutingHeader(this.streamName, this.location)); |
262 | 286 | } |
263 | 287 | BigQueryWriteSettings stubSettings = |
264 | 288 | clientSettings |
|
0 commit comments