lastSequenceNumber in DurableStateBehavior, #30833 (#30834)

* Recovering Behavior must be a class for the WithSeqNrAccessible to be able
  to find it
This commit is contained in:
Patrik Nordwall 2021-12-16 13:41:28 +01:00 committed by GitHub
parent a36329c4c2
commit 6ed3972f4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 181 additions and 45 deletions

View file

@ -0,0 +1,130 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.state.scaladsl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.RecoveryCompleted
object DurableStateRevisionSpec {
def conf: Config = PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(s"""
akka.loglevel = INFO
"""))
}
class DurableStateRevisionSpec
extends ScalaTestWithActorTestKit(DurableStateRevisionSpec.conf)
with AnyWordSpecLike
with LogCapturing {
private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup(
ctx =>
DurableStateBehavior[String, String](
pid,
"",
(state, command) =>
state match {
case "stashing" =>
command match {
case "unstash" =>
probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} unstash"
Effect.persist("normal").thenUnstashAll()
case _ =>
Effect.stash()
}
case _ =>
command match {
case "cmd" =>
probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} onCommand"
Effect
.persist("state")
.thenRun(_ => probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} thenRun")
case "stash" =>
probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} stash"
Effect.persist("stashing")
case "snapshot" =>
Effect.persist("snapshot")
}
}).receiveSignal {
case (_, RecoveryCompleted) =>
probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} onRecoveryComplete"
})
"The revision number" must {
"be accessible in the handlers" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId.ofUniqueId("pid-1"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "cmd"
probe.expectMessage("0 onCommand")
probe.expectMessage("1 thenRun")
ref ! "cmd"
probe.expectMessage("1 onCommand")
probe.expectMessage("2 thenRun")
testKit.stop(ref)
probe.expectTerminated(ref)
// and during recovery
val ref2 = spawn(behavior(PersistenceId.ofUniqueId("pid-1"), probe.ref))
probe.expectMessage("2 onRecoveryComplete")
ref2 ! "cmd"
probe.expectMessage("2 onCommand")
probe.expectMessage("3 thenRun")
}
"be available while unstashing" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId.ofUniqueId("pid-2"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "stash"
ref ! "cmd"
ref ! "cmd"
ref ! "cmd"
ref ! "unstash"
probe.expectMessage("0 stash")
probe.expectMessage("1 unstash")
probe.expectMessage("2 onCommand")
probe.expectMessage("3 thenRun")
probe.expectMessage("3 onCommand")
probe.expectMessage("4 thenRun")
probe.expectMessage("4 onCommand")
probe.expectMessage("5 thenRun")
}
"not fail when snapshotting" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId.ofUniqueId("pid-3"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "cmd"
ref ! "snapshot"
ref ! "cmd"
probe.expectMessage("0 onCommand") // first command
probe.expectMessage("1 thenRun")
probe.expectMessage("2 onCommand") // second command
probe.expectMessage("3 thenRun")
}
}
}

View file

@ -0,0 +1,4 @@
# #30833 internal changes for lastSequenceNumber
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.state.internal.Recovering.createBehavior")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.state.internal.Recovering.onGetSuccess")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.state.internal.Recovering.this")

View file

@ -7,18 +7,20 @@ package akka.persistence.typed.state.internal
import scala.concurrent.duration._
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.persistence._
import akka.persistence.typed.state.internal.DurableStateBehaviorImpl.GetState
import akka.persistence.typed.state.internal.Running.WithRevisionAccessible
import akka.persistence.state.scaladsl.GetObjectResult
import akka.persistence.typed.state.RecoveryCompleted
import akka.persistence.typed.state.RecoveryFailed
import akka.persistence.typed.state.internal.DurableStateBehaviorImpl.GetState
import akka.persistence.typed.state.internal.Running.WithRevisionAccessible
import akka.util.PrettyDuration._
import akka.util.unused
@ -37,8 +39,14 @@ import akka.util.unused
@InternalApi
private[akka] object Recovering {
def apply[C, S](setup: BehaviorSetup[C, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] =
new Recovering(setup.setMdcPhase(PersistenceMdc.RecoveringState)).createBehavior(receivedPoisonPill)
def apply[C, S](setup: BehaviorSetup[C, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
Behaviors.setup { _ =>
// protect against store stalling forever because of store overloaded and such
setup.startRecoveryTimer()
val recoveryState = RecoveryState[S](0L, null.asInstanceOf[S], receivedPoisonPill, System.nanoTime())
new Recovering(setup.setMdcPhase(PersistenceMdc.RecoveringState), recoveryState)
}
}
@InternalApi
private[akka] final case class RecoveryState[State](
@ -50,52 +58,46 @@ private[akka] object Recovering {
}
@InternalApi
private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S])
extends DurableStateStoreInteractions[C, S]
private[akka] class Recovering[C, S](
override val setup: BehaviorSetup[C, S],
var recoveryState: Recovering.RecoveryState[S])
extends AbstractBehavior[InternalProtocol](setup.context) // must be class for WithSeqNrAccessible
with DurableStateStoreInteractions[C, S]
with StashManagement[C, S]
with WithRevisionAccessible {
import InternalProtocol._
import Recovering.RecoveryState
// Needed for WithSeqNrAccessible
private var _currentRevision = 0L
onRecoveryStart(setup.context)
internalGet(setup.context)
def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = {
// protect against store stalling forever because of store overloaded and such
setup.startRecoveryTimer()
internalGet(setup.context)
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
Behaviors
.receiveMessage[InternalProtocol] {
case success: GetSuccess[S @unchecked] => onGetSuccess(success.result, receivedPoisonPill)
case GetFailure(exc) => onGetFailure(exc)
case RecoveryTimeout => onRecoveryTimeout()
case cmd: IncomingCommand[C @unchecked] =>
if (receivedPoisonPill) {
if (setup.settings.logOnStashing)
setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else
onCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case UpsertSuccess => Behaviors.unhandled
case _: UpsertFailure => Behaviors.unhandled
}
.receiveSignal(returnPermitOnStop.orElse {
case (_, PoisonPill) =>
stay(receivedPoisonPill = true)
case (_, signal) =>
if (setup.onSignal(setup.emptyState, signal, catchAndLog = true)) Behaviors.same
else Behaviors.unhandled
})
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
msg match {
case success: GetSuccess[S @unchecked] => onGetSuccess(success.result)
case GetFailure(exc) => onGetFailure(exc)
case RecoveryTimeout => onRecoveryTimeout()
case cmd: IncomingCommand[C @unchecked] =>
if (recoveryState.receivedPoisonPill) {
if (setup.settings.logOnStashing)
setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else
onCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case UpsertSuccess => Behaviors.unhandled
case _: UpsertFailure => Behaviors.unhandled
}
stay(receivedPoisonPillInPreviousPhase)
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
case PoisonPill =>
recoveryState = recoveryState.copy(receivedPoisonPill = true)
this
case signal =>
if (setup.onSignal(recoveryState.state, signal, catchAndLog = true)) this
else Behaviors.unhandled
}
/**
@ -138,7 +140,7 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S])
stashInternal(cmd)
}
def onGetSuccess(result: GetObjectResult[S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
def onGetSuccess(result: GetObjectResult[S]): Behavior[InternalProtocol] = {
val state = result.value match {
case Some(s) => setup.snapshotAdapter.fromJournal(s)
case None => setup.emptyState
@ -148,13 +150,13 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S])
setup.cancelRecoveryTimer()
onRecoveryCompleted(RecoveryState(result.revision, state, receivedPoisonPill, System.nanoTime()))
onRecoveryCompleted(RecoveryState(result.revision, state, recoveryState.receivedPoisonPill, System.nanoTime()))
}
private def onRecoveryCompleted(state: RecoveryState[S]): Behavior[InternalProtocol] =
try {
_currentRevision = state.revision
recoveryState = state
onRecoveryComplete(setup.context)
tryReturnRecoveryPermit("recovery completed successfully")
if (setup.internalLogger.isDebugEnabled) {
@ -185,6 +187,6 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S])
}
override def currentRevision: Long =
_currentRevision
recoveryState.revision
}