Skip to content

Commit f97bdbd

Browse files
Add JournalPersistFailed and JournalPersistRejected signals (#1961) (#2032)
* Add `JournalPersistFailed` and `JournalPersistRejected` signals with debug logging when emitted * Add tests for `JournalPersistFailed` and `JournalPersistRejected` signals * fix comments, remove pekko version * applyCodeStyle * change license Co-authored-by: Ilya Kachalsky <[email protected]>
1 parent e162781 commit f97bdbd

File tree

3 files changed

+239
-0
lines changed

3 files changed

+239
-0
lines changed
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.persistence.typed.scaladsl
19+
20+
import org.apache.pekko
21+
import pekko.actor.testkit.typed.TestException
22+
import pekko.actor.testkit.typed.scaladsl._
23+
import pekko.actor.typed.ActorRef
24+
import pekko.actor.typed.Behavior
25+
import pekko.persistence.AtomicWrite
26+
import pekko.persistence.journal.inmem.InmemJournal
27+
import pekko.persistence.typed.JournalPersistFailed
28+
import pekko.persistence.typed.JournalPersistRejected
29+
import pekko.persistence.typed.PersistenceId
30+
import pekko.persistence.typed.RecoveryCompleted
31+
import pekko.serialization.jackson.CborSerializable
32+
import com.typesafe.config.Config
33+
import com.typesafe.config.ConfigFactory
34+
import org.scalatest.wordspec.AnyWordSpecLike
35+
36+
import scala.collection.immutable
37+
import scala.concurrent.Future
38+
import scala.concurrent.duration._
39+
import scala.util.Try
40+
41+
// Custom journal that checks event flags to determine whether to reject or fail writes
42+
class SignalTestJournal extends InmemJournal {
43+
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
44+
// Check if any of the events have the shouldReject or shouldFail flag set
45+
val shouldReject = messages.exists { atomicWrite =>
46+
atomicWrite.payload.exists { persistentRepr =>
47+
persistentRepr.payload match {
48+
case event: EventSourcedBehaviorSignalSpec.Incremented => event.shouldReject
49+
case _ => false
50+
}
51+
}
52+
}
53+
54+
val shouldFail = messages.exists { atomicWrite =>
55+
atomicWrite.payload.exists { persistentRepr =>
56+
persistentRepr.payload match {
57+
case event: EventSourcedBehaviorSignalSpec.Incremented => event.shouldFail
58+
case _ => false
59+
}
60+
}
61+
}
62+
63+
if (shouldReject) {
64+
// Return a successful future with a failed Try to simulate rejection
65+
Future.successful(messages.map(_ =>
66+
Try { throw TestException("Journal rejected the event") }
67+
))
68+
} else if (shouldFail) {
69+
// Return a failed future to simulate journal failure
70+
Future.failed(TestException("Journal failed to persist the event"))
71+
} else {
72+
super.asyncWriteMessages(messages)
73+
}
74+
}
75+
}
76+
77+
object EventSourcedBehaviorSignalSpec {
78+
// Commands
79+
sealed trait Command extends CborSerializable
80+
case object Increment extends Command
81+
case object IncrementWithReject extends Command
82+
case object IncrementWithFailure extends Command
83+
84+
// Events
85+
sealed trait Event extends CborSerializable
86+
final case class Incremented(delta: Int, shouldReject: Boolean = false, shouldFail: Boolean = false) extends Event
87+
88+
// State
89+
final case class State(value: Int) extends CborSerializable
90+
91+
// Configuration for tests
92+
val config: Config = ConfigFactory.parseString(s"""
93+
pekko.loglevel = INFO
94+
pekko.persistence.journal.plugin = "signal-test-journal"
95+
signal-test-journal = $${pekko.persistence.journal.inmem}
96+
signal-test-journal {
97+
class = "${classOf[SignalTestJournal].getName}"
98+
}
99+
""").withFallback(ConfigFactory.defaultReference()).resolve()
100+
101+
// Create a behavior that tracks signals
102+
def signalTrackingBehavior(
103+
persistenceId: PersistenceId,
104+
signalProbe: ActorRef[String]): Behavior[Command] = {
105+
106+
// Create the base EventSourcedBehavior
107+
val eventSourcedBehavior = EventSourcedBehavior[Command, Event, State](
108+
persistenceId,
109+
emptyState = State(0),
110+
commandHandler = (_, cmd) =>
111+
cmd match {
112+
case Increment =>
113+
Effect.persist(Incremented(1))
114+
case IncrementWithReject =>
115+
// Create an event that signals it should be rejected
116+
Effect.persist(Incremented(1, shouldReject = true))
117+
case IncrementWithFailure =>
118+
// Create an event that signals it should fail
119+
Effect.persist(Incremented(1, shouldFail = true))
120+
},
121+
eventHandler = (state, evt) =>
122+
evt match {
123+
case Incremented(delta, _, _) =>
124+
State(state.value + delta)
125+
})
126+
.receiveSignal {
127+
case (_, RecoveryCompleted) =>
128+
signalProbe ! "RecoveryCompleted"
129+
case (_, signal: JournalPersistRejected) =>
130+
val message = s"JournalPersistRejected: ${signal.failure.getMessage}"
131+
signalProbe ! message
132+
case (_, signal: JournalPersistFailed) =>
133+
val message = s"JournalPersistFailed: ${signal.failure.getMessage}"
134+
signalProbe ! message
135+
}
136+
137+
// We don't need to handle failures with supervision since we're only testing the signals
138+
eventSourcedBehavior
139+
}
140+
}
141+
142+
class EventSourcedBehaviorSignalSpec
143+
extends ScalaTestWithActorTestKit(EventSourcedBehaviorSignalSpec.config)
144+
with AnyWordSpecLike
145+
with LogCapturing {
146+
147+
import EventSourcedBehaviorSignalSpec._
148+
149+
private val pidCounter = new java.util.concurrent.atomic.AtomicInteger(0)
150+
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"signal-test-${pidCounter.incrementAndGet()}")
151+
152+
"An EventSourcedBehavior" must {
153+
"receive JournalPersistRejected signal when journal rejects events" in {
154+
// Create a probe to track signals
155+
val signalProbe = createTestProbe[String]()
156+
157+
// Create a behavior that will track signals
158+
val behavior = signalTrackingBehavior(nextPid(), signalProbe.ref)
159+
160+
// Spawn the actor
161+
val actor = spawn(behavior)
162+
163+
// Wait for recovery to complete
164+
signalProbe.expectMessage("RecoveryCompleted")
165+
166+
// Send a command that will trigger a rejection
167+
actor ! IncrementWithReject
168+
169+
// Verify that the JournalPersistRejected signal was received
170+
signalProbe.expectMessage(5.seconds, "JournalPersistRejected: Journal rejected the event")
171+
}
172+
173+
"receive JournalPersistFailed signal when journal fails to persist events" in {
174+
// Create a probe to track signals
175+
val signalProbe = createTestProbe[String]()
176+
177+
// Create a behavior that will track signals
178+
val behavior = signalTrackingBehavior(nextPid(), signalProbe.ref)
179+
180+
// Spawn the actor
181+
val actor = spawn(behavior)
182+
183+
// Wait for recovery to complete
184+
signalProbe.expectMessage("RecoveryCompleted")
185+
186+
// Send a command that will trigger a failure
187+
actor ! IncrementWithFailure
188+
189+
// Verify that the JournalPersistFailed signal was received
190+
signalProbe.expectMessage(5.seconds, "JournalPersistFailed: Journal failed to persist the event")
191+
}
192+
193+
"receive no signal when journal doesn't fail" in {
194+
// Create a probe to track signals
195+
val signalProbe = createTestProbe[String]()
196+
197+
// Create a behavior that will track signals
198+
val behavior = signalTrackingBehavior(nextPid(), signalProbe.ref)
199+
200+
// Spawn the actor
201+
val actor = spawn(behavior)
202+
203+
// Wait for recovery to complete
204+
signalProbe.expectMessage("RecoveryCompleted")
205+
206+
// Send a command that won't trigger a failure
207+
actor ! Increment
208+
209+
// Verify that no signal was emitted
210+
signalProbe.expectNoMessage(5.seconds)
211+
}
212+
}
213+
}

persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/EventSourcedSignal.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,22 @@ final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal {
3939
def getFailure(): Throwable = failure
4040
}
4141

42+
final case class JournalPersistFailed(failure: Throwable) extends EventSourcedSignal {
43+
44+
/**
45+
* Java API
46+
*/
47+
def getFailure(): Throwable = failure
48+
}
49+
50+
final case class JournalPersistRejected(failure: Throwable) extends EventSourcedSignal {
51+
52+
/**
53+
* Java API
54+
*/
55+
def getFailure(): Throwable = failure
56+
}
57+
4258
final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal {
4359

4460
/**

persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ import pekko.persistence.typed.{
5252
DeleteSnapshotsFailed,
5353
DeletionTarget,
5454
EventRejectedException,
55+
JournalPersistFailed,
56+
JournalPersistRejected,
5557
PersistenceId,
5658
SnapshotCompleted,
5759
SnapshotFailed,
@@ -816,12 +818,20 @@ private[pekko] object Running {
816818
case WriteMessageRejected(p, cause, id) =>
817819
if (id == setup.writerIdentity.instanceId) {
818820
onWriteRejected(setup.context, cause, p)
821+
val signal = JournalPersistRejected(cause)
822+
if (setup.onSignal(state.state, signal, catchAndLog = false)) {
823+
setup.internalLogger.debug("Emitted signal [{}].", signal)
824+
}
819825
throw new EventRejectedException(setup.persistenceId, p.sequenceNr, cause)
820826
} else this
821827

822828
case WriteMessageFailure(p, cause, id) =>
823829
if (id == setup.writerIdentity.instanceId) {
824830
onWriteFailed(setup.context, cause, p)
831+
val signal = JournalPersistFailed(cause)
832+
if (setup.onSignal(state.state, signal, catchAndLog = false)) {
833+
setup.internalLogger.debug("Emitted signal [{}].", signal)
834+
}
825835
throw new JournalFailureException(setup.persistenceId, p.sequenceNr, p.payload.getClass.getName, cause)
826836
} else this
827837

0 commit comments

Comments
 (0)