WIP towards immutable style

compiles, does not work..
This commit is contained in:
Konrad Malawski 2018-03-02 14:17:38 +09:00 committed by Konrad `ktoso` Malawski
parent 40abd2b096
commit ffb4419c4e
18 changed files with 1218 additions and 821 deletions

View file

@ -481,8 +481,6 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable {
class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon {
import SBehaviors.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Command]("widenedListener")
super.behavior(monitor)._1.widen[Command] { case c inbox.ref ! c; c } inbox

View file

@ -4,7 +4,6 @@
package akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors.BehaviorDecorators
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.scaladsl._

View file

@ -4,12 +4,14 @@
package akka.actor.typed
import akka.actor.InvalidMessageException
import akka.actor.typed.internal.BehaviorImpl
import scala.annotation.tailrec
import akka.util.LineNumbers
import akka.util.{ ConstantFun, LineNumbers, OptionVal }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.typed.scaladsl.{ ActorContext SAC }
import akka.util.OptionVal
import scala.reflect.ClassTag
/**
* The behavior of an actor defines how it reacts to the messages that it
@ -33,7 +35,7 @@ import akka.util.OptionVal
*/
@InternalApi
@DoNotInherit
sealed abstract class Behavior[T] {
sealed abstract class Behavior[T] { behavior
/**
* Narrow the type of this Behavior, which is always a safe operation. This
* method is necessary to implement the contravariant nature of Behavior
@ -85,6 +87,26 @@ abstract class ExtensibleBehavior[T] extends Behavior[T] {
object Behavior {
final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* Example:
* {{{
* immutable[String] { (ctx, msg) => println(msg); same }.widen[Number] {
* case b: BigDecimal => s"BigDecimal($b)"
* case i: BigInteger => s"BigInteger($i)"
* // drop all other kinds of Number
* }
* }}}
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
BehaviorImpl.widened(behavior, matcher)
}
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to
@ -175,7 +197,7 @@ object Behavior {
}
/**
* INTERNAL API.
* INTERNAL API
* Not placed in internal.BehaviorImpl because Behavior is sealed.
*/
@InternalApi
@ -185,7 +207,7 @@ object Behavior {
/** INTERNAL API */
@InternalApi
private[akka] object DeferredBehavior {
def apply[T](factory: SAC[T] Behavior[T]) =
def apply[T](factory: SAC[T] Behavior[T]): Behavior[T] =
new DeferredBehavior[T] {
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala)
override def toString: String = s"Deferred(${LineNumbers(factory)})"
@ -193,14 +215,14 @@ object Behavior {
}
/**
* INTERNAL API.
* INTERNAL API
*/
private[akka] object SameBehavior extends Behavior[Nothing] {
override def toString = "Same"
}
/**
* INTERNAL API.
* INTERNAL API
*/
private[akka] object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None)

View file

@ -320,7 +320,7 @@ object Behaviors {
/**
* Provide a MDC ("Mapped Diagnostic Context") for logging from the actor.
*
* @param mdcForMessage Is invoked before each message to setup MDC which is then attachd to each logging statement
* @param mdcForMessage Is invoked before each message to setup MDC which is then attached to each logging statement
* done for that message through the [[ActorContext.getLog]]. After the message has been processed
* the MDC is cleared.
* @param behavior The behavior that this should be applied to.

View file

@ -19,26 +19,6 @@ object Behaviors {
private val _unitFunction = (_: ActorContext[Any], _: Any) ()
private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* Example:
* {{{
* immutable[String] { (ctx, msg) => println(msg); same }.widen[Number] {
* case b: BigDecimal => s"BigDecimal($b)"
* case i: BigInteger => s"BigInteger($i)"
* // drop all other kinds of Number
* }
* }}}
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
BehaviorImpl.widened(behavior, matcher)
}
/**
* `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
* the actor is started, as opposed to [[Behaviors.immutable]] that creates the behavior instance

View file

@ -444,12 +444,13 @@ object Logging {
final val DebugLevel = LogLevel(4)
/**
* Internal Akka use only
* INTERNAL API
*
* Don't include the OffLevel in the AllLogLevels since we should never subscribe
* to some kind of OffEvent.
*/
private final val OffLevel = LogLevel(Int.MinValue)
@InternalApi
private[akka] final val OffLevel = LogLevel(Int.MinValue)
/**
* Returns the LogLevel associated with the given string,

View file

@ -28,6 +28,7 @@ import akka.japi.{ Pair ⇒ JPair }
def scalaAnyToNone[A, B]: A Option[B] = none
def scalaAnyTwoToNone[A, B, C]: (A, B) Option[C] = two2none
def scalaAnyTwoToUnit[A, B]: (A, B) Unit = two2unit
def scalaAnyTwoToTrue[A, B]: (A, B) Boolean = two2true
def scalaAnyThreeToFalse[A, B, C]: (A, B, C) Boolean = three2false
def javaAnyToNone[A, B]: A Option[B] = none
def nullFun[T] = _nullFun.asInstanceOf[Any T]
@ -46,6 +47,8 @@ import akka.japi.{ Pair ⇒ JPair }
private val two2none = (_: Any, _: Any) None
private val two2true = (_: Any, _: Any) true
private val two2unit = (_: Any, _: Any) ()
private val three2false = (_: Any, _: Any, _: Any) false

View file

@ -1,7 +1,11 @@
akka.persistence.typed {
# default stash buffer size for incoming messages to persistent actors
stash-buffer-size = 1024
# Persistent actors stash while recovering or persisting events,
# this setting configures the default capacity of this stash.
stash-capacity = 2048
# If negative (or zero) then an unbounded stash is used (default)
# If positive then a bounded stash is used and the capacity is set using
# the property
# enables automatic logging of messages stashed automatically by an PersistentBehavior,
# this may happen while it receives commands while it is recovering events or while it is persisting events

View file

@ -24,34 +24,36 @@ private[akka] object EventsourcedBehavior {
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
private[akka] val instanceIdCounter = new AtomicInteger(1)
@InternalApi private[akka] object WriterIdentity {
object WriterIdentity {
def newIdentity(): WriterIdentity = {
val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement()
val writerUuid: String = UUID.randomUUID.toString
WriterIdentity(instanceId, writerUuid)
}
}
private[akka] final case class WriterIdentity(instanceId: Int, writerUuid: String)
final case class WriterIdentity(instanceId: Int, writerUuid: String)
/** Protocol used internally by the eventsourced behaviors, never exposed to user-land */
private[akka] sealed trait EventsourcedProtocol
private[akka] case object RecoveryPermitGranted extends EventsourcedProtocol
private[akka] final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends EventsourcedProtocol
private[akka] final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends EventsourcedProtocol
private[akka] final case class RecoveryTickEvent(snapshot: Boolean) extends EventsourcedProtocol
private[akka] final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends EventsourcedProtocol
implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] {
override def genString(b: EventsourcedBehavior[_, _, _]): String = {
val behaviorShortName = b match {
case _: EventsourcedRunning[_, _, _] "running"
case _: EventsourcedRecoveringEvents[_, _, _] "recover-events"
case _: EventsourcedRecoveringSnapshot[_, _, _] "recover-snap"
case _: EventsourcedRequestingRecoveryPermit[_, _, _] "awaiting-permit"
}
s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]"
}
sealed trait InternalProtocol
object InternalProtocol {
case object RecoveryPermitGranted extends InternalProtocol
final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol
final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol
final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol
final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol
}
// implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] {
// override def genString(b: EventsourcedBehavior[_, _, _]): String = {
// val behaviorShortName = b match {
// case _: EventsourcedRunning[_, _, _] "running"
// case _: EventsourcedRecoveringEvents[_, _, _] "recover-events"
// case _: EventsourcedRecoveringSnapshot[_, _, _] "recover-snap"
// case _: EventsourcedRequestingRecoveryPermit[_, _, _] "awaiting-permit"
// }
// s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]"
// }
// }
}
@ -61,8 +63,7 @@ private[akka] trait EventsourcedBehavior[Command, Event, State] {
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
protected def context: ActorContext[Any]
protected def timers: TimerScheduler[Any]
// protected def timers: TimerScheduler[Any]
type C = Command
type AC = ActorContext[C]
@ -72,30 +73,30 @@ private[akka] trait EventsourcedBehavior[Command, Event, State] {
// used for signaling intent in type signatures
type SeqNr = Long
def persistenceId: String = setup.persistenceId
protected def setup: EventsourcedSetup[Command, Event, State]
protected def initialState: State = setup.initialState
protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler
protected def eventHandler: (State, Event) State = setup.eventHandler
protected def snapshotWhen: (State, Event, SeqNr) Boolean = setup.snapshotWhen
protected def tagger: Event Set[String] = setup.tagger
protected final def journalPluginId: String = setup.journalPluginId
protected final def snapshotPluginId: String = setup.snapshotPluginId
// def persistenceId: String = setup.persistenceId
//
// protected def setup: EventsourcedSetup[Command, Event, State]
// protected def initialState: State = setup.initialState
// protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler
// protected def eventHandler: (State, Event) State = setup.eventHandler
// protected def snapshotWhen: (State, Event, SeqNr) Boolean = setup.snapshotWhen
// protected def tagger: Event Set[String] = setup.tagger
//
// protected final def journalPluginId: String = setup.journalPluginId
// protected final def snapshotPluginId: String = setup.snapshotPluginId
// ------ common -------
protected lazy val extension = Persistence(context.system.toUntyped)
protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId)
protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId)
protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped
protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
case res: JournalProtocol.Response JournalResponse(res)
case RecoveryPermitter.RecoveryPermitGranted RecoveryPermitGranted
case res: SnapshotProtocol.Response SnapshotterResponse(res)
case cmd: Command @unchecked cmd // if it was wrong, we'll realise when trying to onMessage the cmd
}.toUntyped
// protected lazy val extension = Persistence(context.system.toUntyped)
// protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId)
// protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId)
//
// protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped
// protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
// case res: JournalProtocol.Response JournalResponse(res)
// case RecoveryPermitter.RecoveryPermitGranted RecoveryPermitGranted
// case res: SnapshotProtocol.Response SnapshotterResponse(res)
// case cmd: Command @unchecked cmd // if it was wrong, we'll realise when trying to onMessage the cmd
// }.toUntyped
}

View file

@ -0,0 +1,112 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.InternalApi
import akka.persistence.Eventsourced.StashingHandlerInvocation
import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence._
import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import scala.collection.immutable
@InternalApi
private[akka] trait EventsourcedJournalInteractions {
import akka.actor.typed.scaladsl.adapter._
// ---------- journal interactions ---------
protected def returnRecoveryPermitOnlyOnFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable): Unit = {
import setup.context
setup.log.debug("Returning recovery permit, on failure because: " + cause.getMessage)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
val permitter = setup.persistence.recoveryPermitter
permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
}
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
protected def internalPersist[C, E, S](
setup: EventsourcedSetup[C, E, S],
state: EventsourcedRunning.EventsourcedState[S],
event: EventOrTagged,
sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[InternalProtocol] = {
// pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
val pendingInvocations = StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit]) :: Nil
val newState = state.nextSequenceNr()
val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped
)
val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted)
EventsourcedRunning.PersistingEvents[C, E, S](setup, state, pendingInvocations, sideEffects)
}
protected def internalPersistAll[C, E, S](
setup: EventsourcedSetup[C, E, S],
events: immutable.Seq[EventOrTagged],
state: EventsourcedRunning.EventsourcedState[S],
sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[InternalProtocol] = {
if (events.nonEmpty) {
val pendingInvocations = events map { event
// pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
}
val newState = state.nextSequenceNr()
val senderNotKnownBecauseAkkaTyped = null
val write = AtomicWrite(events.map(event PersistentRepr(
event,
persistenceId = setup.persistenceId,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped)
))
setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted)
EventsourcedRunning.PersistingEvents(setup, state, pendingInvocations, sideEffects)
} else Behaviors.same
}
protected def replayEvents[C, E, S](setup: EventsourcedSetup[C, E, S], fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
// reply is sent to `selfUntypedAdapted`, it is important to target that one
setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntypedAdapted)
}
protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = {
setup.log.debug("Returning recovery permit, reason: " + reason)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
}
// ---------- snapshot store interactions ---------
/**
* Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
* to the running [[PersistentActor]].
*/
protected def loadSnapshot[Command](setup: EventsourcedSetup[Command, _, _], criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntypedAdapted)
}
protected def internalSaveSnapshot[S](setup: EventsourcedSetup[_, _, S], state: EventsourcedRunning.EventsourcedState[S]): Unit = {
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntypedAdapted)
}
}

