EventSourcedBehavior not failing with DeathPactException for unhandled Terminated() signals (#28358)

This commit is contained in:
Helena Edelson 2019-12-17 05:22:09 -08:00 committed by GitHub
parent 00fc33d0a5
commit 64aa08c481
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 241 additions and 35 deletions

View file

@ -0,0 +1,3 @@
# #28297 EventSourcedBehavior not failing with DeathPactException for unhandled Terminated() signals
# akka.persistence.typed.internal.BehaviorSetup.onSignal now returns Boolean rather than Unit
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.BehaviorSetup.onSignal")

View file

@ -4,22 +4,18 @@
package akka.persistence.typed.internal
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import akka.actor.Cancellable
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.ActorRef
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.{ ActorRef, Cancellable }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria }
import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter }
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.util.ConstantFun
import akka.util.OptionVal
import org.slf4j.Logger
import org.slf4j.MDC
import org.slf4j.{ Logger, MDC }
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -53,9 +49,9 @@ private[akka] final class BehaviorSetup[C, E, S](
val settings: EventSourcedSettings,
val stashState: StashState) {
import BehaviorSetup._
import InternalProtocol.RecoveryTickEvent
import akka.actor.typed.scaladsl.adapter._
import BehaviorSetup._
val persistence: Persistence = Persistence(context.system.toClassic)
@ -102,17 +98,23 @@ private[akka] final class BehaviorSetup[C, E, S](
}
/**
* Applies the `signalHandler` if defined and returns true, otherwise returns false.
* If an exception is thrown and `catchAndLog=true` it is logged and returns true, otherwise it is thrown.
*
* `catchAndLog=true` should be used for "unknown" signals in the phases before Running
* to avoid restart loops if restart supervision is used.
*/
def onSignal(state: S, signal: Signal, catchAndLog: Boolean): Unit = {
def onSignal[T](state: S, signal: Signal, catchAndLog: Boolean): Boolean = {
try {
signalHandler.applyOrElse((state, signal), ConstantFun.scalaAnyToUnit)
var handled = true
signalHandler.applyOrElse((state, signal), (_: (S, Signal)) => handled = false)
handled
} catch {
case NonFatal(ex) =>
if (catchAndLog)
if (catchAndLog) {
log.error(s"Error while processing signal [$signal]: $ex", ex)
else {
true
} else {
if (log.isDebugEnabled)
log.debug(s"Error while processing signal [$signal]: $ex", ex)
throw ex

View file

@ -100,8 +100,8 @@ private[akka] final class ReplayingEvents[C, E, S](
state = state.copy(receivedPoisonPill = true)
this
case signal =>
setup.onSignal(state.state, signal, catchAndLog = true)
this
if (setup.onSignal(state.state, signal, catchAndLog = true)) this
else Behaviors.unhandled
}
private def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
@ -231,6 +231,7 @@ private[akka] final class ReplayingEvents[C, E, S](
setup.persistenceId,
(System.nanoTime() - state.recoveryStartTime).nanos.pretty)
}
setup.onSignal(state.state, RecoveryCompleted, catchAndLog = false)
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)

View file

@ -72,8 +72,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
case (_, PoisonPill) =>
stay(receivedPoisonPill = true)
case (_, signal) =>
setup.onSignal(setup.emptyState, signal, catchAndLog = true)
Behaviors.same
if (setup.onSignal(setup.emptyState, signal, catchAndLog = true)) Behaviors.same
else Behaviors.unhandled
})
}
stay(receivedPoisonPillInPreviousPhase)

View file

@ -61,8 +61,8 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi
case (_, PoisonPill) =>
stay(receivedPoisonPill = true)
case (_, signal) =>
setup.onSignal(setup.emptyState, signal, catchAndLog = true)
Behaviors.same
if (setup.onSignal(setup.emptyState, signal, catchAndLog = true)) Behaviors.same
else Behaviors.unhandled
}
}
stay(receivedPoisonPill = false)

View file

