Skip to content

Commit 10afcef

Browse files
committed
Make TableDataWriteChannel expose Job when upload completed
1 parent 8ec721e commit 10afcef

8 files changed

Lines changed: 154 additions & 61 deletions

File tree

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,25 @@
1717
package com.google.cloud.bigquery;
1818

1919
import static com.google.cloud.RetryHelper.runWithRetries;
20-
import static java.util.concurrent.Executors.callable;
2120

2221
import com.google.cloud.BaseWriteChannel;
2322
import com.google.cloud.RestorableState;
2423
import com.google.cloud.RetryHelper;
2524
import com.google.cloud.WriteChannel;
25+
import com.google.common.base.MoreObjects;
2626

27+
import java.util.Objects;
2728
import java.util.concurrent.Callable;
2829

2930
/**
30-
* WriteChannel implementation to stream data into a BigQuery table.
31+
* {@link WriteChannel} implementation to stream data into a BigQuery table. Use {@link #getJob()}
32+
* to get the job used to insert streamed data. Please notice that {@link #getJob()} returns
33+
* {@code null} until the channel is closed.
3134
*/
32-
class TableDataWriteChannel extends BaseWriteChannel<BigQueryOptions, WriteChannelConfiguration> {
35+
public class TableDataWriteChannel extends
36+
BaseWriteChannel<BigQueryOptions, WriteChannelConfiguration> {
37+
38+
private Job job;
3339

3440
TableDataWriteChannel(BigQueryOptions options,
3541
WriteChannelConfiguration writeChannelConfiguration) {
@@ -44,20 +50,23 @@ class TableDataWriteChannel extends BaseWriteChannel<BigQueryOptions, WriteChann
4450
@Override
4551
protected void flushBuffer(final int length, final boolean last) {
4652
try {
47-
runWithRetries(callable(new Runnable() {
48-
@Override
49-
public void run() {
50-
getOptions().getRpc().write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
51-
}
52-
}), getOptions().getRetryParams(), BigQueryImpl.EXCEPTION_HANDLER, getOptions().getClock());
53+
com.google.api.services.bigquery.model.Job jobPb = runWithRetries(
54+
new Callable<com.google.api.services.bigquery.model.Job>() {
55+
@Override
56+
public com.google.api.services.bigquery.model.Job call() {
57+
return getOptions().getRpc().write(
58+
getUploadId(), getBuffer(), 0, getPosition(), length, last);
59+
}
60+
}, getOptions().getRetryParams(), BigQueryImpl.EXCEPTION_HANDLER, getOptions().getClock());
61+
job = jobPb != null ? Job.fromPb(getOptions().getService(), jobPb) : null;
5362
} catch (RetryHelper.RetryHelperException e) {
5463
throw BigQueryException.translateAndThrow(e);
5564
}
5665
}
5766

5867
@Override
5968
protected StateImpl.Builder stateBuilder() {
60-
return StateImpl.builder(getOptions(), getEntity(), getUploadId());
69+
return StateImpl.builder(getOptions(), getEntity(), getUploadId(), job);
6170
}
6271

6372
private static String open(final BigQueryOptions options,
@@ -74,21 +83,39 @@ public String call() {
7483
}
7584
}
7685

86+
private void setJob(Job job) {
87+
this.job = job;
88+
}
89+
90+
/**
91+
* Returns the {@code Job} created to insert the rows. The job is available only once the upload
92+
* finished and the channel was closed, returns {@code null} otherwise.
93+
*/
94+
public Job getJob() {
95+
return job;
96+
}
97+
7798
static class StateImpl
7899
extends BaseWriteChannel.BaseState<BigQueryOptions, WriteChannelConfiguration> {
79100

80-
private static final long serialVersionUID = -787362105981823738L;
101+
private static final long serialVersionUID = -2692851818766876346L;
102+
103+
private final Job job;
81104

82105
StateImpl(Builder builder) {
83106
super(builder);
107+
this.job = builder.job;
84108
}
85109

86110
static class Builder
87111
extends BaseWriteChannel.BaseState.Builder<BigQueryOptions, WriteChannelConfiguration> {
88112

113+
private final Job job;
114+
89115
private Builder(BigQueryOptions options, WriteChannelConfiguration configuration,
90-
String uploadId) {
116+
String uploadId, Job job) {
91117
super(options, configuration, uploadId);
118+
this.job = job;
92119
}
93120

94121
public RestorableState<WriteChannel> build() {
@@ -97,15 +124,33 @@ public RestorableState<WriteChannel> build() {
97124
}
98125

99126
static Builder builder(BigQueryOptions options, WriteChannelConfiguration config,
100-
String uploadId) {
101-
return new Builder(options, config, uploadId);
127+
String uploadId, Job job) {
128+
return new Builder(options, config, uploadId, job);
102129
}
103130

104131
@Override
105132
public WriteChannel restore() {
106133
TableDataWriteChannel channel = new TableDataWriteChannel(serviceOptions, entity, uploadId);
107134
channel.restore(this);
135+
channel.setJob(job);
108136
return channel;
109137
}
138+
139+
@Override
140+
public int hashCode() {
141+
return Objects.hash(super.hashCode(), job);
142+
}
143+
144+
@Override
145+
public boolean equals(Object obj) {
146+
return super.equals(obj)
147+
&& obj instanceof StateImpl
148+
&& Objects.equals(job, ((StateImpl) obj).job);
149+
}
150+
151+
@Override
152+
protected MoreObjects.ToStringHelper toStringHelper() {
153+
return super.toStringHelper().add("job", job);
154+
}
110155
}
111156
}

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/BigQueryRpc.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,19 @@ Tuple<String, Iterable<TableRow>> listTableData(String projectId, String dataset
242242
String open(JobConfiguration configuration);
243243

244244
/**
245-
* Uploads the provided data to the resumable upload session at the specified position.
245+
* Uploads the provided data to the resumable upload session at the specified position. This
246+
* method returns the job created to insert the rows, only when {@code last} is {@code true}.
246247
*
247248
* @param uploadId the resumable upload session URI
248249
* @param toWrite a byte array of data to upload
249250
* @param toWriteOffset offset in the {@code toWrite} param to start writing from
250251
* @param destOffset offset in the destination where to upload data to
251252
* @param length the number of bytes to upload
252253
* @param last {@code true} indicates that the last chunk is being uploaded
254+
* @return returns the job created to insert the rows, only when {@code last} is {@code true}.
255+
* Returns {@code null} otherwise
253256
* @throws BigQueryException upon failure
254257
*/
255-
void write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
258+
Job write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
256259
boolean last);
257260
}

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/DefaultBigQueryRpc.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -432,12 +432,13 @@ public String open(JobConfiguration configuration) {
432432
}
433433

434434
@Override
435-
public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
435+
public Job write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
436436
boolean last) {
437437
try {
438438
GenericUrl url = new GenericUrl(uploadId);
439-
HttpRequest httpRequest = bigquery.getRequestFactory().buildPutRequest(url,
440-
new ByteArrayContent(null, toWrite, toWriteOffset, length));
439+
HttpRequest httpRequest = bigquery.getRequestFactory()
440+
.buildPutRequest(url, new ByteArrayContent(null, toWrite, toWriteOffset, length));
441+
httpRequest.setParser(bigquery.getObjectParser());
441442
long limit = destOffset + length;
442443
StringBuilder range = new StringBuilder("bytes ");
443444
range.append(destOffset).append('-').append(limit - 1).append('/');
@@ -450,8 +451,9 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destO
450451
int code;
451452
String message;
452453
IOException exception = null;
454+
HttpResponse response = null;
453455
try {
454-
HttpResponse response = httpRequest.execute();
456+
response = httpRequest.execute();
455457
code = response.getStatusCode();
456458
message = response.getStatusMessage();
457459
} catch (HttpResponseException ex) {
@@ -466,6 +468,7 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destO
466468
}
467469
throw new BigQueryException(code, message);
468470
}
471+
return last && response != null ? response.parseAs(Job.class) : null;
469472
} catch (IOException ex) {
470473
throw translate(ex);
471474
}

0 commit comments

Comments
 (0)