diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index ba3c8ad05f..08812803fb 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -47,7 +47,7 @@ private[akka] object ReplayingEvents { receivedPoisonPill: Boolean) def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = - Behaviors.setup { ctx => + Behaviors.setup { _ => // protect against event recovery stalling forever because of journal overloaded and such setup.startRecoveryTimer(snapshot = false) new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state) @@ -158,12 +158,12 @@ private[akka] final class ReplayingEvents[C, E, S]( * The actor is always stopped after this method has been invoked. * * @param cause failure cause. - * @param message the message that was being processed when the exception was thrown + * @param event the event that was being processed when the exception was thrown */ protected def onRecoveryFailure( cause: Throwable, sequenceNr: Long, - message: Option[Any]): Behavior[InternalProtocol] = { + event: Option[Any]): Behavior[InternalProtocol] = { try { setup.onSignal(RecoveryFailed(cause)) } catch { @@ -172,7 +172,7 @@ private[akka] final class ReplayingEvents[C, E, S]( setup.cancelRecoveryTimer() tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) - val msg = message match { + val msg = event match { case Some(evt) => s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " + s"PersistenceId [${setup.persistenceId.id}]" diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 3699576407..d0c15cd144 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -73,18 +73,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup * The actor is always stopped after this method has been invoked. * * @param cause failure cause. - * @param event the event that was processed in `receiveRecover`, if the exception was thrown there */ - private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = { + private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = { setup.cancelRecoveryTimer() - - event match { - case Some(evt) => - setup.log.error(cause, "Exception in receiveRecover when replaying snapshot [{}]", evt.getClass.getName) - case _ => - setup.log.error(cause, "Persistence failure when replaying snapshot") - } - + setup.log.error(cause, "Persistence failure when replaying snapshot") Behaviors.stopped } @@ -93,7 +85,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup // we know we're in snapshotting mode; snapshot recovery timeout arrived val ex = new RecoveryTimedOut( s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") - onRecoveryFailure(ex, None) + onRecoveryFailure(ex) } else Behaviors.same // ignore, since we received the snapshot already def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { @@ -126,7 +118,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill) case LoadSnapshotFailed(cause) => - onRecoveryFailure(cause, event = None) + onRecoveryFailure(cause) case _ => Behaviors.unhandled diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index 9d39f26ec5..75c06d9355 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -8,6 +8,7 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Try + import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ @@ -17,12 +18,16 @@ import akka.actor.typed.PreRestart import akka.actor.typed.Signal import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ import akka.persistence.AtomicWrite import akka.persistence.journal.inmem.InmemJournal import akka.persistence.typed.EventRejectedException import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryFailed +import akka.persistence.typed.internal.JournalFailureException +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -45,6 +50,8 @@ class ChaosJournal extends InmemJournal { Try { throw TestException("I don't like it") })) + } else if (messages.head.payload.head.payload == "malicious") { + super.asyncWriteMessages(List(AtomicWrite(List(messages.head.payload.head.withPayload("wrong-event"))))) } else { super.asyncWriteMessages(messages) } @@ -65,7 +72,8 @@ class ChaosJournal extends InmemJournal { object EventSourcedBehaviorFailureSpec { val conf: Config = ConfigFactory.parseString(s""" - akka.loglevel = DEBUG + akka.loglevel = INFO + akka.loggers = [akka.testkit.TestEventListener] akka.persistence.journal.plugin = "failure-journal" failure-journal = $${akka.persistence.journal.inmem} failure-journal { @@ -80,6 +88,11 @@ class EventSourcedBehaviorFailureSpec implicit val testSettings: TestKitSettings = TestKitSettings(system) + // Needed for the untyped event filter + implicit val untyped = system.toUntyped + + untyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1))) + def failingPersistentActor( pid: PersistenceId, probe: ActorRef[String], @@ -90,55 +103,81 @@ class EventSourcedBehaviorFailureSpec "", (_, cmd) ⇒ { if (cmd == "wrong") - throw new TestException("wrong command") + throw TestException("wrong command") probe.tell("persisting") - Effect.persist(cmd) + Effect.persist(cmd).thenRun(_ => probe.tell("persisted")) }, (state, event) ⇒ { + if (event == "wrong-event") + throw TestException("wrong event") probe.tell(event) state + event - }) - .receiveSignal(additionalSignalHandler.orElse { - case RecoveryCompleted(_) ⇒ - probe.tell("starting") - case PostStop ⇒ - probe.tell("stopped") - case PreRestart ⇒ - probe.tell("restarting") - }) - .onPersistFailure( - SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1).withLoggingEnabled(enabled = false)) + }).receiveSignal(additionalSignalHandler.orElse { + case RecoveryCompleted(_) ⇒ + probe.tell("starting") + case PostStop ⇒ + probe.tell("stopped") + case PreRestart ⇒ + probe.tell("restarting") + }) "A typed persistent actor (failures)" must { - "call onRecoveryFailure when replay fails" in { - val probe = TestProbe[String]() - val excProbe = TestProbe[Throwable]() - spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, { - case RecoveryFailed(t) ⇒ - println("signal recovery failed") - excProbe.ref ! t - })) + "signal RecoveryFailure when replay fails" in { + EventFilter[JournalFailureException](occurrences = 1).intercept { + val probe = TestProbe[String]() + val excProbe = TestProbe[Throwable]() + spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, { + case RecoveryFailed(t) ⇒ + excProbe.ref ! t + })) - excProbe.expectMessageType[TestException].message shouldEqual "Nope" - probe.expectMessage("restarting") + excProbe.expectMessageType[TestException].message shouldEqual "Nope" + probe.expectMessage("stopped") + } } "handle exceptions in onRecoveryFailure" in { val probe = TestProbe[String]() val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref, { - case RecoveryFailed(t) ⇒ + case RecoveryFailed(_) ⇒ throw TestException("recovery call back failure") })) pa ! "one" probe.expectMessage("starting") probe.expectMessage("persisting") probe.expectMessage("one") + probe.expectMessage("persisted") + } + + "signal RecoveryFailure when event handler throws during replay" in { + val probe = TestProbe[String]() + val excProbe = TestProbe[Throwable]() + val pid = PersistenceId("wrong-event-1") + val ref = spawn(failingPersistentActor(pid, probe.ref)) + + ref ! "malicious" + probe.expectMessage("starting") + probe.expectMessage("persisting") + probe.expectMessage("malicious") + probe.expectMessage("persisted") + + EventFilter[JournalFailureException](occurrences = 1).intercept { + // start again and then the event handler will throw + spawn(failingPersistentActor(pid, probe.ref, { + case RecoveryFailed(t) ⇒ + excProbe.ref ! t + })) + + excProbe.expectMessageType[TestException].message shouldEqual "wrong event" + probe.expectMessage("stopped") + } } "restart with backoff" in { val probe = TestProbe[String]() - val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref) + val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref).onPersistFailure( + SupervisorStrategy.restartWithBackoff(1.milli, 10.millis, 0.1).withLoggingEnabled(enabled = false)) val c = spawn(behav) probe.expectMessage("starting") // fail @@ -157,13 +196,15 @@ class EventSourcedBehaviorFailureSpec c ! "three" probe.expectMessage("persisting") probe.expectMessage("three") + probe.expectMessage("persisted") // no restart probe.expectNoMessage() } "restart with backoff for recovery" in { val probe = TestProbe[String]() - val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref) + val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref).onPersistFailure( + SupervisorStrategy.restartWithBackoff(1.milli, 10.millis, 0.1).withLoggingEnabled(enabled = false)) spawn(behav) // First time fails, second time should work and call onRecoveryComplete probe.expectMessage("restarting") @@ -189,28 +230,47 @@ class EventSourcedBehaviorFailureSpec c ! "two" probe.expectMessage("persisting") probe.expectMessage("two") + probe.expectMessage("persisted") // no restart probe.expectNoMessage() } "stop (default supervisor strategy) if command handler throws" in { - val probe = TestProbe[String]() - val behav = failingPersistentActor(PersistenceId("wrong-command-1"), probe.ref) - val c = spawn(behav) - probe.expectMessage("starting") - c ! "wrong" - probe.expectMessage("stopped") + EventFilter[TestException](occurrences = 1).intercept { + val probe = TestProbe[String]() + val behav = failingPersistentActor(PersistenceId("wrong-command-1"), probe.ref) + val c = spawn(behav) + probe.expectMessage("starting") + c ! "wrong" + probe.expectMessage("stopped") + } } "restart supervisor strategy if command handler throws" in { - val probe = TestProbe[String]() - val behav = Behaviors - .supervise(failingPersistentActor(PersistenceId("wrong-command-2"), probe.ref)) - .onFailure[TestException](SupervisorStrategy.restart) - val c = spawn(behav) - probe.expectMessage("starting") - c ! "wrong" - probe.expectMessage("restarting") + EventFilter[TestException](occurrences = 1).intercept { + val probe = TestProbe[String]() + val behav = Behaviors + .supervise(failingPersistentActor(PersistenceId("wrong-command-2"), probe.ref)) + .onFailure[TestException](SupervisorStrategy.restart) + val c = spawn(behav) + probe.expectMessage("starting") + c ! "wrong" + probe.expectMessage("restarting") + } + } + + "not accept wrong event, before persisting it" in { + EventFilter[TestException](occurrences = 1).intercept { + val probe = TestProbe[String]() + val behav = failingPersistentActor(PersistenceId("wrong-event-2"), probe.ref) + val c = spawn(behav) + probe.expectMessage("starting") + // event handler will throw for this event + c ! "wrong-event" + probe.expectMessage("persisting") + // but not "persisted" + probe.expectMessage("stopped") + } } } }