Skip to content

Commit dc29b8c

Browse files
authored
Merge pull request #13859 from [BEAM-11772] Integrate BigQuery sink file loads with GroupIntoBatches
[BEAM-11772] Integrate BigQuery sink file loads with GroupIntoBatches
2 parents 8f219a2 + 04eff7d commit dc29b8c

5 files changed

Lines changed: 200 additions & 47 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java

Lines changed: 101 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,12 @@
4747
import org.apache.beam.sdk.transforms.DoFn;
4848
import org.apache.beam.sdk.transforms.Flatten;
4949
import org.apache.beam.sdk.transforms.GroupByKey;
50+
import org.apache.beam.sdk.transforms.GroupIntoBatches;
51+
import org.apache.beam.sdk.transforms.MapElements;
5052
import org.apache.beam.sdk.transforms.PTransform;
5153
import org.apache.beam.sdk.transforms.ParDo;
5254
import org.apache.beam.sdk.transforms.Reshuffle;
55+
import org.apache.beam.sdk.transforms.SimpleFunction;
5356
import org.apache.beam.sdk.transforms.Values;
5457
import org.apache.beam.sdk.transforms.View;
5558
import org.apache.beam.sdk.transforms.WithKeys;
@@ -265,31 +268,44 @@ public void validate(PipelineOptions options) {
265268

266269
// Expand the pipeline when the user has requested periodically-triggered file writes.
267270
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
268-
checkArgument(numFileShards > 0);
269271
Pipeline p = input.getPipeline();
270272
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
271273
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
272274
final PCollectionView<String> tempFilePrefixView =
273275
createTempFilePrefixView(p, loadJobIdPrefixView);
274-
// The user-supplied triggeringDuration is often chosen to control how many BigQuery load
275-
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
276-
// is set to a large value, currently we have to buffer all the data until the trigger fires.
277-
// Instead we ensure that the files are written if a threshold number of records are ready.
278-
// We use only the user-supplied trigger on the actual BigQuery load. This allows us to
279-
// offload the data to the filesystem.
280-
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
281-
input.apply(
282-
"rewindowIntoGlobal",
283-
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
284-
.triggering(
285-
Repeatedly.forever(
286-
AfterFirst.of(
287-
AfterProcessingTime.pastFirstElementInPane()
288-
.plusDelayOf(triggeringFrequency),
289-
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
290-
.discardingFiredPanes());
291-
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
292-
writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
276+
PCollection<WriteBundlesToFiles.Result<DestinationT>> results;
277+
if (numFileShards > 0) {
278+
// The user-supplied triggeringFrequency is often chosen to control how many BigQuery load
279+
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
280+
// is set to a large value, currently we have to buffer all the data until the trigger fires.
281+
// Instead we ensure that the files are written if a threshold number of records are ready.
282+
// We use only the user-supplied trigger on the actual BigQuery load. This allows us to
283+
// offload the data to the filesystem.
284+
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
285+
input.apply(
286+
"rewindowIntoGlobal",
287+
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
288+
.triggering(
289+
Repeatedly.forever(
290+
AfterFirst.of(
291+
AfterProcessingTime.pastFirstElementInPane()
292+
.plusDelayOf(triggeringFrequency),
293+
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
294+
.discardingFiredPanes());
295+
results = writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
296+
} else {
297+
// In the case of dynamic sharding, however, we use a default triggering and instead apply the
298+
// user supplied triggeringFrequency to the sharding transform. See
299+
// writeDynamicallyShardedFilesTriggered.
300+
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
301+
input.apply(
302+
"rewindowIntoGlobal",
303+
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
304+
.triggering(DefaultTrigger.of())
305+
.discardingFiredPanes());
306+
results = writeDynamicallyShardedFilesTriggered(inputInGlobalWindow, tempFilePrefixView);
307+
}
308+
293309
// Apply the user's trigger before we start generating BigQuery load jobs.
294310
results =
295311
results.apply(
@@ -307,7 +323,7 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
307323
new TupleTag<>("singlePartitionTag");
308324

309325
// If we have non-default triggered output, we can't use the side-input technique used in
310-
// expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
326+
// expandUntriggered. Instead make the result list a main input. Apply a GroupByKey first for
311327
// determinism.
312328
PCollectionTuple partitions =
313329
results
@@ -371,8 +387,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
371387
.discardingFiredPanes());
372388
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
373389
(numFileShards == 0)
374-
? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView)
375-
: writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
390+
? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView)
391+
: writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
376392

377393
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
378394
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
@@ -470,9 +486,10 @@ public void getTempFilePrefix(ProcessContext c) {
470486
.apply("TempFilePrefixView", View.asSingleton());
471487
}
472488

473-
// Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
474-
// file byte size, and table destination.
475-
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(
489+
// Writes input data to dynamically-sharded per-bundle files without triggering. Input records are
490+
// spilt to new files if memory is constrained. Returns a PCollection of filename, file byte size,
491+
// and table destination.
492+
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesUntriggered(
476493
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
477494
TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
478495
new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {};
@@ -513,9 +530,9 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
513530
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
514531
}
515532

516-
// Writes input data to statically-sharded files. Returns a PCollection of filename,
517-
// file byte size, and table destination.
518-
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
533+
// Writes input data to statically-sharded files. Returns a PCollection of filename, file byte
534+
// size, and table destination.
535+
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeStaticallyShardedFiles(
519536
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
520537
checkState(numFileShards > 0);
521538
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords =
@@ -547,11 +564,66 @@ public void processElement(
547564
return writeShardedRecords(shardedRecords, tempFilePrefix);
548565
}
549566

567+
// Writes input data to dynamically-sharded files with triggering. The input data is sharded by
568+
// table destinations and each destination may be sub-sharded dynamically. Returns a PCollection
569+
// of filename, file byte size, and table destination.
570+
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesTriggered(
571+
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
572+
// In contrast to fixed sharding with triggering, here we use a global window with default
573+
// trigger and apply the user supplied triggeringFrequency in the subsequent GroupIntoBatches
574+
// transform. We also ensure that the files are written if a threshold number of records are
575+
// ready. Dynamic sharding is achieved via the withShardedKey() option provided by
576+
// GroupIntoBatches.
577+
return input
578+
.apply(
579+
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
580+
.withMaxBufferingDuration(triggeringFrequency)
581+
.withShardedKey())
582+
.setCoder(
583+
KvCoder.of(
584+
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
585+
IterableCoder.of(elementCoder)))
586+
.apply(
587+
"StripShardId",
588+
MapElements.via(
589+
new SimpleFunction<
590+
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>,
591+
KV<DestinationT, Iterable<ElementT>>>() {
592+
@Override
593+
public KV<DestinationT, Iterable<ElementT>> apply(
594+
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
595+
input) {
596+
return KV.of(input.getKey().getKey(), input.getValue());
597+
}
598+
}))
599+
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
600+
.apply(
601+
"WriteGroupedRecords",
602+
ParDo.of(
603+
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
604+
tempFilePrefix, maxFileSize, rowWriterFactory))
605+
.withSideInputs(tempFilePrefix))
606+
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
607+
}
608+
550609
private PCollection<Result<DestinationT>> writeShardedRecords(
551610
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
552611
PCollectionView<String> tempFilePrefix) {
553612
return shardedRecords
554613
.apply("GroupByDestination", GroupByKey.create())
614+
.apply(
615+
"StripShardId",
616+
MapElements.via(
617+
new SimpleFunction<
618+
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
619+
KV<DestinationT, Iterable<ElementT>>>() {
620+
@Override
621+
public KV<DestinationT, Iterable<ElementT>> apply(
622+
KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
623+
return KV.of(input.getKey().getKey(), input.getValue());
624+
}
625+
}))
626+
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
555627
.apply(
556628
"WriteGroupedRecords",
557629
ParDo.of(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2267,7 +2267,8 @@ public Write<T> withTriggeringFrequency(Duration triggeringFrequency) {
22672267

22682268
/**
22692269
* Control how many file shards are written when using BigQuery load jobs. Applicable only when
2270-
* also setting {@link #withTriggeringFrequency}.
2270+
* also setting {@link #withTriggeringFrequency}. To let runner determine the sharding at
2271+
* runtime, set {@link #withAutoSharding()} instead.
22712272
*/
22722273
public Write<T> withNumFileShards(int numFileShards) {
22732274
checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards);
@@ -2350,10 +2351,10 @@ public Write<T> useBeamSchema() {
23502351
}
23512352

23522353
/**
2353-
* If true, enables dynamically determined number of shards to write to BigQuery. Only
2354-
* applicable to unbounded data with STREAMING_INSERTS.
2355-
*
2356-
* <p>TODO(BEAM-11408): Also integrate this option to FILE_LOADS.
2354+
* If true, enables using a dynamically determined number of shards to write to BigQuery. This
2355+
* can be used for both {@link Method#FILE_LOADS} and {@link Method#STREAMING_INSERTS}. Only
2356+
* applicable to unbounded data. If using {@link Method#FILE_LOADS}, numFileShards set via
2357+
* {@link #withNumFileShards} will be ignored.
23572358
*/
23582359
@Experimental
23592360
public Write<T> withAutoSharding() {
@@ -2751,7 +2752,11 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
27512752
batchLoads.setMaxRetryJobs(1000);
27522753
}
27532754
batchLoads.setTriggeringFrequency(getTriggeringFrequency());
2754-
batchLoads.setNumFileShards(getNumFileShards());
2755+
if (getAutoSharding()) {
2756+
batchLoads.setNumFileShards(0);
2757+
} else {
2758+
batchLoads.setNumFileShards(getNumFileShards());
2759+
}
27552760
return input.apply(batchLoads);
27562761
}
27572762
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,14 @@
2020
import org.apache.beam.sdk.transforms.DoFn;
2121
import org.apache.beam.sdk.values.KV;
2222
import org.apache.beam.sdk.values.PCollectionView;
23-
import org.apache.beam.sdk.values.ShardedKey;
2423

2524
/**
26-
* Receives elements grouped by their (sharded) destination, and writes them out to a file. Since
27-
* all the elements in the {@link Iterable} are destined to the same table, they are all written to
28-
* the same file. Ensures that only one {@link TableRowWriter} is active per bundle.
25+
* Receives elements grouped by their destination, and writes them out to a file. Since all the
26+
* elements in the {@link Iterable} are destined to the same table, they are all written to the same
27+
* file. Ensures that only one {@link TableRowWriter} is active per bundle.
2928
*/
3029
class WriteGroupedRecordsToFiles<DestinationT, ElementT>
31-
extends DoFn<
32-
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
33-
WriteBundlesToFiles.Result<DestinationT>> {
30+
extends DoFn<KV<DestinationT, Iterable<ElementT>>, WriteBundlesToFiles.Result<DestinationT>> {
3431

3532
private final PCollectionView<String> tempFilePrefix;
3633
private final long maxFileSize;
@@ -48,24 +45,24 @@ class WriteGroupedRecordsToFiles<DestinationT, ElementT>
4845
@ProcessElement
4946
public void processElement(
5047
ProcessContext c,
51-
@Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
48+
@Element KV<DestinationT, Iterable<ElementT>> element,
5249
OutputReceiver<WriteBundlesToFiles.Result<DestinationT>> o)
5350
throws Exception {
5451

5552
String tempFilePrefix = c.sideInput(this.tempFilePrefix);
5653

5754
BigQueryRowWriter<ElementT> writer =
58-
rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey());
55+
rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey());
5956

6057
try {
6158
for (ElementT tableRow : element.getValue()) {
6259
if (writer.getByteSize() > maxFileSize) {
6360
writer.close();
64-
writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey());
61+
writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey());
6562
BigQueryRowWriter.Result result = writer.getResult();
6663
o.output(
6764
new WriteBundlesToFiles.Result<>(
68-
result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
65+
result.resourceId.toString(), result.byteSize, c.element().getKey()));
6966
}
7067
writer.write(tableRow);
7168
}
@@ -76,6 +73,6 @@ public void processElement(
7673
BigQueryRowWriter.Result result = writer.getResult();
7774
o.output(
7875
new WriteBundlesToFiles.Result<>(
79-
result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
76+
result.resourceId.toString(), result.byteSize, c.element().getKey()));
8077
}
8178
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.HashMap;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.concurrent.atomic.AtomicInteger;
3536
import java.util.regex.Pattern;
3637
import org.apache.beam.sdk.annotations.Internal;
3738
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -62,8 +63,12 @@ public class FakeDatasetService implements DatasetService, Serializable {
6263

6364
Map<String, List<String>> insertErrors = Maps.newHashMap();
6465

66+
// The counter for the number of insertions performed.
67+
static AtomicInteger insertCount;
68+
6569
public static void setUp() {
6670
tables = HashBasedTable.create();
71+
insertCount = new AtomicInteger(0);
6772
FakeJobService.setUp();
6873
}
6974

@@ -217,6 +222,10 @@ public void deleteDataset(String projectId, String datasetId)
217222
}
218223
}
219224

225+
public int getInsertCount() {
226+
return insertCount.get();
227+
}
228+
220229
public long insertAll(
221230
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
222231
throws IOException, InterruptedException {
@@ -292,6 +301,7 @@ public <T> long insertAll(
292301
failedInserts, allErrors.get(allErrors.size() - 1), ref, rowList.get(i));
293302
}
294303
}
304+
insertCount.addAndGet(1);
295305
return dataSize;
296306
}
297307
}

0 commit comments

Comments
 (0)