4747import org .apache .beam .sdk .transforms .DoFn ;
4848import org .apache .beam .sdk .transforms .Flatten ;
4949import org .apache .beam .sdk .transforms .GroupByKey ;
50+ import org .apache .beam .sdk .transforms .GroupIntoBatches ;
51+ import org .apache .beam .sdk .transforms .MapElements ;
5052import org .apache .beam .sdk .transforms .PTransform ;
5153import org .apache .beam .sdk .transforms .ParDo ;
5254import org .apache .beam .sdk .transforms .Reshuffle ;
55+ import org .apache .beam .sdk .transforms .SimpleFunction ;
5356import org .apache .beam .sdk .transforms .Values ;
5457import org .apache .beam .sdk .transforms .View ;
5558import org .apache .beam .sdk .transforms .WithKeys ;
@@ -265,31 +268,44 @@ public void validate(PipelineOptions options) {
265268
266269 // Expand the pipeline when the user has requested periodically-triggered file writes.
267270 private WriteResult expandTriggered (PCollection <KV <DestinationT , ElementT >> input ) {
268- checkArgument (numFileShards > 0 );
269271 Pipeline p = input .getPipeline ();
270272 final PCollectionView <String > loadJobIdPrefixView = createJobIdPrefixView (p , JobType .LOAD );
271273 final PCollectionView <String > copyJobIdPrefixView = createJobIdPrefixView (p , JobType .COPY );
272274 final PCollectionView <String > tempFilePrefixView =
273275 createTempFilePrefixView (p , loadJobIdPrefixView );
274- // The user-supplied triggeringDuration is often chosen to control how many BigQuery load
275- // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
276- // is set to a large value, currently we have to buffer all the data until the trigger fires.
277- // Instead we ensure that the files are written if a threshold number of records are ready.
278- // We use only the user-supplied trigger on the actual BigQuery load. This allows us to
279- // offload the data to the filesystem.
280- PCollection <KV <DestinationT , ElementT >> inputInGlobalWindow =
281- input .apply (
282- "rewindowIntoGlobal" ,
283- Window .<KV <DestinationT , ElementT >>into (new GlobalWindows ())
284- .triggering (
285- Repeatedly .forever (
286- AfterFirst .of (
287- AfterProcessingTime .pastFirstElementInPane ()
288- .plusDelayOf (triggeringFrequency ),
289- AfterPane .elementCountAtLeast (FILE_TRIGGERING_RECORD_COUNT ))))
290- .discardingFiredPanes ());
291- PCollection <WriteBundlesToFiles .Result <DestinationT >> results =
292- writeShardedFiles (inputInGlobalWindow , tempFilePrefixView );
276+ PCollection <WriteBundlesToFiles .Result <DestinationT >> results ;
277+ if (numFileShards > 0 ) {
278+ // The user-supplied triggeringFrequency is often chosen to control how many BigQuery load
279+ // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
280+ // is set to a large value, currently we have to buffer all the data until the trigger fires.
281+ // Instead we ensure that the files are written if a threshold number of records are ready.
282+ // We use only the user-supplied trigger on the actual BigQuery load. This allows us to
283+ // offload the data to the filesystem.
284+ PCollection <KV <DestinationT , ElementT >> inputInGlobalWindow =
285+ input .apply (
286+ "rewindowIntoGlobal" ,
287+ Window .<KV <DestinationT , ElementT >>into (new GlobalWindows ())
288+ .triggering (
289+ Repeatedly .forever (
290+ AfterFirst .of (
291+ AfterProcessingTime .pastFirstElementInPane ()
292+ .plusDelayOf (triggeringFrequency ),
293+ AfterPane .elementCountAtLeast (FILE_TRIGGERING_RECORD_COUNT ))))
294+ .discardingFiredPanes ());
295+ results = writeStaticallyShardedFiles (inputInGlobalWindow , tempFilePrefixView );
296+ } else {
297+ // In the case of dynamic sharding, however, we use a default triggering and instead apply the
298+ // user supplied triggeringFrequency to the sharding transform. See
299+ // writeDynamicallyShardedFilesTriggered.
300+ PCollection <KV <DestinationT , ElementT >> inputInGlobalWindow =
301+ input .apply (
302+ "rewindowIntoGlobal" ,
303+ Window .<KV <DestinationT , ElementT >>into (new GlobalWindows ())
304+ .triggering (DefaultTrigger .of ())
305+ .discardingFiredPanes ());
306+ results = writeDynamicallyShardedFilesTriggered (inputInGlobalWindow , tempFilePrefixView );
307+ }
308+
293309 // Apply the user's trigger before we start generating BigQuery load jobs.
294310 results =
295311 results .apply (
@@ -307,7 +323,7 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
307323 new TupleTag <>("singlePartitionTag" );
308324
309325 // If we have non-default triggered output, we can't use the side-input technique used in
310- // expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
326+ // expandUntriggered. Instead make the result list a main input. Apply a GroupByKey first for
311327 // determinism.
312328 PCollectionTuple partitions =
313329 results
@@ -371,8 +387,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
371387 .discardingFiredPanes ());
372388 PCollection <WriteBundlesToFiles .Result <DestinationT >> results =
373389 (numFileShards == 0 )
374- ? writeDynamicallyShardedFiles (inputInGlobalWindow , tempFilePrefixView )
375- : writeShardedFiles (inputInGlobalWindow , tempFilePrefixView );
390+ ? writeDynamicallyShardedFilesUntriggered (inputInGlobalWindow , tempFilePrefixView )
391+ : writeStaticallyShardedFiles (inputInGlobalWindow , tempFilePrefixView );
376392
377393 TupleTag <KV <ShardedKey <DestinationT >, List <String >>> multiPartitionsTag =
378394 new TupleTag <KV <ShardedKey <DestinationT >, List <String >>>("multiPartitionsTag" ) {};
@@ -470,9 +486,10 @@ public void getTempFilePrefix(ProcessContext c) {
470486 .apply ("TempFilePrefixView" , View .asSingleton ());
471487 }
472488
473- // Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
474- // file byte size, and table destination.
475- PCollection <WriteBundlesToFiles .Result <DestinationT >> writeDynamicallyShardedFiles (
489+ // Writes input data to dynamically-sharded per-bundle files without triggering. Input records are
490+ // spilt to new files if memory is constrained. Returns a PCollection of filename, file byte size,
491+ // and table destination.
492+ PCollection <WriteBundlesToFiles .Result <DestinationT >> writeDynamicallyShardedFilesUntriggered (
476493 PCollection <KV <DestinationT , ElementT >> input , PCollectionView <String > tempFilePrefix ) {
477494 TupleTag <WriteBundlesToFiles .Result <DestinationT >> writtenFilesTag =
478495 new TupleTag <WriteBundlesToFiles .Result <DestinationT >>("writtenFiles" ) {};
@@ -513,9 +530,9 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
513530 .setCoder (WriteBundlesToFiles .ResultCoder .of (destinationCoder ));
514531 }
515532
516- // Writes input data to statically-sharded files. Returns a PCollection of filename,
517- // file byte size, and table destination.
518- PCollection <WriteBundlesToFiles .Result <DestinationT >> writeShardedFiles (
533+ // Writes input data to statically-sharded files. Returns a PCollection of filename, file byte
534+ // size, and table destination.
535+ PCollection <WriteBundlesToFiles .Result <DestinationT >> writeStaticallyShardedFiles (
519536 PCollection <KV <DestinationT , ElementT >> input , PCollectionView <String > tempFilePrefix ) {
520537 checkState (numFileShards > 0 );
521538 PCollection <KV <ShardedKey <DestinationT >, ElementT >> shardedRecords =
@@ -547,11 +564,66 @@ public void processElement(
547564 return writeShardedRecords (shardedRecords , tempFilePrefix );
548565 }
549566
567+ // Writes input data to dynamically-sharded files with triggering. The input data is sharded by
568+ // table destinations and each destination may be sub-sharded dynamically. Returns a PCollection
569+ // of filename, file byte size, and table destination.
570+ PCollection <WriteBundlesToFiles .Result <DestinationT >> writeDynamicallyShardedFilesTriggered (
571+ PCollection <KV <DestinationT , ElementT >> input , PCollectionView <String > tempFilePrefix ) {
572+ // In contrast to fixed sharding with triggering, here we use a global window with default
573+ // trigger and apply the user supplied triggeringFrequency in the subsequent GroupIntoBatches
574+ // transform. We also ensure that the files are written if a threshold number of records are
575+ // ready. Dynamic sharding is achieved via the withShardedKey() option provided by
576+ // GroupIntoBatches.
577+ return input
578+ .apply (
579+ GroupIntoBatches .<DestinationT , ElementT >ofSize (FILE_TRIGGERING_RECORD_COUNT )
580+ .withMaxBufferingDuration (triggeringFrequency )
581+ .withShardedKey ())
582+ .setCoder (
583+ KvCoder .of (
584+ org .apache .beam .sdk .util .ShardedKey .Coder .of (destinationCoder ),
585+ IterableCoder .of (elementCoder )))
586+ .apply (
587+ "StripShardId" ,
588+ MapElements .via (
589+ new SimpleFunction <
590+ KV <org .apache .beam .sdk .util .ShardedKey <DestinationT >, Iterable <ElementT >>,
591+ KV <DestinationT , Iterable <ElementT >>>() {
592+ @ Override
593+ public KV <DestinationT , Iterable <ElementT >> apply (
594+ KV <org .apache .beam .sdk .util .ShardedKey <DestinationT >, Iterable <ElementT >>
595+ input ) {
596+ return KV .of (input .getKey ().getKey (), input .getValue ());
597+ }
598+ }))
599+ .setCoder (KvCoder .of (destinationCoder , IterableCoder .of (elementCoder )))
600+ .apply (
601+ "WriteGroupedRecords" ,
602+ ParDo .of (
603+ new WriteGroupedRecordsToFiles <DestinationT , ElementT >(
604+ tempFilePrefix , maxFileSize , rowWriterFactory ))
605+ .withSideInputs (tempFilePrefix ))
606+ .setCoder (WriteBundlesToFiles .ResultCoder .of (destinationCoder ));
607+ }
608+
550609 private PCollection <Result <DestinationT >> writeShardedRecords (
551610 PCollection <KV <ShardedKey <DestinationT >, ElementT >> shardedRecords ,
552611 PCollectionView <String > tempFilePrefix ) {
553612 return shardedRecords
554613 .apply ("GroupByDestination" , GroupByKey .create ())
614+ .apply (
615+ "StripShardId" ,
616+ MapElements .via (
617+ new SimpleFunction <
618+ KV <ShardedKey <DestinationT >, Iterable <ElementT >>,
619+ KV <DestinationT , Iterable <ElementT >>>() {
620+ @ Override
621+ public KV <DestinationT , Iterable <ElementT >> apply (
622+ KV <ShardedKey <DestinationT >, Iterable <ElementT >> input ) {
623+ return KV .of (input .getKey ().getKey (), input .getValue ());
624+ }
625+ }))
626+ .setCoder (KvCoder .of (destinationCoder , IterableCoder .of (elementCoder )))
555627 .apply (
556628 "WriteGroupedRecords" ,
557629 ParDo .of (
0 commit comments