Access latestSeqNr in typed persistence (#26261)

This commit is contained in:
Johan Andrén 2019-03-06 11:27:35 +01:00 committed by Patrik Nordwall
parent c186c1bde9
commit 7900240eb3
9 changed files with 225 additions and 50 deletions

View file

@ -40,6 +40,7 @@ import akka.util.OptionVal
import Behavior._
protected var behavior: Behavior[T] = _initialBehavior
final def currentBehavior: Behavior[T] = behavior
private var _ctx: ActorContextAdapter[T] = _
def ctx: ActorContextAdapter[T] =
@ -211,7 +212,7 @@ import akka.util.OptionVal
}
protected def initializeContext(): Unit = {
_ctx = new ActorContextAdapter[T](context)
_ctx = new ActorContextAdapter[T](context, this)
}
}

View file

@ -17,10 +17,12 @@ import scala.concurrent.duration._
/**
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[TypedActorContext]].
*/
@InternalApi private[akka] final class ActorContextAdapter[T](val untypedContext: untyped.ActorContext) extends ActorContextImpl[T] {
@InternalApi private[akka] final class ActorContextAdapter[T](val untypedContext: untyped.ActorContext, adapter: ActorAdapter[T]) extends ActorContextImpl[T] {
import ActorRefAdapter.toUntyped
private[akka] def currentBehavior: Behavior[T] = adapter.currentBehavior
// lazily initialized
private var actorLogger: OptionVal[Logger] = OptionVal.None

View file

@ -36,7 +36,7 @@ import akka.annotation.InternalApi
*/
@DoNotInherit
@ApiMayChange
trait ActorContext[T] extends TypedActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T]
trait ActorContext[T] extends TypedActorContext[T] {
/**
* Get the `javadsl` of this `ActorContext`.

View file

@ -9,12 +9,15 @@ import scala.util.control.NoStackTrace
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.ReplayingEvents.FailureWhileUnstashing
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
/***
* INTERNAL API
@ -46,42 +49,44 @@ private[akka] object ReplayingEvents {
setup: BehaviorSetup[C, E, S],
state: ReplayingState[S]
): Behavior[InternalProtocol] =
new ReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state)
Behaviors.setup { ctx
// protect against event recovery stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = false)
new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state)
}
private final case class FailureWhileUnstashing(cause: Throwable) extends Exception(cause) with NoStackTrace
}
@InternalApi
private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C, E, S])
extends JournalInteractions[C, E, S] with StashManagement[C, E, S] {
private[akka] final class ReplayingEvents[C, E, S](
override val setup: BehaviorSetup[C, E, S],
var state: ReplayingState[S])
extends AbstractBehavior[InternalProtocol] with JournalInteractions[C, E, S] with StashManagement[C, E, S] with WithSeqNrAccessible {
import InternalProtocol._
import ReplayingEvents.ReplayingState
def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = {
Behaviors.setup { _
// protect against event recovery stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = false)
replayEvents(state.seqNr + 1L, state.toSeqNr)
stay(state)
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
msg match {
case JournalResponse(r) onJournalResponse(r)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case RecoveryTickEvent(snap) onRecoveryTick(snap)
case cmd: IncomingCommand[C] onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}
}
private def stay(state: ReplayingState[S]): Behavior[InternalProtocol] =
Behaviors.receiveMessage[InternalProtocol] {
case JournalResponse(r) onJournalResponse(state, r)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case RecoveryTickEvent(snap) onRecoveryTick(state, snap)
case cmd: IncomingCommand[C] onCommand(cmd, state)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop.orElse {
case (_, PoisonPill) stay(state.copy(receivedPoisonPill = true))
})
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
case PoisonPill
state = state.copy(receivedPoisonPill = true)
this
}
private def onJournalResponse(
state: ReplayingState[S],
response: JournalProtocol.Response): Behavior[InternalProtocol] = {
try {
response match {
@ -89,11 +94,11 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C
val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per])
try {
val newState = state.copy(
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
stay(newState)
this
} catch {
case NonFatal(ex) onRecoveryFailure(ex, repr.sequenceNr, Some(event))
}
@ -114,7 +119,7 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C
}
}
private def onCommand(cmd: InternalProtocol, state: ReplayingState[S]): Behavior[InternalProtocol] = {
private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing) setup.log.debug(
@ -126,10 +131,11 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C
}
}
protected def onRecoveryTick(state: ReplayingState[S], snapshot: Boolean): Behavior[InternalProtocol] =
protected def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] =
if (!snapshot) {
if (state.eventSeenInInterval) {
stay(state.copy(eventSeenInInterval = false))
state = state.copy(eventSeenInInterval = false)
this
} else {
val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]"
onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None)
@ -194,5 +200,6 @@ private[akka] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C
setup.cancelRecoveryTimer()
}
override def currentSequenceNumber: Long = state.seqNr
}

View file

@ -20,11 +20,13 @@ import akka.annotation.InternalApi
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
import akka.persistence.typed.Callback
import akka.persistence.typed.EventRejectedException
import akka.persistence.typed.SideEffect
import akka.persistence.typed.Stop
import akka.persistence.typed.UnstashAll
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
/**
@ -48,6 +50,10 @@ import akka.persistence.typed.scaladsl.Effect
@InternalApi
private[akka] object Running {
trait WithSeqNrAccessible {
def currentSequenceNumber: Long
}
final case class RunningState[State](
seqNr: Long,
state: State,
@ -66,14 +72,16 @@ private[akka] object Running {
}
}
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] =
new Running(setup.setMdc(MDC.RunningCmds)).handlingCommands(state)
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = {
val running = new Running(setup.setMdc(MDC.RunningCmds))
new running.HandlingCommands(state)
}
}
// ===============================================
/** INTERNAL API */
@InternalApi private[akka] class Running[C, E, S](
@InternalApi private[akka] final class Running[C, E, S](
override val setup: BehaviorSetup[C, E, S])
extends JournalInteractions[C, E, S] with StashManagement[C, E, S] {
import InternalProtocol._
@ -83,7 +91,21 @@ private[akka] object Running {
private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents)
private val storingSnapshotMdc = MDC.create(setup.persistenceId, MDC.StoringSnapshot)
def handlingCommands(state: RunningState[S]): Behavior[InternalProtocol] = {
final class HandlingCommands(state: RunningState[S]) extends AbstractBehavior[InternalProtocol] with WithSeqNrAccessible {
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) onCommand(state, c)
case SnapshotterResponse(r)
setup.log.warning("Unexpected SnapshotterResponse {}", r)
Behaviors.unhandled
case _ Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
case PoisonPill
if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped
else new HandlingCommands(state.copy(receivedPoisonPill = true))
}
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(state.state, cmd)
@ -170,18 +192,7 @@ private[akka] object Running {
setup.setMdc(runningCmdsMdc)
Behaviors.receiveMessage[InternalProtocol] {
case IncomingCommand(c: C @unchecked) onCommand(state, c)
case SnapshotterResponse(r)
setup.log.warning("Unexpected SnapshotterResponse {}", r)
Behaviors.unhandled
case _ Behaviors.unhandled
}.receiveSignal {
case (_, PoisonPill)
if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped
else handlingCommands(state.copy(receivedPoisonPill = true))
}
override def currentSequenceNumber: Long = state.seqNr
}
// ===============================================
@ -202,7 +213,7 @@ private[akka] object Running {
numberOfEvents: Int,
shouldSnapshotAfterPersist: Boolean,
var sideEffects: immutable.Seq[SideEffect[S]])
extends AbstractBehavior[InternalProtocol] {
extends AbstractBehavior[InternalProtocol] with WithSeqNrAccessible {
private var eventCounter = 0
@ -285,6 +296,7 @@ private[akka] object Running {
this
}
override def currentSequenceNumber: Long = state.seqNr
}
// ===============================================
@ -343,7 +355,7 @@ private[akka] object Running {
// --------------------------
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = handlingCommands(state)
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator
// if at least one effect results in a `stop`, we need to stop

View file

@ -9,11 +9,11 @@ import java.util.Optional
import scala.util.Failure
import scala.util.Success
import akka.actor.typed
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.SnapshotMetadata
@ -196,6 +196,13 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
behavior
}
/**
* The last sequence number that was persisted, can only be called from inside the handlers of an `EventSourcedBehavior`
*/
def lastSequenceNumber(ctx: ActorContext[_]): Long = {
scaladsl.EventSourcedBehavior.lastSequenceNumber(ctx.asScala)
}
}
/**

View file

@ -5,11 +5,16 @@
package akka.persistence.typed.scaladsl
import scala.util.Try
import scala.annotation.tailrec
import akka.Done
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.internal.LoggerClass
import akka.actor.typed.internal.InterceptorImpl
import akka.actor.typed.internal.adapter.ActorContextAdapter
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.DoNotInherit
import akka.persistence._
import akka.persistence.typed.EventAdapter
@ -86,6 +91,27 @@ object EventSourcedBehavior {
}
/**
* The last sequence number that was persisted, can only be called from inside the handlers of an `EventSourcedBehavior`
*/
def lastSequenceNumber(context: ActorContext[_]): Long = {
@tailrec
def extractConcreteBehavior(beh: Behavior[_]): Behavior[_] =
beh match {
case interceptor: InterceptorImpl[_, _] extractConcreteBehavior(interceptor.nestedBehavior)
case concrete concrete
}
context match {
case impl: ActorContextAdapter[_]
extractConcreteBehavior(impl.currentBehavior) match {
case w: Running.WithSeqNrAccessible w.currentSequenceNumber
case s throw new IllegalStateException(s"Cannot extract the lastSequenceNumber in state ${s.getClass.getName}")
}
case c throw new IllegalStateException(s"Cannot extract the lastSequenceNumber from context ${c.getClass.getName}")
}
}
}
/**

View file

@ -24,7 +24,6 @@ import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.testkit.ErrorFilter;
import akka.testkit.javadsl.EventFilter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -654,4 +653,67 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
probe.expectTerminated(c);
}
class SequenceNumberBehavior extends EventSourcedBehavior<String, String, String> {
private final ActorRef<String> probe;
private final ActorContext<String> context;
public SequenceNumberBehavior(
PersistenceId persistenceId, ActorRef<String> probe, ActorContext<String> context) {
super(persistenceId);
this.probe = probe;
this.context = context;
}
@Override
public String emptyState() {
return "";
}
@Override
public CommandHandler<String, String, String> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onAnyCommand(
(state, cmd) -> {
probe.tell(lastSequenceNumber(context) + " onCommand");
return Effect()
.persist(cmd)
.thenRun((newState) -> probe.tell(lastSequenceNumber(context) + " thenRun"));
});
}
@Override
public EventHandler<String, String> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onAnyEvent(
(state, event) -> {
probe.tell(lastSequenceNumber(context) + " applyEvent");
return state + event;
});
}
@Override
public void onRecoveryCompleted(String s) {
probe.tell(lastSequenceNumber(context) + " onRecoveryCompleted");
}
}
@Test
public void accessLastSequenceNumber() {
TestProbe<String> probe = testKit.createTestProbe(String.class);
ActorRef<String> ref =
testKit.spawn(
Behaviors.<String>setup(
context ->
new SequenceNumberBehavior(
new PersistenceId("seqnr1"), probe.getRef(), context)));
probe.expectMessage("0 onRecoveryCompleted");
ref.tell("cmd");
probe.expectMessage("0 onCommand");
probe.expectMessage("0 applyEvent");
probe.expectMessage("1 thenRun");
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object EventSourcedSequenceNumberSpec {
private val conf = ConfigFactory.parseString(
s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
}
class EventSourcedSequenceNumberSpec extends ScalaTestWithActorTestKit(EventSourcedSequenceNumberSpec.conf) with WordSpecLike {
private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup(ctx
EventSourcedBehavior[String, String, String](
pid,
"",
{ (_, command)
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onCommand")
Effect.persist(command).thenRun(_
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun")
)
},
{ (state, evt)
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " eventHandler")
state + evt
}
).onRecoveryCompleted(_
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete")
)
)
"The sequence number" must {
"be accessible in the handlers" in {
val probe = TestProbe[String]()
val ref = spawn(behavior(PersistenceId("ess-1"), probe.ref))
probe.expectMessage("0 onRecoveryComplete")
ref ! "cmd1"
probe.expectMessage("0 onCommand")
probe.expectMessage("0 eventHandler")
probe.expectMessage("1 thenRun")
}
}
}