No journal failure for command failure stashed while replaying (#26258)

* No journal failure for command failure stashed while replaying #26256

* Import missed in the flood
This commit is contained in:
Johan Andrén 2019-01-29 09:16:17 +01:00 committed by Patrik Nordwall
parent 4acfdf6f7b
commit f97aa4bbe4
2 changed files with 57 additions and 12 deletions

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.internal
import scala.util.control.NonFatal
import scala.util.control.NoStackTrace
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
@ -13,6 +14,7 @@ import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.ReplayingEvents.FailureWhileUnstashing
/***
* INTERNAL API
@ -46,6 +48,8 @@ private[akka] object ReplayingEvents {
): Behavior[InternalProtocol] =
new ReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state)
private final case class FailureWhileUnstashing(cause: Throwable) extends Exception(cause) with NoStackTrace
}
@InternalApi
@ -104,6 +108,7 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C
Behaviors.unhandled
}
} catch {
case FailureWhileUnstashing(ex) throw ex
case NonFatal(cause)
onRecoveryFailure(cause, state.seqNr, None)
}
@ -179,7 +184,11 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C
Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill)
)
tryUnstashOne(running)
try {
tryUnstashOne(running)
} catch {
case NonFatal(t) throw FailureWhileUnstashing(t)
}
}
} finally {
setup.cancelRecoveryTimer()

View file

@ -12,9 +12,9 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.actor.testkit.typed.TestKitSettings
import akka.testkit.EventFilter
import akka.actor.testkit.typed.{ TestException, TestKitSettings }
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
@ -72,6 +72,7 @@ object EventSourcedBehaviorSpec {
def conf: Config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
akka.loggers = [akka.testkit.TestEventListener]
# akka.persistence.typed.log-stashing = on
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
@ -100,6 +101,8 @@ object EventSourcedBehaviorSpec {
final case object DelayFinished extends Command
private case object Timeout extends Command
final case object LogThenStop extends Command
final case object Fail extends Command
final case object StopIt extends Command
sealed trait Event
final case class Incremented(delta: Int) extends Event
@ -164,7 +167,7 @@ object EventSourcedBehaviorSpec {
case cmd: IncrementWithConfirmation
Effect.persist(Incremented(1))
.thenReply(cmd)(newState Done)
.thenReply(cmd)(_ Done)
case GetValue(replyTo)
replyTo ! state
@ -222,6 +225,13 @@ object EventSourcedBehaviorSpec {
loggingActor ! firstLogging
}
.thenStop
case Fail
throw new TestException("boom!")
case StopIt
Effect.none.thenStop()
},
eventHandler = (state, evt) evt match {
case Incremented(delta)
@ -248,6 +258,9 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
val queries: LeveldbReadJournal = PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
// needed for the untyped event filter
implicit val actorSystem = system.toUntyped
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
@ -650,15 +663,38 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
}
"fail after recovery timeout" in {
val c = spawn(Behaviors.setup[Command](ctx
counter(ctx, nextPid)
.withSnapshotPluginId("slow-snapshot-store")
.withJournalPluginId("short-recovery-timeout"))
)
EventFilter.error(start = "Persistence failure when replaying snapshot", occurrences = 1).intercept {
val c = spawn(Behaviors.setup[Command](ctx
counter(ctx, nextPid)
.withSnapshotPluginId("slow-snapshot-store")
.withJournalPluginId("short-recovery-timeout"))
)
val probe = TestProbe[State]
val probe = TestProbe[State]
probe.expectTerminated(c, probe.remainingOrDefault)
}
}
"not wrap a failure caused by command stashed while recovering in a journal failure" in {
val pid = nextPid()
val probe = TestProbe[AnyRef]
// put some events in there, so that recovering takes a little time
val c = spawn(Behaviors.setup[Command](counter(_, pid)))
(0 to 50).foreach { _
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
}
c ! StopIt
probe.expectTerminated(c)
EventFilter[TestException](occurrences = 1).intercept {
val c2 = spawn(Behaviors.setup[Command](counter(_, pid)))
c2 ! Fail
probe.expectTerminated(c2) // should fail
}
probe.expectTerminated(c, probe.remainingOrDefault)
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
@ -667,7 +703,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
ctx.watch(toWatch)
Behaviors.receive[Any] { (_, _) Behaviors.same }
.receiveSignal {
case (_, s: Terminated)
case (_, _: Terminated)
probe.ref ! "Terminated"
Behaviors.stopped
}