From 64aa08c481e534bfe9d99008610904d9b89ca91e Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Tue, 17 Dec 2019 05:22:09 -0800 Subject: [PATCH] EventSourcedBehavior not failing with DeathPactException for unhandled Terminated() signals (#28358) --- .../issue-28297-deathpactexception.excludes | 3 + .../typed/internal/BehaviorSetup.scala | 34 +-- .../typed/internal/ReplayingEvents.scala | 5 +- .../typed/internal/ReplayingSnapshot.scala | 4 +- .../internal/RequestingRecoveryPermit.scala | 4 +- .../persistence/typed/internal/Running.scala | 26 +-- .../scaladsl/EventSourcedBehaviorSpec.scala | 2 +- .../EventSourcedBehaviorWatchSpec.scala | 198 ++++++++++++++++++ 8 files changed, 241 insertions(+), 35 deletions(-) create mode 100644 akka-persistence-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28297-deathpactexception.excludes create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28297-deathpactexception.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28297-deathpactexception.excludes new file mode 100644 index 0000000000..8ddfa23a36 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28297-deathpactexception.excludes @@ -0,0 +1,3 @@ +# #28297 EventSourcedBehavior not failing with DeathPactException for unhandled Terminated() signals +# akka.persistence.typed.internal.BehaviorSetup.onSignal now returns Boolean rather than Unit +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.BehaviorSetup.onSignal") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 3c240c3892..d931a279b2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -4,22 +4,18 @@ package akka.persistence.typed.internal -import scala.concurrent.ExecutionContext -import scala.util.control.NonFatal - -import akka.actor.Cancellable -import akka.actor.typed.scaladsl.ActorContext -import akka.actor.ActorRef import akka.actor.typed.Signal +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.{ ActorRef, Cancellable } import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria } import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter } -import akka.persistence.typed.scaladsl.EventSourcedBehavior -import akka.persistence.typed.scaladsl.RetentionCriteria -import akka.util.ConstantFun import akka.util.OptionVal -import org.slf4j.Logger -import org.slf4j.MDC +import org.slf4j.{ Logger, MDC } + +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal /** * INTERNAL API @@ -53,9 +49,9 @@ private[akka] final class BehaviorSetup[C, E, S]( val settings: EventSourcedSettings, val stashState: StashState) { + import BehaviorSetup._ import InternalProtocol.RecoveryTickEvent import akka.actor.typed.scaladsl.adapter._ - import BehaviorSetup._ val persistence: Persistence = Persistence(context.system.toClassic) @@ -102,17 +98,23 @@ private[akka] final class BehaviorSetup[C, E, S]( } /** + * Applies the `signalHandler` if defined and returns true, otherwise returns false. + * If an exception is thrown and `catchAndLog=true` it is logged and returns true, otherwise it is thrown. + * * `catchAndLog=true` should be used for "unknown" signals in the phases before Running * to avoid restart loops if restart supervision is used. */ - def onSignal(state: S, signal: Signal, catchAndLog: Boolean): Unit = { + def onSignal[T](state: S, signal: Signal, catchAndLog: Boolean): Boolean = { try { - signalHandler.applyOrElse((state, signal), ConstantFun.scalaAnyToUnit) + var handled = true + signalHandler.applyOrElse((state, signal), (_: (S, Signal)) => handled = false) + handled } catch { case NonFatal(ex) => - if (catchAndLog) + if (catchAndLog) { log.error(s"Error while processing signal [$signal]: $ex", ex) - else { + true + } else { if (log.isDebugEnabled) log.debug(s"Error while processing signal [$signal]: $ex", ex) throw ex diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 87fc5bba41..675f999a13 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -100,8 +100,8 @@ private[akka] final class ReplayingEvents[C, E, S]( state = state.copy(receivedPoisonPill = true) this case signal => - setup.onSignal(state.state, signal, catchAndLog = true) - this + if (setup.onSignal(state.state, signal, catchAndLog = true)) this + else Behaviors.unhandled } private def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = { @@ -231,6 +231,7 @@ private[akka] final class ReplayingEvents[C, E, S]( setup.persistenceId, (System.nanoTime() - state.recoveryStartTime).nanos.pretty) } + setup.onSignal(state.state, RecoveryCompleted, catchAndLog = false) if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index e5a6112887..29d34dabd4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -72,8 +72,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup case (_, PoisonPill) => stay(receivedPoisonPill = true) case (_, signal) => - setup.onSignal(setup.emptyState, signal, catchAndLog = true) - Behaviors.same + if (setup.onSignal(setup.emptyState, signal, catchAndLog = true)) Behaviors.same + else Behaviors.unhandled }) } stay(receivedPoisonPillInPreviousPhase) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala index 9dc28bb6cd..eb8e6c17a6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala @@ -61,8 +61,8 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi case (_, PoisonPill) => stay(receivedPoisonPill = true) case (_, signal) => - setup.onSignal(setup.emptyState, signal, catchAndLog = true) - Behaviors.same + if (setup.onSignal(setup.emptyState, signal, catchAndLog = true)) Behaviors.same + else Behaviors.unhandled } } stay(receivedPoisonPill = false) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 1a408d07fa..35d8d01610 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -111,8 +111,8 @@ private[akka] object Running { if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else new HandlingCommands(state.copy(receivedPoisonPill = true)) case signal => - setup.onSignal(state.state, signal, catchAndLog = false) - this + if (setup.onSignal(state.state, signal, catchAndLog = false)) this + else Behaviors.unhandled } def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { @@ -322,8 +322,8 @@ private[akka] object Running { state = state.copy(receivedPoisonPill = true) this case signal => - setup.onSignal(visibleState.state, signal, catchAndLog = false) - this + if (setup.onSignal(visibleState.state, signal, catchAndLog = false)) this + else Behaviors.unhandled } override def currentSequenceNumber: Long = visibleState.seqNr @@ -383,8 +383,10 @@ private[akka] object Running { signal match { case Some(signal) => - setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal) - setup.onSignal(state.state, signal, catchAndLog = false) + setup.log.debug("Received snapshot response [{}].", response) + if (setup.onSignal(state.state, signal, catchAndLog = false)) { + setup.log.debug("Emitted signal [{}].", signal) + } case None => setup.log.debug("Received snapshot response [{}], no signal emitted.", response) } @@ -412,8 +414,10 @@ private[akka] object Running { // wait for snapshot response before stopping new StoringSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason) case signal => - setup.onSignal(state.state, signal, catchAndLog = false) - Behaviors.same + if (setup.onSignal(state.state, signal, catchAndLog = false)) + Behaviors.same + else + Behaviors.unhandled } override def currentSequenceNumber: Long = state.seqNr @@ -485,8 +489,7 @@ private[akka] object Running { signal match { case Some(sig) => - setup.onSignal(state, sig, catchAndLog = false) - Behaviors.same + if (setup.onSignal(state, sig, catchAndLog = false)) Behaviors.same else Behaviors.unhandled case None => Behaviors.unhandled // unexpected journal response } @@ -512,8 +515,7 @@ private[akka] object Running { signal match { case Some(sig) => - setup.onSignal(state, sig, catchAndLog = false) - Behaviors.same + if (setup.onSignal(state, sig, catchAndLog = false)) Behaviors.same else Behaviors.unhandled case None => Behaviors.unhandled // unexpected snapshot response } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index de757ce07c..7a6d4059b8 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -74,7 +74,7 @@ object EventSourcedBehaviorSpec { } } - // also used from PersistentActorTest + // also used from PersistentActorTest, EventSourcedBehaviorWatchSpec def conf: Config = ConfigFactory.parseString(s""" akka.loglevel = INFO # akka.persistence.typed.log-stashing = on diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala new file mode 100644 index 0000000000..8bde6132db --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala @@ -0,0 +1,198 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.testkit.typed.TestException +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed._ +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.persistence.Recovery +import akka.persistence.typed.internal.{ + BehaviorSetup, + EventSourcedSettings, + InternalProtocol, + NoOpSnapshotAdapter, + StashState +} +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.WriterIdentity +import akka.persistence.typed.{ NoOpEventAdapter, PersistenceId, RecoveryCompleted } +import akka.serialization.jackson.CborSerializable +import akka.util.ConstantFun +import org.scalatest.WordSpecLike + +object EventSourcedBehaviorWatchSpec { + sealed trait Command extends CborSerializable + case object Fail extends Command + case object Stop extends Command + final case class ChildHasFailed(t: akka.actor.typed.ChildFailed) + final case class HasTerminated(ref: ActorRef[_]) +} + +class EventSourcedBehaviorWatchSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) + with WordSpecLike + with LogCapturing { + + import EventSourcedBehaviorWatchSpec._ + + private val cause = TestException("Dodge this.") + + private val pidCounter = new AtomicInteger(0) + + private def nextPid: PersistenceId = PersistenceId.ofUniqueId(s"${pidCounter.incrementAndGet()}") + + private def setup( + pf: PartialFunction[(String, Signal), Unit], + settings: EventSourcedSettings, + context: ActorContext[_]): BehaviorSetup[Command, String, String] = + new BehaviorSetup[Command, String, String]( + context.asInstanceOf[ActorContext[InternalProtocol]], + nextPid, + emptyState = "", + commandHandler = (_, _) => Effect.none, + eventHandler = (state, evt) => state + evt, + WriterIdentity.newIdentity(), + pf, + _ => Set.empty[String], + NoOpEventAdapter.instance[String], + NoOpSnapshotAdapter.instance[String], + snapshotWhen = ConstantFun.scalaAnyThreeToFalse, + Recovery(), + RetentionCriteria.disabled, + holdingRecoveryPermit = false, + settings = settings, + stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings)) + + "A typed persistent parent actor watching a child" must { + + "throw a DeathPactException from parent when not handling the child Terminated signal" in { + + val parent = + spawn(Behaviors.setup[Command] { context => + val child = context.spawnAnonymous(Behaviors.receive[Command] { (_, _) => + throw cause + }) + + context.watch(child) + + EventSourcedBehavior[Command, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => { + child ! cmd + Effect.none + }, eventHandler = (state, evt) => state + evt) + }) + + LoggingTestKit.error[TestException].expect { + LoggingTestKit.error[DeathPactException].expect { + parent ! Fail + } + } + createTestProbe().expectTerminated(parent) + } + + "behave as expected if a user's signal handler is side effecting" in { + val signalHandler: PartialFunction[(String, Signal), Unit] = { + case (_, RecoveryCompleted) => + java.time.Instant.now.getNano + Behaviors.same + } + + Behaviors.setup[Command] { context => + val settings = EventSourcedSettings(context.system, "", "") + + setup(signalHandler, settings, context).onSignal("", RecoveryCompleted, false) shouldEqual true + setup(PartialFunction.empty, settings, context).onSignal("", RecoveryCompleted, false) shouldEqual false + + Behaviors.empty + } + + val parent = + spawn(Behaviors.setup[Command] { context => + val child = context.spawnAnonymous(Behaviors.receive[Command] { (_, _) => + throw cause + }) + + context.watch(child) + + EventSourcedBehavior[Command, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => { + child ! cmd + Effect.none + }, eventHandler = (state, evt) => state + evt).receiveSignal(signalHandler) + }) + + LoggingTestKit.error[TestException].expect { + LoggingTestKit.error[DeathPactException].expect { + parent ! Fail + } + } + createTestProbe().expectTerminated(parent) + } + + "receive a Terminated when handling the signal" in { + val probe = TestProbe[AnyRef]() + + val parent = + spawn(Behaviors.setup[Stop.type] { context => + val child = context.spawnAnonymous(Behaviors.setup[Stop.type] { c => + Behaviors.receive[Stop.type] { (_, _) => + context.stop(c.self) + Behaviors.stopped + } + }) + + probe.ref ! child + context.watch(child) + + EventSourcedBehavior[Stop.type, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => { + child ! cmd + Effect.none + }, eventHandler = (state, evt) => state + evt).receiveSignal { + case (_, t: Terminated) => + probe.ref ! HasTerminated(t.ref) + Behaviors.stopped + } + }) + + val child = probe.expectMessageType[ActorRef[Stop.type]] + + parent ! Stop + probe.expectMessageType[HasTerminated].ref shouldEqual child + } + + "receive a ChildFailed when handling the signal" in { + val probe = TestProbe[AnyRef]() + + val parent = + spawn(Behaviors.setup[Fail.type] { context => + val child = context.spawnAnonymous(Behaviors.receive[Fail.type] { (_, _) => + throw cause + }) + + probe.ref ! child + context.watch(child) + + EventSourcedBehavior[Fail.type, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => { + child ! cmd + Effect.none + }, eventHandler = (state, evt) => state + evt).receiveSignal { + case (_, t: ChildFailed) => + probe.ref ! ChildHasFailed(t) + Behaviors.same + } + }) + + val child = probe.expectMessageType[ActorRef[Fail.type]] + + LoggingTestKit.error[TestException].expect { + parent ! Fail + } + val failed = probe.expectMessageType[ChildHasFailed].t + failed.ref shouldEqual child + failed.cause shouldEqual cause + } + + } +}