Skip to content

Commit 6675586

Browse files
author
Kyle Weaver
authored
[BEAM-11637] Use accumulators properly in BitAnd. (#13745)
* [BEAM-11637] Use accumulators properly in BitAnd.
1 parent c57d71f commit 6675586

1 file changed

Lines changed: 38 additions & 18 deletions

File tree

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -397,40 +397,60 @@ public Long extractOutput(Long accum) {
397397
* <p>Note: null values are ignored when mixed with non-null values.
398398
* (https://issues.apache.org/jira/browse/BEAM-10379)
399399
*/
400-
static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> {
401-
// Indicate if input only contains null value.
402-
private boolean isEmpty = true;
400+
static class BitAnd<T extends Number> extends CombineFn<T, BitAnd.Accum, Long> {
401+
static class Accum {
402+
/** True if no inputs have been seen yet. */
403+
boolean isEmpty = true;
404+
/**
405+
* True if any null inputs have been seen. If we see a single null value, the end result is
406+
* null, so if isNull is true, isEmpty and bitAnd are ignored.
407+
*/
408+
boolean isNull = false;
409+
/** The bitwise-and of the inputs seen so far. */
410+
long bitAnd = -1L;
411+
}
403412

404413
@Override
405-
public Long createAccumulator() {
406-
return -1L;
414+
public Accum createAccumulator() {
415+
return new Accum();
407416
}
408417

409418
@Override
410-
public Long addInput(Long accum, T input) {
411-
if (input != null) {
412-
this.isEmpty = false;
413-
return accum & input.longValue();
414-
} else {
415-
return null;
419+
public Accum addInput(Accum accum, T input) {
420+
if (accum.isNull) {
421+
return accum;
422+
}
423+
if (input == null) {
424+
accum.isNull = true;
425+
return accum;
416426
}
427+
accum.isEmpty = false;
428+
accum.bitAnd &= input.longValue();
429+
return accum;
417430
}
418431

419432
@Override
420-
public Long mergeAccumulators(Iterable<Long> accums) {
421-
Long merged = createAccumulator();
422-
for (Long accum : accums) {
423-
merged = merged & accum;
433+
public Accum mergeAccumulators(Iterable<Accum> accums) {
434+
Accum merged = createAccumulator();
435+
for (Accum accum : accums) {
436+
if (accum.isNull) {
437+
return accum;
438+
}
439+
if (accum.isEmpty) {
440+
continue;
441+
}
442+
merged.isEmpty = false;
443+
merged.bitAnd &= accum.bitAnd;
424444
}
425445
return merged;
426446
}
427447

428448
@Override
429-
public Long extractOutput(Long accum) {
430-
if (this.isEmpty) {
449+
public Long extractOutput(Accum accum) {
450+
if (accum.isEmpty || accum.isNull) {
431451
return null;
432452
}
433-
return accum;
453+
return accum.bitAnd;
434454
}
435455
}
436456
}

0 commit comments

Comments
 (0)