diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala
index 4530ef9359..ed35320814 100644
--- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala
+++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala
@@ -481,8 +481,6 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable {
class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon {
- import SBehaviors.BehaviorDecorators
-
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Command]("widenedListener")
super.behavior(monitor)._1.widen[Command] { case c ⇒ inbox.ref ! c; c } → inbox
diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala
index f8fad66dc0..85e9aceb1e 100644
--- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala
+++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala
@@ -4,7 +4,6 @@
package akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.scaladsl.Behaviors.BehaviorDecorators
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.scaladsl._
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala
index 3788ca63f3..b6f8bb3b6f 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala
@@ -4,12 +4,14 @@
package akka.actor.typed
import akka.actor.InvalidMessageException
+import akka.actor.typed.internal.BehaviorImpl
import scala.annotation.tailrec
-import akka.util.LineNumbers
+import akka.util.{ ConstantFun, LineNumbers, OptionVal }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC }
-import akka.util.OptionVal
+
+import scala.reflect.ClassTag
/**
* The behavior of an actor defines how it reacts to the messages that it
@@ -33,7 +35,7 @@ import akka.util.OptionVal
*/
@InternalApi
@DoNotInherit
-sealed abstract class Behavior[T] {
+sealed abstract class Behavior[T] { behavior ⇒
/**
* Narrow the type of this Behavior, which is always a safe operation. This
* method is necessary to implement the contravariant nature of Behavior
@@ -85,6 +87,26 @@ abstract class ExtensibleBehavior[T] extends Behavior[T] {
object Behavior {
+ final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
+ /**
+ * Widen the wrapped Behavior by placing a funnel in front of it: the supplied
+ * PartialFunction decides which message to pull in (those that it is defined
+ * at) and may transform the incoming message to place them into the wrapped
+ * Behavior’s type hierarchy. Signals are not transformed.
+ *
+ * Example:
+ * {{{
+ * immutable[String] { (ctx, msg) => println(msg); same }.widen[Number] {
+ * case b: BigDecimal => s"BigDecimal($b)"
+ * case i: BigInteger => s"BigInteger($i)"
+ * // drop all other kinds of Number
+ * }
+ * }}}
+ */
+ def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
+ BehaviorImpl.widened(behavior, matcher)
+ }
+
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to
@@ -175,7 +197,7 @@ object Behavior {
}
/**
- * INTERNAL API.
+ * INTERNAL API
* Not placed in internal.BehaviorImpl because Behavior is sealed.
*/
@InternalApi
@@ -185,7 +207,7 @@ object Behavior {
/** INTERNAL API */
@InternalApi
private[akka] object DeferredBehavior {
- def apply[T](factory: SAC[T] ⇒ Behavior[T]) =
+ def apply[T](factory: SAC[T] ⇒ Behavior[T]): Behavior[T] =
new DeferredBehavior[T] {
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala)
override def toString: String = s"Deferred(${LineNumbers(factory)})"
@@ -193,14 +215,14 @@ object Behavior {
}
/**
- * INTERNAL API.
+ * INTERNAL API
*/
private[akka] object SameBehavior extends Behavior[Nothing] {
override def toString = "Same"
}
/**
- * INTERNAL API.
+ * INTERNAL API
*/
private[akka] object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None)
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala
index 6f3095e1c0..6b5e4b296a 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala
@@ -320,7 +320,7 @@ object Behaviors {
/**
* Provide a MDC ("Mapped Diagnostic Context") for logging from the actor.
*
- * @param mdcForMessage Is invoked before each message to setup MDC which is then attachd to each logging statement
+ * @param mdcForMessage Is invoked before each message to setup MDC which is then attached to each logging statement
* done for that message through the [[ActorContext.getLog]]. After the message has been processed
* the MDC is cleared.
* @param behavior The behavior that this should be applied to.
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala
index f6641f21ea..5bc86e2941 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala
@@ -19,26 +19,6 @@ object Behaviors {
private val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ ()
private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)]
- final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
- /**
- * Widen the wrapped Behavior by placing a funnel in front of it: the supplied
- * PartialFunction decides which message to pull in (those that it is defined
- * at) and may transform the incoming message to place them into the wrapped
- * Behavior’s type hierarchy. Signals are not transformed.
- *
- * Example:
- * {{{
- * immutable[String] { (ctx, msg) => println(msg); same }.widen[Number] {
- * case b: BigDecimal => s"BigDecimal($b)"
- * case i: BigInteger => s"BigInteger($i)"
- * // drop all other kinds of Number
- * }
- * }}}
- */
- def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
- BehaviorImpl.widened(behavior, matcher)
- }
-
/**
* `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
* the actor is started, as opposed to [[Behaviors.immutable]] that creates the behavior instance
diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala
index 46545f9de3..101cde9204 100644
--- a/akka-actor/src/main/scala/akka/event/Logging.scala
+++ b/akka-actor/src/main/scala/akka/event/Logging.scala
@@ -444,12 +444,13 @@ object Logging {
final val DebugLevel = LogLevel(4)
/**
- * Internal Akka use only
+ * INTERNAL API
*
* Don't include the OffLevel in the AllLogLevels since we should never subscribe
* to some kind of OffEvent.
*/
- private final val OffLevel = LogLevel(Int.MinValue)
+ @InternalApi
+ private[akka] final val OffLevel = LogLevel(Int.MinValue)
/**
* Returns the LogLevel associated with the given string,
diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala
index 2a97f764c9..7931b5e35c 100644
--- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala
+++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala
@@ -28,6 +28,7 @@ import akka.japi.{ Pair ⇒ JPair }
def scalaAnyToNone[A, B]: A ⇒ Option[B] = none
def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none
def scalaAnyTwoToUnit[A, B]: (A, B) ⇒ Unit = two2unit
+ def scalaAnyTwoToTrue[A, B]: (A, B) ⇒ Boolean = two2true
def scalaAnyThreeToFalse[A, B, C]: (A, B, C) ⇒ Boolean = three2false
def javaAnyToNone[A, B]: A ⇒ Option[B] = none
def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T]
@@ -46,6 +47,8 @@ import akka.japi.{ Pair ⇒ JPair }
private val two2none = (_: Any, _: Any) ⇒ None
+ private val two2true = (_: Any, _: Any) ⇒ true
+
private val two2unit = (_: Any, _: Any) ⇒ ()
private val three2false = (_: Any, _: Any, _: Any) ⇒ false
diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf
index 15febc5cb0..465038ae15 100644
--- a/akka-persistence-typed/src/main/resources/reference.conf
+++ b/akka-persistence-typed/src/main/resources/reference.conf
@@ -1,7 +1,11 @@
akka.persistence.typed {
- # default stash buffer size for incoming messages to persistent actors
- stash-buffer-size = 1024
+ # Persistent actors stash while recovering or persisting events,
+ # this setting configures the default capacity of this stash.
+ stash-capacity = 2048
+ # If negative (or zero) then an unbounded stash is used (default)
+ # If positive then a bounded stash is used and the capacity is set using
+ # the property
# enables automatic logging of messages stashed automatically by an PersistentBehavior,
# this may happen while it receives commands while it is recovering events or while it is persisting events
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala
index 1fc7b3cbf0..0dd64fe32c 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala
@@ -24,34 +24,36 @@ private[akka] object EventsourcedBehavior {
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
private[akka] val instanceIdCounter = new AtomicInteger(1)
- @InternalApi private[akka] object WriterIdentity {
+ object WriterIdentity {
def newIdentity(): WriterIdentity = {
val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement()
val writerUuid: String = UUID.randomUUID.toString
WriterIdentity(instanceId, writerUuid)
}
}
- private[akka] final case class WriterIdentity(instanceId: Int, writerUuid: String)
+ final case class WriterIdentity(instanceId: Int, writerUuid: String)
/** Protocol used internally by the eventsourced behaviors, never exposed to user-land */
- private[akka] sealed trait EventsourcedProtocol
- private[akka] case object RecoveryPermitGranted extends EventsourcedProtocol
- private[akka] final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends EventsourcedProtocol
- private[akka] final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends EventsourcedProtocol
- private[akka] final case class RecoveryTickEvent(snapshot: Boolean) extends EventsourcedProtocol
- private[akka] final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends EventsourcedProtocol
-
- implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] {
- override def genString(b: EventsourcedBehavior[_, _, _]): String = {
- val behaviorShortName = b match {
- case _: EventsourcedRunning[_, _, _] ⇒ "running"
- case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events"
- case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap"
- case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit"
- }
- s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]"
- }
+ sealed trait InternalProtocol
+ object InternalProtocol {
+ case object RecoveryPermitGranted extends InternalProtocol
+ final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol
+ final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol
+ final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol
+ final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends InternalProtocol
+ final case class IncomingCommand[C](c: C) extends InternalProtocol
}
+ // implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] {
+ // override def genString(b: EventsourcedBehavior[_, _, _]): String = {
+ // val behaviorShortName = b match {
+ // case _: EventsourcedRunning[_, _, _] ⇒ "running"
+ // case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events"
+ // case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap"
+ // case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit"
+ // }
+ // s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]"
+ // }
+ // }
}
@@ -61,8 +63,7 @@ private[akka] trait EventsourcedBehavior[Command, Event, State] {
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
- protected def context: ActorContext[Any]
- protected def timers: TimerScheduler[Any]
+ // protected def timers: TimerScheduler[Any]
type C = Command
type AC = ActorContext[C]
@@ -72,30 +73,30 @@ private[akka] trait EventsourcedBehavior[Command, Event, State] {
// used for signaling intent in type signatures
type SeqNr = Long
- def persistenceId: String = setup.persistenceId
-
- protected def setup: EventsourcedSetup[Command, Event, State]
- protected def initialState: State = setup.initialState
- protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler
- protected def eventHandler: (State, Event) ⇒ State = setup.eventHandler
- protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = setup.snapshotWhen
- protected def tagger: Event ⇒ Set[String] = setup.tagger
-
- protected final def journalPluginId: String = setup.journalPluginId
- protected final def snapshotPluginId: String = setup.snapshotPluginId
+ // def persistenceId: String = setup.persistenceId
+ //
+ // protected def setup: EventsourcedSetup[Command, Event, State]
+ // protected def initialState: State = setup.initialState
+ // protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler
+ // protected def eventHandler: (State, Event) ⇒ State = setup.eventHandler
+ // protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = setup.snapshotWhen
+ // protected def tagger: Event ⇒ Set[String] = setup.tagger
+ //
+ // protected final def journalPluginId: String = setup.journalPluginId
+ // protected final def snapshotPluginId: String = setup.snapshotPluginId
// ------ common -------
- protected lazy val extension = Persistence(context.system.toUntyped)
- protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId)
- protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId)
-
- protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped
- protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
- case res: JournalProtocol.Response ⇒ JournalResponse(res)
- case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted
- case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res)
- case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd
- }.toUntyped
+ // protected lazy val extension = Persistence(context.system.toUntyped)
+ // protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId)
+ // protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId)
+ //
+ // protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped
+ // protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
+ // case res: JournalProtocol.Response ⇒ JournalResponse(res)
+ // case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted
+ // case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res)
+ // case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd
+ // }.toUntyped
}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala
new file mode 100644
index 0000000000..e8faad0a66
--- /dev/null
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2016-2018 Lightbend Inc.
+ */
+package akka.persistence.typed.internal
+
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import akka.annotation.InternalApi
+import akka.persistence.Eventsourced.StashingHandlerInvocation
+import akka.persistence.JournalProtocol.ReplayMessages
+import akka.persistence._
+import akka.persistence.SnapshotProtocol.LoadSnapshot
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
+
+import scala.collection.immutable
+
+@InternalApi
+private[akka] trait EventsourcedJournalInteractions {
+ import akka.actor.typed.scaladsl.adapter._
+
+ // ---------- journal interactions ---------
+
+ protected def returnRecoveryPermitOnlyOnFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable): Unit = {
+ import setup.context
+ setup.log.debug("Returning recovery permit, on failure because: " + cause.getMessage)
+ // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
+ val permitter = setup.persistence.recoveryPermitter
+ permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
+ }
+
+ type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
+ protected def internalPersist[C, E, S](
+ setup: EventsourcedSetup[C, E, S],
+ state: EventsourcedRunning.EventsourcedState[S],
+ event: EventOrTagged,
+ sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[InternalProtocol] = {
+ // pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
+ val pendingInvocations = StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) :: Nil
+
+ val newState = state.nextSequenceNr()
+
+ val senderNotKnownBecauseAkkaTyped = null
+ val repr = PersistentRepr(
+ event,
+ persistenceId = setup.persistenceId,
+ sequenceNr = newState.seqNr,
+ writerUuid = setup.writerIdentity.writerUuid,
+ sender = senderNotKnownBecauseAkkaTyped
+ )
+
+ val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
+ setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted)
+
+ EventsourcedRunning.PersistingEvents[C, E, S](setup, state, pendingInvocations, sideEffects)
+ }
+
+ protected def internalPersistAll[C, E, S](
+ setup: EventsourcedSetup[C, E, S],
+ events: immutable.Seq[EventOrTagged],
+ state: EventsourcedRunning.EventsourcedState[S],
+ sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[InternalProtocol] = {
+ if (events.nonEmpty) {
+
+ val pendingInvocations = events map { event ⇒
+ // pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
+ StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
+ }
+
+ val newState = state.nextSequenceNr()
+
+ val senderNotKnownBecauseAkkaTyped = null
+ val write = AtomicWrite(events.map(event ⇒ PersistentRepr(
+ event,
+ persistenceId = setup.persistenceId,
+ sequenceNr = newState.seqNr,
+ writerUuid = setup.writerIdentity.writerUuid,
+ sender = senderNotKnownBecauseAkkaTyped)
+ ))
+
+ setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted)
+
+ EventsourcedRunning.PersistingEvents(setup, state, pendingInvocations, sideEffects)
+ } else Behaviors.same
+ }
+
+ protected def replayEvents[C, E, S](setup: EventsourcedSetup[C, E, S], fromSeqNr: Long, toSeqNr: Long): Unit = {
+ setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
+ // reply is sent to `selfUntypedAdapted`, it is important to target that one
+ setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntypedAdapted)
+ }
+
+ protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = {
+ setup.log.debug("Returning recovery permit, reason: " + reason)
+ // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
+ setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
+ }
+
+ // ---------- snapshot store interactions ---------
+
+ /**
+ * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
+ * to the running [[PersistentActor]].
+ */
+ protected def loadSnapshot[Command](setup: EventsourcedSetup[Command, _, _], criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
+ setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntypedAdapted)
+ }
+
+ protected def internalSaveSnapshot[S](setup: EventsourcedSetup[_, _, S], state: EventsourcedRunning.EventsourcedState[S]): Unit = {
+ setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntypedAdapted)
+ }
+
+}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala
index 3fc92c8a23..8a339d4e8e 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala
@@ -4,121 +4,139 @@
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
-import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
+import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
-import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
-import akka.persistence.typed.scaladsl.PersistentBehaviors._
-import akka.util.Helpers._
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse }
+import akka.persistence.typed.internal.EventsourcedBehavior._
+import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
-/**
+/***
* INTERNAL API
*
- * Third (of four) behavior of an PersistentBehavior.
- *
- * In this behavior we finally start replaying events, beginning from the last applied sequence number
- * (i.e. the one up-until-which the snapshot recovery has brought us).
- *
- * Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed
- * and control is given to the user's handlers to drive the actors behavior from there.
+ * See next behavior [[EventsourcedRunning]].
*
*/
@InternalApi
-private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
- val setup: EventsourcedSetup[Command, Event, State],
- override val context: ActorContext[Any],
- override val timers: TimerScheduler[Any],
- override val internalStash: StashBuffer[Any],
+private[persistence] object EventsourcedRecoveringEvents extends EventsourcedJournalInteractions with EventsourcedStashManagement {
- private var sequenceNr: Long,
- val writerIdentity: WriterIdentity,
+ @InternalApi
+ private[persistence] final case class RecoveringState[State](
+ seqNr: Long,
+ state: State,
+ eventSeenInInterval: Boolean = false
+ )
- private var state: State
-) extends MutableBehavior[Any]
- with EventsourcedBehavior[Command, Event, State]
- with EventsourcedStashManagement {
- import setup._
- import Behaviors.same
- import EventsourcedBehavior._
- import akka.actor.typed.scaladsl.adapter._
+ def apply[Command, Event, State](
+ setup: EventsourcedSetup[Command, Event, State],
+ state: RecoveringState[State]
+ ): Behavior[InternalProtocol] =
+ Behaviors.setup { _ ⇒
+ startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout)
- protected val log = Logging(context.system.toUntyped, this)
+ replayEvents(setup, state.seqNr + 1L, setup.recovery.toSequenceNr)
- // -------- initialize --------
- startRecoveryTimer()
-
- replayEvents(sequenceNr + 1L, recovery.toSequenceNr)
- // ---- end of initialize ----
-
- private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
-
- // ----------
-
- def snapshotSequenceNr: Long = sequenceNr
-
- private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
- if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
-
- private def setLastSequenceNr(value: Long): Unit =
- sequenceNr = value
-
- // ----------
-
- // FIXME it's a bit of a pain to have those lazy vals, change everything to constructor parameters
- lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
-
- // protect against snapshot stalling forever because of journal overloaded and such
- private val RecoveryTickTimerKey = "recovery-tick"
- private def startRecoveryTimer(): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
- private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey)
-
- private var eventSeenInInterval = false
-
- def onCommand(cmd: Command): Behavior[Any] = {
- // during recovery, stash all incoming commands
- stash(context, cmd)
- same
- }
-
- def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try {
- response match {
- case ReplayedMessage(repr) ⇒
- eventSeenInInterval = true
- updateLastSequenceNr(repr)
- // TODO we need some state adapters here?
- val newState = eventHandler(state, repr.payload.asInstanceOf[Event])
- state = newState
- same
-
- case RecoverySuccess(highestSeqNr) ⇒
- log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr)
- cancelRecoveryTimer()
- setLastSequenceNr(highestSeqNr)
-
- try onRecoveryCompleted(state)
- catch { case NonFatal(ex) ⇒ onRecoveryFailure(ex, Some(state)) }
-
- case ReplayMessagesFailure(cause) ⇒
- onRecoveryFailure(cause, event = None)
-
- case other ⇒
- stash(context, other)
- Behaviors.same
+ withMdc(setup) {
+ stay(setup, state)
+ }
}
- } catch {
- case NonFatal(e) ⇒
- cancelRecoveryTimer()
- onRecoveryFailure(e, None)
+
+ private def stay[Command, Event, State](
+ setup: EventsourcedSetup[Command, Event, State],
+ state: RecoveringState[State]
+ ): Behavior[InternalProtocol] =
+ Behaviors.immutable {
+ case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, state, r)
+ case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r)
+ case (_, RecoveryTickEvent(snap)) ⇒ onRecoveryTick(setup, state, snap)
+ case (_, cmd @ IncomingCommand(_)) ⇒ onCommand(setup, cmd)
+ }
+
+ private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(wrapped: Behavior[InternalProtocol]) = {
+ val mdc = Map(
+ "persistenceId" → setup.persistenceId,
+ "phase" → "recover-evnts"
+ )
+
+ Behaviors.withMdc((_: Any) ⇒ mdc, wrapped)
}
- def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = {
- log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response))
- Behaviors.same // ignore the response
+ private def onJournalResponse[Command, Event, State](
+ setup: EventsourcedSetup[Command, Event, State],
+ state: RecoveringState[State],
+ response: JournalProtocol.Response): Behavior[InternalProtocol] = {
+ import setup.context.log
+ try {
+ response match {
+ case ReplayedMessage(repr) ⇒
+ // eventSeenInInterval = true
+ // updateLastSequenceNr(repr)
+
+ val newState = state.copy(
+ seqNr = repr.sequenceNr,
+ state = setup.eventHandler(state.state, repr.payload.asInstanceOf[Event])
+ )
+
+ stay(setup, newState)
+
+ case RecoverySuccess(highestSeqNr) ⇒
+ log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr)
+ cancelRecoveryTimer(setup.timers)
+
+ try onRecoveryCompleted(setup, state)
+ catch { case NonFatal(ex) ⇒ onRecoveryFailure(setup, ex, highestSeqNr, Some(state)) }
+
+ case ReplayMessagesFailure(cause) ⇒
+ onRecoveryFailure(setup, cause, state.seqNr, None)
+
+ case other ⇒
+ // stash(setup, setup.internalStash, other)
+ // Behaviors.same
+ Behaviors.unhandled
+ }
+ } catch {
+ case NonFatal(cause) ⇒
+ cancelRecoveryTimer(setup.timers)
+ onRecoveryFailure(setup, cause, state.seqNr, None)
+ }
+ }
+
+ private def onCommand[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], cmd: InternalProtocol): Behavior[InternalProtocol] = {
+ // during recovery, stash all incoming commands
+ stash(setup, setup.internalStash, cmd)
+ Behaviors.same
+ }
+
+ // FYI, have to keep carrying all [C,E,S] everywhere as otherwise ending up with:
+ // [error] /Users/ktoso/code/akka/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala:117:14: type mismatch;
+ // [error] found : akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,_$2] where type _$2, type _$1
+ // [error] required: akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,Any] where type _$1
+ // [error] Note: _$2 <: Any, but class EventsourcedSetup is invariant in type State.
+ // [error] You may wish to define State as +State instead. (SLS 4.5)
+ // [error] Error occurred in an application involving default arguments.
+ // [error] stay(setup, state.copy(eventSeenInInterval = false))
+ // [error] ^
+ protected def onRecoveryTick[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], state: RecoveringState[State], snapshot: Boolean): Behavior[InternalProtocol] =
+ if (!snapshot) {
+ if (state.eventSeenInInterval) {
+ stay(setup, state.copy(eventSeenInInterval = false))
+ } else {
+ cancelRecoveryTimer(setup.timers)
+ val msg = s"Recovery timed out, didn't get event within ${setup.settings.recoveryEventTimeout}, highest sequence number seen ${state.seqNr}"
+ onRecoveryFailure(setup, new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this?
+ }
+ } else {
+ // snapshot timeout, but we're already in the events recovery phase
+ Behavior.unhandled
+ }
+
+ def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
+ setup.log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response))
+ Behaviors.unhandled // ignore the response
}
/**
@@ -129,87 +147,40 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
- protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = {
- returnRecoveryPermit("on recovery failure: " + cause.getMessage)
- cancelRecoveryTimer()
+ protected def onRecoveryFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, sequenceNr: Long, event: Option[Any]): Behavior[InternalProtocol] = {
+ returnRecoveryPermit(setup, "on recovery failure: " + cause.getMessage)
+ cancelRecoveryTimer(setup.timers)
event match {
case Some(evt) ⇒
- log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
+ setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
Behaviors.stopped
case None ⇒
- log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", persistenceId, sequenceNr)
+ setup.log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", setup.persistenceId, sequenceNr)
Behaviors.stopped
}
}
- protected def onRecoveryCompleted(state: State): Behavior[Any] = {
- try {
- returnRecoveryPermit("recovery completed successfully")
- recoveryCompleted(commandContext, state)
+ protected def onRecoveryCompleted[C, E, S](setup: EventsourcedSetup[C, E, S], state: RecoveringState[S]): Behavior[InternalProtocol] = try {
+ returnRecoveryPermit(setup, "recovery completed successfully")
+ setup.recoveryCompleted(setup.commandContext, state.state)
- val running = new EventsourcedRunning[Command, Event, State](
- setup,
- context,
- timers,
- internalStash,
+ val running = EventsourcedRunning.HandlingCommands[C, E, S](
+ setup,
+ EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state)
+ )
- sequenceNr,
- writerIdentity,
-
- state
- )
-
- tryUnstash(context, running)
- } finally {
- cancelRecoveryTimer()
- }
+ tryUnstash(setup, setup.internalStash, running)
+ } finally {
+ cancelRecoveryTimer(setup.timers)
}
- protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] =
- if (!snapshot) {
- if (!eventSeenInInterval) {
- cancelRecoveryTimer()
- val msg = s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr"
- onRecoveryFailure(new RecoveryTimedOut(msg), event = None) // TODO allow users to hook into this?
- } else {
- eventSeenInInterval = false
- same
- }
- } else {
- // snapshot timeout, but we're already in the events recovery phase
- Behavior.unhandled
- }
-
- // ----------
-
- override def onMessage(msg: Any): Behavior[Any] = {
- msg match {
- // TODO explore crazy hashcode hack to make this match quicker...?
- case JournalResponse(r) ⇒ onJournalResponse(r)
- case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot)
- case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r)
- case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly
- }
- }
-
- // ----------
-
- // ---------- journal interactions ---------
-
- private def replayEvents(fromSeqNr: SeqNr, toSeqNr: SeqNr): Unit = {
- log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
- // reply is sent to `selfUntypedAdapted`, it is important to target that one
- journal ! ReplayMessages(fromSeqNr, toSeqNr, recovery.replayMax, persistenceId, selfUntypedAdapted)
- }
-
- private def returnRecoveryPermit(reason: String): Unit = {
- log.debug("Returning recovery permit, reason: " + reason)
- // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
- extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped)
- }
-
- override def toString = s"EventsourcedRecoveringEvents($persistenceId)"
+ // protect against snapshot stalling forever because of journal overloaded and such
+ private val RecoveryTickTimerKey = "recovery-tick"
+ private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit =
+ timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
+ private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(RecoveryTickTimerKey)
}
+
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala
index 65a3c1d452..6bcf3401fe 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala
@@ -4,14 +4,14 @@
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
-import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
+import akka.actor.typed.scaladsl.Behaviors.same
+import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.annotation.InternalApi
-import akka.event.Logging
-import akka.persistence.SnapshotProtocol.{ LoadSnapshot, LoadSnapshotFailed, LoadSnapshotResult }
+import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence._
-import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
-import akka.util.Helpers._
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
+import akka.{ actor ⇒ a }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
@@ -20,6 +20,7 @@ import scala.util.{ Failure, Success, Try }
* INTERNAL API
*
* Second (of four) behavior of an PersistentBehavior.
+ * See next behavior [[EventsourcedRecoveringEvents]].
*
* In this behavior the recovery process is initiated.
* We try to obtain a snapshot from the configured snapshot store,
@@ -29,71 +30,89 @@ import scala.util.{ Failure, Success, Try }
* recovery of events continues in [[EventsourcedRecoveringEvents]].
*/
@InternalApi
-final class EventsourcedRecoveringSnapshot[Command, Event, State](
- val setup: EventsourcedSetup[Command, Event, State],
- override val context: ActorContext[Any],
- override val timers: TimerScheduler[Any],
- override val internalStash: StashBuffer[Any],
+object EventsourcedRecoveringSnapshot extends EventsourcedJournalInteractions with EventsourcedStashManagement {
- val writerIdentity: WriterIdentity
-) extends MutableBehavior[Any]
- with EventsourcedBehavior[Command, Event, State]
- with EventsourcedStashManagement {
- import setup._
+ def apply[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = {
+ startRecoveryTimer(setup)
- import Behaviors.same
- import EventsourcedBehavior._
- import akka.actor.typed.scaladsl.adapter._
+ withMdc(setup) {
+ Behaviors.immutable {
+ case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r)
+ case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, r)
+ case (_, RecoveryTickEvent(snapshot)) ⇒ onRecoveryTick(setup, snapshot)
+ case (_, cmd: IncomingCommand[Command]) ⇒ onCommand(setup, cmd)
+ }
+ }
+ }
- protected val log = Logging(context.system.toUntyped, this)
+ def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
+ val mdc = Map(
+ "persistenceId" → setup.persistenceId,
+ "phase" → "recover-snap"
+ )
+ Behaviors.withMdc(_ ⇒ mdc, b)
+ }
- // -------- initialize --------
- startRecoveryTimer()
+ /**
+ * Called whenever a message replay fails. By default it logs the error.
+ *
+ * The actor is always stopped after this method has been invoked.
+ *
+ * @param cause failure cause.
+ * @param event the event that was processed in `receiveRecover`, if the exception was thrown there
+ */
+ private def onRecoveryFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = {
+ cancelRecoveryTimer(setup.timers)
- loadSnapshot(persistenceId, recovery.fromSnapshot, recovery.toSequenceNr)
- // ---- end of initialize ----
+ val lastSequenceNr = 0 // FIXME not needed since snapshot == 0
+ event match {
+ case Some(evt) ⇒
+ setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
+ "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, setup.persistenceId)
+ Behaviors.stopped
- val commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
+ case None ⇒
+ setup.log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
+ "Last known sequence number [{}]", setup.persistenceId, lastSequenceNr)
+ Behaviors.stopped
+ }
+ }
- // ----------
-
- protected var awaitingSnapshot: Boolean = true
-
- // ----------
-
- private var lastSequenceNr: Long = 0L
- def snapshotSequenceNr: Long = lastSequenceNr
-
- // ----------
-
- lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
+ private def onRecoveryTick[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], snapshot: Boolean): Behavior[InternalProtocol] =
+ if (snapshot) {
+ // we know we're in snapshotting mode; snapshot recovery timeout arrived
+ val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}")
+ onRecoveryFailure(setup, ex, event = None)
+ } else same // ignore, since we received the snapshot already
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
- private def startRecoveryTimer(): Unit = {
- timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
- }
- private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey)
- def onCommand(cmd: Command): Behavior[Any] = {
+ private def startRecoveryTimer(setup: EventsourcedSetup[_, _, _]): Unit = {
+ setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout)
+ }
+
+ private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey)
+
+ def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
- stash(context, cmd)
+ setup.internalStash.stash(cmd) // TODO move stash out as it's mutable
Behavior.same
}
- def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try {
+ def onJournalResponse[Command](setup: EventsourcedSetup[_, _, _], response: JournalProtocol.Response): Behavior[InternalProtocol] = try {
throw new Exception("Should not talk to journal yet! But got: " + response)
} catch {
case NonFatal(cause) ⇒
- returnRecoveryPermitOnlyOnFailure(cause)
+ returnRecoveryPermitOnlyOnFailure(setup, cause)
throw cause
}
- def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = try {
+ def onSnapshotterResponse[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = try {
response match {
case LoadSnapshotResult(sso, toSnr) ⇒
- var state: S = initialState
- val re: Try[SeqNr] = Try {
+ var state: State = setup.initialState
+ val re: Try[Long] = Try {
sso match {
case Some(SelectedSnapshot(metadata, snapshot)) ⇒
state = snapshot.asInstanceOf[State]
@@ -106,106 +125,38 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State](
re match {
case Success(seqNr) ⇒
- lastSequenceNr = seqNr
- replayMessages(state, toSnr)
+ replayMessages(setup, state, seqNr, toSnr)
case Failure(cause) ⇒
// FIXME better exception type
- val ex = new RuntimeException(s"Failed to recover state for [$persistenceId] from snapshot offer.", cause)
- onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME
+ val ex = new RuntimeException(s"Failed to recover state for [${setup.persistenceId}] from snapshot offer.", cause)
+ onRecoveryFailure(setup, ex, event = None) // FIXME the failure logs has bad messages... FIXME
}
case LoadSnapshotFailed(cause) ⇒
- cancelRecoveryTimer()
+ cancelRecoveryTimer(setup.timers)
- onRecoveryFailure(cause, event = None)
+ onRecoveryFailure(setup, cause, event = None)
- case other ⇒
- stash(context, other)
- same
+ case _ ⇒
+ Behaviors.unhandled
}
} catch {
case NonFatal(cause) ⇒
- returnRecoveryPermitOnlyOnFailure(cause)
+ returnRecoveryPermitOnlyOnFailure(setup, cause)
throw cause
}
- private def replayMessages(state: State, toSnr: SeqNr): Behavior[Any] = {
- cancelRecoveryTimer()
+ private def replayMessages[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], state: State, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
+ cancelRecoveryTimer(setup.timers)
- val rec = recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types
+ val rec = setup.recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types
- new EventsourcedRecoveringEvents[Command, Event, State](
+ EventsourcedRecoveringEvents[Command, Event, State](
setup.copy(recovery = rec),
- context,
- timers,
- internalStash,
-
- lastSequenceNr,
- writerIdentity,
-
- state
+ // setup.internalStash, // TODO move it out of setup
+ EventsourcedRecoveringEvents.RecoveringState(lastSequenceNr, state)
)
}
- /**
- * Called whenever a message replay fails. By default it logs the error.
- *
- * The actor is always stopped after this method has been invoked.
- *
- * @param cause failure cause.
- * @param event the event that was processed in `receiveRecover`, if the exception was thrown there
- */
- protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = {
- cancelRecoveryTimer()
- event match {
- case Some(evt) ⇒
- log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
- "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId)
- Behaviors.stopped
-
- case None ⇒
- log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
- "Last known sequence number [{}]", persistenceId, lastSequenceNr)
- Behaviors.stopped
- }
- }
-
- protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] =
- // we know we're in snapshotting mode
- if (snapshot) onRecoveryFailure(new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), event = None)
- else same // ignore, since we received the snapshot already
-
- // ----------
-
- override def onMessage(msg: Any): Behavior[Any] = {
- msg match {
- // TODO explore crazy hashcode hack to make this match quicker...?
- case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r)
- case JournalResponse(r) ⇒ onJournalResponse(r)
- case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot)
- case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly
- }
- }
-
- // ----------
-
- // ---------- journal interactions ---------
-
- /**
- * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
- * to the running [[PersistentActor]].
- */
- private def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
- snapshotStore.tell(LoadSnapshot(persistenceId, criteria, toSequenceNr), selfUntypedAdapted)
- }
-
- private def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = {
- log.debug("Returning recovery permit, on failure because: " + cause.getMessage)
- // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
- extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped)
- }
-
- override def toString = s"EventsourcedRecoveringSnapshot($persistenceId)"
-
}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala
index 80d7705a85..aaccc403a6 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala
@@ -9,81 +9,59 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerSc
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence._
-import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
+import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
/**
* INTERNAL API
*
* First (of four) behaviour of an PersistentBehaviour.
+ * See next behavior [[EventsourcedRecoveringSnapshot]].
*
* Requests a permit to start recovering this actor; this is tone to avoid
* hammering the journal with too many concurrently recovering actors.
+ *
*/
@InternalApi
-private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, State](
- val setup: EventsourcedSetup[Command, Event, State],
- override val context: ActorContext[Any],
- override val timers: TimerScheduler[Any]
-
-) extends MutableBehavior[Any]
- with EventsourcedBehavior[Command, Event, State]
- with EventsourcedStashManagement {
- import setup._
-
+private[akka] object EventsourcedRequestingRecoveryPermit extends EventsourcedStashManagement {
import akka.actor.typed.scaladsl.adapter._
- // has to be lazy, since we want to obtain the persistenceId
- protected lazy val log = Logging(context.system.toUntyped, this)
+ def apply[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = {
+ // request a permit, as only once we obtain one we can start recovering
+ requestRecoveryPermit(setup.context, setup.persistence)
- override protected val internalStash: StashBuffer[Any] = {
- val stashSize = context.system.settings.config
- .getInt("akka.persistence.typed.stash-buffer-size")
- StashBuffer[Any](stashSize)
- }
+ withMdc(setup) {
+ Behaviors.immutable[InternalProtocol] {
+ case (_, InternalProtocol.RecoveryPermitGranted) ⇒ // FIXME types
+ becomeRecovering(setup)
- // --- initialization ---
- // only once we have a permit, we can become active:
- requestRecoveryPermit()
-
- val writerIdentity: WriterIdentity = WriterIdentity.newIdentity()
-
- // --- end of initialization ---
-
- // ----------
-
- def becomeRecovering(): Behavior[Any] = {
- log.debug(s"Initializing snapshot recovery: {}", recovery)
-
- new EventsourcedRecoveringSnapshot(
- setup,
- context,
- timers,
- internalStash,
-
- writerIdentity
- )
- }
-
- // ----------
-
- override def onMessage(msg: Any): Behavior[Any] = {
- msg match {
- case RecoveryPermitter.RecoveryPermitGranted ⇒
- log.debug("Awaiting permit, received: RecoveryPermitGranted")
- becomeRecovering()
-
- case other ⇒
- stash(context, other)
- Behaviors.same
+ case (_, other) ⇒
+ stash(setup, setup.internalStash, other)
+ Behaviors.same
+ }
}
}
+ private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(wrapped: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
+ val mdc = Map(
+ "persistenceId" → setup.persistenceId,
+ "phase" → "awaiting-permit"
+ )
+
+ Behaviors.withMdc(_ ⇒ mdc, wrapped)
+ }
+
+ private def becomeRecovering[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = {
+ setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)
+
+ EventsourcedRecoveringSnapshot(setup)
+ }
+
// ---------- journal interactions ---------
- private def requestRecoveryPermit(): Unit = {
+ private def requestRecoveryPermit[Command](context: ActorContext[Command], persistence: Persistence): Unit = {
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
- extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped)
+ val selfUntyped = context.self.toUntyped
+ persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped)
}
- override def toString = s"EventsourcedRequestingRecoveryPermit($persistenceId)"
}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala
index 586159f0ac..690365c62e 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala
@@ -13,7 +13,8 @@ import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandler
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
-import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse }
import scala.annotation.tailrec
import scala.collection.immutable
@@ -34,224 +35,206 @@ import scala.collection.immutable
* which perform the Persistence extension lookup on creation and similar things (config lookup)
*
*/
-@InternalApi
-class EventsourcedRunning[Command, Event, State](
- val setup: EventsourcedSetup[Command, Event, State],
- override val context: ActorContext[Any],
- override val timers: TimerScheduler[Any],
- override val internalStash: StashBuffer[Any],
+@InternalApi object EventsourcedRunning extends EventsourcedJournalInteractions with EventsourcedStashManagement {
- private var sequenceNr: Long,
- val writerIdentity: WriterIdentity,
+ final case class EventsourcedState[State](
+ seqNr: Long,
+ state: State,
+ pendingInvocations: immutable.Seq[PendingHandlerInvocation] = Nil
+ ) {
- private var state: State
-) extends MutableBehavior[Any]
- with EventsourcedBehavior[Command, Event, State]
- with EventsourcedStashManagement { same ⇒
- import setup._
+ def nextSequenceNr(): EventsourcedState[State] =
+ copy(seqNr = seqNr + 1)
- import EventsourcedBehavior._
- import akka.actor.typed.scaladsl.adapter._
+ def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] =
+ if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this
- protected val log = Logging(context.system.toUntyped, this)
+ def popApplyPendingInvocation(repr: PersistentRepr): EventsourcedState[State] = {
+ val (headSeq, remainingInvocations) = pendingInvocations.splitAt(1)
+ headSeq.head.handler(repr.payload)
- private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
+ copy(
+ pendingInvocations = remainingInvocations,
+ seqNr = repr.sequenceNr
+ )
+ }
- // ----------
-
- // Holds callbacks for persist calls (note that we do not implement persistAsync currently)
- private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty
- private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
-
- // ----------
-
- private def snapshotSequenceNr: Long = sequenceNr
-
- private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
- if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
- private def nextSequenceNr(): Long = {
- sequenceNr += 1L
- sequenceNr
- }
- // ----------
-
- private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = {
- response match {
- case SaveSnapshotSuccess(meta) ⇒
- log.debug("Save snapshot successful: " + meta)
- same
- case SaveSnapshotFailure(meta, ex) ⇒
- log.error(ex, "Save snapshot failed! " + meta)
- same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
+ def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = {
+ val updated = setup.eventHandler(state, event)
+ copy(state = updated)
}
}
- // ----------
+ // ===============================================
- trait EventsourcedRunningPhase {
- def name: String
- def onCommand(c: Command): Behavior[Any]
- def onJournalResponse(response: JournalProtocol.Response): Behavior[Any]
- }
-
- object HandlingCommands extends EventsourcedRunningPhase {
- def name = "HandlingCommands"
-
- final override def onCommand(command: Command): Behavior[Any] = {
- val effect = commandHandler(commandContext, state, command)
- applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
+ object HandlingCommands {
+ def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
+ withMdc(setup, "run-cmnds") {
+ Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
+ case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r)
+ case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, r)
+ case (_, IncomingCommand(c: C @unchecked)) ⇒ onCommand(setup, state, c)
+ }
+ }
}
- final override def onJournalResponse(response: Response): Behavior[Any] = {
- // should not happen, what would it reply?
- throw new RuntimeException("Received message which should not happen in Running state!")
+
+ private def onJournalResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: Response): Behavior[InternalProtocol] = {
+ // TODO ignore, could happen if actor was restarted?
+ Behaviors.unhandled
+ }
+
+ private def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
+ val effect = setup.commandHandler(setup.commandContext, state.state, cmd)
+ applyEffects(setup, cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
}
- object PersistingEventsNoSideEffects extends PersistingEvents(Nil)
+ // ===============================================
- sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase {
- def name = "PersistingEvents"
+ object PersistingEvents {
- final override def onCommand(c: Command): Behavior[Any] = {
- stash(context, c)
- same
+ def apply[C, E, S](
+ setup: EventsourcedSetup[C, E, S],
+ state: EventsourcedState[S],
+ pendingInvocations: immutable.Seq[PendingHandlerInvocation],
+ sideEffects: immutable.Seq[ChainableEffect[_, S]]
+ ): Behavior[InternalProtocol] = {
+ withMdc(setup, "run-persist-evnts") {
+ Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
+ case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r)
+ case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, state, pendingInvocations, sideEffects, r)
+ case (_, in: IncomingCommand[C @unchecked]) ⇒ onCommand(setup, state, in)
+ }
+ }
}
- final override def onJournalResponse(response: Response): Behavior[Any] = {
- log.debug("Received Journal response: {}", response)
+ def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
+ stash(setup, setup.internalStash, cmd)
+ Behaviors.same
+ }
+
+ final def onJournalResponse[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S],
+ pendingInvocations: immutable.Seq[PendingHandlerInvocation],
+ sideEffects: immutable.Seq[ChainableEffect[_, S]],
+ response: Response): Behavior[InternalProtocol] = {
+ setup.log.debug("Received Journal response: {}", response)
response match {
case WriteMessageSuccess(p, id) ⇒
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
- if (id == writerIdentity.instanceId) {
- updateLastSequenceNr(p)
- popApplyHandler(p.payload)
- onWriteMessageComplete()
- tryUnstash(context, applySideEffects(sideEffects))
- } else same
+ if (id == setup.writerIdentity.instanceId) {
+ val newState = state.popApplyPendingInvocation(p)
+
+ // only once all things are applied we can revert back
+ if (newState.pendingInvocations.nonEmpty) Behaviors.same
+ else tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, newState))
+ } else Behaviors.same
case WriteMessageRejected(p, cause, id) ⇒
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
- if (id == writerIdentity.instanceId) {
- updateLastSequenceNr(p)
- onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop
- tryUnstash(context, applySideEffects(sideEffects))
- } else same
+ if (id == setup.writerIdentity.instanceId) {
+ val newState = state.updateLastSequenceNr(p)
+ onPersistRejected(setup, cause, p.payload, p.sequenceNr) // does not stop (by design)
+ tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, newState))
+ } else Behaviors.same
case WriteMessageFailure(p, cause, id) ⇒
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
- if (id == writerIdentity.instanceId) {
- onWriteMessageComplete()
- onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
- } else same
+ if (id == setup.writerIdentity.instanceId) {
+ // onWriteMessageComplete() -> tryBecomeHandlingCommands
+ onPersistFailureThenStop(setup, cause, p.payload, p.sequenceNr)
+ } else Behaviors.same
case WriteMessagesSuccessful ⇒
// ignore
- same
+ Behaviors.same
case WriteMessagesFailed(_) ⇒
// ignore
- same // it will be stopped by the first WriteMessageFailure message; not applying side effects
+ Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _: LoopMessageSuccess ⇒
// ignore, should never happen as there is no persistAsync in typed
- same
+ Behaviors.same
}
}
- private def onWriteMessageComplete(): Unit =
- tryBecomeHandlingCommands()
+ // private def onWriteMessageComplete(): Unit =
+ // tryBecomeHandlingCommands()
- private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
- log.error(
+ private def onPersistRejected[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Any, seqNr: Long): Unit = {
+ setup.log.error(
cause,
"Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
- event.getClass.getName, seqNr, persistenceId, cause.getMessage)
+ event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage)
}
- private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = {
- log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
- event.getClass.getName, seqNr, persistenceId)
+ private def onPersistFailureThenStop[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = {
+ setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
+ event.getClass.getName, seqNr, setup.persistenceId)
// FIXME see #24479 for reconsidering the stopping behaviour
Behaviors.stopped
}
}
+ // --------------------------
- // the active phase switches between PersistingEvents and HandlingCommands;
- // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect
- private[this] var phase: EventsourcedRunningPhase = HandlingCommands
+ private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S], phase: String)(wrapped: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
+ val mdc = Map(
+ "persistenceId" → setup.persistenceId,
+ "phase" → phase
+ )
- override def onMessage(msg: Any): Behavior[Any] = {
- msg match {
- // TODO explore crazy hashcode hack to make this match quicker...?
- case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r)
- case JournalResponse(r) ⇒ phase.onJournalResponse(r)
- case command: Command @unchecked ⇒
- // the above type-check does nothing, since Command is tun
- // we cast explicitly to fail early in case of type mismatch
- val c = command.asInstanceOf[Command]
- phase.onCommand(c)
+ // FIXME remove need for class tag!!!
+ Behaviors.withMdc[Any]((_: Any) ⇒ mdc, wrapped.asInstanceOf[Behavior[Any]]).asInstanceOf[Behavior[InternalProtocol]]
+ }
+
+ // --------------------------
+
+ private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
+ response match {
+ case SaveSnapshotSuccess(meta) ⇒
+ setup.context.log.debug("Save snapshot successful: " + meta)
+ Behaviors.same
+ case SaveSnapshotFailure(meta, ex) ⇒
+ setup.context.log.error(ex, "Save snapshot failed! " + meta)
+ Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
}
}
- // ----------
+ private def applyEvent[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], event: E): S =
+ setup.eventHandler(state.state, event)
- def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
- var res: Behavior[Any] = same
- val it = effects.iterator
+ @tailrec private def applyEffects[C, E, S](
+ setup: EventsourcedSetup[C, E, S],
+ msg: Any,
+ state: EventsourcedState[S],
+ effect: EffectImpl[E, S],
+ sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil
+ ): Behavior[InternalProtocol] = {
+ import setup.log
- // if at least one effect results in a `stop`, we need to stop
- // manual loop implementation to avoid allocations and multiple scans
- while (it.hasNext) {
- val effect = it.next()
- applySideEffect(effect) match {
- case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped
- case _ ⇒ // nothing to do
- }
- }
-
- res
- }
-
- def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match {
- case _: Stop.type @unchecked ⇒
- Behaviors.stopped
-
- case SideEffect(sideEffects) ⇒
- sideEffects(state)
- same
-
- case _ ⇒
- throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
- }
-
- def applyEvent(s: S, event: E): S =
- eventHandler(s, event)
-
- @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = {
if (log.isDebugEnabled)
log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
effect match {
- case CompositeEffect(e, currentSideEffects) ⇒
+ case CompositeEffect(eff, currentSideEffects) ⇒
// unwrap and accumulate effects
- applyEffects(msg, e, currentSideEffects ++ sideEffects)
+ applyEffects(setup, msg, state, eff, currentSideEffects ++ sideEffects)
case Persist(event) ⇒
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
- state = applyEvent(state, event)
- val tags = tagger(event)
- val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
+ val newState = state.applyEvent(setup, event)
+ val eventToPersist = tagEvent(setup, event)
- internalPersist(eventToPersist, sideEffects) { _ ⇒
- if (snapshotWhen(state, event, sequenceNr))
- internalSaveSnapshot(state)
+ internalPersist(setup, state, eventToPersist, sideEffects) { _ ⇒
+ if (setup.snapshotWhen(newState.state, event, newState.seqNr))
+ internalSaveSnapshot(setup, state)
}
case PersistAll(events) ⇒
@@ -260,104 +243,364 @@ class EventsourcedRunning[Command, Event, State](
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var count = events.size
- var seqNr = sequenceNr
- val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
- case ((currentState, snapshot), event) ⇒
- seqNr += 1
- val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr)
- (applyEvent(currentState, event), shouldSnapshot)
- }
- state = newState
- val eventsToPersist = events.map { event ⇒
- val tags = tagger(event)
- if (tags.isEmpty) event else Tagged(event, tags)
- }
+ // var seqNr = state.seqNr
+ val (newState, shouldSnapshotAfterPersist) =
+ events.foldLeft((state, false)) {
+ case ((currentState, snapshot), event) ⇒
+ val value = currentState
+ .nextSequenceNr()
+ .applyEvent(setup, event)
- internalPersistAll(eventsToPersist, sideEffects) { _ ⇒
+ val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr)
+ (value, shouldSnapshot)
+ }
+ // state = newState
+
+ val eventsToPersist = events.map { tagEvent(setup, _) }
+
+ internalPersistAll(setup, eventsToPersist, newState, sideEffects) { _ ⇒
count -= 1
if (count == 0) {
- sideEffects.foreach(applySideEffect)
+ // FIXME the result of applying side effects is ignored
+ val b = applySideEffects(sideEffects, newState)
+
if (shouldSnapshotAfterPersist)
- internalSaveSnapshot(state)
+ internalSaveSnapshot(setup, newState)
}
}
} else {
// run side-effects even when no events are emitted
- tryUnstash(context, applySideEffects(sideEffects))
+ tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, state))
}
- case e: PersistNothing.type @unchecked ⇒
- tryUnstash(context, applySideEffects(sideEffects))
+ case _: PersistNothing.type @unchecked ⇒
+ tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, state))
case _: Unhandled.type @unchecked ⇒
- applySideEffects(sideEffects)
+ applySideEffects(sideEffects, state)
Behavior.unhandled
case c: ChainableEffect[_, S] ⇒
- applySideEffect(c)
+ applySideEffect(c, state)
}
}
- private def popApplyHandler(payload: Any): Unit =
- pendingInvocations.pop().handler(payload)
-
- private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
- if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException(
- "Attempted to become PersistingEvents while already in this phase! Logic error?")
-
- phase =
- if (sideEffects.isEmpty) PersistingEventsNoSideEffects
- else new PersistingEvents(sideEffects)
-
- same
+/***/
+ private def tagEvent[S, E, C](setup: EventsourcedSetup[C, E, S], event: E): Any = {
+ val tags = setup.tagger(event)
+ if (tags.isEmpty) event else Tagged(event, tags)
}
- private def tryBecomeHandlingCommands(): Behavior[Any] = {
- if (phase == HandlingCommands) throw new IllegalArgumentException(
- "Attempted to become HandlingCommands while already in this phase! Logic error?")
+ def applySideEffects[S](effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
+ var res: Behavior[InternalProtocol] = Behaviors.same
+ val it = effects.iterator
- if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN?
- phase = HandlingCommands
- }
-
- same
- }
-
- // ---------- journal interactions ---------
-
- // Any since can be `E` or `Tagged`
- private def internalPersist(event: Any, sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = {
- pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
-
- val senderNotKnownBecauseAkkaTyped = null
- val repr = PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped)
-
- val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
- journal.tell(JournalProtocol.WriteMessages(eventBatch, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted)
-
- becomePersistingEvents(sideEffects)
- }
-
- private def internalPersistAll(events: immutable.Seq[Any], sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = {
- if (events.nonEmpty) {
- val senderNotKnownBecauseAkkaTyped = null
-
- events.foreach { event ⇒
- pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
+ // if at least one effect results in a `stop`, we need to stop
+ // manual loop implementation to avoid allocations and multiple scans
+ while (it.hasNext) {
+ val effect = it.next()
+ applySideEffect(effect, state) match {
+ case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped
+ case _ ⇒ // nothing to do
}
+ }
- val write = AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId,
- sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped)))
-
- journal.tell(JournalProtocol.WriteMessages(write :: Nil, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted)
-
- becomePersistingEvents(sideEffects)
- } else same
+ res
}
- private def internalSaveSnapshot(snapshot: State): Unit = {
- snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(persistenceId, snapshotSequenceNr), snapshot), selfUntypedAdapted)
+ def applySideEffect[S](effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match {
+ case _: Stop.type @unchecked ⇒
+ Behaviors.stopped
+
+ case SideEffect(sideEffects) ⇒
+ sideEffects(state.state)
+ Behaviors.same
+
+ case _ ⇒
+ throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
}
- override def toString = s"EventsourcedRunning($persistenceId,${phase.name})"
}
+
+//@InternalApi
+//class EventsourcedRunning[Command, Event, State](
+// val setup: EventsourcedSetup[Command, Event, State],
+// // internalStash: StashBuffer[Any], // FIXME separate or in settings?
+//
+// private var sequenceNr: Long,
+// val writerIdentity: WriterIdentity,
+//
+// private var state: State
+//) extends MutableBehavior[Any]
+// with EventsourcedBehavior[Command, Event, State]
+// with EventsourcedStashManagement { same ⇒
+// import setup._
+//
+// import EventsourcedBehavior._
+// import akka.actor.typed.scaladsl.adapter._
+//
+// // Holds callbacks for persist calls (note that we do not implement persistAsync currently)
+// private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty
+// private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
+//
+// // ----------
+////
+//// private def snapshotSequenceNr: Long = sequenceNr
+////
+//// private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
+//// if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
+//// private def nextSequenceNr(): Long = {
+//// sequenceNr += 1L
+//// sequenceNr
+//// }
+// // ----------
+//
+// private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[C] = {
+// response match {
+// case SaveSnapshotSuccess(meta) ⇒
+// setup.context.log.debug("Save snapshot successful: " + meta)
+// Behaviors.same
+// case SaveSnapshotFailure(meta, ex) ⇒
+// setup.context.log.error(ex, "Save snapshot failed! " + meta)
+// Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
+// }
+// }
+//
+// // ----------
+//
+// trait EventsourcedRunningPhase {
+// def name: String
+// def onCommand(c: Command): Behavior[Any]
+// def onJournalResponse(response: JournalProtocol.Response): Behavior[Any]
+// }
+//
+//// object HandlingCommands extends EventsourcedRunningPhase {
+//// def name = "HandlingCommands"
+////
+//// final override def onCommand(command: Command): Behavior[Any] = {
+//// val effect = commandHandler(commandContext, state, command)
+//// applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
+//// }
+//// final override def onJournalResponse(response: Response): Behavior[Any] = {
+//// // ignore, could happen if actor was restarted?
+//// }
+//// }
+//
+// object PersistingEventsNoSideEffects extends PersistingEvents(Nil)
+//
+// sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase {
+// def name = "PersistingEvents"
+//
+// final override def onCommand(c: Command): Behavior[Any] = {
+// stash(setup, context, c)
+// same
+// }
+//
+// final override def onJournalResponse(response: Response): Behavior[Any] = {
+// log.debug("Received Journal response: {}", response)
+// response match {
+// case WriteMessageSuccess(p, id) ⇒
+// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
+// // while message is in flight, in that case we ignore the call to the handler
+// if (id == writerIdentity.instanceId) {
+// updateLastSequenceNr(p)
+// popApplyHandler(p.payload)
+// onWriteMessageComplete()
+// tryUnstash(setup, internalStash, applySideEffects(sideEffects))
+// } else same
+//
+// case WriteMessageRejected(p, cause, id) ⇒
+// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
+// // while message is in flight, in that case the handler has already been discarded
+// if (id == writerIdentity.instanceId) {
+// updateLastSequenceNr(p)
+// onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop
+// tryUnstash(setup, applySideEffects(sideEffects))
+// } else same
+//
+// case WriteMessageFailure(p, cause, id) ⇒
+// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
+// // while message is in flight, in that case the handler has already been discarded
+// if (id == writerIdentity.instanceId) {
+// onWriteMessageComplete()
+// onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
+// } else same
+//
+// case WriteMessagesSuccessful ⇒
+// // ignore
+// same
+//
+// case WriteMessagesFailed(_) ⇒
+// // ignore
+// same // it will be stopped by the first WriteMessageFailure message; not applying side effects
+//
+// case _: LoopMessageSuccess ⇒
+// // ignore, should never happen as there is no persistAsync in typed
+// same
+// }
+// }
+//
+// private def onWriteMessageComplete(): Unit =
+// tryBecomeHandlingCommands()
+//
+// private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
+// log.error(
+// cause,
+// "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
+// event.getClass.getName, seqNr, persistenceId, cause.getMessage)
+// }
+//
+// private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = {
+// log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
+// event.getClass.getName, seqNr, persistenceId)
+//
+// // FIXME see #24479 for reconsidering the stopping behaviour
+// Behaviors.stopped
+// }
+//
+// }
+//
+// // the active phase switches between PersistingEvents and HandlingCommands;
+// // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect
+// private[this] var phase: EventsourcedRunningPhase = HandlingCommands
+//
+// override def onMessage(msg: Any): Behavior[Any] = {
+// msg match {
+// // TODO explore crazy hashcode hack to make this match quicker...?
+// case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r)
+// case JournalResponse(r) ⇒ phase.onJournalResponse(r)
+// case command: Command @unchecked ⇒
+// // the above type-check does nothing, since Command is tun
+// // we cast explicitly to fail early in case of type mismatch
+// val c = command.asInstanceOf[Command]
+// phase.onCommand(c)
+// }
+// }
+//
+// // ----------
+//
+// def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
+// var res: Behavior[Any] = same
+// val it = effects.iterator
+//
+// // if at least one effect results in a `stop`, we need to stop
+// // manual loop implementation to avoid allocations and multiple scans
+// while (it.hasNext) {
+// val effect = it.next()
+// applySideEffect(effect) match {
+// case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped
+// case _ ⇒ // nothing to do
+// }
+// }
+//
+// res
+// }
+//
+// def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match {
+// case _: Stop.type @unchecked ⇒
+// Behaviors.stopped
+//
+// case SideEffect(sideEffects) ⇒
+// sideEffects(state)
+// same
+//
+// case _ ⇒
+// throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
+// }
+//
+// def applyEvent(s: S, event: E): S =
+// eventHandler(s, event)
+//
+// @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = {
+// if (log.isDebugEnabled)
+// log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
+//
+// effect match {
+// case CompositeEffect(e, currentSideEffects) ⇒
+// // unwrap and accumulate effects
+// applyEffects(msg, e, currentSideEffects ++ sideEffects)
+//
+// case Persist(event) ⇒
+// // apply the event before persist so that validation exception is handled before persisting
+// // the invalid event, in case such validation is implemented in the event handler.
+// // also, ensure that there is an event handler for each single event
+// state = applyEvent(state, event)
+// val tags = tagger(event)
+// val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
+//
+// internalPersist(eventToPersist, sideEffects) { _ ⇒
+// if (snapshotWhen(state, event, sequenceNr))
+// internalSaveSnapshot(state)
+// }
+//
+// case PersistAll(events) ⇒
+// if (events.nonEmpty) {
+// // apply the event before persist so that validation exception is handled before persisting
+// // the invalid event, in case such validation is implemented in the event handler.
+// // also, ensure that there is an event handler for each single event
+// var count = events.size
+// var seqNr = sequenceNr
+// val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
+// case ((currentState, snapshot), event) ⇒
+// seqNr += 1
+// val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr)
+// (applyEvent(currentState, event), shouldSnapshot)
+// }
+// state = newState
+// val eventsToPersist = events.map { event ⇒
+// val tags = tagger(event)
+// if (tags.isEmpty) event else Tagged(event, tags)
+// }
+//
+// internalPersistAll(eventsToPersist, sideEffects) { _ ⇒
+// count -= 1
+// if (count == 0) {
+// sideEffects.foreach(applySideEffect)
+// if (shouldSnapshotAfterPersist)
+// internalSaveSnapshot(state)
+// }
+// }
+// } else {
+// // run side-effects even when no events are emitted
+// tryUnstash(context, applySideEffects(sideEffects))
+// }
+//
+// case e: PersistNothing.type @unchecked ⇒
+// tryUnstash(context, applySideEffects(sideEffects))
+//
+// case _: Unhandled.type @unchecked ⇒
+// applySideEffects(sideEffects)
+// Behavior.unhandled
+//
+// case c: ChainableEffect[_, S] ⇒
+// applySideEffect(c)
+// }
+// }
+//
+// private def popApplyHandler(payload: Any): Unit =
+// pendingInvocations.pop().handler(payload)
+//
+// private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
+// if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException(
+// "Attempted to become PersistingEvents while already in this phase! Logic error?")
+//
+// phase =
+// if (sideEffects.isEmpty) PersistingEventsNoSideEffects
+// else new PersistingEvents(sideEffects)
+//
+// same
+// }
+//
+// private def tryBecomeHandlingCommands(): Behavior[Any] = {
+// if (phase == HandlingCommands) throw new IllegalArgumentException(
+// "Attempted to become HandlingCommands while already in this phase! Logic error?")
+//
+// if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN?
+// phase = HandlingCommands
+// }
+//
+// same
+// }
+//
+// override def toString = s"EventsourcedRunning($persistenceId,${phase.name})"
+//}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala
new file mode 100644
index 0000000000..8f8b8191f5
--- /dev/null
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2009-2018 Lightbend Inc.
+ */
+package akka.persistence.typed.internal
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.typed.ActorSystem
+import akka.annotation.InternalApi
+import akka.event.Logging
+import akka.event.Logging.LogLevel
+import com.typesafe.config.Config
+
+import scala.concurrent.duration._
+
+trait EventsourcedSettings {
+
+ def stashCapacity: Int
+ // def stashOverflowStrategyName: String // TODO not supported, the stash just throws for now
+ def stashingLogLevel: LogLevel
+ def journalPluginId: String
+ def snapshotPluginId: String
+ def recoveryEventTimeout: FiniteDuration
+
+ def withJournalPluginId(id: Option[String]): EventsourcedSettings
+ def withSnapshotPluginId(id: Option[String]): EventsourcedSettings
+}
+
+object EventsourcedSettings {
+
+ def apply(system: ActorSystem[_]): EventsourcedSettings =
+ apply(system.settings.config)
+
+ def apply(config: Config): EventsourcedSettings = {
+ val typedConfig = config.getConfig("akka.persistence.typed")
+ val untypedConfig = config.getConfig("akka.persistence")
+
+ // StashOverflowStrategy
+ val internalStashOverflowStrategy =
+ untypedConfig.getString("internal-stash-overflow-strategy") // FIXME or copy it to typed?
+
+ val stashCapacity = typedConfig.getInt("stash-capacity")
+
+ val stashingLogLevel = typedConfig.getString("log-stashing") match {
+ case "off" ⇒ Logging.OffLevel
+ case "on" | "true" ⇒ Logging.DebugLevel
+ case l ⇒ Logging.levelFor(l).getOrElse(Logging.OffLevel)
+ }
+
+ // FIXME this is wrong I think
+ val recoveryEventTimeout = 10.seconds // untypedConfig.getDuration("plugin-journal-fallback.recovery-event-timeout", TimeUnit.MILLISECONDS).millis
+
+ EventsourcedSettingsImpl(
+ stashCapacity = stashCapacity,
+ internalStashOverflowStrategy,
+ stashingLogLevel = stashingLogLevel,
+ journalPluginId = "",
+ snapshotPluginId = "",
+ recoveryEventTimeout = recoveryEventTimeout
+ )
+ }
+}
+
+@InternalApi
+private[persistence] final case class EventsourcedSettingsImpl(
+ stashCapacity: Int,
+ stashOverflowStrategyName: String,
+ stashingLogLevel: LogLevel,
+ journalPluginId: String,
+ snapshotPluginId: String,
+ recoveryEventTimeout: FiniteDuration
+) extends EventsourcedSettings {
+
+ def withJournalPluginId(id: Option[String]): EventsourcedSettings = id match {
+ case Some(identifier) ⇒ copy(journalPluginId = identifier)
+ case _ ⇒ this
+ }
+ def withSnapshotPluginId(id: Option[String]): EventsourcedSettings = id match {
+ case Some(identifier) ⇒ copy(snapshotPluginId = identifier)
+ case _ ⇒ this
+ }
+}
+
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala
index 2191caa8cc..d63cf468c5 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala
@@ -1,129 +1,119 @@
+/**
+ * Copyright (C) 2009-2018 Lightbend Inc.
+ */
package akka.persistence.typed.internal
-import akka.actor.typed
-import akka.actor.typed.Behavior
-import akka.actor.typed.Behavior.DeferredBehavior
-import akka.actor.typed.internal.TimerSchedulerImpl
-import akka.actor.typed.scaladsl.ActorContext
-import akka.actor.typed.scaladsl.TimerScheduler
+import akka.actor.ActorRef
+import akka.{ actor ⇒ a }
+import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
-import akka.persistence.Recovery
-import akka.persistence.SnapshotSelectionCriteria
-import akka.persistence.typed.scaladsl.PersistentBehavior
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryPermitGranted
+import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.PersistentBehaviors
-import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
+import akka.persistence._
import akka.util.ConstantFun
-/** INTERNAL API */
@InternalApi
-private[persistence] case class EventsourcedSetup[Command, Event, State](
+private[persistence] object EventsourcedSetup {
+
+ def apply[Command, Event, State](
+ context: ActorContext[InternalProtocol],
+ timers: TimerScheduler[InternalProtocol],
+
+ persistenceId: String,
+ initialState: State,
+ commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
+ eventHandler: (State, Event) ⇒ State): EventsourcedSetup[Command, Event, State] = {
+ apply(
+ context,
+ timers,
+ persistenceId,
+ initialState,
+ commandHandler,
+ eventHandler,
+ // values dependent on context
+ EventsourcedSettings(context.system))
+ }
+
+ def apply[Command, Event, State](
+ context: ActorContext[InternalProtocol],
+ timers: TimerScheduler[InternalProtocol],
+
+ persistenceId: String,
+ initialState: State,
+ commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
+ eventHandler: (State, Event) ⇒ State,
+ settings: EventsourcedSettings): EventsourcedSetup[Command, Event, State] = {
+ new EventsourcedSetup[Command, Event, State](
+ context,
+ timers,
+
+ persistenceId,
+ initialState,
+ commandHandler,
+ eventHandler,
+ writerIdentity = WriterIdentity.newIdentity(),
+ recoveryCompleted = ConstantFun.scalaAnyTwoToUnit,
+ tagger = (_: Event) ⇒ Set.empty[String],
+ snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
+ recovery = Recovery(),
+ settings,
+ StashBuffer(settings.stashCapacity)
+ )
+ }
+}
+
+/** INTERNAL API: Carry state for the Persistent behavior implementation behaviors */
+@InternalApi
+private[persistence] final case class EventsourcedSetup[Command, Event, State](
+ context: ActorContext[InternalProtocol],
+ timers: TimerScheduler[InternalProtocol],
+
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
- eventHandler: (State, Event) ⇒ State,
- recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit,
- tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String],
- journalPluginId: String = "",
- snapshotPluginId: String = "",
- snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse,
- recovery: Recovery = Recovery()
-) extends PersistentBehavior[Command, Event, State] {
+ eventHandler: (State, Event) ⇒ State,
+ writerIdentity: WriterIdentity,
+ recoveryCompleted: (ActorContext[Command], State) ⇒ Unit,
+ tagger: Event ⇒ Set[String],
+ snapshotWhen: (State, Event, Long) ⇒ Boolean,
+ recovery: Recovery,
- override def apply(ctx: typed.ActorContext[Command]): Behavior[Command] = {
- DeferredBehavior[Command](ctx ⇒
- TimerSchedulerImpl.wrapWithTimers[Command] { timers ⇒
- new EventsourcedRequestingRecoveryPermit(
- this,
- ctx.asInstanceOf[ActorContext[Any]], // sorry
- timers.asInstanceOf[TimerScheduler[Any]] // sorry
- ).narrow[Command]
+ settings: EventsourcedSettings,
- }(ctx))
+ internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\
+) {
+ import akka.actor.typed.scaladsl.adapter._
+
+ def withJournalPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = {
+ require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
+ copy(settings = settings.withJournalPluginId(id))
}
- /**
- * The `callback` function is called to notify the actor that the recovery process
- * is finished.
- */
- def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] =
- copy(recoveryCompleted = callback)
-
- /**
- * Initiates a snapshot if the given function returns true.
- * When persisting multiple events at once the snapshot is triggered after all the events have
- * been persisted.
- *
- * `predicate` receives the State, Event and the sequenceNr used for the Event
- */
- def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] =
- copy(snapshotWhen = predicate)
-
- /**
- * Snapshot every N events
- *
- * `numberOfEvents` should be greater than 0
- */
- def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
- require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
- copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0)
- }
-
- /**
- * Change the journal plugin id that this actor should use.
- */
- def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] = {
- require(id != null, "persistence plugin id must not be null; use empty string for 'default' journal")
- copy(journalPluginId = id)
- }
-
- /**
- * Change the snapshot store plugin id that this actor should use.
- */
- def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
+ def withSnapshotPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
- copy(snapshotPluginId = id)
+ copy(settings = settings.withSnapshotPluginId(id))
}
- /**
- * Changes the snapshot selection criteria used by this behavior.
- * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
- * from the sequence number up until which the snapshot reached.
- *
- * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
- * performed by replaying all events -- which may take a long time.
- */
- def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
- copy(recovery = Recovery(selection))
- }
+ def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
- /**
- * The `tagger` function should give event tags, which will be used in persistence query
- */
- def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] =
- copy(tagger = tagger)
+ def log = context.log
- def copy(
- initialState: State = initialState,
- commandHandler: CommandHandler[Command, Event, State] = commandHandler,
- eventHandler: (State, Event) ⇒ State = eventHandler,
- recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted,
- tagger: Event ⇒ Set[String] = tagger,
- snapshotWhen: (State, Event, Long) ⇒ Boolean = snapshotWhen,
- journalPluginId: String = journalPluginId,
- snapshotPluginId: String = snapshotPluginId,
- recovery: Recovery = recovery): EventsourcedSetup[Command, Event, State] =
- new EventsourcedSetup[Command, Event, State](
- persistenceId = persistenceId,
- initialState = initialState,
- commandHandler = commandHandler,
- eventHandler = eventHandler,
- recoveryCompleted = recoveryCompleted,
- tagger = tagger,
- journalPluginId = journalPluginId,
- snapshotPluginId = snapshotPluginId,
- snapshotWhen = snapshotWhen,
- recovery = recovery)
+ val persistence: Persistence = Persistence(context.system.toUntyped)
+
+ val journal: ActorRef = persistence.journalFor(settings.journalPluginId)
+ val snapshotStore: ActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
+
+ def selfUntyped = context.self.toUntyped
+
+ import EventsourcedBehavior.InternalProtocol
+ val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
+ case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res)
+ case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted
+ case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res)
+ case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd)
+ }.toUntyped
}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala
index 9c2781d0a1..7579384d07 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala
@@ -1,58 +1,36 @@
package akka.persistence.typed.internal
-import java.util.Locale
-
-import akka.actor.typed.{ ActorSystem, Behavior }
-import akka.actor.{ DeadLetter, StashOverflowException }
+import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer }
+import akka.actor.{ DeadLetter, StashOverflowException }
import akka.annotation.InternalApi
-import akka.event.Logging.LogLevel
-import akka.event.{ Logging, LoggingAdapter }
-import akka.persistence._
+import akka.event.Logging
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
+import akka.persistence.{ StashOverflowStrategy, _ }
import akka.util.ConstantFun
import akka.{ actor ⇒ a }
/** INTERNAL API: Stash management for persistent behaviors */
@InternalApi
private[akka] trait EventsourcedStashManagement {
- import EventsourcedStashManagement._
import akka.actor.typed.scaladsl.adapter._
- protected def log: LoggingAdapter
+ protected def stash(setup: EventsourcedSetup[_, _, _], stash: StashBuffer[InternalProtocol], msg: InternalProtocol): Unit = {
+ import setup.context
- protected def extension: Persistence
+ val logLevel = setup.settings.stashingLogLevel
+ if (logLevel != Logging.OffLevel) context.log.debug("Stashing message: {}", msg) // FIXME can be log(logLevel once missing method added
- protected val internalStash: StashBuffer[Any]
+ val internalStashOverflowStrategy: StashOverflowStrategy = setup.persistence.defaultInternalStashOverflowStrategy
- private lazy val logLevel = {
- val configuredLevel = extension.system.settings.config
- .getString("akka.persistence.typed.log-stashing")
- Logging.levelFor(configuredLevel).getOrElse(OffLevel) // this is OffLevel
- }
-
- /**
- * The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash
- * when the internal Stash capacity exceeded.
- */
- protected val internalStashOverflowStrategy: StashOverflowStrategy =
- extension.defaultInternalStashOverflowStrategy match {
- case ReplyToStrategy(_) ⇒
- throw new RuntimeException("ReplyToStrategy is not supported in Akka Typed, since there is no sender()!")
- case other ⇒
- other // the other strategies are supported
- }
-
- protected def stash(ctx: ActorContext[Any], msg: Any): Unit = {
- if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg)
-
- try internalStash.stash(msg) catch {
+ try stash.stash(msg) catch {
case e: StashOverflowException ⇒
internalStashOverflowStrategy match {
case DiscardToDeadLetterStrategy ⇒
val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow?
- ctx.system.deadLetters.tell(DeadLetter(msg, snd, ctx.self.toUntyped))
+ context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped))
- case ReplyToStrategy(response) ⇒
+ case ReplyToStrategy(_) ⇒
throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!")
case ThrowOverflowExceptionStrategy ⇒
@@ -61,15 +39,16 @@ private[akka] trait EventsourcedStashManagement {
}
}
- protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = {
+ // FIXME, yet we need to also stash not-commands, due to journal responses ...
+ protected def tryUnstash[C, E, S](
+ setup: EventsourcedSetup[C, E, S],
+ internalStash: StashBuffer[InternalProtocol], // TODO since may want to not have it inside setup
+ behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
if (internalStash.nonEmpty) {
- log.debug("Unstashing message: {}", internalStash.head.getClass)
- internalStash.unstash(ctx, behavior, 1, ConstantFun.scalaIdentityFunction)
+ setup.log.debug("Unstashing message: {}", internalStash.head.getClass)
+
+ internalStash.asInstanceOf[StashBuffer[InternalProtocol]].unstash(setup.context, behavior.asInstanceOf[Behavior[InternalProtocol]], 1, ConstantFun.scalaIdentityFunction)
} else behavior
}
}
-
-object EventsourcedStashManagement {
- private val OffLevel = LogLevel(Int.MinValue)
-}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala
index 442bd27f0a..058832f11a 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala
@@ -3,11 +3,15 @@
*/
package akka.persistence.typed.scaladsl
+import akka.actor.typed
+import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
-import akka.actor.typed.scaladsl.ActorContext
+import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler }
import akka.annotation.InternalApi
-import akka.persistence.SnapshotSelectionCriteria
+import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal._
+import akka.persistence._
+import akka.util.ConstantFun
import scala.language.implicitConversions
@@ -22,29 +26,9 @@ object PersistentBehaviors {
def immutable[Command, Event, State](
persistenceId: String,
initialState: State,
- commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State],
+ commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] =
- new EventsourcedSetup(
- persistenceId = persistenceId,
- initialState = initialState,
- commandHandler = commandHandler,
- eventHandler = eventHandler
- )
-
- /**
- * Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known
- * until the actor is started and typically based on the entityId, which
- * is the actor name.
- *
- * TODO This will not be needed when it can be wrapped in `Actor.deferred`.
- */
- @Deprecated // FIXME remove this
- def persistentEntity[Command, Event, State](
- persistenceIdFromActorName: String ⇒ String,
- initialState: State,
- commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State],
- eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] =
- ???
+ PersistentBehaviorImpl(persistenceId, initialState, commandHandler, eventHandler)
/**
* The `CommandHandler` defines how to act on commands.
@@ -108,7 +92,7 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
/**
* Change the journal plugin id that this actor should use.
*/
- def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State]
+ def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State]
/**
* Change the snapshot store plugin id that this actor should use.
@@ -130,3 +114,101 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
*/
def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State]
}
+
+@InternalApi
+private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
+ persistenceId: String,
+ initialState: State,
+ commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
+ eventHandler: (State, Event) ⇒ State,
+
+ journalPluginId: Option[String] = None,
+ snapshotPluginId: Option[String] = None,
+ // settings: Option[EventsourcedSettings], // FIXME can't because no context available yet
+
+ recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit,
+ tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String],
+ snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse,
+ recovery: Recovery = Recovery()
+) extends PersistentBehavior[Command, Event, State] {
+
+ override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
+ Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒
+ Behaviors.withTimers[EventsourcedBehavior.InternalProtocol] { timers ⇒
+ val setup = EventsourcedSetup(
+ ctx,
+ timers,
+ persistenceId,
+ initialState,
+ commandHandler,
+ eventHandler)
+ .withJournalPluginId(journalPluginId)
+ .withSnapshotPluginId(snapshotPluginId)
+
+ EventsourcedRequestingRecoveryPermit(setup)
+ }
+ }.widen[Command] { case c ⇒ InternalProtocol.IncomingCommand(c) } // TODO this is nice, same way applicable to mutable style
+ }
+
+ /**
+ * The `callback` function is called to notify the actor that the recovery process
+ * is finished.
+ */
+ def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] =
+ copy(recoveryCompleted = callback)
+
+ /**
+ * Initiates a snapshot if the given function returns true.
+ * When persisting multiple events at once the snapshot is triggered after all the events have
+ * been persisted.
+ *
+ * `predicate` receives the State, Event and the sequenceNr used for the Event
+ */
+ def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] =
+ copy(snapshotWhen = predicate)
+
+ /**
+ * Snapshot every N events
+ *
+ * `numberOfEvents` should be greater than 0
+ */
+ def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
+ require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
+ copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0)
+ }
+
+ /**
+ * Change the journal plugin id that this actor should use.
+ */
+ def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = {
+ require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
+ copy(journalPluginId = if (id != "") Some(id) else None)
+ }
+
+ /**
+ * Change the snapshot store plugin id that this actor should use.
+ */
+ def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
+ require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
+ copy(snapshotPluginId = if (id != "") Some(id) else None)
+ }
+
+ /**
+ * Changes the snapshot selection criteria used by this behavior.
+ * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
+ * from the sequence number up until which the snapshot reached.
+ *
+ * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
+ * performed by replaying all events -- which may take a long time.
+ */
+ def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
+ copy(recovery = Recovery(selection))
+ }
+
+ /**
+ * The `tagger` function should give event tags, which will be used in persistence query
+ */
+ def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] =
+ copy(tagger = tagger)
+
+}