diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala index e13744aee3..79a8ed7d16 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala @@ -16,7 +16,7 @@ package akka.persistence.query */ final class DurableStateChange[A]( val persistenceId: String, - val seqNr: Long, + val revision: Long, val value: A, val offset: Offset, val timestamp: Long) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala index 0da3b59805..00e6edb6d5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala @@ -42,12 +42,13 @@ private[akka] trait DurableStateStoreInteractions[C, S] { state: Running.RunningState[S], value: Any): Running.RunningState[S] = { - val newRunningState = state.nextSequenceNr() + val newRunningState = state.nextRevision() val persistenceId = setup.persistenceId.id onWriteInitiated(ctx, cmd) - ctx.pipeToSelf[Done](setup.durableStateStore.upsertObject(persistenceId, newRunningState.seqNr, value, setup.tag)) { + ctx.pipeToSelf[Done]( + setup.durableStateStore.upsertObject(persistenceId, newRunningState.revision, value, setup.tag)) { case Success(_) => InternalProtocol.UpsertSuccess case Failure(cause) => InternalProtocol.UpsertFailure(cause) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala index 2e935c46c3..d830b494d2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala @@ -15,7 +15,7 @@ import akka.annotation.InternalApi import akka.annotation.InternalStableApi import akka.persistence._ import akka.persistence.typed.state.internal.DurableStateBehaviorImpl.GetState -import akka.persistence.typed.state.internal.Running.WithSeqNrAccessible +import akka.persistence.typed.state.internal.Running.WithRevisionAccessible import akka.persistence.state.scaladsl.GetObjectResult import akka.persistence.typed.state.RecoveryCompleted import akka.persistence.typed.state.RecoveryFailed @@ -42,7 +42,7 @@ private[akka] object Recovering { @InternalApi private[akka] final case class RecoveryState[State]( - seqNr: Long, + revision: Long, state: State, receivedPoisonPill: Boolean, recoveryStartTime: Long) @@ -53,13 +53,13 @@ private[akka] object Recovering { private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) extends DurableStateStoreInteractions[C, S] with StashManagement[C, S] - with WithSeqNrAccessible { + with WithRevisionAccessible { import InternalProtocol._ import Recovering.RecoveryState // Needed for WithSeqNrAccessible - private var _currentSequenceNumber = 0L + private var _currentRevision = 0L onRecoveryStart(setup.context) @@ -144,17 +144,17 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) case None => setup.emptyState } - setup.context.log.debug("Recovered from seqNr [{}]", result.seqNr) + setup.context.log.debug("Recovered from revision [{}]", result.revision) setup.cancelRecoveryTimer() - onRecoveryCompleted(RecoveryState(result.seqNr, state, receivedPoisonPill, System.nanoTime())) + onRecoveryCompleted(RecoveryState(result.revision, state, receivedPoisonPill, System.nanoTime())) } private def onRecoveryCompleted(state: RecoveryState[S]): Behavior[InternalProtocol] = try { - _currentSequenceNumber = state.seqNr + _currentRevision = state.revision onRecoveryComplete(setup.context) tryReturnRecoveryPermit("recovery completed successfully") if (setup.internalLogger.isDebugEnabled) { @@ -169,8 +169,10 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else { - val runningState = Running - .RunningState[S](seqNr = state.seqNr, state = state.state, receivedPoisonPill = state.receivedPoisonPill) + val runningState = Running.RunningState[S]( + revision = state.revision, + state = state.state, + receivedPoisonPill = state.receivedPoisonPill) val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) tryUnstashOne(new running.HandlingCommands(runningState)) } @@ -182,7 +184,7 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) onRecoveryFailure(cause) } - override def currentSequenceNumber: Long = - _currentSequenceNumber + override def currentRevision: Long = + _currentRevision } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala index b4e6142898..60f314caef 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala @@ -41,14 +41,14 @@ import akka.util.unused @InternalApi private[akka] object Running { - trait WithSeqNrAccessible { - def currentSequenceNumber: Long + trait WithRevisionAccessible { + def currentRevision: Long } - final case class RunningState[State](seqNr: Long, state: State, receivedPoisonPill: Boolean) { + final case class RunningState[State](revision: Long, state: State, receivedPoisonPill: Boolean) { - def nextSequenceNr(): RunningState[State] = - copy(seqNr = seqNr + 1) + def nextRevision(): RunningState[State] = + copy(revision = revision + 1) def applyState[C, E](@unused setup: BehaviorSetup[C, State], updated: State): RunningState[State] = { copy(state = updated) @@ -64,16 +64,16 @@ private[akka] object Running { with StashManagement[C, S] { import InternalProtocol._ import Running.RunningState - import Running.WithSeqNrAccessible + import Running.WithRevisionAccessible // Needed for WithSeqNrAccessible, when unstashing - private var _currentSequenceNumber = 0L + private var _currentRevision = 0L final class HandlingCommands(state: RunningState[S]) extends AbstractBehavior[InternalProtocol](setup.context) - with WithSeqNrAccessible { + with WithRevisionAccessible { - _currentSequenceNumber = state.seqNr + _currentRevision = state.revision def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { case IncomingCommand(c: C @unchecked) => onCommand(state, c) @@ -107,7 +107,7 @@ private[akka] object Running { newState: S, cmd: Any, sideEffects: immutable.Seq[SideEffect[S]]): (Behavior[InternalProtocol], Boolean) = { - _currentSequenceNumber = state.seqNr + 1 + _currentRevision = state.revision + 1 val stateAfterApply = state.applyState(setup, newState) val stateToPersist = adaptState(newState) @@ -161,8 +161,8 @@ private[akka] object Running { setup.setMdcPhase(PersistenceMdc.RunningCmds) - override def currentSequenceNumber: Long = - _currentSequenceNumber + override def currentRevision: Long = + _currentRevision } // =============================================== @@ -182,7 +182,7 @@ private[akka] object Running { var sideEffects: immutable.Seq[SideEffect[S]], persistStartTime: Long = System.nanoTime()) extends AbstractBehavior[InternalProtocol](setup.context) - with WithSeqNrAccessible { + with WithRevisionAccessible { override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { msg match { @@ -222,7 +222,7 @@ private[akka] object Running { final def onUpsertFailed(cause: Throwable): Behavior[InternalProtocol] = { onWriteFailed(setup.context, cause) - throw new DurableStateStoreException(setup.persistenceId, currentSequenceNumber, cause) + throw new DurableStateStoreException(setup.persistenceId, currentRevision, cause) } override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { @@ -235,8 +235,8 @@ private[akka] object Running { else Behaviors.unhandled } - override def currentSequenceNumber: Long = { - _currentSequenceNumber + override def currentRevision: Long = { + _currentRevision } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala index 16e3d058ac..80e79682c3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/DurableStateBehavior.scala @@ -94,7 +94,7 @@ object DurableStateBehavior { } extractConcreteBehavior(context.currentBehavior) match { - case w: Running.WithSeqNrAccessible => w.currentSequenceNumber + case w: Running.WithRevisionAccessible => w.currentRevision case s => throw new IllegalStateException(s"Cannot extract the lastSequenceNumber in state ${s.getClass.getName}") } diff --git a/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateStore.scala index bf889a94b7..102c9855ea 100644 --- a/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateStore.scala @@ -20,4 +20,4 @@ trait DurableStateStore[A] { } -final case class GetObjectResult[A](value: Optional[A], seqNr: Long) +final case class GetObjectResult[A](value: Optional[A], revision: Long) diff --git a/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala index 91726213d1..d581b998bc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala @@ -18,7 +18,7 @@ trait DurableStateUpdateStore[A] extends DurableStateStore[A] { /** * @param seqNr sequence number for optimistic locking. starts at 1. */ - def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] + def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] def deleteObject(persistenceId: String): CompletionStage[Done] } diff --git a/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateStore.scala index 24d959260d..d2493a47bd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateStore.scala @@ -19,4 +19,4 @@ trait DurableStateStore[A] { } -final case class GetObjectResult[A](value: Option[A], seqNr: Long) +final case class GetObjectResult[A](value: Option[A], revision: Long) diff --git a/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala index cd847b4579..0b73613c07 100644 --- a/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala @@ -18,7 +18,7 @@ trait DurableStateUpdateStore[A] extends DurableStateStore[A] { /** * @param seqNr sequence number for optimistic locking. starts at 1. */ - def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): Future[Done] + def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] def deleteObject(persistenceId: String): Future[Done]