diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 4defca4acd..d1ec21aa84 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -40,6 +40,7 @@ import akka.util.OptionVal import Behavior._ protected var behavior: Behavior[T] = _initialBehavior + final def currentBehavior: Behavior[T] = behavior private var _ctx: ActorContextAdapter[T] = _ def ctx: ActorContextAdapter[T] = @@ -211,7 +212,7 @@ import akka.util.OptionVal } protected def initializeContext(): Unit = { - _ctx = new ActorContextAdapter[T](context) + _ctx = new ActorContextAdapter[T](context, this) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index 3c6e4cad6e..4626f7f852 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -17,10 +17,12 @@ import scala.concurrent.duration._ /** * INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[TypedActorContext]]. */ -@InternalApi private[akka] final class ActorContextAdapter[T](val untypedContext: untyped.ActorContext) extends ActorContextImpl[T] { +@InternalApi private[akka] final class ActorContextAdapter[T](val untypedContext: untyped.ActorContext, adapter: ActorAdapter[T]) extends ActorContextImpl[T] { import ActorRefAdapter.toUntyped + private[akka] def currentBehavior: Behavior[T] = adapter.currentBehavior + // lazily initialized private var actorLogger: OptionVal[Logger] = OptionVal.None diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 16b93f2b3f..12f4447105 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -36,7 +36,7 @@ import akka.annotation.InternalApi */ @DoNotInherit @ApiMayChange -trait ActorContext[T] extends TypedActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ +trait ActorContext[T] extends TypedActorContext[T] { /** * Get the `javadsl` of this `ActorContext`. diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 5a965c6105..621e5a4255 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -9,12 +9,15 @@ import scala.util.control.NoStackTrace import akka.actor.typed.Behavior import akka.actor.typed.internal.PoisonPill -import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.Signal +import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors } import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.typed.internal.ReplayingEvents.FailureWhileUnstashing +import akka.persistence.typed.internal.ReplayingEvents.ReplayingState +import akka.persistence.typed.internal.Running.WithSeqNrAccessible /*** * INTERNAL API @@ -46,42 +49,44 @@ private[akka] object ReplayingEvents { setup: BehaviorSetup[C, E, S], state: ReplayingState[S] ): Behavior[InternalProtocol] = - new ReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state) + Behaviors.setup { ctx ⇒ + // protect against event recovery stalling forever because of journal overloaded and such + setup.startRecoveryTimer(snapshot = false) + new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state) + } private final case class FailureWhileUnstashing(cause: Throwable) extends Exception(cause) with NoStackTrace } @InternalApi -private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C, E, S]) - extends JournalInteractions[C, E, S] with StashManagement[C, E, S] { +private[akka] final class ReplayingEvents[C, E, S]( + override val setup: BehaviorSetup[C, E, S], + var state: ReplayingState[S]) + extends AbstractBehavior[InternalProtocol] with JournalInteractions[C, E, S] with StashManagement[C, E, S] with WithSeqNrAccessible { + import InternalProtocol._ import ReplayingEvents.ReplayingState - def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = { - Behaviors.setup { _ ⇒ - // protect against event recovery stalling forever because of journal overloaded and such - setup.startRecoveryTimer(snapshot = false) + replayEvents(state.seqNr + 1L, state.toSeqNr) - replayEvents(state.seqNr + 1L, state.toSeqNr) - - stay(state) + override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { + msg match { + case JournalResponse(r) ⇒ onJournalResponse(r) + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case RecoveryTickEvent(snap) ⇒ onRecoveryTick(snap) + case cmd: IncomingCommand[C] ⇒ onCommand(cmd) + case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit } } - private def stay(state: ReplayingState[S]): Behavior[InternalProtocol] = - Behaviors.receiveMessage[InternalProtocol] { - case JournalResponse(r) ⇒ onJournalResponse(state, r) - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case RecoveryTickEvent(snap) ⇒ onRecoveryTick(state, snap) - case cmd: IncomingCommand[C] ⇒ onCommand(cmd, state) - case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit - }.receiveSignal(returnPermitOnStop.orElse { - case (_, PoisonPill) ⇒ stay(state.copy(receivedPoisonPill = true)) - }) + override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { + case PoisonPill ⇒ + state = state.copy(receivedPoisonPill = true) + this + } private def onJournalResponse( - state: ReplayingState[S], response: JournalProtocol.Response): Behavior[InternalProtocol] = { try { response match { @@ -89,11 +94,11 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per]) try { - val newState = state.copy( + state = state.copy( seqNr = repr.sequenceNr, state = setup.eventHandler(state.state, event), eventSeenInInterval = true) - stay(newState) + this } catch { case NonFatal(ex) ⇒ onRecoveryFailure(ex, repr.sequenceNr, Some(event)) } @@ -114,7 +119,7 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C } } - private def onCommand(cmd: InternalProtocol, state: ReplayingState[S]): Behavior[InternalProtocol] = { + private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands if (state.receivedPoisonPill) { if (setup.settings.logOnStashing) setup.log.debug( @@ -126,10 +131,11 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C } } - protected def onRecoveryTick(state: ReplayingState[S], snapshot: Boolean): Behavior[InternalProtocol] = + protected def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] = if (!snapshot) { if (state.eventSeenInInterval) { - stay(state.copy(eventSeenInInterval = false)) + state = state.copy(eventSeenInInterval = false) + this } else { val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) @@ -194,5 +200,6 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C setup.cancelRecoveryTimer() } + override def currentSequenceNumber: Long = state.seqNr } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 2ae8d90f31..e53d5863e1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -20,11 +20,13 @@ import akka.annotation.InternalApi import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged + import akka.persistence.typed.Callback import akka.persistence.typed.EventRejectedException import akka.persistence.typed.SideEffect import akka.persistence.typed.Stop import akka.persistence.typed.UnstashAll +import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.scaladsl.Effect /** @@ -48,6 +50,10 @@ import akka.persistence.typed.scaladsl.Effect @InternalApi private[akka] object Running { + trait WithSeqNrAccessible { + def currentSequenceNumber: Long + } + final case class RunningState[State]( seqNr: Long, state: State, @@ -66,14 +72,16 @@ private[akka] object Running { } } - def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = - new Running(setup.setMdc(MDC.RunningCmds)).handlingCommands(state) + def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = { + val running = new Running(setup.setMdc(MDC.RunningCmds)) + new running.HandlingCommands(state) + } } // =============================================== /** INTERNAL API */ -@InternalApi private[akka] class Running[C, E, S]( +@InternalApi private[akka] final class Running[C, E, S]( override val setup: BehaviorSetup[C, E, S]) extends JournalInteractions[C, E, S] with StashManagement[C, E, S] { import InternalProtocol._ @@ -83,7 +91,21 @@ private[akka] object Running { private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents) private val storingSnapshotMdc = MDC.create(setup.persistenceId, MDC.StoringSnapshot) - def handlingCommands(state: RunningState[S]): Behavior[InternalProtocol] = { + final class HandlingCommands(state: RunningState[S]) extends AbstractBehavior[InternalProtocol] with WithSeqNrAccessible { + + def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { + case IncomingCommand(c: C @unchecked) ⇒ onCommand(state, c) + case SnapshotterResponse(r) ⇒ + setup.log.warning("Unexpected SnapshotterResponse {}", r) + Behaviors.unhandled + case _ ⇒ Behaviors.unhandled + } + + override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { + case PoisonPill ⇒ + if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped + else new HandlingCommands(state.copy(receivedPoisonPill = true)) + } def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { val effect = setup.commandHandler(state.state, cmd) @@ -170,18 +192,7 @@ private[akka] object Running { setup.setMdc(runningCmdsMdc) - Behaviors.receiveMessage[InternalProtocol] { - case IncomingCommand(c: C @unchecked) ⇒ onCommand(state, c) - case SnapshotterResponse(r) ⇒ - setup.log.warning("Unexpected SnapshotterResponse {}", r) - Behaviors.unhandled - case _ ⇒ Behaviors.unhandled - }.receiveSignal { - case (_, PoisonPill) ⇒ - if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped - else handlingCommands(state.copy(receivedPoisonPill = true)) - } - + override def currentSequenceNumber: Long = state.seqNr } // =============================================== @@ -202,7 +213,7 @@ private[akka] object Running { numberOfEvents: Int, shouldSnapshotAfterPersist: Boolean, var sideEffects: immutable.Seq[SideEffect[S]]) - extends AbstractBehavior[InternalProtocol] { + extends AbstractBehavior[InternalProtocol] with WithSeqNrAccessible { private var eventCounter = 0 @@ -285,6 +296,7 @@ private[akka] object Running { this } + override def currentSequenceNumber: Long = state.seqNr } // =============================================== @@ -343,7 +355,7 @@ private[akka] object Running { // -------------------------- def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = { - var behavior: Behavior[InternalProtocol] = handlingCommands(state) + var behavior: Behavior[InternalProtocol] = new HandlingCommands(state) val it = effects.iterator // if at least one effect results in a `stop`, we need to stop diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 2342e796da..c5f4f96ef8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -9,11 +9,11 @@ import java.util.Optional import scala.util.Failure import scala.util.Success - import akka.actor.typed import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior +import akka.actor.typed.javadsl.ActorContext import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.persistence.SnapshotMetadata @@ -196,6 +196,13 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] behavior } + /** + * The last sequence number that was persisted, can only be called from inside the handlers of an `EventSourcedBehavior` + */ + def lastSequenceNumber(ctx: ActorContext[_]): Long = { + scaladsl.EventSourcedBehavior.lastSequenceNumber(ctx.asScala) + } + } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 6416c2d91c..1186e0bfd7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -5,11 +5,16 @@ package akka.persistence.typed.scaladsl import scala.util.Try +import scala.annotation.tailrec import akka.Done import akka.actor.typed.BackoffSupervisorStrategy +import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.internal.LoggerClass +import akka.actor.typed.internal.InterceptorImpl +import akka.actor.typed.internal.adapter.ActorContextAdapter +import akka.actor.typed.scaladsl.ActorContext import akka.annotation.DoNotInherit import akka.persistence._ import akka.persistence.typed.EventAdapter @@ -86,6 +91,27 @@ object EventSourcedBehavior { } + /** + * The last sequence number that was persisted, can only be called from inside the handlers of an `EventSourcedBehavior` + */ + def lastSequenceNumber(context: ActorContext[_]): Long = { + @tailrec + def extractConcreteBehavior(beh: Behavior[_]): Behavior[_] = + beh match { + case interceptor: InterceptorImpl[_, _] ⇒ extractConcreteBehavior(interceptor.nestedBehavior) + case concrete ⇒ concrete + } + + context match { + case impl: ActorContextAdapter[_] ⇒ + extractConcreteBehavior(impl.currentBehavior) match { + case w: Running.WithSeqNrAccessible ⇒ w.currentSequenceNumber + case s ⇒ throw new IllegalStateException(s"Cannot extract the lastSequenceNumber in state ${s.getClass.getName}") + } + case c ⇒ throw new IllegalStateException(s"Cannot extract the lastSequenceNumber from context ${c.getClass.getName}") + } + } + } /** diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 7397874a1a..b56d68d64b 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -24,7 +24,6 @@ import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; -import akka.testkit.ErrorFilter; import akka.testkit.javadsl.EventFilter; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -654,4 +653,67 @@ public class PersistentActorJavaDslTest extends JUnitSuite { probe.expectTerminated(c); } + + class SequenceNumberBehavior extends EventSourcedBehavior { + private final ActorRef probe; + private final ActorContext context; + + public SequenceNumberBehavior( + PersistenceId persistenceId, ActorRef probe, ActorContext context) { + super(persistenceId); + this.probe = probe; + this.context = context; + } + + @Override + public String emptyState() { + return ""; + } + + @Override + public CommandHandler commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onAnyCommand( + (state, cmd) -> { + probe.tell(lastSequenceNumber(context) + " onCommand"); + return Effect() + .persist(cmd) + .thenRun((newState) -> probe.tell(lastSequenceNumber(context) + " thenRun")); + }); + } + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onAnyEvent( + (state, event) -> { + probe.tell(lastSequenceNumber(context) + " applyEvent"); + return state + event; + }); + } + + @Override + public void onRecoveryCompleted(String s) { + probe.tell(lastSequenceNumber(context) + " onRecoveryCompleted"); + } + } + + @Test + public void accessLastSequenceNumber() { + TestProbe probe = testKit.createTestProbe(String.class); + ActorRef ref = + testKit.spawn( + Behaviors.setup( + context -> + new SequenceNumberBehavior( + new PersistenceId("seqnr1"), probe.getRef(), context))); + + probe.expectMessage("0 onRecoveryCompleted"); + ref.tell("cmd"); + probe.expectMessage("0 onCommand"); + probe.expectMessage("0 applyEvent"); + probe.expectMessage("1 thenRun"); + } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala new file mode 100644 index 0000000000..50fb5afac3 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.{ ActorRef, Behavior } +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object EventSourcedSequenceNumberSpec { + + private val conf = ConfigFactory.parseString( + s""" + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) + +} + +class EventSourcedSequenceNumberSpec extends ScalaTestWithActorTestKit(EventSourcedSequenceNumberSpec.conf) with WordSpecLike { + + private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup(ctx ⇒ + EventSourcedBehavior[String, String, String]( + pid, + "", + { (_, command) ⇒ + probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onCommand") + Effect.persist(command).thenRun(_ ⇒ + probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun") + ) + }, + { (state, evt) ⇒ + probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " eventHandler") + state + evt + } + ).onRecoveryCompleted(_ ⇒ + probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete") + ) + ) + + "The sequence number" must { + + "be accessible in the handlers" in { + val probe = TestProbe[String]() + val ref = spawn(behavior(PersistenceId("ess-1"), probe.ref)) + probe.expectMessage("0 onRecoveryComplete") + + ref ! "cmd1" + probe.expectMessage("0 onCommand") + probe.expectMessage("0 eventHandler") + probe.expectMessage("1 thenRun") + } + } +}