1717package com .google .cloud .bigquery ;
1818
1919import static com .google .cloud .RetryHelper .runWithRetries ;
20- import static java .util .concurrent .Executors .callable ;
2120
2221import com .google .cloud .BaseWriteChannel ;
2322import com .google .cloud .RestorableState ;
2423import com .google .cloud .RetryHelper ;
2524import com .google .cloud .WriteChannel ;
25+ import com .google .common .base .MoreObjects ;
2626
27+ import java .util .Objects ;
2728import java .util .concurrent .Callable ;
2829
2930/**
30- * WriteChannel implementation to stream data into a BigQuery table.
31+ * {@link WriteChannel} implementation to stream data into a BigQuery table. Use {@link #getJob()}
32+ * to get the job used to insert streamed data. Please notice that {@link #getJob()} returns
33+ * {@code null} until the channel is closed.
3134 */
32- class TableDataWriteChannel extends BaseWriteChannel <BigQueryOptions , WriteChannelConfiguration > {
35+ public class TableDataWriteChannel extends
36+ BaseWriteChannel <BigQueryOptions , WriteChannelConfiguration > {
37+
38+ private Job job ;
3339
3440 TableDataWriteChannel (BigQueryOptions options ,
3541 WriteChannelConfiguration writeChannelConfiguration ) {
@@ -44,20 +50,23 @@ class TableDataWriteChannel extends BaseWriteChannel<BigQueryOptions, WriteChann
4450 @ Override
4551 protected void flushBuffer (final int length , final boolean last ) {
4652 try {
47- runWithRetries (callable (new Runnable () {
48- @ Override
49- public void run () {
50- getOptions ().getRpc ().write (getUploadId (), getBuffer (), 0 , getPosition (), length , last );
51- }
52- }), getOptions ().getRetryParams (), BigQueryImpl .EXCEPTION_HANDLER , getOptions ().getClock ());
53+ com .google .api .services .bigquery .model .Job jobPb = runWithRetries (
54+ new Callable <com .google .api .services .bigquery .model .Job >() {
55+ @ Override
56+ public com .google .api .services .bigquery .model .Job call () {
57+ return getOptions ().getRpc ().write (
58+ getUploadId (), getBuffer (), 0 , getPosition (), length , last );
59+ }
60+ }, getOptions ().getRetryParams (), BigQueryImpl .EXCEPTION_HANDLER , getOptions ().getClock ());
61+ job = jobPb != null ? Job .fromPb (getOptions ().getService (), jobPb ) : null ;
5362 } catch (RetryHelper .RetryHelperException e ) {
5463 throw BigQueryException .translateAndThrow (e );
5564 }
5665 }
5766
5867 @ Override
5968 protected StateImpl .Builder stateBuilder () {
60- return StateImpl .builder (getOptions (), getEntity (), getUploadId ());
69+ return StateImpl .builder (getOptions (), getEntity (), getUploadId (), job );
6170 }
6271
6372 private static String open (final BigQueryOptions options ,
@@ -74,21 +83,39 @@ public String call() {
7483 }
7584 }
7685
86+ private void setJob (Job job ) {
87+ this .job = job ;
88+ }
89+
90+ /**
91+ * Returns the {@code Job} created to insert the rows. The job is available only once the upload
92+ * finished and the channel was closed, returns {@code null} otherwise.
93+ */
94+ public Job getJob () {
95+ return job ;
96+ }
97+
7798 static class StateImpl
7899 extends BaseWriteChannel .BaseState <BigQueryOptions , WriteChannelConfiguration > {
79100
80- private static final long serialVersionUID = -787362105981823738L ;
101+ private static final long serialVersionUID = -2692851818766876346L ;
102+
103+ private final Job job ;
81104
82105 StateImpl (Builder builder ) {
83106 super (builder );
107+ this .job = builder .job ;
84108 }
85109
86110 static class Builder
87111 extends BaseWriteChannel .BaseState .Builder <BigQueryOptions , WriteChannelConfiguration > {
88112
113+ private final Job job ;
114+
89115 private Builder (BigQueryOptions options , WriteChannelConfiguration configuration ,
90- String uploadId ) {
116+ String uploadId , Job job ) {
91117 super (options , configuration , uploadId );
118+ this .job = job ;
92119 }
93120
94121 public RestorableState <WriteChannel > build () {
@@ -97,15 +124,33 @@ public RestorableState<WriteChannel> build() {
97124 }
98125
99126 static Builder builder (BigQueryOptions options , WriteChannelConfiguration config ,
100- String uploadId ) {
101- return new Builder (options , config , uploadId );
127+ String uploadId , Job job ) {
128+ return new Builder (options , config , uploadId , job );
102129 }
103130
104131 @ Override
105132 public WriteChannel restore () {
106133 TableDataWriteChannel channel = new TableDataWriteChannel (serviceOptions , entity , uploadId );
107134 channel .restore (this );
135+ channel .setJob (job );
108136 return channel ;
109137 }
138+
139+ @ Override
140+ public int hashCode () {
141+ return Objects .hash (super .hashCode (), job );
142+ }
143+
144+ @ Override
145+ public boolean equals (Object obj ) {
146+ return super .equals (obj )
147+ && obj instanceof StateImpl
148+ && Objects .equals (job , ((StateImpl ) obj ).job );
149+ }
150+
151+ @ Override
152+ protected MoreObjects .ToStringHelper toStringHelper () {
153+ return super .toStringHelper ().add ("job" , job );
154+ }
110155 }
111156}
0 commit comments