diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala new file mode 100644 index 0000000000..0ccae61329 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateRevisionSpec.scala @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.persistence.typed.state.scaladsl + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.testkit.PersistenceTestKitDurableStateStorePlugin +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.state.RecoveryCompleted + +object DurableStateRevisionSpec { + + def conf: Config = PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(s""" + akka.loglevel = INFO + """)) + +} + +class DurableStateRevisionSpec + extends ScalaTestWithActorTestKit(DurableStateRevisionSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup( + ctx => + DurableStateBehavior[String, String]( + pid, + "", + (state, command) => + state match { + case "stashing" => + command match { + case "unstash" => + probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} unstash" + Effect.persist("normal").thenUnstashAll() + case _ => + Effect.stash() + } + case _ => + command match { + case "cmd" => + probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} onCommand" + Effect + .persist("state") + .thenRun(_ => probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} thenRun") + case "stash" => + probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} stash" + Effect.persist("stashing") + case "snapshot" => + Effect.persist("snapshot") + } + }).receiveSignal { + case (_, RecoveryCompleted) => + probe ! s"${DurableStateBehavior.lastSequenceNumber(ctx)} onRecoveryComplete" + }) + + "The revision number" must { + + "be accessible in the handlers" in { + val probe = TestProbe[String]() + val ref = spawn(behavior(PersistenceId.ofUniqueId("pid-1"), probe.ref)) + probe.expectMessage("0 onRecoveryComplete") + + ref ! "cmd" + probe.expectMessage("0 onCommand") + probe.expectMessage("1 thenRun") + + ref ! "cmd" + probe.expectMessage("1 onCommand") + probe.expectMessage("2 thenRun") + + testKit.stop(ref) + probe.expectTerminated(ref) + + // and during recovery + val ref2 = spawn(behavior(PersistenceId.ofUniqueId("pid-1"), probe.ref)) + probe.expectMessage("2 onRecoveryComplete") + + ref2 ! "cmd" + probe.expectMessage("2 onCommand") + probe.expectMessage("3 thenRun") + } + + "be available while unstashing" in { + val probe = TestProbe[String]() + val ref = spawn(behavior(PersistenceId.ofUniqueId("pid-2"), probe.ref)) + probe.expectMessage("0 onRecoveryComplete") + + ref ! "stash" + ref ! "cmd" + ref ! "cmd" + ref ! "cmd" + ref ! "unstash" + probe.expectMessage("0 stash") + probe.expectMessage("1 unstash") + probe.expectMessage("2 onCommand") + probe.expectMessage("3 thenRun") + probe.expectMessage("3 onCommand") + probe.expectMessage("4 thenRun") + probe.expectMessage("4 onCommand") + probe.expectMessage("5 thenRun") + } + + "not fail when snapshotting" in { + val probe = TestProbe[String]() + val ref = spawn(behavior(PersistenceId.ofUniqueId("pid-3"), probe.ref)) + probe.expectMessage("0 onRecoveryComplete") + + ref ! "cmd" + ref ! "snapshot" + ref ! "cmd" + + probe.expectMessage("0 onCommand") // first command + probe.expectMessage("1 thenRun") + probe.expectMessage("2 onCommand") // second command + probe.expectMessage("3 thenRun") + } + } +} diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.17.backwards.excludes/issue-30833-lastSequenceNumber.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.17.backwards.excludes/issue-30833-lastSequenceNumber.excludes new file mode 100644 index 0000000000..a617615ee0 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.17.backwards.excludes/issue-30833-lastSequenceNumber.excludes @@ -0,0 +1,4 @@ +# #30833 internal changes for lastSequenceNumber +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.state.internal.Recovering.createBehavior") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.state.internal.Recovering.onGetSuccess") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.state.internal.Recovering.this") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala index e2f5e42298..2df3095e1d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala @@ -7,18 +7,20 @@ package akka.persistence.typed.state.internal import scala.concurrent.duration._ import akka.actor.typed.Behavior +import akka.actor.typed.Signal import akka.actor.typed.internal.PoisonPill +import akka.actor.typed.scaladsl.AbstractBehavior import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.annotation.InternalStableApi import akka.persistence._ -import akka.persistence.typed.state.internal.DurableStateBehaviorImpl.GetState -import akka.persistence.typed.state.internal.Running.WithRevisionAccessible import akka.persistence.state.scaladsl.GetObjectResult import akka.persistence.typed.state.RecoveryCompleted import akka.persistence.typed.state.RecoveryFailed +import akka.persistence.typed.state.internal.DurableStateBehaviorImpl.GetState +import akka.persistence.typed.state.internal.Running.WithRevisionAccessible import akka.util.PrettyDuration._ import akka.util.unused @@ -37,8 +39,14 @@ import akka.util.unused @InternalApi private[akka] object Recovering { - def apply[C, S](setup: BehaviorSetup[C, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = - new Recovering(setup.setMdcPhase(PersistenceMdc.RecoveringState)).createBehavior(receivedPoisonPill) + def apply[C, S](setup: BehaviorSetup[C, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { + Behaviors.setup { _ => + // protect against store stalling forever because of store overloaded and such + setup.startRecoveryTimer() + val recoveryState = RecoveryState[S](0L, null.asInstanceOf[S], receivedPoisonPill, System.nanoTime()) + new Recovering(setup.setMdcPhase(PersistenceMdc.RecoveringState), recoveryState) + } + } @InternalApi private[akka] final case class RecoveryState[State]( @@ -50,52 +58,46 @@ private[akka] object Recovering { } @InternalApi -private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) - extends DurableStateStoreInteractions[C, S] +private[akka] class Recovering[C, S]( + override val setup: BehaviorSetup[C, S], + var recoveryState: Recovering.RecoveryState[S]) + extends AbstractBehavior[InternalProtocol](setup.context) // must be class for WithSeqNrAccessible + with DurableStateStoreInteractions[C, S] with StashManagement[C, S] with WithRevisionAccessible { import InternalProtocol._ import Recovering.RecoveryState - // Needed for WithSeqNrAccessible - private var _currentRevision = 0L - onRecoveryStart(setup.context) + internalGet(setup.context) - def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = { - // protect against store stalling forever because of store overloaded and such - setup.startRecoveryTimer() - - internalGet(setup.context) - - def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { - Behaviors - .receiveMessage[InternalProtocol] { - case success: GetSuccess[S @unchecked] => onGetSuccess(success.result, receivedPoisonPill) - case GetFailure(exc) => onGetFailure(exc) - case RecoveryTimeout => onRecoveryTimeout() - case cmd: IncomingCommand[C @unchecked] => - if (receivedPoisonPill) { - if (setup.settings.logOnStashing) - setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd) - Behaviors.unhandled - } else - onCommand(cmd) - case get: GetState[S @unchecked] => stashInternal(get) - case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit - case UpsertSuccess => Behaviors.unhandled - case _: UpsertFailure => Behaviors.unhandled - } - .receiveSignal(returnPermitOnStop.orElse { - case (_, PoisonPill) => - stay(receivedPoisonPill = true) - case (_, signal) => - if (setup.onSignal(setup.emptyState, signal, catchAndLog = true)) Behaviors.same - else Behaviors.unhandled - }) + override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { + msg match { + case success: GetSuccess[S @unchecked] => onGetSuccess(success.result) + case GetFailure(exc) => onGetFailure(exc) + case RecoveryTimeout => onRecoveryTimeout() + case cmd: IncomingCommand[C @unchecked] => + if (recoveryState.receivedPoisonPill) { + if (setup.settings.logOnStashing) + setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd) + Behaviors.unhandled + } else + onCommand(cmd) + case get: GetState[S @unchecked] => stashInternal(get) + case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit + case UpsertSuccess => Behaviors.unhandled + case _: UpsertFailure => Behaviors.unhandled } - stay(receivedPoisonPillInPreviousPhase) + } + + override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { + case PoisonPill => + recoveryState = recoveryState.copy(receivedPoisonPill = true) + this + case signal => + if (setup.onSignal(recoveryState.state, signal, catchAndLog = true)) this + else Behaviors.unhandled } /** @@ -138,7 +140,7 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) stashInternal(cmd) } - def onGetSuccess(result: GetObjectResult[S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { + def onGetSuccess(result: GetObjectResult[S]): Behavior[InternalProtocol] = { val state = result.value match { case Some(s) => setup.snapshotAdapter.fromJournal(s) case None => setup.emptyState @@ -148,13 +150,13 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) setup.cancelRecoveryTimer() - onRecoveryCompleted(RecoveryState(result.revision, state, receivedPoisonPill, System.nanoTime())) + onRecoveryCompleted(RecoveryState(result.revision, state, recoveryState.receivedPoisonPill, System.nanoTime())) } private def onRecoveryCompleted(state: RecoveryState[S]): Behavior[InternalProtocol] = try { - _currentRevision = state.revision + recoveryState = state onRecoveryComplete(setup.context) tryReturnRecoveryPermit("recovery completed successfully") if (setup.internalLogger.isDebugEnabled) { @@ -185,6 +187,6 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) } override def currentRevision: Long = - _currentRevision + recoveryState.revision }