diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index f3a90b84a1..5a965c6105 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -5,6 +5,7 @@ package akka.persistence.typed.internal import scala.util.control.NonFatal +import scala.util.control.NoStackTrace import akka.actor.typed.Behavior import akka.actor.typed.internal.PoisonPill @@ -13,6 +14,7 @@ import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ +import akka.persistence.typed.internal.ReplayingEvents.FailureWhileUnstashing /*** * INTERNAL API @@ -46,6 +48,8 @@ private[akka] object ReplayingEvents { ): Behavior[InternalProtocol] = new ReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state) + private final case class FailureWhileUnstashing(cause: Throwable) extends Exception(cause) with NoStackTrace + } @InternalApi @@ -104,6 +108,7 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C Behaviors.unhandled } } catch { + case FailureWhileUnstashing(ex) ⇒ throw ex case NonFatal(cause) ⇒ onRecoveryFailure(cause, state.seqNr, None) } @@ -179,7 +184,11 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill) ) - tryUnstashOne(running) + try { + tryUnstashOne(running) + } catch { + case NonFatal(t) ⇒ throw FailureWhileUnstashing(t) + } } } finally { setup.cancelRecoveryTimer() diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 753cc68222..a39df7f9d3 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -12,9 +12,9 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Success import scala.util.Try - import akka.Done -import akka.actor.testkit.typed.TestKitSettings +import akka.testkit.EventFilter +import akka.actor.testkit.typed.{ TestException, TestKitSettings } import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem @@ -72,6 +72,7 @@ object EventSourcedBehaviorSpec { def conf: Config = ConfigFactory.parseString( s""" akka.loglevel = INFO + akka.loggers = [akka.testkit.TestEventListener] # akka.persistence.typed.log-stashing = on akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" @@ -100,6 +101,8 @@ object EventSourcedBehaviorSpec { final case object DelayFinished extends Command private case object Timeout extends Command final case object LogThenStop extends Command + final case object Fail extends Command + final case object StopIt extends Command sealed trait Event final case class Incremented(delta: Int) extends Event @@ -164,7 +167,7 @@ object EventSourcedBehaviorSpec { case cmd: IncrementWithConfirmation ⇒ Effect.persist(Incremented(1)) - .thenReply(cmd)(newState ⇒ Done) + .thenReply(cmd)(_ ⇒ Done) case GetValue(replyTo) ⇒ replyTo ! state @@ -222,6 +225,13 @@ object EventSourcedBehaviorSpec { loggingActor ! firstLogging } .thenStop + + case Fail ⇒ + throw new TestException("boom!") + + case StopIt ⇒ + Effect.none.thenStop() + }, eventHandler = (state, evt) ⇒ evt match { case Incremented(delta) ⇒ @@ -248,6 +258,9 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh val queries: LeveldbReadJournal = PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal]( LeveldbReadJournal.Identifier) + // needed for the untyped event filter + implicit val actorSystem = system.toUntyped + val pidCounter = new AtomicInteger(0) private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") @@ -650,15 +663,38 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh } "fail after recovery timeout" in { - val c = spawn(Behaviors.setup[Command](ctx ⇒ - counter(ctx, nextPid) - .withSnapshotPluginId("slow-snapshot-store") - .withJournalPluginId("short-recovery-timeout")) - ) + EventFilter.error(start = "Persistence failure when replaying snapshot", occurrences = 1).intercept { + val c = spawn(Behaviors.setup[Command](ctx ⇒ + counter(ctx, nextPid) + .withSnapshotPluginId("slow-snapshot-store") + .withJournalPluginId("short-recovery-timeout")) + ) - val probe = TestProbe[State] + val probe = TestProbe[State] + + probe.expectTerminated(c, probe.remainingOrDefault) + } + } + + "not wrap a failure caused by command stashed while recovering in a journal failure" in { + val pid = nextPid() + val probe = TestProbe[AnyRef] + + // put some events in there, so that recovering takes a little time + val c = spawn(Behaviors.setup[Command](counter(_, pid))) + (0 to 50).foreach { _ ⇒ + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + } + c ! StopIt + probe.expectTerminated(c) + + EventFilter[TestException](occurrences = 1).intercept { + val c2 = spawn(Behaviors.setup[Command](counter(_, pid))) + c2 ! Fail + probe.expectTerminated(c2) // should fail + } - probe.expectTerminated(c, probe.remainingOrDefault) } def watcher(toWatch: ActorRef[_]): TestProbe[String] = { @@ -667,7 +703,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh ctx.watch(toWatch) Behaviors.receive[Any] { (_, _) ⇒ Behaviors.same } .receiveSignal { - case (_, s: Terminated) ⇒ + case (_, _: Terminated) ⇒ probe.ref ! "Terminated" Behaviors.stopped }