From 3cbda93496f7e31696512fa559b99d9df875bd86 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Mar 2019 16:23:01 +0100 Subject: [PATCH] add EventSourcedBehaviorRecoveryTimeoutSpec, #24687 (#26525) * include cause message in log error message for recovery failures --- .../typed/internal/ReplayingEvents.scala | 7 +- .../typed/internal/ReplayingSnapshot.scala | 2 +- ...ntSourcedBehaviorRecoveryTimeoutSpec.scala | 109 ++++++++++++++++++ 3 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRecoveryTimeoutSpec.scala diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 08812803fb..ef3e124674 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -138,7 +138,7 @@ private[akka] final class ReplayingEvents[C, E, S]( this } else { val msg = - s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" + s"Replay timed out, didn't get event within [${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) } } else { @@ -175,9 +175,10 @@ private[akka] final class ReplayingEvents[C, E, S]( val msg = event match { case Some(evt) => s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " + - s"PersistenceId [${setup.persistenceId.id}]" + s"PersistenceId [${setup.persistenceId.id}]. ${cause.getMessage}" case None => - s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId [${setup.persistenceId.id}]" + s"Exception during recovery. Last known sequence number [$sequenceNr]. " + + s"PersistenceId [${setup.persistenceId.id}]. ${cause.getMessage}" } throw new JournalFailureException(msg, cause) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index d0c15cd144..7a55b964a9 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -76,7 +76,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup */ private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = { setup.cancelRecoveryTimer() - setup.log.error(cause, "Persistence failure when replaying snapshot") + setup.log.error(cause, s"Persistence failure when replaying snapshot. ${cause.getMessage}") Behaviors.stopped } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRecoveryTimeoutSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRecoveryTimeoutSpec.scala new file mode 100644 index 0000000000..3b9cdb91b8 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRecoveryTimeoutSpec.scala @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.RecoveryTimedOut +import akka.persistence.journal.SteppingInmemJournal +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryFailed +import akka.persistence.typed.internal.JournalFailureException +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object EventSourcedBehaviorRecoveryTimeoutSpec { + + val journalId = "event-sourced-behavior-recovery-timeout-spec" + + def config: Config = + SteppingInmemJournal + .config(journalId) + .withFallback(ConfigFactory.parseString(""" + akka.persistence.journal.stepping-inmem.recovery-event-timeout=1s + """)) + .withFallback(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.loggers = [akka.testkit.TestEventListener] + """)) + + def testBehavior(persistenceId: PersistenceId, probe: ActorRef[AnyRef]): Behavior[String] = + Behaviors.setup { _ => + EventSourcedBehavior[String, String, String]( + persistenceId, + emptyState = "", + commandHandler = (_, command) => Effect.persist(command).thenRun(_ => probe ! command), + eventHandler = (state, evt) => state + evt).receiveSignal { + case RecoveryFailed(cause) => + probe ! cause + } + } + +} + +class EventSourcedBehaviorRecoveryTimeoutSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorRecoveryTimeoutSpec.config) + with WordSpecLike { + + import EventSourcedBehaviorRecoveryTimeoutSpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + + import akka.actor.typed.scaladsl.adapter._ + // needed for SteppingInmemJournal.step + private implicit val untypedSystem: akka.actor.ActorSystem = system.toUntyped + + untypedSystem.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1))) + + "The recovery timeout" must { + + "fail recovery if timeout is not met when recovering" in { + val probe = createTestProbe[AnyRef]() + val pid = nextPid() + val persisting = spawn(testBehavior(pid, probe.ref)) + + probe.awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds) + val journal = SteppingInmemJournal.getRef(journalId) + + // initial read highest + SteppingInmemJournal.step(journal) + + persisting ! "A" + SteppingInmemJournal.step(journal) + probe.expectMessage("A") + + testKit.stop(persisting) + probe.expectTerminated(persisting) + + // now replay, but don't give the journal any tokens to replay events + // so that we cause the timeout to trigger + EventFilter[JournalFailureException](pattern = "Exception during recovery.*Replay timed out", occurrences = 1) + .intercept { + val replaying = spawn(testBehavior(pid, probe.ref)) + + // initial read highest + SteppingInmemJournal.step(journal) + + probe.expectMessageType[RecoveryTimedOut] + probe.expectTerminated(replaying) + } + + // avoid having it stuck in the next test from the + // last read request above + SteppingInmemJournal.step(journal) + } + + } +}