@ -111,8 +111,8 @@ private[akka] object Running {
if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped
else new HandlingCommands(state.copy(receivedPoisonPill = true))
case signal =>
setup.onSignal(state.state, signal, catchAndLog = false)
this
if (setup.onSignal(state.state, signal, catchAndLog = false)) this
else Behaviors.unhandled
}
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
@ -322,8 +322,8 @@ private[akka] object Running {
state = state.copy(receivedPoisonPill = true)
this
case signal =>
setup.onSignal(visibleState.state, signal, catchAndLog = false)
this
if (setup.onSignal(visibleState.state, signal, catchAndLog = false)) this
else Behaviors.unhandled
}
override def currentSequenceNumber: Long = visibleState.seqNr
@ -383,8 +383,10 @@ private[akka] object Running {
signal match {
case Some(signal) =>
setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal)
setup.onSignal(state.state, signal, catchAndLog = false)
setup.log.debug("Received snapshot response [{}].", response)
if (setup.onSignal(state.state, signal, catchAndLog = false)) {
setup.log.debug("Emitted signal [{}].", signal)
}
case None =>
setup.log.debug("Received snapshot response [{}], no signal emitted.", response)
}
@ -412,8 +414,10 @@ private[akka] object Running {
// wait for snapshot response before stopping
new StoringSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason)
case signal =>
setup.onSignal(state.state, signal, catchAndLog = false)
Behaviors.same
if (setup.onSignal(state.state, signal, catchAndLog = false))
Behaviors.same
else
Behaviors.unhandled
}
override def currentSequenceNumber: Long = state.seqNr
@ -485,8 +489,7 @@ private[akka] object Running {
signal match {
case Some(sig) =>
setup.onSignal(state, sig, catchAndLog = false)
Behaviors.same
if (setup.onSignal(state, sig, catchAndLog = false)) Behaviors.same else Behaviors.unhandled
case None =>
Behaviors.unhandled // unexpected journal response
}
@ -512,8 +515,7 @@ private[akka] object Running {
signal match {
case Some(sig) =>
setup.onSignal(state, sig, catchAndLog = false)
Behaviors.same
if (setup.onSignal(state, sig, catchAndLog = false)) Behaviors.same else Behaviors.unhandled
case None =>
Behaviors.unhandled // unexpected snapshot response
}

View file

@ -74,7 +74,7 @@ object EventSourcedBehaviorSpec {
}
}
// also used from PersistentActorTest
// also used from PersistentActorTest, EventSourcedBehaviorWatchSpec
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = on

View file

@ -0,0 +1,198 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed._
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.Recovery
import akka.persistence.typed.internal.{
BehaviorSetup,
EventSourcedSettings,
InternalProtocol,
NoOpSnapshotAdapter,
StashState
}
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.WriterIdentity
import akka.persistence.typed.{ NoOpEventAdapter, PersistenceId, RecoveryCompleted }
import akka.serialization.jackson.CborSerializable
import akka.util.ConstantFun
import org.scalatest.WordSpecLike
object EventSourcedBehaviorWatchSpec {
sealed trait Command extends CborSerializable
case object Fail extends Command
case object Stop extends Command
final case class ChildHasFailed(t: akka.actor.typed.ChildFailed)
final case class HasTerminated(ref: ActorRef[_])
}
class EventSourcedBehaviorWatchSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf)
with WordSpecLike
with LogCapturing {
import EventSourcedBehaviorWatchSpec._
private val cause = TestException("Dodge this.")
private val pidCounter = new AtomicInteger(0)
private def nextPid: PersistenceId = PersistenceId.ofUniqueId(s"${pidCounter.incrementAndGet()}")
private def setup(
pf: PartialFunction[(String, Signal), Unit],
settings: EventSourcedSettings,
context: ActorContext[_]): BehaviorSetup[Command, String, String] =
new BehaviorSetup[Command, String, String](
context.asInstanceOf[ActorContext[InternalProtocol]],
nextPid,
emptyState = "",
commandHandler = (_, _) => Effect.none,
eventHandler = (state, evt) => state + evt,
WriterIdentity.newIdentity(),
pf,
_ => Set.empty[String],
NoOpEventAdapter.instance[String],
NoOpSnapshotAdapter.instance[String],
snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
Recovery(),
RetentionCriteria.disabled,
holdingRecoveryPermit = false,
settings = settings,
stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings))
"A typed persistent parent actor watching a child" must {
"throw a DeathPactException from parent when not handling the child Terminated signal" in {
val parent =
spawn(Behaviors.setup[Command] { context =>
val child = context.spawnAnonymous(Behaviors.receive[Command] { (_, _) =>
throw cause
})
context.watch(child)
EventSourcedBehavior[Command, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => {
child ! cmd
Effect.none
}, eventHandler = (state, evt) => state + evt)
})
LoggingTestKit.error[TestException].expect {
LoggingTestKit.error[DeathPactException].expect {
parent ! Fail
}
}
createTestProbe().expectTerminated(parent)
}
"behave as expected if a user's signal handler is side effecting" in {
val signalHandler: PartialFunction[(String, Signal), Unit] = {
case (_, RecoveryCompleted) =>
java.time.Instant.now.getNano
Behaviors.same
}
Behaviors.setup[Command] { context =>
val settings = EventSourcedSettings(context.system, "", "")
setup(signalHandler, settings, context).onSignal("", RecoveryCompleted, false) shouldEqual true
setup(PartialFunction.empty, settings, context).onSignal("", RecoveryCompleted, false) shouldEqual false
Behaviors.empty
}
val parent =
spawn(Behaviors.setup[Command] { context =>
val child = context.spawnAnonymous(Behaviors.receive[Command] { (_, _) =>
throw cause
})
context.watch(child)
EventSourcedBehavior[Command, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => {
child ! cmd
Effect.none
}, eventHandler = (state, evt) => state + evt).receiveSignal(signalHandler)
})
LoggingTestKit.error[TestException].expect {
LoggingTestKit.error[DeathPactException].expect {
parent ! Fail
}
}
createTestProbe().expectTerminated(parent)
}
"receive a Terminated when handling the signal" in {
val probe = TestProbe[AnyRef]()
val parent =
spawn(Behaviors.setup[Stop.type] { context =>
val child = context.spawnAnonymous(Behaviors.setup[Stop.type] { c =>
Behaviors.receive[Stop.type] { (_, _) =>
context.stop(c.self)
Behaviors.stopped
}
})
probe.ref ! child
context.watch(child)
EventSourcedBehavior[Stop.type, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => {
child ! cmd
Effect.none
}, eventHandler = (state, evt) => state + evt).receiveSignal {
case (_, t: Terminated) =>
probe.ref ! HasTerminated(t.ref)
Behaviors.stopped
}
})
val child = probe.expectMessageType[ActorRef[Stop.type]]
parent ! Stop
probe.expectMessageType[HasTerminated].ref shouldEqual child
}
"receive a ChildFailed when handling the signal" in {
val probe = TestProbe[AnyRef]()
val parent =
spawn(Behaviors.setup[Fail.type] { context =>
val child = context.spawnAnonymous(Behaviors.receive[Fail.type] { (_, _) =>
throw cause
})
probe.ref ! child
context.watch(child)
EventSourcedBehavior[Fail.type, String, String](nextPid, emptyState = "", commandHandler = (_, cmd) => {
child ! cmd
Effect.none
}, eventHandler = (state, evt) => state + evt).receiveSignal {
case (_, t: ChildFailed) =>
probe.ref ! ChildHasFailed(t)
Behaviors.same
}
})
val child = probe.expectMessageType[ActorRef[Fail.type]]
LoggingTestKit.error[TestException].expect {
parent ! Fail
}
val failed = probe.expectMessageType[ChildHasFailed].t
failed.ref shouldEqual child
failed.cause shouldEqual cause
}
}
}