From 2f7dfbfc01d954bc69a192bcbc1ec5c3e049490d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 11 Oct 2019 14:45:40 +0200 Subject: [PATCH] Last sequence number fix (#27964) * Reproducer for the bug * Separate state when waiting for snapshot write #27935 Allows for accessing seqNr while snapshot in progress --- .../persistence/typed/internal/Running.scala | 60 +++++++------ .../EventSourcedSequenceNumberSpec.scala | 90 ++++++++++++++++--- .../scaladsl/SlowInMemorySnapshotStore.scala | 53 +++++++++++ .../scaladsl/SnapshotMutableStateSpec.scala | 48 ---------- 4 files changed, 163 insertions(+), 88 deletions(-) create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SlowInMemorySnapshotStore.scala diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 2e79cf5b77..a8a2c8ec7f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -280,7 +280,7 @@ private[akka] object Running { tryUnstashOne(newState) } else { internalSaveSnapshot(state) - storingSnapshot(state, sideEffects, shouldSnapshotAfterPersist) + new StoringSnapshot(state, sideEffects, shouldSnapshotAfterPersist) } } } @@ -331,10 +331,13 @@ private[akka] object Running { // =============================================== - def storingSnapshot( + /** INTERNAL API */ + @InternalApi private[akka] class StoringSnapshot( state: RunningState[S], sideEffects: immutable.Seq[SideEffect[S]], - snapshotReason: SnapshotAfterPersist): Behavior[InternalProtocol] = { + snapshotReason: SnapshotAfterPersist) + extends AbstractBehavior[InternalProtocol](setup.context) + with WithSeqNrAccessible { setup.setMdcPhase(PersistenceMdc.StoringSnapshot) def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { @@ -387,32 +390,33 @@ private[akka] object Running { } } - Behaviors - .receiveMessage[InternalProtocol] { - case cmd: IncomingCommand[C] @unchecked => - onCommand(cmd) - case JournalResponse(r) => - onDeleteEventsJournalResponse(r, state.state) - case SnapshotterResponse(response) => - response match { - case _: SaveSnapshotSuccess | _: SaveSnapshotFailure => - onSaveSnapshotResponse(response) - tryUnstashOne(applySideEffects(sideEffects, state)) - case _ => - onDeleteSnapshotResponse(response, state.state) - } - case _ => - Behaviors.unhandled - } - .receiveSignal { - case (_, PoisonPill) => - // wait for snapshot response before stopping - storingSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason) - case (_, signal) => - setup.onSignal(state.state, signal, catchAndLog = false) - Behaviors.same - } + def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { + case cmd: IncomingCommand[C] @unchecked => + onCommand(cmd) + case JournalResponse(r) => + onDeleteEventsJournalResponse(r, state.state) + case SnapshotterResponse(response) => + response match { + case _: SaveSnapshotSuccess | _: SaveSnapshotFailure => + onSaveSnapshotResponse(response) + tryUnstashOne(applySideEffects(sideEffects, state)) + case _ => + onDeleteSnapshotResponse(response, state.state) + } + case _ => + Behaviors.unhandled + } + override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { + case PoisonPill => + // wait for snapshot response before stopping + new StoringSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason) + case signal => + setup.onSignal(state.state, signal, catchAndLog = false) + Behaviors.same + } + + override def currentSequenceNumber: Long = state.seqNr } // -------------------------- diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala index 71e4598118..98caea7faf 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala @@ -18,6 +18,8 @@ object EventSourcedSequenceNumberSpec { private val conf = ConfigFactory.parseString(s""" akka.persistence.journal.plugin = "akka.persistence.journal.inmem" akka.persistence.journal.inmem.test-serialization = on + akka.persistence.snapshot-store.plugin = "slow-snapshot-store" + slow-snapshot-store.class = "${classOf[SlowInMemorySnapshotStore].getName}" """) } @@ -28,17 +30,40 @@ class EventSourcedSequenceNumberSpec with LogCapturing { private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = - Behaviors.setup(ctx => - EventSourcedBehavior[String, String, String](pid, "", { (_, command) => - probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand" - Effect.persist(command).thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun") - }, { (state, evt) => - probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler" - state + evt - }).receiveSignal { - case (_, RecoveryCompleted) => - probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onRecoveryComplete" - }) + Behaviors.setup( + ctx => + EventSourcedBehavior[String, String, String](pid, "", { + (state, command) => + state match { + case "stashing" => + command match { + case "unstash" => + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} unstash" + Effect.persist("normal").thenUnstashAll() + case _ => + Effect.stash() + } + case _ => + command match { + case "cmd" => + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand" + Effect + .persist(command) + .thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun") + case "stash" => + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} stash" + Effect.persist("stashing") + case "snapshot" => + Effect.persist("snapshot") + } + } + }, { (_, evt) => + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler" + evt + }).snapshotWhen((_, event, _) => event == "snapshot").receiveSignal { + case (_, RecoveryCompleted) => + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onRecoveryComplete" + }) "The sequence number" must { @@ -47,10 +72,51 @@ class EventSourcedSequenceNumberSpec val ref = spawn(behavior(PersistenceId.ofUniqueId("ess-1"), probe.ref)) probe.expectMessage("0 onRecoveryComplete") - ref ! "cmd1" + ref ! "cmd" probe.expectMessage("0 onCommand") probe.expectMessage("0 eventHandler") probe.expectMessage("1 thenRun") } + + "be available while replaying stash" in { + val probe = TestProbe[String]() + val ref = spawn(behavior(PersistenceId("ess-2"), probe.ref)) + probe.expectMessage("0 onRecoveryComplete") + + ref ! "stash" + ref ! "cmd" + ref ! "cmd" + ref ! "cmd" + ref ! "unstash" + probe.expectMessage("0 stash") + probe.expectMessage("0 eventHandler") + probe.expectMessage("1 unstash") + probe.expectMessage("1 eventHandler") + probe.expectMessage("2 onCommand") + probe.expectMessage("2 eventHandler") + probe.expectMessage("3 thenRun") + probe.expectMessage("3 onCommand") + probe.expectMessage("3 eventHandler") + probe.expectMessage("4 thenRun") + } + + // reproducer for #27935 + "not fail when snapshotting" in { + val probe = TestProbe[String]() + val ref = spawn(behavior(PersistenceId("ess-3"), probe.ref)) + probe.expectMessage("0 onRecoveryComplete") + + ref ! "cmd" + ref ! "snapshot" + ref ! "cmd" + + probe.expectMessage("0 onCommand") // first command + probe.expectMessage("0 eventHandler") + probe.expectMessage("1 thenRun") + probe.expectMessage("1 eventHandler") // snapshot + probe.expectMessage("2 onCommand") // second command + probe.expectMessage("2 eventHandler") + probe.expectMessage("3 thenRun") + } } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SlowInMemorySnapshotStore.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SlowInMemorySnapshotStore.scala new file mode 100644 index 0000000000..149d4cac6f --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SlowInMemorySnapshotStore.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.persistence.SelectedSnapshot +import akka.persistence.snapshot.SnapshotStore +import akka.persistence.typed.scaladsl.SnapshotMutableStateSpec.MutableState +import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria } +import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata } + +import scala.concurrent.Future + +class SlowInMemorySnapshotStore extends SnapshotStore { + + private var state = Map.empty[String, (Any, ClassicSnapshotMetadata)] + + def loadAsync(persistenceId: String, criteria: ClassicSnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + Future.successful(state.get(persistenceId).map { + case (snap, meta) => SelectedSnapshot(meta, snap) + }) + } + + def saveAsync(metadata: ClassicSnapshotMetadata, snapshot: Any): Future[Unit] = { + val snapshotState = snapshot.asInstanceOf[MutableState] + val value1 = snapshotState.value + Thread.sleep(50) + val value2 = snapshotState.value + // it mustn't have been modified by another command/event + if (value1 != value2) + Future.failed(new IllegalStateException(s"State changed from $value1 to $value2")) + else { + // copy to simulate serialization, and subsequent recovery shouldn't get same instance + state = state.updated(metadata.persistenceId, (new MutableState(snapshotState.value), metadata)) + Future.successful(()) + } + } + + override def deleteAsync(metadata: ClassicSnapshotMetadata): Future[Unit] = { + state = state.filterNot { + case (pid, (_, meta)) => pid == metadata.persistenceId && meta.sequenceNr == metadata.sequenceNr + } + Future.successful(()) + } + + override def deleteAsync(persistenceId: String, criteria: ClassicSnapshotSelectionCriteria): Future[Unit] = { + state = state.filterNot { + case (pid, (_, meta)) => pid == persistenceId && criteria.matches(meta) + } + Future.successful(()) + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala index 1db3a79869..7dae0799fe 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala @@ -7,18 +7,12 @@ package akka.persistence.typed.scaladsl import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.Future - import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior -import akka.persistence.SelectedSnapshot -import akka.persistence.snapshot.SnapshotStore import akka.persistence.typed.PersistenceId import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed -import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria } -import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata } import akka.serialization.jackson.CborSerializable import com.typesafe.config.Config import com.typesafe.config.ConfigFactory @@ -26,48 +20,6 @@ import org.scalatest.WordSpecLike object SnapshotMutableStateSpec { - class SlowInMemorySnapshotStore extends SnapshotStore { - - private var state = Map.empty[String, (Any, ClassicSnapshotMetadata)] - - def loadAsync( - persistenceId: String, - criteria: ClassicSnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { - Future.successful(state.get(persistenceId).map { - case (snap, meta) => SelectedSnapshot(meta, snap) - }) - } - - def saveAsync(metadata: ClassicSnapshotMetadata, snapshot: Any): Future[Unit] = { - val snapshotState = snapshot.asInstanceOf[MutableState] - val value1 = snapshotState.value - Thread.sleep(50) - val value2 = snapshotState.value - // it mustn't have been modified by another command/event - if (value1 != value2) - Future.failed(new IllegalStateException(s"State changed from $value1 to $value2")) - else { - // copy to simulate serialization, and subsequent recovery shouldn't get same instance - state = state.updated(metadata.persistenceId, (new MutableState(snapshotState.value), metadata)) - Future.successful(()) - } - } - - override def deleteAsync(metadata: ClassicSnapshotMetadata): Future[Unit] = { - state = state.filterNot { - case (pid, (_, meta)) => pid == metadata.persistenceId && meta.sequenceNr == metadata.sequenceNr - } - Future.successful(()) - } - - override def deleteAsync(persistenceId: String, criteria: ClassicSnapshotSelectionCriteria): Future[Unit] = { - state = state.filterNot { - case (pid, (_, meta)) => pid == persistenceId && criteria.matches(meta) - } - Future.successful(()) - } - } - def conf: Config = ConfigFactory.parseString(s""" akka.loglevel = INFO akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"