test when event handler throws during replay, #24687

* and not accept wrong event before persisting it
* cleanup EventSourcedBehaviorFailureSpec
This commit is contained in:
Patrik Nordwall 2019-03-20 08:53:00 +01:00
parent 9165cb6c6b
commit ed65752bb5
3 changed files with 109 additions and 57 deletions

View file

@ -47,7 +47,7 @@ private[akka] object ReplayingEvents {
receivedPoisonPill: Boolean) receivedPoisonPill: Boolean)
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
Behaviors.setup { ctx => Behaviors.setup { _ =>
// protect against event recovery stalling forever because of journal overloaded and such // protect against event recovery stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = false) setup.startRecoveryTimer(snapshot = false)
new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state) new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state)
@ -158,12 +158,12 @@ private[akka] final class ReplayingEvents[C, E, S](
* The actor is always stopped after this method has been invoked. * The actor is always stopped after this method has been invoked.
* *
* @param cause failure cause. * @param cause failure cause.
* @param message the message that was being processed when the exception was thrown * @param event the event that was being processed when the exception was thrown
*/ */
protected def onRecoveryFailure( protected def onRecoveryFailure(
cause: Throwable, cause: Throwable,
sequenceNr: Long, sequenceNr: Long,
message: Option[Any]): Behavior[InternalProtocol] = { event: Option[Any]): Behavior[InternalProtocol] = {
try { try {
setup.onSignal(RecoveryFailed(cause)) setup.onSignal(RecoveryFailed(cause))
} catch { } catch {
@ -172,7 +172,7 @@ private[akka] final class ReplayingEvents[C, E, S](
setup.cancelRecoveryTimer() setup.cancelRecoveryTimer()
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)
val msg = message match { val msg = event match {
case Some(evt) => case Some(evt) =>
s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " + s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " +
s"PersistenceId [${setup.persistenceId.id}]" s"PersistenceId [${setup.persistenceId.id}]"

View file

@ -73,18 +73,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
* The actor is always stopped after this method has been invoked. * The actor is always stopped after this method has been invoked.
* *
* @param cause failure cause. * @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/ */
private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = { private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = {
setup.cancelRecoveryTimer() setup.cancelRecoveryTimer()
setup.log.error(cause, "Persistence failure when replaying snapshot")
event match {
case Some(evt) =>
setup.log.error(cause, "Exception in receiveRecover when replaying snapshot [{}]", evt.getClass.getName)
case _ =>
setup.log.error(cause, "Persistence failure when replaying snapshot")
}
Behaviors.stopped Behaviors.stopped
} }
@ -93,7 +85,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
// we know we're in snapshotting mode; snapshot recovery timeout arrived // we know we're in snapshotting mode; snapshot recovery timeout arrived
val ex = new RecoveryTimedOut( val ex = new RecoveryTimedOut(
s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}")
onRecoveryFailure(ex, None) onRecoveryFailure(ex)
} else Behaviors.same // ignore, since we received the snapshot already } else Behaviors.same // ignore, since we received the snapshot already
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
@ -126,7 +118,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill) becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill)
case LoadSnapshotFailed(cause) => case LoadSnapshotFailed(cause) =>
onRecoveryFailure(cause, event = None) onRecoveryFailure(cause)
case _ => case _ =>
Behaviors.unhandled Behaviors.unhandled

View file

@ -8,6 +8,7 @@ import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Try import scala.util.Try
import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._ import akka.actor.testkit.typed.scaladsl._
@ -17,12 +18,16 @@ import akka.actor.typed.PreRestart
import akka.actor.typed.Signal import akka.actor.typed.Signal
import akka.actor.typed.SupervisorStrategy import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.AtomicWrite import akka.persistence.AtomicWrite
import akka.persistence.journal.inmem.InmemJournal import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.EventRejectedException import akka.persistence.typed.EventRejectedException
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.RecoveryFailed import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.internal.JournalFailureException
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike import org.scalatest.WordSpecLike
@ -45,6 +50,8 @@ class ChaosJournal extends InmemJournal {
Try { Try {
throw TestException("I don't like it") throw TestException("I don't like it")
})) }))
} else if (messages.head.payload.head.payload == "malicious") {
super.asyncWriteMessages(List(AtomicWrite(List(messages.head.payload.head.withPayload("wrong-event")))))
} else { } else {
super.asyncWriteMessages(messages) super.asyncWriteMessages(messages)
} }
@ -65,7 +72,8 @@ class ChaosJournal extends InmemJournal {
object EventSourcedBehaviorFailureSpec { object EventSourcedBehaviorFailureSpec {
val conf: Config = ConfigFactory.parseString(s""" val conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = DEBUG akka.loglevel = INFO
akka.loggers = [akka.testkit.TestEventListener]
akka.persistence.journal.plugin = "failure-journal" akka.persistence.journal.plugin = "failure-journal"
failure-journal = $${akka.persistence.journal.inmem} failure-journal = $${akka.persistence.journal.inmem}
failure-journal { failure-journal {
@ -80,6 +88,11 @@ class EventSourcedBehaviorFailureSpec
implicit val testSettings: TestKitSettings = TestKitSettings(system) implicit val testSettings: TestKitSettings = TestKitSettings(system)
// Needed for the untyped event filter
implicit val untyped = system.toUntyped
untyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1)))
def failingPersistentActor( def failingPersistentActor(
pid: PersistenceId, pid: PersistenceId,
probe: ActorRef[String], probe: ActorRef[String],
@ -90,55 +103,81 @@ class EventSourcedBehaviorFailureSpec
"", "",
(_, cmd) { (_, cmd) {
if (cmd == "wrong") if (cmd == "wrong")
throw new TestException("wrong command") throw TestException("wrong command")
probe.tell("persisting") probe.tell("persisting")
Effect.persist(cmd) Effect.persist(cmd).thenRun(_ => probe.tell("persisted"))
}, },
(state, event) { (state, event) {
if (event == "wrong-event")
throw TestException("wrong event")
probe.tell(event) probe.tell(event)
state + event state + event
}) }).receiveSignal(additionalSignalHandler.orElse {
.receiveSignal(additionalSignalHandler.orElse { case RecoveryCompleted(_)
case RecoveryCompleted(_) probe.tell("starting")
probe.tell("starting") case PostStop
case PostStop probe.tell("stopped")
probe.tell("stopped") case PreRestart
case PreRestart probe.tell("restarting")
probe.tell("restarting") })
})
.onPersistFailure(
SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1).withLoggingEnabled(enabled = false))
"A typed persistent actor (failures)" must { "A typed persistent actor (failures)" must {
"call onRecoveryFailure when replay fails" in { "signal RecoveryFailure when replay fails" in {
val probe = TestProbe[String]() EventFilter[JournalFailureException](occurrences = 1).intercept {
val excProbe = TestProbe[Throwable]() val probe = TestProbe[String]()
spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, { val excProbe = TestProbe[Throwable]()
case RecoveryFailed(t) spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, {
println("signal recovery failed") case RecoveryFailed(t)
excProbe.ref ! t excProbe.ref ! t
})) }))
excProbe.expectMessageType[TestException].message shouldEqual "Nope" excProbe.expectMessageType[TestException].message shouldEqual "Nope"
probe.expectMessage("restarting") probe.expectMessage("stopped")
}
} }
"handle exceptions in onRecoveryFailure" in { "handle exceptions in onRecoveryFailure" in {
val probe = TestProbe[String]() val probe = TestProbe[String]()
val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref, { val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref, {
case RecoveryFailed(t) case RecoveryFailed(_)
throw TestException("recovery call back failure") throw TestException("recovery call back failure")
})) }))
pa ! "one" pa ! "one"
probe.expectMessage("starting") probe.expectMessage("starting")
probe.expectMessage("persisting") probe.expectMessage("persisting")
probe.expectMessage("one") probe.expectMessage("one")
probe.expectMessage("persisted")
}
"signal RecoveryFailure when event handler throws during replay" in {
val probe = TestProbe[String]()
val excProbe = TestProbe[Throwable]()
val pid = PersistenceId("wrong-event-1")
val ref = spawn(failingPersistentActor(pid, probe.ref))
ref ! "malicious"
probe.expectMessage("starting")
probe.expectMessage("persisting")
probe.expectMessage("malicious")
probe.expectMessage("persisted")
EventFilter[JournalFailureException](occurrences = 1).intercept {
// start again and then the event handler will throw
spawn(failingPersistentActor(pid, probe.ref, {
case RecoveryFailed(t)
excProbe.ref ! t
}))
excProbe.expectMessageType[TestException].message shouldEqual "wrong event"
probe.expectMessage("stopped")
}
} }
"restart with backoff" in { "restart with backoff" in {
val probe = TestProbe[String]() val probe = TestProbe[String]()
val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref) val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref).onPersistFailure(
SupervisorStrategy.restartWithBackoff(1.milli, 10.millis, 0.1).withLoggingEnabled(enabled = false))
val c = spawn(behav) val c = spawn(behav)
probe.expectMessage("starting") probe.expectMessage("starting")
// fail // fail
@ -157,13 +196,15 @@ class EventSourcedBehaviorFailureSpec
c ! "three" c ! "three"
probe.expectMessage("persisting") probe.expectMessage("persisting")
probe.expectMessage("three") probe.expectMessage("three")
probe.expectMessage("persisted")
// no restart // no restart
probe.expectNoMessage() probe.expectNoMessage()
} }
"restart with backoff for recovery" in { "restart with backoff for recovery" in {
val probe = TestProbe[String]() val probe = TestProbe[String]()
val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref) val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref).onPersistFailure(
SupervisorStrategy.restartWithBackoff(1.milli, 10.millis, 0.1).withLoggingEnabled(enabled = false))
spawn(behav) spawn(behav)
// First time fails, second time should work and call onRecoveryComplete // First time fails, second time should work and call onRecoveryComplete
probe.expectMessage("restarting") probe.expectMessage("restarting")
@ -189,28 +230,47 @@ class EventSourcedBehaviorFailureSpec
c ! "two" c ! "two"
probe.expectMessage("persisting") probe.expectMessage("persisting")
probe.expectMessage("two") probe.expectMessage("two")
probe.expectMessage("persisted")
// no restart // no restart
probe.expectNoMessage() probe.expectNoMessage()
} }
"stop (default supervisor strategy) if command handler throws" in { "stop (default supervisor strategy) if command handler throws" in {
val probe = TestProbe[String]() EventFilter[TestException](occurrences = 1).intercept {
val behav = failingPersistentActor(PersistenceId("wrong-command-1"), probe.ref) val probe = TestProbe[String]()
val c = spawn(behav) val behav = failingPersistentActor(PersistenceId("wrong-command-1"), probe.ref)
probe.expectMessage("starting") val c = spawn(behav)
c ! "wrong" probe.expectMessage("starting")
probe.expectMessage("stopped") c ! "wrong"
probe.expectMessage("stopped")
}
} }
"restart supervisor strategy if command handler throws" in { "restart supervisor strategy if command handler throws" in {
val probe = TestProbe[String]() EventFilter[TestException](occurrences = 1).intercept {
val behav = Behaviors val probe = TestProbe[String]()
.supervise(failingPersistentActor(PersistenceId("wrong-command-2"), probe.ref)) val behav = Behaviors
.onFailure[TestException](SupervisorStrategy.restart) .supervise(failingPersistentActor(PersistenceId("wrong-command-2"), probe.ref))
val c = spawn(behav) .onFailure[TestException](SupervisorStrategy.restart)
probe.expectMessage("starting") val c = spawn(behav)
c ! "wrong" probe.expectMessage("starting")
probe.expectMessage("restarting") c ! "wrong"
probe.expectMessage("restarting")
}
}
"not accept wrong event, before persisting it" in {
EventFilter[TestException](occurrences = 1).intercept {
val probe = TestProbe[String]()
val behav = failingPersistentActor(PersistenceId("wrong-event-2"), probe.ref)
val c = spawn(behav)
probe.expectMessage("starting")
// event handler will throw for this event
c ! "wrong-event"
probe.expectMessage("persisting")
// but not "persisted"
probe.expectMessage("stopped")
}
} }
} }
} }