View file

@ -4,121 +4,139 @@
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import akka.persistence.typed.scaladsl.PersistentBehaviors._
import akka.util.Helpers._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse }
import akka.persistence.typed.internal.EventsourcedBehavior._
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
/**
/***
* INTERNAL API
*
* Third (of four) behavior of an PersistentBehavior.
*
* In this behavior we finally start replaying events, beginning from the last applied sequence number
* (i.e. the one up-until-which the snapshot recovery has brought us).
*
* Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed
* and control is given to the user's handlers to drive the actors behavior from there.
* See next behavior [[EventsourcedRunning]].
*
*/
@InternalApi
private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
val setup: EventsourcedSetup[Command, Event, State],
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
override val internalStash: StashBuffer[Any],
private[persistence] object EventsourcedRecoveringEvents extends EventsourcedJournalInteractions with EventsourcedStashManagement {
private var sequenceNr: Long,
val writerIdentity: WriterIdentity,
@InternalApi
private[persistence] final case class RecoveringState[State](
seqNr: Long,
state: State,
eventSeenInInterval: Boolean = false
)
private var state: State
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement {
import setup._
import Behaviors.same
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
def apply[Command, Event, State](
setup: EventsourcedSetup[Command, Event, State],
state: RecoveringState[State]
): Behavior[InternalProtocol] =
Behaviors.setup { _
startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout)
protected val log = Logging(context.system.toUntyped, this)
replayEvents(setup, state.seqNr + 1L, setup.recovery.toSequenceNr)
// -------- initialize --------
startRecoveryTimer()
replayEvents(sequenceNr + 1L, recovery.toSequenceNr)
// ---- end of initialize ----
private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
// ----------
def snapshotSequenceNr: Long = sequenceNr
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
private def setLastSequenceNr(value: Long): Unit =
sequenceNr = value
// ----------
// FIXME it's a bit of a pain to have those lazy vals, change everything to constructor parameters
lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
private def startRecoveryTimer(): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey)
private var eventSeenInInterval = false
def onCommand(cmd: Command): Behavior[Any] = {
// during recovery, stash all incoming commands
stash(context, cmd)
same
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try {
response match {
case ReplayedMessage(repr)
eventSeenInInterval = true
updateLastSequenceNr(repr)
// TODO we need some state adapters here?
val newState = eventHandler(state, repr.payload.asInstanceOf[Event])
state = newState
same
case RecoverySuccess(highestSeqNr)
log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr)
cancelRecoveryTimer()
setLastSequenceNr(highestSeqNr)
try onRecoveryCompleted(state)
catch { case NonFatal(ex) onRecoveryFailure(ex, Some(state)) }
case ReplayMessagesFailure(cause)
onRecoveryFailure(cause, event = None)
case other
stash(context, other)
Behaviors.same
withMdc(setup) {
stay(setup, state)
}
}
} catch {
case NonFatal(e)
cancelRecoveryTimer()
onRecoveryFailure(e, None)
private def stay[Command, Event, State](
setup: EventsourcedSetup[Command, Event, State],
state: RecoveringState[State]
): Behavior[InternalProtocol] =
Behaviors.immutable {
case (_, JournalResponse(r)) onJournalResponse(setup, state, r)
case (_, SnapshotterResponse(r)) onSnapshotterResponse(setup, r)
case (_, RecoveryTickEvent(snap)) onRecoveryTick(setup, state, snap)
case (_, cmd @ IncomingCommand(_)) onCommand(setup, cmd)
}
private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(wrapped: Behavior[InternalProtocol]) = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" "recover-evnts"
)
Behaviors.withMdc((_: Any) mdc, wrapped)
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = {
log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response))
Behaviors.same // ignore the response
private def onJournalResponse[Command, Event, State](
setup: EventsourcedSetup[Command, Event, State],
state: RecoveringState[State],
response: JournalProtocol.Response): Behavior[InternalProtocol] = {
import setup.context.log
try {
response match {
case ReplayedMessage(repr)
// eventSeenInInterval = true
// updateLastSequenceNr(repr)
val newState = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, repr.payload.asInstanceOf[Event])
)
stay(setup, newState)
case RecoverySuccess(highestSeqNr)
log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr)
cancelRecoveryTimer(setup.timers)
try onRecoveryCompleted(setup, state)
catch { case NonFatal(ex) onRecoveryFailure(setup, ex, highestSeqNr, Some(state)) }
case ReplayMessagesFailure(cause)
onRecoveryFailure(setup, cause, state.seqNr, None)
case other
// stash(setup, setup.internalStash, other)
// Behaviors.same
Behaviors.unhandled
}
} catch {
case NonFatal(cause)
cancelRecoveryTimer(setup.timers)
onRecoveryFailure(setup, cause, state.seqNr, None)
}
}
private def onCommand[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], cmd: InternalProtocol): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
stash(setup, setup.internalStash, cmd)
Behaviors.same
}
// FYI, have to keep carrying all [C,E,S] everywhere as otherwise ending up with:
// [error] /Users/ktoso/code/akka/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala:117:14: type mismatch;
// [error] found : akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,_$2] where type _$2, type _$1
// [error] required: akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,Any] where type _$1
// [error] Note: _$2 <: Any, but class EventsourcedSetup is invariant in type State.
// [error] You may wish to define State as +State instead. (SLS 4.5)
// [error] Error occurred in an application involving default arguments.
// [error] stay(setup, state.copy(eventSeenInInterval = false))
// [error] ^
protected def onRecoveryTick[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], state: RecoveringState[State], snapshot: Boolean): Behavior[InternalProtocol] =
if (!snapshot) {
if (state.eventSeenInInterval) {
stay(setup, state.copy(eventSeenInInterval = false))
} else {
cancelRecoveryTimer(setup.timers)
val msg = s"Recovery timed out, didn't get event within ${setup.settings.recoveryEventTimeout}, highest sequence number seen ${state.seqNr}"
onRecoveryFailure(setup, new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this?
}
} else {
// snapshot timeout, but we're already in the events recovery phase
Behavior.unhandled
}
def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
setup.log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response))
Behaviors.unhandled // ignore the response
}
/**
@ -129,87 +147,40 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = {
returnRecoveryPermit("on recovery failure: " + cause.getMessage)
cancelRecoveryTimer()
protected def onRecoveryFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, sequenceNr: Long, event: Option[Any]): Behavior[InternalProtocol] = {
returnRecoveryPermit(setup, "on recovery failure: " + cause.getMessage)
cancelRecoveryTimer(setup.timers)
event match {
case Some(evt)
log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
Behaviors.stopped
case None
log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", persistenceId, sequenceNr)
setup.log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", setup.persistenceId, sequenceNr)
Behaviors.stopped
}
}
protected def onRecoveryCompleted(state: State): Behavior[Any] = {
try {
returnRecoveryPermit("recovery completed successfully")
recoveryCompleted(commandContext, state)
protected def onRecoveryCompleted[C, E, S](setup: EventsourcedSetup[C, E, S], state: RecoveringState[S]): Behavior[InternalProtocol] = try {
returnRecoveryPermit(setup, "recovery completed successfully")
setup.recoveryCompleted(setup.commandContext, state.state)
val running = new EventsourcedRunning[Command, Event, State](
setup,
context,
timers,
internalStash,
val running = EventsourcedRunning.HandlingCommands[C, E, S](
setup,
EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state)
)
sequenceNr,
writerIdentity,
state
)
tryUnstash(context, running)
} finally {
cancelRecoveryTimer()
}
tryUnstash(setup, setup.internalStash, running)
} finally {
cancelRecoveryTimer(setup.timers)
}
protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] =
if (!snapshot) {
if (!eventSeenInInterval) {
cancelRecoveryTimer()
val msg = s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr"
onRecoveryFailure(new RecoveryTimedOut(msg), event = None) // TODO allow users to hook into this?
} else {
eventSeenInInterval = false
same
}
} else {
// snapshot timeout, but we're already in the events recovery phase
Behavior.unhandled
}
// ----------
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
// TODO explore crazy hashcode hack to make this match quicker...?
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot = snapshot)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case c: Command @unchecked onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly
}
}
// ----------
// ---------- journal interactions ---------
private def replayEvents(fromSeqNr: SeqNr, toSeqNr: SeqNr): Unit = {
log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
// reply is sent to `selfUntypedAdapted`, it is important to target that one
journal ! ReplayMessages(fromSeqNr, toSeqNr, recovery.replayMax, persistenceId, selfUntypedAdapted)
}
private def returnRecoveryPermit(reason: String): Unit = {
log.debug("Returning recovery permit, reason: " + reason)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped)
}
override def toString = s"EventsourcedRecoveringEvents($persistenceId)"
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit =
timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(RecoveryTickTimerKey)
}

View file

@ -4,14 +4,14 @@
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.SnapshotProtocol.{ LoadSnapshot, LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import akka.util.Helpers._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.{ actor a }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
@ -20,6 +20,7 @@ import scala.util.{ Failure, Success, Try }
* INTERNAL API
*
* Second (of four) behavior of an PersistentBehavior.
* See next behavior [[EventsourcedRecoveringEvents]].
*
* In this behavior the recovery process is initiated.
* We try to obtain a snapshot from the configured snapshot store,
@ -29,71 +30,89 @@ import scala.util.{ Failure, Success, Try }
* recovery of events continues in [[EventsourcedRecoveringEvents]].
*/
@InternalApi
final class EventsourcedRecoveringSnapshot[Command, Event, State](
val setup: EventsourcedSetup[Command, Event, State],
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
override val internalStash: StashBuffer[Any],
object EventsourcedRecoveringSnapshot extends EventsourcedJournalInteractions with EventsourcedStashManagement {
val writerIdentity: WriterIdentity
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement {
import setup._
def apply[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = {
startRecoveryTimer(setup)
import Behaviors.same
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
withMdc(setup) {
Behaviors.immutable {
case (_, SnapshotterResponse(r)) onSnapshotterResponse(setup, r)
case (_, JournalResponse(r)) onJournalResponse(setup, r)
case (_, RecoveryTickEvent(snapshot)) onRecoveryTick(setup, snapshot)
case (_, cmd: IncomingCommand[Command]) onCommand(setup, cmd)
}
}
}
protected val log = Logging(context.system.toUntyped, this)
def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" "recover-snap"
)
Behaviors.withMdc(_ mdc, b)
}
// -------- initialize --------
startRecoveryTimer()
/**
* Called whenever a message replay fails. By default it logs the error.
*
* The actor is always stopped after this method has been invoked.
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
private def onRecoveryFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
loadSnapshot(persistenceId, recovery.fromSnapshot, recovery.toSequenceNr)
// ---- end of initialize ----
val lastSequenceNr = 0 // FIXME not needed since snapshot == 0
event match {
case Some(evt)
setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
"persistenceId [{}].", evt.getClass.getName, lastSequenceNr, setup.persistenceId)
Behaviors.stopped
val commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
case None
setup.log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", setup.persistenceId, lastSequenceNr)
Behaviors.stopped
}
}
// ----------
protected var awaitingSnapshot: Boolean = true
// ----------
private var lastSequenceNr: Long = 0L
def snapshotSequenceNr: Long = lastSequenceNr
// ----------
lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
private def onRecoveryTick[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], snapshot: Boolean): Behavior[InternalProtocol] =
if (snapshot) {
// we know we're in snapshotting mode; snapshot recovery timeout arrived
val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}")
onRecoveryFailure(setup, ex, event = None)
} else same // ignore, since we received the snapshot already
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
private def startRecoveryTimer(): Unit = {
timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
}
private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey)
def onCommand(cmd: Command): Behavior[Any] = {
private def startRecoveryTimer(setup: EventsourcedSetup[_, _, _]): Unit = {
setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout)
}
private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey)
def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
stash(context, cmd)
setup.internalStash.stash(cmd) // TODO move stash out as it's mutable
Behavior.same
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try {
def onJournalResponse[Command](setup: EventsourcedSetup[_, _, _], response: JournalProtocol.Response): Behavior[InternalProtocol] = try {
throw new Exception("Should not talk to journal yet! But got: " + response)
} catch {
case NonFatal(cause)
returnRecoveryPermitOnlyOnFailure(cause)
returnRecoveryPermitOnlyOnFailure(setup, cause)
throw cause
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = try {
def onSnapshotterResponse[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = try {
response match {
case LoadSnapshotResult(sso, toSnr)
var state: S = initialState
val re: Try[SeqNr] = Try {
var state: State = setup.initialState
val re: Try[Long] = Try {
sso match {
case Some(SelectedSnapshot(metadata, snapshot))
state = snapshot.asInstanceOf[State]
@ -106,106 +125,38 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State](
re match {
case Success(seqNr)
lastSequenceNr = seqNr
replayMessages(state, toSnr)
replayMessages(setup, state, seqNr, toSnr)
case Failure(cause)
// FIXME better exception type
val ex = new RuntimeException(s"Failed to recover state for [$persistenceId] from snapshot offer.", cause)
onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME
val ex = new RuntimeException(s"Failed to recover state for [${setup.persistenceId}] from snapshot offer.", cause)
onRecoveryFailure(setup, ex, event = None) // FIXME the failure logs has bad messages... FIXME
}
case LoadSnapshotFailed(cause)
cancelRecoveryTimer()
cancelRecoveryTimer(setup.timers)
onRecoveryFailure(cause, event = None)
onRecoveryFailure(setup, cause, event = None)
case other
stash(context, other)
same
case _
Behaviors.unhandled
}
} catch {
case NonFatal(cause)
returnRecoveryPermitOnlyOnFailure(cause)
returnRecoveryPermitOnlyOnFailure(setup, cause)
throw cause
}
private def replayMessages(state: State, toSnr: SeqNr): Behavior[Any] = {
cancelRecoveryTimer()
private def replayMessages[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], state: State, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
val rec = recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types
val rec = setup.recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types
new EventsourcedRecoveringEvents[Command, Event, State](
EventsourcedRecoveringEvents[Command, Event, State](
setup.copy(recovery = rec),
context,
timers,
internalStash,
lastSequenceNr,
writerIdentity,
state
// setup.internalStash, // TODO move it out of setup
EventsourcedRecoveringEvents.RecoveringState(lastSequenceNr, state)
)
}
/**
* Called whenever a message replay fails. By default it logs the error.
*
* The actor is always stopped after this method has been invoked.
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = {
cancelRecoveryTimer()
event match {
case Some(evt)
log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
"persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId)
Behaviors.stopped
case None
log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", persistenceId, lastSequenceNr)
Behaviors.stopped
}
}
protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] =
// we know we're in snapshotting mode
if (snapshot) onRecoveryFailure(new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), event = None)
else same // ignore, since we received the snapshot already
// ----------
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
// TODO explore crazy hashcode hack to make this match quicker...?
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot = snapshot)
case c: Command @unchecked onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly
}
}
// ----------
// ---------- journal interactions ---------
/**
* Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
* to the running [[PersistentActor]].
*/
private def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
snapshotStore.tell(LoadSnapshot(persistenceId, criteria, toSequenceNr), selfUntypedAdapted)
}
private def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = {
log.debug("Returning recovery permit, on failure because: " + cause.getMessage)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped)
}
override def toString = s"EventsourcedRecoveringSnapshot($persistenceId)"
}

