From 90aa5be45e758c5dc3669df63fcab6b02b490dd3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 May 2020 17:17:30 +0200 Subject: [PATCH] fix wrong lastSequenceNumber, #28976 (#29128) * off by one when accessed from event handler * off by one from event handler during replay * wrong when unstashing * added more tests, also for persist of several events --- .../typed/internal/ExternalInteractions.scala | 2 +- .../typed/internal/ReplayingEvents.scala | 9 ++- .../persistence/typed/internal/Running.scala | 23 +++++-- .../javadsl/PersistentActorJavaDslTest.java | 2 +- .../EventSourcedSequenceNumberSpec.scala | 63 +++++++++++++++---- 5 files changed, 74 insertions(+), 25 deletions(-) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index abe744434d..4b8298f366 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -99,7 +99,7 @@ private[akka] trait JournalInteractions[C, E, S] { @unused repr: immutable.Seq[PersistentRepr]): Unit = () protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { - setup.log.debug2("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) + setup.log.debug2("Replaying events: from: {}, to: {}", fromSeqNr, toSeqNr) setup.journal.tell( ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId.id, setup.selfClassic), setup.selfClassic) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index da06fb1294..c61da7ce27 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -116,10 +116,8 @@ private[akka] final class ReplayingEvents[C, E, S]( def handleEvent(event: E): Unit = { eventForErrorReporting = OptionVal.Some(event) - state = state.copy( - seqNr = repr.sequenceNr, - state = setup.eventHandler(state.state, event), - eventSeenInInterval = true) + state = state.copy(seqNr = repr.sequenceNr) + state = state.copy(state = setup.eventHandler(state.state, event), eventSeenInInterval = true) } eventSeq match { @@ -247,5 +245,6 @@ private[akka] final class ReplayingEvents[C, E, S]( setup.cancelRecoveryTimer() } - override def currentSequenceNumber: Long = state.seqNr + override def currentSequenceNumber: Long = + state.seqNr } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index a9174e5440..c1f4173801 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -97,10 +97,15 @@ private[akka] object Running { import InternalProtocol._ import Running.RunningState + // Needed for WithSeqNrAccessible, when unstashing + private var _currentSequenceNumber = 0L + final class HandlingCommands(state: RunningState[S]) extends AbstractBehavior[InternalProtocol](setup.context) with WithSeqNrAccessible { + _currentSequenceNumber = state.seqNr + def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { case IncomingCommand(c: C @unchecked) => onCommand(state, c) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) @@ -150,6 +155,7 @@ private[akka] object Running { // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event + _currentSequenceNumber = state.seqNr + 1 val newState = state.applyEvent(setup, event) val eventToPersist = adaptEvent(event) @@ -166,12 +172,13 @@ private[akka] object Running { // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event - var seqNr = state.seqNr + _currentSequenceNumber = state.seqNr val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) { case ((currentState, snapshot), event) => - seqNr += 1 + _currentSequenceNumber += 1 val shouldSnapshot = - if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot + if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber) + else snapshot (currentState.applyEvent(setup, event), shouldSnapshot) } @@ -212,7 +219,8 @@ private[akka] object Running { setup.setMdcPhase(PersistenceMdc.RunningCmds) - override def currentSequenceNumber: Long = state.seqNr + override def currentSequenceNumber: Long = + _currentSequenceNumber } // =============================================== @@ -335,7 +343,9 @@ private[akka] object Running { else Behaviors.unhandled } - override def currentSequenceNumber: Long = visibleState.seqNr + override def currentSequenceNumber: Long = { + _currentSequenceNumber + } } // =============================================== @@ -430,7 +440,8 @@ private[akka] object Running { Behaviors.unhandled } - override def currentSequenceNumber: Long = state.seqNr + override def currentSequenceNumber: Long = + _currentSequenceNumber } // -------------------------- diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 45a57a3df4..57e28b85ff 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -773,7 +773,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { probe.expectMessage("0 onRecoveryCompleted"); ref.tell("cmd"); probe.expectMessage("0 onCommand"); - probe.expectMessage("0 applyEvent"); + probe.expectMessage("1 applyEvent"); probe.expectMessage("1 thenRun"); } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala index 3c3947ab30..222f25fe97 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala @@ -49,7 +49,12 @@ class EventSourcedSequenceNumberSpec case "cmd" => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand" Effect - .persist(command) + .persist("evt") + .thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun") + case "cmd3" => + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} onCommand" + Effect + .persist("evt1", "evt2", "evt3") .thenRun(_ => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} thenRun") case "stash" => probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} stash" @@ -59,7 +64,7 @@ class EventSourcedSequenceNumberSpec } } }, { (_, evt) => - probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler" + probe ! s"${EventSourcedBehavior.lastSequenceNumber(ctx)} eventHandler $evt" evt }).snapshotWhen((_, event, _) => event == "snapshot").receiveSignal { case (_, RecoveryCompleted) => @@ -75,11 +80,40 @@ class EventSourcedSequenceNumberSpec ref ! "cmd" probe.expectMessage("0 onCommand") - probe.expectMessage("0 eventHandler") + probe.expectMessage("1 eventHandler evt") probe.expectMessage("1 thenRun") + + ref ! "cmd" + probe.expectMessage("1 onCommand") + probe.expectMessage("2 eventHandler evt") + probe.expectMessage("2 thenRun") + + ref ! "cmd3" + probe.expectMessage("2 onCommand") + probe.expectMessage("3 eventHandler evt1") + probe.expectMessage("4 eventHandler evt2") + probe.expectMessage("5 eventHandler evt3") + probe.expectMessage("5 thenRun") + + testKit.stop(ref) + probe.expectTerminated(ref) + + // and during replay + val ref2 = spawn(behavior(PersistenceId.ofUniqueId("ess-1"), probe.ref)) + probe.expectMessage("1 eventHandler evt") + probe.expectMessage("2 eventHandler evt") + probe.expectMessage("3 eventHandler evt1") + probe.expectMessage("4 eventHandler evt2") + probe.expectMessage("5 eventHandler evt3") + probe.expectMessage("5 onRecoveryComplete") + + ref2 ! "cmd" + probe.expectMessage("5 onCommand") + probe.expectMessage("6 eventHandler evt") + probe.expectMessage("6 thenRun") } - "be available while replaying stash" in { + "be available while unstashing" in { val probe = TestProbe[String]() val ref = spawn(behavior(PersistenceId.ofUniqueId("ess-2"), probe.ref)) probe.expectMessage("0 onRecoveryComplete") @@ -87,18 +121,23 @@ class EventSourcedSequenceNumberSpec ref ! "stash" ref ! "cmd" ref ! "cmd" - ref ! "cmd" + ref ! "cmd3" ref ! "unstash" probe.expectMessage("0 stash") - probe.expectMessage("0 eventHandler") + probe.expectMessage("1 eventHandler stashing") probe.expectMessage("1 unstash") - probe.expectMessage("1 eventHandler") + probe.expectMessage("2 eventHandler normal") probe.expectMessage("2 onCommand") - probe.expectMessage("2 eventHandler") + probe.expectMessage("3 eventHandler evt") probe.expectMessage("3 thenRun") probe.expectMessage("3 onCommand") - probe.expectMessage("3 eventHandler") + probe.expectMessage("4 eventHandler evt") probe.expectMessage("4 thenRun") + probe.expectMessage("4 onCommand") // cmd3 + probe.expectMessage("5 eventHandler evt1") + probe.expectMessage("6 eventHandler evt2") + probe.expectMessage("7 eventHandler evt3") + probe.expectMessage("7 thenRun") } // reproducer for #27935 @@ -112,11 +151,11 @@ class EventSourcedSequenceNumberSpec ref ! "cmd" probe.expectMessage("0 onCommand") // first command - probe.expectMessage("0 eventHandler") + probe.expectMessage("1 eventHandler evt") probe.expectMessage("1 thenRun") - probe.expectMessage("1 eventHandler") // snapshot + probe.expectMessage("2 eventHandler snapshot") probe.expectMessage("2 onCommand") // second command - probe.expectMessage("2 eventHandler") + probe.expectMessage("3 eventHandler evt") probe.expectMessage("3 thenRun") } }