Skip to content

Commit ed4c03e

Browse files
authored
Do not create new Executor everytime createRunner (#32272)
* Do not create new Executor everytime createRunner * reset executorService after shutdown * Switch to use newScheduledThreadPool; guard ses with AtomicReference * Partially revert changes on flink and samza runner
1 parent 512b52a commit ed4c03e

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.beam.sdk.values.PCollectionView;
5050
import org.apache.beam.sdk.values.TupleTag;
5151
import org.apache.beam.sdk.values.WindowingStrategy;
52+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
5253
import org.apache.flink.api.java.functions.KeySelector;
5354
import org.apache.flink.runtime.state.StateInitializationContext;
5455
import org.joda.time.Duration;
@@ -126,7 +127,11 @@ public void initializeState(StateInitializationContext context) throws Exception
126127
// this will implicitly be keyed like the StateInternalsFactory
127128
TimerInternalsFactory<byte[]> timerInternalsFactory = key -> timerInternals;
128129

129-
executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
130+
if (this.executorService == null) {
131+
this.executorService =
132+
Executors.newSingleThreadScheduledExecutor(
133+
new ThreadFactoryBuilder().setNameFormat("flink-sdf-executor-%d").build());
134+
}
130135

131136
((ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
132137
((ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
@@ -191,10 +196,12 @@ public void close() throws Exception {
191196
"The scheduled executor service did not properly terminate. Shutting "
192197
+ "it down now.");
193198
executorService.shutdownNow();
199+
executorService = null;
194200
}
195201
} catch (InterruptedException e) {
196202
LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
197203
executorService.shutdownNow();
204+
executorService = null;
198205
}
199206
}
200207
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.atomic.AtomicReference;
3133
import org.apache.beam.runners.core.DoFnRunner;
3234
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
3335
import org.apache.beam.runners.core.KeyedWorkItem;
@@ -56,6 +58,7 @@
5658
import org.apache.beam.sdk.values.PCollectionView;
5759
import org.apache.beam.sdk.values.TupleTag;
5860
import org.apache.beam.sdk.values.WindowingStrategy;
61+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
5962
import org.joda.time.Duration;
6063
import org.joda.time.Instant;
6164

@@ -115,7 +118,10 @@ private static class ProcessFnExtractor implements UserParDoFnFactory.DoFnExtrac
115118
private static class SplittableDoFnRunnerFactory<
116119
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
117120
implements DoFnRunnerFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
121+
private final AtomicReference<ScheduledExecutorService> ses = new AtomicReference<>();
122+
118123
@Override
124+
@SuppressWarnings("nullness") // nullable atomic reference guaranteed nonnull when get
119125
public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> createRunner(
120126
DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fn,
121127
PipelineOptions options,
@@ -131,6 +137,13 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> crea
131137
OutputManager outputManager,
132138
DoFnSchemaInformation doFnSchemaInformation,
133139
Map<String, PCollectionView<?>> sideInputMapping) {
140+
if (this.ses.get() == null) {
141+
this.ses.compareAndSet(
142+
null,
143+
Executors.newScheduledThreadPool(
144+
Runtime.getRuntime().availableProcessors(),
145+
new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build()));
146+
}
134147
ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processFn =
135148
(ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>) fn;
136149
processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals());
@@ -162,7 +175,7 @@ public <T> void outputWindowedValue(
162175
}
163176
},
164177
sideInputReader,
165-
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
178+
ses.get(),
166179
// Commit at least once every 10 seconds or 10k records. This keeps the watermark
167180
// advancing smoothly, and ensures that not too much work will have to be reprocessed
168181
// in the event of a crash.

runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.Collections;
2222
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
2324
import org.apache.beam.runners.core.DoFnRunner;
2425
import org.apache.beam.runners.core.DoFnRunners;
2526
import org.apache.beam.runners.core.KeyedWorkItem;
@@ -49,9 +50,11 @@
4950
import org.apache.beam.sdk.values.PCollection.IsBounded;
5051
import org.apache.beam.sdk.values.TupleTag;
5152
import org.apache.beam.sdk.values.WindowingStrategy;
53+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
5254
import org.apache.samza.config.Config;
5355
import org.apache.samza.context.Context;
5456
import org.apache.samza.operators.Scheduler;
57+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
5558
import org.joda.time.Duration;
5659
import org.joda.time.Instant;
5760
import org.slf4j.Logger;
@@ -81,6 +84,7 @@ public class SplittableParDoProcessKeyedElementsOp<
8184
private transient SamzaTimerInternalsFactory<byte[]> timerInternalsFactory;
8285
private transient DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fnRunner;
8386
private transient SamzaPipelineOptions pipelineOptions;
87+
private transient @MonotonicNonNull ScheduledExecutorService ses = null;
8488

8589
public SplittableParDoProcessKeyedElementsOp(
8690
TupleTag<OutputT> mainOutputTag,
@@ -137,6 +141,12 @@ public void open(
137141
isBounded,
138142
pipelineOptions);
139143

144+
if (this.ses == null) {
145+
this.ses =
146+
Executors.newSingleThreadScheduledExecutor(
147+
new ThreadFactoryBuilder().setNameFormat("samza-sdf-executor-%d").build());
148+
}
149+
140150
final KeyedInternals<byte[]> keyedInternals =
141151
new KeyedInternals<>(stateInternalsFactory, timerInternalsFactory);
142152

@@ -172,7 +182,7 @@ public <AdditionalOutputT> void outputWindowedValue(
172182
}
173183
},
174184
NullSideInputReader.empty(),
175-
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
185+
ses,
176186
10000,
177187
Duration.standardSeconds(10),
178188
() -> {

0 commit comments

Comments
 (0)