Last sequence number fix (#27964)

* Reproducer for the bug

* Separate state when waiting for snapshot write #27935

Allows for accessing seqNr while snapshot in progress
This commit is contained in:
Johan Andrén 2019-10-11 14:45:40 +02:00 committed by Christopher Batey
parent ca7cad3a6d
commit 2f7dfbfc01
4 changed files with 163 additions and 88 deletions

View file

@ -280,7 +280,7 @@ private[akka] object Running {
tryUnstashOne(newState)
} else {
internalSaveSnapshot(state)
storingSnapshot(state, sideEffects, shouldSnapshotAfterPersist)
new StoringSnapshot(state, sideEffects, shouldSnapshotAfterPersist)
}
}
}
@ -331,10 +331,13 @@ private[akka] object Running {
// ===============================================
def storingSnapshot(
/** INTERNAL API */
@InternalApi private[akka] class StoringSnapshot(
state: RunningState[S],
sideEffects: immutable.Seq[SideEffect[S]],
snapshotReason: SnapshotAfterPersist): Behavior[InternalProtocol] = {
snapshotReason: SnapshotAfterPersist)
extends AbstractBehavior[InternalProtocol](setup.context)
with WithSeqNrAccessible {
setup.setMdcPhase(PersistenceMdc.StoringSnapshot)
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
@ -387,32 +390,33 @@ private[akka] object Running {
}
}
Behaviors
.receiveMessage[InternalProtocol] {
case cmd: IncomingCommand[C] @unchecked =>
onCommand(cmd)
case JournalResponse(r) =>
onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(response) =>
response match {
case _: SaveSnapshotSuccess | _: SaveSnapshotFailure =>
onSaveSnapshotResponse(response)
tryUnstashOne(applySideEffects(sideEffects, state))
case _ =>
onDeleteSnapshotResponse(response, state.state)
}
case _ =>
Behaviors.unhandled
}
.receiveSignal {
case (_, PoisonPill) =>
// wait for snapshot response before stopping
storingSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason)
case (_, signal) =>
setup.onSignal(state.state, signal, catchAndLog = false)
Behaviors.same
}
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case cmd: IncomingCommand[C] @unchecked =>
onCommand(cmd)
case JournalResponse(r) =>
onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(response) =>
response match {
case _: SaveSnapshotSuccess | _: SaveSnapshotFailure =>
onSaveSnapshotResponse(response)
tryUnstashOne(applySideEffects(sideEffects, state))
case _ =>
onDeleteSnapshotResponse(response, state.state)
}
case _ =>
Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
case PoisonPill =>
// wait for snapshot response before stopping
new StoringSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason)
case signal =>
setup.onSignal(state.state, signal, catchAndLog = false)
Behaviors.same
}
override def currentSequenceNumber: Long = state.seqNr
}
// --------------------------

View file

@ -18,6 +18,8 @@ object EventSourcedSequenceNumberSpec {
private val conf = ConfigFactory.parseString(s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
akka.persistence.snapshot-store.plugin = "slow-snapshot-store"
slow-snapshot-store.class = "${classOf[SlowInMemorySnapshotStore].getName}"
""")
}
@ -28,17 +30,40 @@ class EventSourcedSequenceNumberSpec
with LogCapturing {
private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup(ctx =>
EventSourcedBehavior[String, String, String](pid, "", { (_, command) =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand"
Effect.persist(command).thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun")
}, { (state, evt) =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler"
state + evt
}).receiveSignal {
case (_, RecoveryCompleted) =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onRecoveryComplete"
})
Behaviors.setup(
ctx =>
EventSourcedBehavior[String, String, String](pid, "", {
(state, command) =>
state match {
case "stashing" =>
command match {
case "unstash" =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} unstash"
Effect.persist("normal").thenUnstashAll()
case _ =>
Effect.stash()
}
case _ =>
command match {
case "cmd" =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand"
Effect
.persist(command)
.thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun")
case "stash" =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} stash"
Effect.persist("stashing")
case "snapshot" =>
Effect.persist("snapshot")
}
}
}, { (_, evt) =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler"
evt
}).snapshotWhen((_, event, _) => event == "snapshot").receiveSignal {
case (_, RecoveryCompleted) =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onRecoveryComplete"
})
"The sequence number" must {
@ -47,10 +72,51 @@ class EventSourcedSequenceNumberSpec
val ref = spawn(behavior(PersistenceId.ofUniqueId("ess-1"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "cmd1"
ref ! "cmd"
probe.expectMessage("0 onCommand")
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 thenRun")
}
"be available while replaying stash" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId("ess-2"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "stash"
ref ! "cmd"
ref ! "cmd"
ref ! "cmd"
ref ! "unstash"
probe.expectMessage("0 stash")
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 unstash")
probe.expectMessage("1 eventHandler")
probe.expectMessage("2 onCommand")
probe.expectMessage("2 eventHandler")
probe.expectMessage("3 thenRun")
probe.expectMessage("3 onCommand")
probe.expectMessage("3 eventHandler")
probe.expectMessage("4 thenRun")
}
// reproducer for #27935
"not fail when snapshotting" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId("ess-3"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "cmd"
ref ! "snapshot"
ref ! "cmd"
probe.expectMessage("0 onCommand") // first command
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 thenRun")
probe.expectMessage("1 eventHandler") // snapshot
probe.expectMessage("2 onCommand") // second command
probe.expectMessage("2 eventHandler")
probe.expectMessage("3 thenRun")
}
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.persistence.SelectedSnapshot
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.scaladsl.SnapshotMutableStateSpec.MutableState
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import scala.concurrent.Future
class SlowInMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, ClassicSnapshotMetadata)]
def loadAsync(persistenceId: String, criteria: ClassicSnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
Future.successful(state.get(persistenceId).map {
case (snap, meta) => SelectedSnapshot(meta, snap)
})
}
def saveAsync(metadata: ClassicSnapshotMetadata, snapshot: Any): Future[Unit] = {
val snapshotState = snapshot.asInstanceOf[MutableState]
val value1 = snapshotState.value
Thread.sleep(50)
val value2 = snapshotState.value
// it mustn't have been modified by another command/event
if (value1 != value2)
Future.failed(new IllegalStateException(s"State changed from $value1 to $value2"))
else {
// copy to simulate serialization, and subsequent recovery shouldn't get same instance
state = state.updated(metadata.persistenceId, (new MutableState(snapshotState.value), metadata))
Future.successful(())
}
}
override def deleteAsync(metadata: ClassicSnapshotMetadata): Future[Unit] = {
state = state.filterNot {
case (pid, (_, meta)) => pid == metadata.persistenceId && meta.sequenceNr == metadata.sequenceNr
}
Future.successful(())
}
override def deleteAsync(persistenceId: String, criteria: ClassicSnapshotSelectionCriteria): Future[Unit] = {
state = state.filterNot {
case (pid, (_, meta)) => pid == persistenceId && criteria.matches(meta)
}
Future.successful(())
}
}

View file

@ -7,18 +7,12 @@ package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.SelectedSnapshot
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -26,48 +20,6 @@ import org.scalatest.WordSpecLike
object SnapshotMutableStateSpec {
class SlowInMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, ClassicSnapshotMetadata)]
def loadAsync(
persistenceId: String,
criteria: ClassicSnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
Future.successful(state.get(persistenceId).map {
case (snap, meta) => SelectedSnapshot(meta, snap)
})
}
def saveAsync(metadata: ClassicSnapshotMetadata, snapshot: Any): Future[Unit] = {
val snapshotState = snapshot.asInstanceOf[MutableState]
val value1 = snapshotState.value
Thread.sleep(50)
val value2 = snapshotState.value
// it mustn't have been modified by another command/event
if (value1 != value2)
Future.failed(new IllegalStateException(s"State changed from $value1 to $value2"))
else {
// copy to simulate serialization, and subsequent recovery shouldn't get same instance
state = state.updated(metadata.persistenceId, (new MutableState(snapshotState.value), metadata))
Future.successful(())
}
}
override def deleteAsync(metadata: ClassicSnapshotMetadata): Future[Unit] = {
state = state.filterNot {
case (pid, (_, meta)) => pid == metadata.persistenceId && meta.sequenceNr == metadata.sequenceNr
}
Future.successful(())
}
override def deleteAsync(persistenceId: String, criteria: ClassicSnapshotSelectionCriteria): Future[Unit] = {
state = state.filterNot {
case (pid, (_, meta)) => pid == persistenceId && criteria.matches(meta)
}
Future.successful(())
}
}
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"