fix wrong lastSequenceNumber, #28976 (#29128)

* off by one when accessed from event handler
* off by one from event handler during replay
* wrong when unstashing
* added more tests, also for persist of several events
This commit is contained in:
Patrik Nordwall 2020-05-26 17:17:30 +02:00 committed by GitHub
parent 2a1f54c8ce
commit 90aa5be45e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 25 deletions

View file

@ -99,7 +99,7 @@ private[akka] trait JournalInteractions[C, E, S] {
@unused repr: immutable.Seq[PersistentRepr]): Unit = ()
protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.log.debug2("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
setup.log.debug2("Replaying events: from: {}, to: {}", fromSeqNr, toSeqNr)
setup.journal.tell(
ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId.id, setup.selfClassic),
setup.selfClassic)

View file

@ -116,10 +116,8 @@ private[akka] final class ReplayingEvents[C, E, S](
def handleEvent(event: E): Unit = {
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
state = state.copy(seqNr = repr.sequenceNr)
state = state.copy(state = setup.eventHandler(state.state, event), eventSeenInInterval = true)
}
eventSeq match {
@ -247,5 +245,6 @@ private[akka] final class ReplayingEvents[C, E, S](
setup.cancelRecoveryTimer()
}
override def currentSequenceNumber: Long = state.seqNr
override def currentSequenceNumber: Long =
state.seqNr
}

View file

@ -97,10 +97,15 @@ private[akka] object Running {
import InternalProtocol._
import Running.RunningState
// Needed for WithSeqNrAccessible, when unstashing
private var _currentSequenceNumber = 0L
final class HandlingCommands(state: RunningState[S])
extends AbstractBehavior[InternalProtocol](setup.context)
with WithSeqNrAccessible {
_currentSequenceNumber = state.seqNr
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
@ -150,6 +155,7 @@ private[akka] object Running {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr + 1
val newState = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
@ -166,12 +172,13 @@ private[akka] object Running {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var seqNr = state.seqNr
_currentSequenceNumber = state.seqNr
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) {
case ((currentState, snapshot), event) =>
seqNr += 1
_currentSequenceNumber += 1
val shouldSnapshot =
if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot
if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber)
else snapshot
(currentState.applyEvent(setup, event), shouldSnapshot)
}
@ -212,7 +219,8 @@ private[akka] object Running {
setup.setMdcPhase(PersistenceMdc.RunningCmds)
override def currentSequenceNumber: Long = state.seqNr
override def currentSequenceNumber: Long =
_currentSequenceNumber
}
// ===============================================
@ -335,7 +343,9 @@ private[akka] object Running {
else Behaviors.unhandled
}
override def currentSequenceNumber: Long = visibleState.seqNr
override def currentSequenceNumber: Long = {
_currentSequenceNumber
}
}
// ===============================================
@ -430,7 +440,8 @@ private[akka] object Running {
Behaviors.unhandled
}
override def currentSequenceNumber: Long = state.seqNr
override def currentSequenceNumber: Long =
_currentSequenceNumber
}
// --------------------------

View file

@ -773,7 +773,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
probe.expectMessage("0 onRecoveryCompleted");
ref.tell("cmd");
probe.expectMessage("0 onCommand");
probe.expectMessage("0 applyEvent");
probe.expectMessage("1 applyEvent");
probe.expectMessage("1 thenRun");
}
}

View file

@ -49,7 +49,12 @@ class EventSourcedSequenceNumberSpec
case "cmd" =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand"
Effect
.persist(command)
.persist("evt")
.thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun")
case "cmd3" =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand"
Effect
.persist("evt1", "evt2", "evt3")
.thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun")
case "stash" =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} stash"
@ -59,7 +64,7 @@ class EventSourcedSequenceNumberSpec
}
}
}, { (_, evt) =>
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler"
probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler $evt"
evt
}).snapshotWhen((_, event, _) => event == "snapshot").receiveSignal {
case (_, RecoveryCompleted) =>
@ -75,11 +80,40 @@ class EventSourcedSequenceNumberSpec
ref ! "cmd"
probe.expectMessage("0 onCommand")
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 eventHandler evt")
probe.expectMessage("1 thenRun")
ref ! "cmd"
probe.expectMessage("1 onCommand")
probe.expectMessage("2 eventHandler evt")
probe.expectMessage("2 thenRun")
ref ! "cmd3"
probe.expectMessage("2 onCommand")
probe.expectMessage("3 eventHandler evt1")
probe.expectMessage("4 eventHandler evt2")
probe.expectMessage("5 eventHandler evt3")
probe.expectMessage("5 thenRun")
testKit.stop(ref)
probe.expectTerminated(ref)
// and during replay
val ref2 = spawn(behavior(PersistenceId.ofUniqueId("ess-1"), probe.ref))
probe.expectMessage("1 eventHandler evt")
probe.expectMessage("2 eventHandler evt")
probe.expectMessage("3 eventHandler evt1")
probe.expectMessage("4 eventHandler evt2")
probe.expectMessage("5 eventHandler evt3")
probe.expectMessage("5 onRecoveryComplete")
ref2 ! "cmd"
probe.expectMessage("5 onCommand")
probe.expectMessage("6 eventHandler evt")
probe.expectMessage("6 thenRun")
}
"be available while replaying stash" in {
"be available while unstashing" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId.ofUniqueId("ess-2"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
@ -87,18 +121,23 @@ class EventSourcedSequenceNumberSpec
ref ! "stash"
ref ! "cmd"
ref ! "cmd"
ref ! "cmd"
ref ! "cmd3"
ref ! "unstash"
probe.expectMessage("0 stash")
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 eventHandler stashing")
probe.expectMessage("1 unstash")
probe.expectMessage("1 eventHandler")
probe.expectMessage("2 eventHandler normal")
probe.expectMessage("2 onCommand")
probe.expectMessage("2 eventHandler")
probe.expectMessage("3 eventHandler evt")
probe.expectMessage("3 thenRun")
probe.expectMessage("3 onCommand")
probe.expectMessage("3 eventHandler")
probe.expectMessage("4 eventHandler evt")
probe.expectMessage("4 thenRun")
probe.expectMessage("4 onCommand") // cmd3
probe.expectMessage("5 eventHandler evt1")
probe.expectMessage("6 eventHandler evt2")
probe.expectMessage("7 eventHandler evt3")
probe.expectMessage("7 thenRun")
}
// reproducer for #27935
@ -112,11 +151,11 @@ class EventSourcedSequenceNumberSpec
ref ! "cmd"
probe.expectMessage("0 onCommand") // first command
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 eventHandler evt")
probe.expectMessage("1 thenRun")
probe.expectMessage("1 eventHandler") // snapshot
probe.expectMessage("2 eventHandler snapshot")
probe.expectMessage("2 onCommand") // second command
probe.expectMessage("2 eventHandler")
probe.expectMessage("3 eventHandler evt")
probe.expectMessage("3 thenRun")
}
}