View file

@ -9,81 +9,59 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerSc
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
/**
* INTERNAL API
*
* First (of four) behaviour of an PersistentBehaviour.
* See next behavior [[EventsourcedRecoveringSnapshot]].
*
* Requests a permit to start recovering this actor; this is tone to avoid
* hammering the journal with too many concurrently recovering actors.
*
*/
@InternalApi
private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, State](
val setup: EventsourcedSetup[Command, Event, State],
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any]
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement {
import setup._
private[akka] object EventsourcedRequestingRecoveryPermit extends EventsourcedStashManagement {
import akka.actor.typed.scaladsl.adapter._
// has to be lazy, since we want to obtain the persistenceId
protected lazy val log = Logging(context.system.toUntyped, this)
def apply[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = {
// request a permit, as only once we obtain one we can start recovering
requestRecoveryPermit(setup.context, setup.persistence)
override protected val internalStash: StashBuffer[Any] = {
val stashSize = context.system.settings.config
.getInt("akka.persistence.typed.stash-buffer-size")
StashBuffer[Any](stashSize)
}
withMdc(setup) {
Behaviors.immutable[InternalProtocol] {
case (_, InternalProtocol.RecoveryPermitGranted) // FIXME types
becomeRecovering(setup)
// --- initialization ---
// only once we have a permit, we can become active:
requestRecoveryPermit()
val writerIdentity: WriterIdentity = WriterIdentity.newIdentity()
// --- end of initialization ---
// ----------
def becomeRecovering(): Behavior[Any] = {
log.debug(s"Initializing snapshot recovery: {}", recovery)
new EventsourcedRecoveringSnapshot(
setup,
context,
timers,
internalStash,
writerIdentity
)
}
// ----------
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
case RecoveryPermitter.RecoveryPermitGranted
log.debug("Awaiting permit, received: RecoveryPermitGranted")
becomeRecovering()
case other
stash(context, other)
Behaviors.same
case (_, other)
stash(setup, setup.internalStash, other)
Behaviors.same
}
}
}
private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(wrapped: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" "awaiting-permit"
)
Behaviors.withMdc(_ mdc, wrapped)
}
private def becomeRecovering[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = {
setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)
EventsourcedRecoveringSnapshot(setup)
}
// ---------- journal interactions ---------
private def requestRecoveryPermit(): Unit = {
private def requestRecoveryPermit[Command](context: ActorContext[Command], persistence: Persistence): Unit = {
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped)
val selfUntyped = context.self.toUntyped
persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped)
}
override def toString = s"EventsourcedRequestingRecoveryPermit($persistenceId)"
}

