* include cause message in log error message for recovery failures
This commit is contained in:
parent
21f765fb86
commit
3cbda93496
3 changed files with 114 additions and 4 deletions
|
|
@ -138,7 +138,7 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
this
|
||||
} else {
|
||||
val msg =
|
||||
s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]"
|
||||
s"Replay timed out, didn't get event within [${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]"
|
||||
onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None)
|
||||
}
|
||||
} else {
|
||||
|
|
@ -175,9 +175,10 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
val msg = event match {
|
||||
case Some(evt) =>
|
||||
s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " +
|
||||
s"PersistenceId [${setup.persistenceId.id}]"
|
||||
s"PersistenceId [${setup.persistenceId.id}]. ${cause.getMessage}"
|
||||
case None =>
|
||||
s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId [${setup.persistenceId.id}]"
|
||||
s"Exception during recovery. Last known sequence number [$sequenceNr]. " +
|
||||
s"PersistenceId [${setup.persistenceId.id}]. ${cause.getMessage}"
|
||||
}
|
||||
|
||||
throw new JournalFailureException(msg, cause)
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
|
|||
*/
|
||||
private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = {
|
||||
setup.cancelRecoveryTimer()
|
||||
setup.log.error(cause, "Persistence failure when replaying snapshot")
|
||||
setup.log.error(cause, s"Persistence failure when replaying snapshot. ${cause.getMessage}")
|
||||
Behaviors.stopped
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.scaladsl
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.RecoveryTimedOut
|
||||
import akka.persistence.journal.SteppingInmemJournal
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryFailed
|
||||
import akka.persistence.typed.internal.JournalFailureException
|
||||
import akka.testkit.EventFilter
|
||||
import akka.testkit.TestEvent.Mute
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
object EventSourcedBehaviorRecoveryTimeoutSpec {
|
||||
|
||||
val journalId = "event-sourced-behavior-recovery-timeout-spec"
|
||||
|
||||
def config: Config =
|
||||
SteppingInmemJournal
|
||||
.config(journalId)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.persistence.journal.stepping-inmem.recovery-event-timeout=1s
|
||||
"""))
|
||||
.withFallback(ConfigFactory.parseString(s"""
|
||||
akka.loglevel = INFO
|
||||
akka.loggers = [akka.testkit.TestEventListener]
|
||||
"""))
|
||||
|
||||
def testBehavior(persistenceId: PersistenceId, probe: ActorRef[AnyRef]): Behavior[String] =
|
||||
Behaviors.setup { _ =>
|
||||
EventSourcedBehavior[String, String, String](
|
||||
persistenceId,
|
||||
emptyState = "",
|
||||
commandHandler = (_, command) => Effect.persist(command).thenRun(_ => probe ! command),
|
||||
eventHandler = (state, evt) => state + evt).receiveSignal {
|
||||
case RecoveryFailed(cause) =>
|
||||
probe ! cause
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class EventSourcedBehaviorRecoveryTimeoutSpec
|
||||
extends ScalaTestWithActorTestKit(EventSourcedBehaviorRecoveryTimeoutSpec.config)
|
||||
with WordSpecLike {
|
||||
|
||||
import EventSourcedBehaviorRecoveryTimeoutSpec._
|
||||
|
||||
val pidCounter = new AtomicInteger(0)
|
||||
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
|
||||
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
// needed for SteppingInmemJournal.step
|
||||
private implicit val untypedSystem: akka.actor.ActorSystem = system.toUntyped
|
||||
|
||||
untypedSystem.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1)))
|
||||
|
||||
"The recovery timeout" must {
|
||||
|
||||
"fail recovery if timeout is not met when recovering" in {
|
||||
val probe = createTestProbe[AnyRef]()
|
||||
val pid = nextPid()
|
||||
val persisting = spawn(testBehavior(pid, probe.ref))
|
||||
|
||||
probe.awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds)
|
||||
val journal = SteppingInmemJournal.getRef(journalId)
|
||||
|
||||
// initial read highest
|
||||
SteppingInmemJournal.step(journal)
|
||||
|
||||
persisting ! "A"
|
||||
SteppingInmemJournal.step(journal)
|
||||
probe.expectMessage("A")
|
||||
|
||||
testKit.stop(persisting)
|
||||
probe.expectTerminated(persisting)
|
||||
|
||||
// now replay, but don't give the journal any tokens to replay events
|
||||
// so that we cause the timeout to trigger
|
||||
EventFilter[JournalFailureException](pattern = "Exception during recovery.*Replay timed out", occurrences = 1)
|
||||
.intercept {
|
||||
val replaying = spawn(testBehavior(pid, probe.ref))
|
||||
|
||||
// initial read highest
|
||||
SteppingInmemJournal.step(journal)
|
||||
|
||||
probe.expectMessageType[RecoveryTimedOut]
|
||||
probe.expectTerminated(replaying)
|
||||
}
|
||||
|
||||
// avoid having it stuck in the next test from the
|
||||
// last read request above
|
||||
SteppingInmemJournal.step(journal)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue