1717 */
1818package org .apache .beam .runners .samza .runtime ;
1919
20+ import java .util .Collections ;
2021import java .util .List ;
22+ import java .util .Locale ;
2123import java .util .Map ;
2224import java .util .concurrent .LinkedBlockingQueue ;
2325import org .apache .beam .model .pipeline .v1 .RunnerApi ;
3032import org .apache .beam .runners .core .StatefulDoFnRunner ;
3133import org .apache .beam .runners .core .StepContext ;
3234import org .apache .beam .runners .core .TimerInternals ;
35+ import org .apache .beam .runners .core .construction .Timer ;
3336import org .apache .beam .runners .core .construction .graph .ExecutableStage ;
3437import org .apache .beam .runners .fnexecution .control .BundleProgressHandler ;
3538import org .apache .beam .runners .fnexecution .control .OutputReceiverFactory ;
3639import org .apache .beam .runners .fnexecution .control .RemoteBundle ;
3740import org .apache .beam .runners .fnexecution .control .StageBundleFactory ;
41+ import org .apache .beam .runners .fnexecution .control .TimerReceiverFactory ;
3842import org .apache .beam .runners .fnexecution .state .StateRequestHandler ;
3943import org .apache .beam .runners .samza .SamzaExecutionContext ;
4044import org .apache .beam .runners .samza .SamzaPipelineOptions ;
4145import org .apache .beam .runners .samza .metrics .DoFnRunnerWithMetrics ;
4246import org .apache .beam .runners .samza .util .StateUtils ;
47+ import org .apache .beam .runners .samza .util .WindowUtils ;
4348import org .apache .beam .sdk .coders .Coder ;
4449import org .apache .beam .sdk .fn .data .FnDataReceiver ;
4550import org .apache .beam .sdk .state .BagState ;
4954import org .apache .beam .sdk .transforms .reflect .DoFnSignature ;
5055import org .apache .beam .sdk .transforms .reflect .DoFnSignatures ;
5156import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
57+ import org .apache .beam .sdk .transforms .windowing .PaneInfo ;
5258import org .apache .beam .sdk .util .WindowedValue ;
5359import org .apache .beam .sdk .values .KV ;
5460import org .apache .beam .sdk .values .PCollectionView ;
@@ -184,6 +190,7 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
184190 Map <?, PCollectionView <?>> sideInputMapping ,
185191 SideInputHandler sideInputHandler ,
186192 SamzaStoreStateInternals .Factory <?> nonKeyedStateInternalsFactory ,
193+ SamzaTimerInternalsFactory <?> timerInternalsFactory ,
187194 SamzaPipelineOptions pipelineOptions ,
188195 DoFnRunners .OutputManager outputManager ,
189196 StageBundleFactory stageBundleFactory ,
@@ -212,6 +219,9 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
212219 (SamzaExecutionContext ) context .getApplicationContainerContext ();
213220 final DoFnRunner <InT , FnOutT > underlyingRunner =
214221 new SdkHarnessDoFnRunner <>(
222+ timerInternalsFactory ,
223+ WindowUtils .getWindowStrategy (
224+ executableStage .getInputPCollection ().getId (), executableStage .getComponents ()),
215225 outputManager ,
216226 stageBundleFactory ,
217227 mainOutputTag ,
@@ -225,6 +235,8 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
225235 }
226236
227237 private static class SdkHarnessDoFnRunner <InT , FnOutT > implements DoFnRunner <InT , FnOutT > {
238+ private final SamzaTimerInternalsFactory timerInternalsFactory ;
239+ private final WindowingStrategy windowingStrategy ;
228240 private final DoFnRunners .OutputManager outputManager ;
229241 private final StageBundleFactory stageBundleFactory ;
230242 private final TupleTag <FnOutT > mainOutputTag ;
@@ -236,12 +248,16 @@ private static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT
236248 private StateRequestHandler stateRequestHandler ;
237249
238250 private SdkHarnessDoFnRunner (
251+ SamzaTimerInternalsFactory <?> timerInternalsFactory ,
252+ WindowingStrategy windowingStrategy ,
239253 DoFnRunners .OutputManager outputManager ,
240254 StageBundleFactory stageBundleFactory ,
241255 TupleTag <FnOutT > mainOutputTag ,
242256 Map <String , TupleTag <?>> idToTupleTagMap ,
243257 BagState <WindowedValue <InT >> bundledEventsBag ,
244258 StateRequestHandler stateRequestHandler ) {
259+ this .timerInternalsFactory = timerInternalsFactory ;
260+ this .windowingStrategy = windowingStrategy ;
245261 this .outputManager = outputManager ;
246262 this .stageBundleFactory = stageBundleFactory ;
247263 this .mainOutputTag = mainOutputTag ;
@@ -250,6 +266,17 @@ private SdkHarnessDoFnRunner(
250266 this .stateRequestHandler = stateRequestHandler ;
251267 }
252268
269+ @ SuppressWarnings ("unchecked" )
270+ private void timerDataConsumer (Timer <?> timerElement , TimerInternals .TimerData timerData ) {
271+ TimerInternals timerInternals =
272+ timerInternalsFactory .timerInternalsForKey (timerElement .getUserKey ());
273+ if (timerElement .getClearBit ()) {
274+ timerInternals .deleteTimer (timerData );
275+ } else {
276+ timerInternals .setTimer (timerData );
277+ }
278+ }
279+
253280 @ Override
254281 public void startBundle () {
255282 try {
@@ -264,11 +291,17 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
264291 }
265292 };
266293
294+ final Coder <BoundedWindow > windowCoder = windowingStrategy .getWindowFn ().windowCoder ();
295+ final TimerReceiverFactory timerReceiverFactory =
296+ new TimerReceiverFactory (stageBundleFactory , this ::timerDataConsumer , windowCoder );
297+
267298 remoteBundle =
268299 stageBundleFactory .getBundle (
269- receiverFactory , stateRequestHandler , BundleProgressHandler .ignored ());
300+ receiverFactory ,
301+ timerReceiverFactory ,
302+ stateRequestHandler ,
303+ BundleProgressHandler .ignored ());
270304
271- // TODO: side input support needs to implement to handle this properly
272305 inputReceiver = Iterables .getOnlyElement (remoteBundle .getInputReceivers ().values ());
273306 bundledEventsBag
274307 .read ()
@@ -312,7 +345,27 @@ public <KeyT> void onTimer(
312345 BoundedWindow window ,
313346 Instant timestamp ,
314347 Instant outputTimestamp ,
315- TimeDomain timeDomain ) {}
348+ TimeDomain timeDomain ) {
349+ final KV <String , String > timerReceiverKey =
350+ TimerReceiverFactory .decodeTimerDataTimerId (timerFamilyId );
351+ final FnDataReceiver <Timer > timerReceiver =
352+ remoteBundle .getTimerReceivers ().get (timerReceiverKey );
353+ final Timer timerValue =
354+ Timer .of (
355+ key ,
356+ timerId ,
357+ Collections .singletonList (window ),
358+ timestamp ,
359+ outputTimestamp ,
360+ // TODO: Support propagating the PaneInfo through.
361+ PaneInfo .NO_FIRING );
362+ try {
363+ timerReceiver .accept (timerValue );
364+ } catch (Exception e ) {
365+ throw new RuntimeException (
366+ String .format (Locale .ENGLISH , "Failed to process timer %s" , timerReceiver ), e );
367+ }
368+ }
316369
317370 @ Override
318371 public void finishBundle () {
0 commit comments