View file

@ -13,7 +13,8 @@ import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandler
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse }
import scala.annotation.tailrec
import scala.collection.immutable
@ -34,224 +35,206 @@ import scala.collection.immutable
* which perform the Persistence extension lookup on creation and similar things (config lookup)
*
*/
@InternalApi
class EventsourcedRunning[Command, Event, State](
val setup: EventsourcedSetup[Command, Event, State],
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
override val internalStash: StashBuffer[Any],
@InternalApi object EventsourcedRunning extends EventsourcedJournalInteractions with EventsourcedStashManagement {
private var sequenceNr: Long,
val writerIdentity: WriterIdentity,
final case class EventsourcedState[State](
seqNr: Long,
state: State,
pendingInvocations: immutable.Seq[PendingHandlerInvocation] = Nil
) {
private var state: State
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement { same
import setup._
def nextSequenceNr(): EventsourcedState[State] =
copy(seqNr = seqNr + 1)
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] =
if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this
protected val log = Logging(context.system.toUntyped, this)
def popApplyPendingInvocation(repr: PersistentRepr): EventsourcedState[State] = {
val (headSeq, remainingInvocations) = pendingInvocations.splitAt(1)
headSeq.head.handler(repr.payload)
private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
copy(
pendingInvocations = remainingInvocations,
seqNr = repr.sequenceNr
)
}
// ----------
// Holds callbacks for persist calls (note that we do not implement persistAsync currently)
private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty
private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
// ----------
private def snapshotSequenceNr: Long = sequenceNr
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
private def nextSequenceNr(): Long = {
sequenceNr += 1L
sequenceNr
}
// ----------
private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = {
response match {
case SaveSnapshotSuccess(meta)
log.debug("Save snapshot successful: " + meta)
same
case SaveSnapshotFailure(meta, ex)
log.error(ex, "Save snapshot failed! " + meta)
same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = {
val updated = setup.eventHandler(state, event)
copy(state = updated)
}
}
// ----------
// ===============================================
trait EventsourcedRunningPhase {
def name: String
def onCommand(c: Command): Behavior[Any]
def onJournalResponse(response: JournalProtocol.Response): Behavior[Any]
}
object HandlingCommands extends EventsourcedRunningPhase {
def name = "HandlingCommands"
final override def onCommand(command: Command): Behavior[Any] = {
val effect = commandHandler(commandContext, state, command)
applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
object HandlingCommands {
def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
withMdc(setup, "run-cmnds") {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
case (_, SnapshotterResponse(r)) onSnapshotterResponse(setup, r)
case (_, JournalResponse(r)) onJournalResponse(setup, r)
case (_, IncomingCommand(c: C @unchecked)) onCommand(setup, state, c)
}
}
}
final override def onJournalResponse(response: Response): Behavior[Any] = {
// should not happen, what would it reply?
throw new RuntimeException("Received message which should not happen in Running state!")
private def onJournalResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: Response): Behavior[InternalProtocol] = {
// TODO ignore, could happen if actor was restarted?
Behaviors.unhandled
}
private def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(setup.commandContext, state.state, cmd)
applyEffects(setup, cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
}
object PersistingEventsNoSideEffects extends PersistingEvents(Nil)
// ===============================================
sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase {
def name = "PersistingEvents"
object PersistingEvents {
final override def onCommand(c: Command): Behavior[Any] = {
stash(context, c)
same
def apply[C, E, S](
setup: EventsourcedSetup[C, E, S],
state: EventsourcedState[S],
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]]
): Behavior[InternalProtocol] = {
withMdc(setup, "run-persist-evnts") {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
case (_, SnapshotterResponse(r)) onSnapshotterResponse(setup, r)
case (_, JournalResponse(r)) onJournalResponse(setup, state, pendingInvocations, sideEffects, r)
case (_, in: IncomingCommand[C @unchecked]) onCommand(setup, state, in)
}
}
}
final override def onJournalResponse(response: Response): Behavior[Any] = {
log.debug("Received Journal response: {}", response)
def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
stash(setup, setup.internalStash, cmd)
Behaviors.same
}
final def onJournalResponse[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S],
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]],
response: Response): Behavior[InternalProtocol] = {
setup.log.debug("Received Journal response: {}", response)
response match {
case WriteMessageSuccess(p, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == writerIdentity.instanceId) {
updateLastSequenceNr(p)
popApplyHandler(p.payload)
onWriteMessageComplete()
tryUnstash(context, applySideEffects(sideEffects))
} else same
if (id == setup.writerIdentity.instanceId) {
val newState = state.popApplyPendingInvocation(p)
// only once all things are applied we can revert back
if (newState.pendingInvocations.nonEmpty) Behaviors.same
else tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, newState))
} else Behaviors.same
case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == writerIdentity.instanceId) {
updateLastSequenceNr(p)
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop
tryUnstash(context, applySideEffects(sideEffects))
} else same
if (id == setup.writerIdentity.instanceId) {
val newState = state.updateLastSequenceNr(p)
onPersistRejected(setup, cause, p.payload, p.sequenceNr) // does not stop (by design)
tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, newState))
} else Behaviors.same
case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == writerIdentity.instanceId) {
onWriteMessageComplete()
onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
} else same
if (id == setup.writerIdentity.instanceId) {
// onWriteMessageComplete() -> tryBecomeHandlingCommands
onPersistFailureThenStop(setup, cause, p.payload, p.sequenceNr)
} else Behaviors.same
case WriteMessagesSuccessful
// ignore
same
Behaviors.same
case WriteMessagesFailed(_)
// ignore
same // it will be stopped by the first WriteMessageFailure message; not applying side effects
Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _: LoopMessageSuccess
// ignore, should never happen as there is no persistAsync in typed
same
Behaviors.same
}
}
private def onWriteMessageComplete(): Unit =
tryBecomeHandlingCommands()
// private def onWriteMessageComplete(): Unit =
// tryBecomeHandlingCommands()
private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
log.error(
private def onPersistRejected[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Any, seqNr: Long): Unit = {
setup.log.error(
cause,
"Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
event.getClass.getName, seqNr, persistenceId, cause.getMessage)
event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage)
}
private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = {
log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, persistenceId)
private def onPersistFailureThenStop[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = {
setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, setup.persistenceId)
// FIXME see #24479 for reconsidering the stopping behaviour
Behaviors.stopped
}
}
// --------------------------
// the active phase switches between PersistingEvents and HandlingCommands;
// we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect
private[this] var phase: EventsourcedRunningPhase = HandlingCommands
private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S], phase: String)(wrapped: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" phase
)
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
// TODO explore crazy hashcode hack to make this match quicker...?
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) phase.onJournalResponse(r)
case command: Command @unchecked
// the above type-check does nothing, since Command is tun
// we cast explicitly to fail early in case of type mismatch
val c = command.asInstanceOf[Command]
phase.onCommand(c)
// FIXME remove need for class tag!!!
Behaviors.withMdc[Any]((_: Any) mdc, wrapped.asInstanceOf[Behavior[Any]]).asInstanceOf[Behavior[InternalProtocol]]
}
// --------------------------
private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
response match {
case SaveSnapshotSuccess(meta)
setup.context.log.debug("Save snapshot successful: " + meta)
Behaviors.same
case SaveSnapshotFailure(meta, ex)
setup.context.log.error(ex, "Save snapshot failed! " + meta)
Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
}
}
// ----------
private def applyEvent[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], event: E): S =
setup.eventHandler(state.state, event)
def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
var res: Behavior[Any] = same
val it = effects.iterator
@tailrec private def applyEffects[C, E, S](
setup: EventsourcedSetup[C, E, S],
msg: Any,
state: EventsourcedState[S],
effect: EffectImpl[E, S],
sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil
): Behavior[InternalProtocol] = {
import setup.log
// if at least one effect results in a `stop`, we need to stop
// manual loop implementation to avoid allocations and multiple scans
while (it.hasNext) {
val effect = it.next()
applySideEffect(effect) match {
case _: StoppedBehavior[_] res = Behaviors.stopped
case _ // nothing to do
}
}
res
}
def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match {
case _: Stop.type @unchecked
Behaviors.stopped
case SideEffect(sideEffects)
sideEffects(state)
same
case _
throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
}
def applyEvent(s: S, event: E): S =
eventHandler(s, event)
@tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = {
if (log.isDebugEnabled)
log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
effect match {
case CompositeEffect(e, currentSideEffects)
case CompositeEffect(eff, currentSideEffects)
// unwrap and accumulate effects
applyEffects(msg, e, currentSideEffects ++ sideEffects)
applyEffects(setup, msg, state, eff, currentSideEffects ++ sideEffects)
case Persist(event)
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
state = applyEvent(state, event)
val tags = tagger(event)
val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
val newState = state.applyEvent(setup, event)
val eventToPersist = tagEvent(setup, event)
internalPersist(eventToPersist, sideEffects) { _
if (snapshotWhen(state, event, sequenceNr))
internalSaveSnapshot(state)
internalPersist(setup, state, eventToPersist, sideEffects) { _
if (setup.snapshotWhen(newState.state, event, newState.seqNr))
internalSaveSnapshot(setup, state)
}
case PersistAll(events)
@ -260,104 +243,364 @@ class EventsourcedRunning[Command, Event, State](
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var count = events.size
var seqNr = sequenceNr
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
case ((currentState, snapshot), event)
seqNr += 1
val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr)
(applyEvent(currentState, event), shouldSnapshot)
}
state = newState
val eventsToPersist = events.map { event
val tags = tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
}
// var seqNr = state.seqNr
val (newState, shouldSnapshotAfterPersist) =
events.foldLeft((state, false)) {
case ((currentState, snapshot), event)
val value = currentState
.nextSequenceNr()
.applyEvent(setup, event)
internalPersistAll(eventsToPersist, sideEffects) { _
val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr)
(value, shouldSnapshot)
}
// state = newState
val eventsToPersist = events.map { tagEvent(setup, _) }
internalPersistAll(setup, eventsToPersist, newState, sideEffects) { _
count -= 1
if (count == 0) {
sideEffects.foreach(applySideEffect)
// FIXME the result of applying side effects is ignored
val b = applySideEffects(sideEffects, newState)
if (shouldSnapshotAfterPersist)
internalSaveSnapshot(state)
internalSaveSnapshot(setup, newState)
}
}
} else {
// run side-effects even when no events are emitted
tryUnstash(context, applySideEffects(sideEffects))
tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, state))
}
case e: PersistNothing.type @unchecked
tryUnstash(context, applySideEffects(sideEffects))
case _: PersistNothing.type @unchecked
tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, state))
case _: Unhandled.type @unchecked
applySideEffects(sideEffects)
applySideEffects(sideEffects, state)
Behavior.unhandled
case c: ChainableEffect[_, S]
applySideEffect(c)
applySideEffect(c, state)
}
}
private def popApplyHandler(payload: Any): Unit =
pendingInvocations.pop().handler(payload)
private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException(
"Attempted to become PersistingEvents while already in this phase! Logic error?")
phase =
if (sideEffects.isEmpty) PersistingEventsNoSideEffects
else new PersistingEvents(sideEffects)
same
/***/
private def tagEvent[S, E, C](setup: EventsourcedSetup[C, E, S], event: E): Any = {
val tags = setup.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
}
private def tryBecomeHandlingCommands(): Behavior[Any] = {
if (phase == HandlingCommands) throw new IllegalArgumentException(
"Attempted to become HandlingCommands while already in this phase! Logic error?")
def applySideEffects[S](effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
var res: Behavior[InternalProtocol] = Behaviors.same
val it = effects.iterator
if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN?
phase = HandlingCommands
}
same
}
// ---------- journal interactions ---------
// Any since can be `E` or `Tagged`
private def internalPersist(event: Any, sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[Any] = {
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped)
val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
journal.tell(JournalProtocol.WriteMessages(eventBatch, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted)
becomePersistingEvents(sideEffects)
}
private def internalPersistAll(events: immutable.Seq[Any], sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[Any] = {
if (events.nonEmpty) {
val senderNotKnownBecauseAkkaTyped = null
events.foreach { event
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
// if at least one effect results in a `stop`, we need to stop
// manual loop implementation to avoid allocations and multiple scans
while (it.hasNext) {
val effect = it.next()
applySideEffect(effect, state) match {
case _: StoppedBehavior[_] res = Behaviors.stopped
case _ // nothing to do
}
}
val write = AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped)))
journal.tell(JournalProtocol.WriteMessages(write :: Nil, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted)
becomePersistingEvents(sideEffects)
} else same
res
}
private def internalSaveSnapshot(snapshot: State): Unit = {
snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(persistenceId, snapshotSequenceNr), snapshot), selfUntypedAdapted)
def applySideEffect[S](effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match {
case _: Stop.type @unchecked
Behaviors.stopped
case SideEffect(sideEffects)
sideEffects(state.state)
Behaviors.same
case _
throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
}
override def toString = s"EventsourcedRunning($persistenceId,${phase.name})"
}
//@InternalApi
//class EventsourcedRunning[Command, Event, State](
// val setup: EventsourcedSetup[Command, Event, State],
// // internalStash: StashBuffer[Any], // FIXME separate or in settings?
//
// private var sequenceNr: Long,
// val writerIdentity: WriterIdentity,
//
// private var state: State
//) extends MutableBehavior[Any]
// with EventsourcedBehavior[Command, Event, State]
// with EventsourcedStashManagement { same
// import setup._
//
// import EventsourcedBehavior._
// import akka.actor.typed.scaladsl.adapter._
//
// // Holds callbacks for persist calls (note that we do not implement persistAsync currently)
// private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty
// private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
//
// // ----------
////
//// private def snapshotSequenceNr: Long = sequenceNr
////
//// private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
//// if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
//// private def nextSequenceNr(): Long = {
//// sequenceNr += 1L
//// sequenceNr
//// }
// // ----------
//
// private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[C] = {
// response match {
// case SaveSnapshotSuccess(meta)
// setup.context.log.debug("Save snapshot successful: " + meta)
// Behaviors.same
// case SaveSnapshotFailure(meta, ex)
// setup.context.log.error(ex, "Save snapshot failed! " + meta)
// Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
// }
// }
//
// // ----------
//
// trait EventsourcedRunningPhase {
// def name: String
// def onCommand(c: Command): Behavior[Any]
// def onJournalResponse(response: JournalProtocol.Response): Behavior[Any]
// }
//
//// object HandlingCommands extends EventsourcedRunningPhase {
//// def name = "HandlingCommands"
////
//// final override def onCommand(command: Command): Behavior[Any] = {
//// val effect = commandHandler(commandContext, state, command)
//// applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
//// }
//// final override def onJournalResponse(response: Response): Behavior[Any] = {
//// // ignore, could happen if actor was restarted?
//// }
//// }
//
// object PersistingEventsNoSideEffects extends PersistingEvents(Nil)
//
// sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase {
// def name = "PersistingEvents"
//
// final override def onCommand(c: Command): Behavior[Any] = {
// stash(setup, context, c)
// same
// }
//
// final override def onJournalResponse(response: Response): Behavior[Any] = {
// log.debug("Received Journal response: {}", response)
// response match {
// case WriteMessageSuccess(p, id)
// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// // while message is in flight, in that case we ignore the call to the handler
// if (id == writerIdentity.instanceId) {
// updateLastSequenceNr(p)
// popApplyHandler(p.payload)
// onWriteMessageComplete()
// tryUnstash(setup, internalStash, applySideEffects(sideEffects))
// } else same
//
// case WriteMessageRejected(p, cause, id)
// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// // while message is in flight, in that case the handler has already been discarded
// if (id == writerIdentity.instanceId) {
// updateLastSequenceNr(p)
// onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop
// tryUnstash(setup, applySideEffects(sideEffects))
// } else same
//
// case WriteMessageFailure(p, cause, id)
// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// // while message is in flight, in that case the handler has already been discarded
// if (id == writerIdentity.instanceId) {
// onWriteMessageComplete()
// onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
// } else same
//
// case WriteMessagesSuccessful
// // ignore
// same
//
// case WriteMessagesFailed(_)
// // ignore
// same // it will be stopped by the first WriteMessageFailure message; not applying side effects
//
// case _: LoopMessageSuccess
// // ignore, should never happen as there is no persistAsync in typed
// same
// }
// }
//
// private def onWriteMessageComplete(): Unit =
// tryBecomeHandlingCommands()
//
// private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
// log.error(
// cause,
// "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
// event.getClass.getName, seqNr, persistenceId, cause.getMessage)
// }
//
// private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = {
// log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
// event.getClass.getName, seqNr, persistenceId)
//
// // FIXME see #24479 for reconsidering the stopping behaviour
// Behaviors.stopped
// }
//
// }
//
// // the active phase switches between PersistingEvents and HandlingCommands;
// // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect
// private[this] var phase: EventsourcedRunningPhase = HandlingCommands
//
// override def onMessage(msg: Any): Behavior[Any] = {
// msg match {
// // TODO explore crazy hashcode hack to make this match quicker...?
// case SnapshotterResponse(r) onSnapshotterResponse(r)
// case JournalResponse(r) phase.onJournalResponse(r)
// case command: Command @unchecked
// // the above type-check does nothing, since Command is tun
// // we cast explicitly to fail early in case of type mismatch
// val c = command.asInstanceOf[Command]
// phase.onCommand(c)
// }
// }
//
// // ----------
//
// def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
// var res: Behavior[Any] = same
// val it = effects.iterator
//
// // if at least one effect results in a `stop`, we need to stop
// // manual loop implementation to avoid allocations and multiple scans
// while (it.hasNext) {
// val effect = it.next()
// applySideEffect(effect) match {
// case _: StoppedBehavior[_] res = Behaviors.stopped
// case _ // nothing to do
// }
// }
//
// res
// }
//
// def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match {
// case _: Stop.type @unchecked
// Behaviors.stopped
//
// case SideEffect(sideEffects)
// sideEffects(state)
// same
//
// case _
// throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
// }
//
// def applyEvent(s: S, event: E): S =
// eventHandler(s, event)
//
// @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = {
// if (log.isDebugEnabled)
// log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
//
// effect match {
// case CompositeEffect(e, currentSideEffects)
// // unwrap and accumulate effects
// applyEffects(msg, e, currentSideEffects ++ sideEffects)
//
// case Persist(event)
// // apply the event before persist so that validation exception is handled before persisting
// // the invalid event, in case such validation is implemented in the event handler.
// // also, ensure that there is an event handler for each single event
// state = applyEvent(state, event)
// val tags = tagger(event)
// val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
//
// internalPersist(eventToPersist, sideEffects) { _
// if (snapshotWhen(state, event, sequenceNr))
// internalSaveSnapshot(state)
// }
//
// case PersistAll(events)
// if (events.nonEmpty) {
// // apply the event before persist so that validation exception is handled before persisting
// // the invalid event, in case such validation is implemented in the event handler.
// // also, ensure that there is an event handler for each single event
// var count = events.size
// var seqNr = sequenceNr
// val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
// case ((currentState, snapshot), event)
// seqNr += 1
// val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr)
// (applyEvent(currentState, event), shouldSnapshot)
// }
// state = newState
// val eventsToPersist = events.map { event
// val tags = tagger(event)
// if (tags.isEmpty) event else Tagged(event, tags)
// }
//
// internalPersistAll(eventsToPersist, sideEffects) { _
// count -= 1
// if (count == 0) {
// sideEffects.foreach(applySideEffect)
// if (shouldSnapshotAfterPersist)
// internalSaveSnapshot(state)
// }
// }
// } else {
// // run side-effects even when no events are emitted
// tryUnstash(context, applySideEffects(sideEffects))
// }
//
// case e: PersistNothing.type @unchecked
// tryUnstash(context, applySideEffects(sideEffects))
//
// case _: Unhandled.type @unchecked
// applySideEffects(sideEffects)
// Behavior.unhandled
//
// case c: ChainableEffect[_, S]
// applySideEffect(c)
// }
// }
//
// private def popApplyHandler(payload: Any): Unit =
// pendingInvocations.pop().handler(payload)
//
// private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
// if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException(
// "Attempted to become PersistingEvents while already in this phase! Logic error?")
//
// phase =
// if (sideEffects.isEmpty) PersistingEventsNoSideEffects
// else new PersistingEvents(sideEffects)
//
// same
// }
//
// private def tryBecomeHandlingCommands(): Behavior[Any] = {
// if (phase == HandlingCommands) throw new IllegalArgumentException(
// "Attempted to become HandlingCommands while already in this phase! Logic error?")
//
// if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN?
// phase = HandlingCommands
// }
//
// same
// }
//
// override def toString = s"EventsourcedRunning($persistenceId,${phase.name})"
//}

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import java.util.concurrent.TimeUnit
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.event.Logging
import akka.event.Logging.LogLevel
import com.typesafe.config.Config
import scala.concurrent.duration._
trait EventsourcedSettings {
def stashCapacity: Int
// def stashOverflowStrategyName: String // TODO not supported, the stash just throws for now
def stashingLogLevel: LogLevel
def journalPluginId: String
def snapshotPluginId: String
def recoveryEventTimeout: FiniteDuration
def withJournalPluginId(id: Option[String]): EventsourcedSettings
def withSnapshotPluginId(id: Option[String]): EventsourcedSettings
}
object EventsourcedSettings {
def apply(system: ActorSystem[_]): EventsourcedSettings =
apply(system.settings.config)
def apply(config: Config): EventsourcedSettings = {
val typedConfig = config.getConfig("akka.persistence.typed")
val untypedConfig = config.getConfig("akka.persistence")
// StashOverflowStrategy
val internalStashOverflowStrategy =
untypedConfig.getString("internal-stash-overflow-strategy") // FIXME or copy it to typed?
val stashCapacity = typedConfig.getInt("stash-capacity")
val stashingLogLevel = typedConfig.getString("log-stashing") match {
case "off" Logging.OffLevel
case "on" | "true" Logging.DebugLevel
case l Logging.levelFor(l).getOrElse(Logging.OffLevel)
}
// FIXME this is wrong I think
val recoveryEventTimeout = 10.seconds // untypedConfig.getDuration("plugin-journal-fallback.recovery-event-timeout", TimeUnit.MILLISECONDS).millis
EventsourcedSettingsImpl(
stashCapacity = stashCapacity,
internalStashOverflowStrategy,
stashingLogLevel = stashingLogLevel,
journalPluginId = "",
snapshotPluginId = "",
recoveryEventTimeout = recoveryEventTimeout
)
}
}
@InternalApi
private[persistence] final case class EventsourcedSettingsImpl(
stashCapacity: Int,
stashOverflowStrategyName: String,
stashingLogLevel: LogLevel,
journalPluginId: String,
snapshotPluginId: String,
recoveryEventTimeout: FiniteDuration
) extends EventsourcedSettings {
def withJournalPluginId(id: Option[String]): EventsourcedSettings = id match {
case Some(identifier) copy(journalPluginId = identifier)
case _ this
}
def withSnapshotPluginId(id: Option[String]): EventsourcedSettings = id match {
case Some(identifier) copy(snapshotPluginId = identifier)
case _ this
}
}

View file

@ -1,129 +1,119 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.internal.TimerSchedulerImpl
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.TimerScheduler
import akka.actor.ActorRef
import akka.{ actor a }
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.persistence.Recovery
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryPermitGranted
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
import akka.persistence._
import akka.util.ConstantFun
/** INTERNAL API */
@InternalApi
private[persistence] case class EventsourcedSetup[Command, Event, State](
private[persistence] object EventsourcedSetup {
def apply[Command, Event, State](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): EventsourcedSetup[Command, Event, State] = {
apply(
context,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler,
// values dependent on context
EventsourcedSettings(context.system))
}
def apply[Command, Event, State](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
settings: EventsourcedSettings): EventsourcedSetup[Command, Event, State] = {
new EventsourcedSetup[Command, Event, State](
context,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler,
writerIdentity = WriterIdentity.newIdentity(),
recoveryCompleted = ConstantFun.scalaAnyTwoToUnit,
tagger = (_: Event) Set.empty[String],
snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
recovery = Recovery(),
settings,
StashBuffer(settings.stashCapacity)
)
}
}
/** INTERNAL API: Carry state for the Persistent behavior implementation behaviors */
@InternalApi
private[persistence] final case class EventsourcedSetup[Command, Event, State](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
journalPluginId: String = "",
snapshotPluginId: String = "",
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery()
) extends PersistentBehavior[Command, Event, State] {
eventHandler: (State, Event) State,
writerIdentity: WriterIdentity,
recoveryCompleted: (ActorContext[Command], State) Unit,
tagger: Event Set[String],
snapshotWhen: (State, Event, Long) Boolean,
recovery: Recovery,
override def apply(ctx: typed.ActorContext[Command]): Behavior[Command] = {
DeferredBehavior[Command](ctx
TimerSchedulerImpl.wrapWithTimers[Command] { timers
new EventsourcedRequestingRecoveryPermit(
this,
ctx.asInstanceOf[ActorContext[Any]], // sorry
timers.asInstanceOf[TimerScheduler[Any]] // sorry
).narrow[Command]
settings: EventsourcedSettings,
}(ctx))
internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\
) {
import akka.actor.typed.scaladsl.adapter._
def withJournalPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(settings = settings.withJournalPluginId(id))
}
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
* Snapshot every N events
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
/**
* Change the journal plugin id that this actor should use.
*/
def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "persistence plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = id)
}
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
def withSnapshotPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = id)
copy(settings = settings.withSnapshotPluginId(id))
}
/**
* Changes the snapshot selection criteria used by this behavior.
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
copy(tagger = tagger)
def log = context.log
def copy(
initialState: State = initialState,
commandHandler: CommandHandler[Command, Event, State] = commandHandler,
eventHandler: (State, Event) State = eventHandler,
recoveryCompleted: (ActorContext[Command], State) Unit = recoveryCompleted,
tagger: Event Set[String] = tagger,
snapshotWhen: (State, Event, Long) Boolean = snapshotWhen,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
recovery: Recovery = recovery): EventsourcedSetup[Command, Event, State] =
new EventsourcedSetup[Command, Event, State](
persistenceId = persistenceId,
initialState = initialState,
commandHandler = commandHandler,
eventHandler = eventHandler,
recoveryCompleted = recoveryCompleted,
tagger = tagger,
journalPluginId = journalPluginId,
snapshotPluginId = snapshotPluginId,
snapshotWhen = snapshotWhen,
recovery = recovery)
val persistence: Persistence = Persistence(context.system.toUntyped)
val journal: ActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
def selfUntyped = context.self.toUntyped
import EventsourcedBehavior.InternalProtocol
val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.toUntyped
}

View file

@ -1,58 +1,36 @@
package akka.persistence.typed.internal
import java.util.Locale
import akka.actor.typed.{ ActorSystem, Behavior }
import akka.actor.{ DeadLetter, StashOverflowException }
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer }
import akka.actor.{ DeadLetter, StashOverflowException }
import akka.annotation.InternalApi
import akka.event.Logging.LogLevel
import akka.event.{ Logging, LoggingAdapter }
import akka.persistence._
import akka.event.Logging
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.{ StashOverflowStrategy, _ }
import akka.util.ConstantFun
import akka.{ actor a }
/** INTERNAL API: Stash management for persistent behaviors */
@InternalApi
private[akka] trait EventsourcedStashManagement {
import EventsourcedStashManagement._
import akka.actor.typed.scaladsl.adapter._
protected def log: LoggingAdapter
protected def stash(setup: EventsourcedSetup[_, _, _], stash: StashBuffer[InternalProtocol], msg: InternalProtocol): Unit = {
import setup.context
protected def extension: Persistence
val logLevel = setup.settings.stashingLogLevel
if (logLevel != Logging.OffLevel) context.log.debug("Stashing message: {}", msg) // FIXME can be log(logLevel once missing method added
protected val internalStash: StashBuffer[Any]
val internalStashOverflowStrategy: StashOverflowStrategy = setup.persistence.defaultInternalStashOverflowStrategy
private lazy val logLevel = {
val configuredLevel = extension.system.settings.config
.getString("akka.persistence.typed.log-stashing")
Logging.levelFor(configuredLevel).getOrElse(OffLevel) // this is OffLevel
}
/**
* The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash
* when the internal Stash capacity exceeded.
*/
protected val internalStashOverflowStrategy: StashOverflowStrategy =
extension.defaultInternalStashOverflowStrategy match {
case ReplyToStrategy(_)
throw new RuntimeException("ReplyToStrategy is not supported in Akka Typed, since there is no sender()!")
case other
other // the other strategies are supported
}
protected def stash(ctx: ActorContext[Any], msg: Any): Unit = {
if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg)
try internalStash.stash(msg) catch {
try stash.stash(msg) catch {
case e: StashOverflowException
internalStashOverflowStrategy match {
case DiscardToDeadLetterStrategy
val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow?
ctx.system.deadLetters.tell(DeadLetter(msg, snd, ctx.self.toUntyped))
context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped))
case ReplyToStrategy(response)
case ReplyToStrategy(_)
throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!")
case ThrowOverflowExceptionStrategy
@ -61,15 +39,16 @@ private[akka] trait EventsourcedStashManagement {
}
}
protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = {
// FIXME, yet we need to also stash not-commands, due to journal responses ...
protected def tryUnstash[C, E, S](
setup: EventsourcedSetup[C, E, S],
internalStash: StashBuffer[InternalProtocol], // TODO since may want to not have it inside setup
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
if (internalStash.nonEmpty) {
log.debug("Unstashing message: {}", internalStash.head.getClass)
internalStash.unstash(ctx, behavior, 1, ConstantFun.scalaIdentityFunction)
setup.log.debug("Unstashing message: {}", internalStash.head.getClass)
internalStash.asInstanceOf[StashBuffer[InternalProtocol]].unstash(setup.context, behavior.asInstanceOf[Behavior[InternalProtocol]], 1, ConstantFun.scalaIdentityFunction)
} else behavior
}
}
object EventsourcedStashManagement {
private val OffLevel = LogLevel(Int.MinValue)
}

View file

@ -3,11 +3,15 @@
*/
package akka.persistence.typed.scaladsl
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler }
import akka.annotation.InternalApi
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal._
import akka.persistence._
import akka.util.ConstantFun
import scala.language.implicitConversions
@ -22,29 +26,9 @@ object PersistentBehaviors {
def immutable[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: (ActorContext[Command], State, Command) Effect[Event, State],
commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
new EventsourcedSetup(
persistenceId = persistenceId,
initialState = initialState,
commandHandler = commandHandler,
eventHandler = eventHandler
)
/**
* Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known
* until the actor is started and typically based on the entityId, which
* is the actor name.
*
* TODO This will not be needed when it can be wrapped in `Actor.deferred`.
*/
@Deprecated // FIXME remove this
def persistentEntity[Command, Event, State](
persistenceIdFromActorName: String String,
initialState: State,
commandHandler: (ActorContext[Command], State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
???
PersistentBehaviorImpl(persistenceId, initialState, commandHandler, eventHandler)
/**
* The `CommandHandler` defines how to act on commands.
@ -108,7 +92,7 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
/**
* Change the journal plugin id that this actor should use.
*/
def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State]
def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State]
/**
* Change the snapshot store plugin id that this actor should use.
@ -130,3 +114,101 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State]
}
@InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
// settings: Option[EventsourcedSettings], // FIXME can't because no context available yet
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery()
) extends PersistentBehavior[Command, Event, State] {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
Behaviors.withTimers[EventsourcedBehavior.InternalProtocol] { timers
val setup = EventsourcedSetup(
ctx,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler)
.withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId)
EventsourcedRequestingRecoveryPermit(setup)
}
}.widen[Command] { case c InternalProtocol.IncomingCommand(c) } // TODO this is nice, same way applicable to mutable style
}
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
* Snapshot every N events
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
/**
* Change the journal plugin id that this actor should use.
*/
def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = if (id != "") Some(id) else None)
}
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = if (id != "") Some(id) else None)
}
/**
* Changes the snapshot selection criteria used by this behavior.
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
copy(tagger = tagger)
}