From ffb4419c4e60b1998cdb6fc6e9ed09e2897a1dec Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Fri, 2 Mar 2018 14:17:38 +0900 Subject: [PATCH] WIP towards immutable style compiles, does not work.. --- .../scala/akka/actor/typed/BehaviorSpec.scala | 2 - .../scala/akka/actor/typed/DeferredSpec.scala | 1 - .../scala/akka/actor/typed/Behavior.scala | 36 +- .../akka/actor/typed/javadsl/Behaviors.scala | 2 +- .../akka/actor/typed/scaladsl/Behaviors.scala | 20 - .../src/main/scala/akka/event/Logging.scala | 5 +- .../main/scala/akka/util/ConstantFun.scala | 3 + .../src/main/resources/reference.conf | 8 +- .../typed/internal/EventsourcedBehavior.scala | 87 +-- .../EventsourcedJournalInteractions.scala | 112 +++ .../EventsourcedRecoveringEvents.scala | 299 ++++---- .../EventsourcedRecoveringSnapshot.scala | 211 +++--- ...EventsourcedRequestingRecoveryPermit.scala | 88 +-- .../typed/internal/EventsourcedRunning.scala | 685 ++++++++++++------ .../typed/internal/EventsourcedSettings.scala | 83 +++ .../typed/internal/EventsourcedSetup.scala | 202 +++--- .../EventsourcedStashManagement.scala | 63 +- .../typed/scaladsl/PersistentBehaviors.scala | 132 +++- 18 files changed, 1218 insertions(+), 821 deletions(-) create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 4530ef9359..ed35320814 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -481,8 +481,6 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon { - import SBehaviors.BehaviorDecorators - override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") super.behavior(monitor)._1.widen[Command] { case c ⇒ inbox.ref ! c; c } → inbox diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala index f8fad66dc0..85e9aceb1e 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala @@ -4,7 +4,6 @@ package akka.actor.typed import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.Behaviors.BehaviorDecorators import akka.testkit.typed.TestKitSettings import akka.testkit.typed.scaladsl._ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 3788ca63f3..b6f8bb3b6f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -4,12 +4,14 @@ package akka.actor.typed import akka.actor.InvalidMessageException +import akka.actor.typed.internal.BehaviorImpl import scala.annotation.tailrec -import akka.util.LineNumbers +import akka.util.{ ConstantFun, LineNumbers, OptionVal } import akka.annotation.{ DoNotInherit, InternalApi } import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC } -import akka.util.OptionVal + +import scala.reflect.ClassTag /** * The behavior of an actor defines how it reacts to the messages that it @@ -33,7 +35,7 @@ import akka.util.OptionVal */ @InternalApi @DoNotInherit -sealed abstract class Behavior[T] { +sealed abstract class Behavior[T] { behavior ⇒ /** * Narrow the type of this Behavior, which is always a safe operation. This * method is necessary to implement the contravariant nature of Behavior @@ -85,6 +87,26 @@ abstract class ExtensibleBehavior[T] extends Behavior[T] { object Behavior { + final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal { + /** + * Widen the wrapped Behavior by placing a funnel in front of it: the supplied + * PartialFunction decides which message to pull in (those that it is defined + * at) and may transform the incoming message to place them into the wrapped + * Behavior’s type hierarchy. Signals are not transformed. + * + * Example: + * {{{ + * immutable[String] { (ctx, msg) => println(msg); same }.widen[Number] { + * case b: BigDecimal => s"BigDecimal($b)" + * case i: BigInteger => s"BigInteger($i)" + * // drop all other kinds of Number + * } + * }}} + */ + def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = + BehaviorImpl.widened(behavior, matcher) + } + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to @@ -175,7 +197,7 @@ object Behavior { } /** - * INTERNAL API. + * INTERNAL API * Not placed in internal.BehaviorImpl because Behavior is sealed. */ @InternalApi @@ -185,7 +207,7 @@ object Behavior { /** INTERNAL API */ @InternalApi private[akka] object DeferredBehavior { - def apply[T](factory: SAC[T] ⇒ Behavior[T]) = + def apply[T](factory: SAC[T] ⇒ Behavior[T]): Behavior[T] = new DeferredBehavior[T] { def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala) override def toString: String = s"Deferred(${LineNumbers(factory)})" @@ -193,14 +215,14 @@ object Behavior { } /** - * INTERNAL API. + * INTERNAL API */ private[akka] object SameBehavior extends Behavior[Nothing] { override def toString = "Same" } /** - * INTERNAL API. + * INTERNAL API */ private[akka] object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index 6f3095e1c0..6b5e4b296a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -320,7 +320,7 @@ object Behaviors { /** * Provide a MDC ("Mapped Diagnostic Context") for logging from the actor. * - * @param mdcForMessage Is invoked before each message to setup MDC which is then attachd to each logging statement + * @param mdcForMessage Is invoked before each message to setup MDC which is then attached to each logging statement * done for that message through the [[ActorContext.getLog]]. After the message has been processed * the MDC is cleared. * @param behavior The behavior that this should be applied to. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index f6641f21ea..5bc86e2941 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -19,26 +19,6 @@ object Behaviors { private val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ () private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)] - final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal { - /** - * Widen the wrapped Behavior by placing a funnel in front of it: the supplied - * PartialFunction decides which message to pull in (those that it is defined - * at) and may transform the incoming message to place them into the wrapped - * Behavior’s type hierarchy. Signals are not transformed. - * - * Example: - * {{{ - * immutable[String] { (ctx, msg) => println(msg); same }.widen[Number] { - * case b: BigDecimal => s"BigDecimal($b)" - * case i: BigInteger => s"BigInteger($i)" - * // drop all other kinds of Number - * } - * }}} - */ - def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = - BehaviorImpl.widened(behavior, matcher) - } - /** * `setup` is a factory for a behavior. Creation of the behavior instance is deferred until * the actor is started, as opposed to [[Behaviors.immutable]] that creates the behavior instance diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 46545f9de3..101cde9204 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -444,12 +444,13 @@ object Logging { final val DebugLevel = LogLevel(4) /** - * Internal Akka use only + * INTERNAL API * * Don't include the OffLevel in the AllLogLevels since we should never subscribe * to some kind of OffEvent. */ - private final val OffLevel = LogLevel(Int.MinValue) + @InternalApi + private[akka] final val OffLevel = LogLevel(Int.MinValue) /** * Returns the LogLevel associated with the given string, diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index 2a97f764c9..7931b5e35c 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -28,6 +28,7 @@ import akka.japi.{ Pair ⇒ JPair } def scalaAnyToNone[A, B]: A ⇒ Option[B] = none def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none def scalaAnyTwoToUnit[A, B]: (A, B) ⇒ Unit = two2unit + def scalaAnyTwoToTrue[A, B]: (A, B) ⇒ Boolean = two2true def scalaAnyThreeToFalse[A, B, C]: (A, B, C) ⇒ Boolean = three2false def javaAnyToNone[A, B]: A ⇒ Option[B] = none def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] @@ -46,6 +47,8 @@ import akka.japi.{ Pair ⇒ JPair } private val two2none = (_: Any, _: Any) ⇒ None + private val two2true = (_: Any, _: Any) ⇒ true + private val two2unit = (_: Any, _: Any) ⇒ () private val three2false = (_: Any, _: Any, _: Any) ⇒ false diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf index 15febc5cb0..465038ae15 100644 --- a/akka-persistence-typed/src/main/resources/reference.conf +++ b/akka-persistence-typed/src/main/resources/reference.conf @@ -1,7 +1,11 @@ akka.persistence.typed { - # default stash buffer size for incoming messages to persistent actors - stash-buffer-size = 1024 + # Persistent actors stash while recovering or persisting events, + # this setting configures the default capacity of this stash. + stash-capacity = 2048 + # If negative (or zero) then an unbounded stash is used (default) + # If positive then a bounded stash is used and the capacity is set using + # the property # enables automatic logging of messages stashed automatically by an PersistentBehavior, # this may happen while it receives commands while it is recovering events or while it is persisting events diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala index 1fc7b3cbf0..0dd64fe32c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala @@ -24,34 +24,36 @@ private[akka] object EventsourcedBehavior { // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) private[akka] val instanceIdCounter = new AtomicInteger(1) - @InternalApi private[akka] object WriterIdentity { + object WriterIdentity { def newIdentity(): WriterIdentity = { val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement() val writerUuid: String = UUID.randomUUID.toString WriterIdentity(instanceId, writerUuid) } } - private[akka] final case class WriterIdentity(instanceId: Int, writerUuid: String) + final case class WriterIdentity(instanceId: Int, writerUuid: String) /** Protocol used internally by the eventsourced behaviors, never exposed to user-land */ - private[akka] sealed trait EventsourcedProtocol - private[akka] case object RecoveryPermitGranted extends EventsourcedProtocol - private[akka] final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends EventsourcedProtocol - private[akka] final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends EventsourcedProtocol - private[akka] final case class RecoveryTickEvent(snapshot: Boolean) extends EventsourcedProtocol - private[akka] final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends EventsourcedProtocol - - implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] { - override def genString(b: EventsourcedBehavior[_, _, _]): String = { - val behaviorShortName = b match { - case _: EventsourcedRunning[_, _, _] ⇒ "running" - case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events" - case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap" - case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit" - } - s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]" - } + sealed trait InternalProtocol + object InternalProtocol { + case object RecoveryPermitGranted extends InternalProtocol + final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol + final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol + final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol + final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends InternalProtocol + final case class IncomingCommand[C](c: C) extends InternalProtocol } + // implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] { + // override def genString(b: EventsourcedBehavior[_, _, _]): String = { + // val behaviorShortName = b match { + // case _: EventsourcedRunning[_, _, _] ⇒ "running" + // case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events" + // case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap" + // case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit" + // } + // s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]" + // } + // } } @@ -61,8 +63,7 @@ private[akka] trait EventsourcedBehavior[Command, Event, State] { import EventsourcedBehavior._ import akka.actor.typed.scaladsl.adapter._ - protected def context: ActorContext[Any] - protected def timers: TimerScheduler[Any] + // protected def timers: TimerScheduler[Any] type C = Command type AC = ActorContext[C] @@ -72,30 +73,30 @@ private[akka] trait EventsourcedBehavior[Command, Event, State] { // used for signaling intent in type signatures type SeqNr = Long - def persistenceId: String = setup.persistenceId - - protected def setup: EventsourcedSetup[Command, Event, State] - protected def initialState: State = setup.initialState - protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler - protected def eventHandler: (State, Event) ⇒ State = setup.eventHandler - protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = setup.snapshotWhen - protected def tagger: Event ⇒ Set[String] = setup.tagger - - protected final def journalPluginId: String = setup.journalPluginId - protected final def snapshotPluginId: String = setup.snapshotPluginId + // def persistenceId: String = setup.persistenceId + // + // protected def setup: EventsourcedSetup[Command, Event, State] + // protected def initialState: State = setup.initialState + // protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler + // protected def eventHandler: (State, Event) ⇒ State = setup.eventHandler + // protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = setup.snapshotWhen + // protected def tagger: Event ⇒ Set[String] = setup.tagger + // + // protected final def journalPluginId: String = setup.journalPluginId + // protected final def snapshotPluginId: String = setup.snapshotPluginId // ------ common ------- - protected lazy val extension = Persistence(context.system.toUntyped) - protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId) - protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId) - - protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped - protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { - case res: JournalProtocol.Response ⇒ JournalResponse(res) - case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted - case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res) - case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd - }.toUntyped + // protected lazy val extension = Persistence(context.system.toUntyped) + // protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId) + // protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId) + // + // protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped + // protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { + // case res: JournalProtocol.Response ⇒ JournalResponse(res) + // case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted + // case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res) + // case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd + // }.toUntyped } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala new file mode 100644 index 0000000000..e8faad0a66 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.annotation.InternalApi +import akka.persistence.Eventsourced.StashingHandlerInvocation +import akka.persistence.JournalProtocol.ReplayMessages +import akka.persistence._ +import akka.persistence.SnapshotProtocol.LoadSnapshot +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol + +import scala.collection.immutable + +@InternalApi +private[akka] trait EventsourcedJournalInteractions { + import akka.actor.typed.scaladsl.adapter._ + + // ---------- journal interactions --------- + + protected def returnRecoveryPermitOnlyOnFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable): Unit = { + import setup.context + setup.log.debug("Returning recovery permit, on failure because: " + cause.getMessage) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + val permitter = setup.persistence.recoveryPermitter + permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped) + } + + type EventOrTagged = Any // `Any` since can be `E` or `Tagged` + protected def internalPersist[C, E, S]( + setup: EventsourcedSetup[C, E, S], + state: EventsourcedRunning.EventsourcedState[S], + event: EventOrTagged, + sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[InternalProtocol] = { + // pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + val pendingInvocations = StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) :: Nil + + val newState = state.nextSequenceNr() + + val senderNotKnownBecauseAkkaTyped = null + val repr = PersistentRepr( + event, + persistenceId = setup.persistenceId, + sequenceNr = newState.seqNr, + writerUuid = setup.writerIdentity.writerUuid, + sender = senderNotKnownBecauseAkkaTyped + ) + + val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync + setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) + + EventsourcedRunning.PersistingEvents[C, E, S](setup, state, pendingInvocations, sideEffects) + } + + protected def internalPersistAll[C, E, S]( + setup: EventsourcedSetup[C, E, S], + events: immutable.Seq[EventOrTagged], + state: EventsourcedRunning.EventsourcedState[S], + sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[InternalProtocol] = { + if (events.nonEmpty) { + + val pendingInvocations = events map { event ⇒ + // pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + + val newState = state.nextSequenceNr() + + val senderNotKnownBecauseAkkaTyped = null + val write = AtomicWrite(events.map(event ⇒ PersistentRepr( + event, + persistenceId = setup.persistenceId, + sequenceNr = newState.seqNr, + writerUuid = setup.writerIdentity.writerUuid, + sender = senderNotKnownBecauseAkkaTyped) + )) + + setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) + + EventsourcedRunning.PersistingEvents(setup, state, pendingInvocations, sideEffects) + } else Behaviors.same + } + + protected def replayEvents[C, E, S](setup: EventsourcedSetup[C, E, S], fromSeqNr: Long, toSeqNr: Long): Unit = { + setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) + // reply is sent to `selfUntypedAdapted`, it is important to target that one + setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntypedAdapted) + } + + protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = { + setup.log.debug("Returning recovery permit, reason: " + reason) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped) + } + + // ---------- snapshot store interactions --------- + + /** + * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] + * to the running [[PersistentActor]]. + */ + protected def loadSnapshot[Command](setup: EventsourcedSetup[Command, _, _], criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { + setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntypedAdapted) + } + + protected def internalSaveSnapshot[S](setup: EventsourcedSetup[_, _, S], state: EventsourcedRunning.EventsourcedState[S]): Unit = { + setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntypedAdapted) + } + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala index 3fc92c8a23..8a339d4e8e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala @@ -4,121 +4,139 @@ package akka.persistence.typed.internal import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.Behaviors.MutableBehavior -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity -import akka.persistence.typed.scaladsl.PersistentBehaviors._ -import akka.util.Helpers._ +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse } +import akka.persistence.typed.internal.EventsourcedBehavior._ +import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal -/** +/*** * INTERNAL API * - * Third (of four) behavior of an PersistentBehavior. - * - * In this behavior we finally start replaying events, beginning from the last applied sequence number - * (i.e. the one up-until-which the snapshot recovery has brought us). - * - * Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed - * and control is given to the user's handlers to drive the actors behavior from there. + * See next behavior [[EventsourcedRunning]]. * */ @InternalApi -private[akka] class EventsourcedRecoveringEvents[Command, Event, State]( - val setup: EventsourcedSetup[Command, Event, State], - override val context: ActorContext[Any], - override val timers: TimerScheduler[Any], - override val internalStash: StashBuffer[Any], +private[persistence] object EventsourcedRecoveringEvents extends EventsourcedJournalInteractions with EventsourcedStashManagement { - private var sequenceNr: Long, - val writerIdentity: WriterIdentity, + @InternalApi + private[persistence] final case class RecoveringState[State]( + seqNr: Long, + state: State, + eventSeenInInterval: Boolean = false + ) - private var state: State -) extends MutableBehavior[Any] - with EventsourcedBehavior[Command, Event, State] - with EventsourcedStashManagement { - import setup._ - import Behaviors.same - import EventsourcedBehavior._ - import akka.actor.typed.scaladsl.adapter._ + def apply[Command, Event, State]( + setup: EventsourcedSetup[Command, Event, State], + state: RecoveringState[State] + ): Behavior[InternalProtocol] = + Behaviors.setup { _ ⇒ + startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout) - protected val log = Logging(context.system.toUntyped, this) + replayEvents(setup, state.seqNr + 1L, setup.recovery.toSequenceNr) - // -------- initialize -------- - startRecoveryTimer() - - replayEvents(sequenceNr + 1L, recovery.toSequenceNr) - // ---- end of initialize ---- - - private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] - - // ---------- - - def snapshotSequenceNr: Long = sequenceNr - - private def updateLastSequenceNr(persistent: PersistentRepr): Unit = - if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr - - private def setLastSequenceNr(value: Long): Unit = - sequenceNr = value - - // ---------- - - // FIXME it's a bit of a pain to have those lazy vals, change everything to constructor parameters - lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") - - // protect against snapshot stalling forever because of journal overloaded and such - private val RecoveryTickTimerKey = "recovery-tick" - private def startRecoveryTimer(): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) - private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey) - - private var eventSeenInInterval = false - - def onCommand(cmd: Command): Behavior[Any] = { - // during recovery, stash all incoming commands - stash(context, cmd) - same - } - - def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try { - response match { - case ReplayedMessage(repr) ⇒ - eventSeenInInterval = true - updateLastSequenceNr(repr) - // TODO we need some state adapters here? - val newState = eventHandler(state, repr.payload.asInstanceOf[Event]) - state = newState - same - - case RecoverySuccess(highestSeqNr) ⇒ - log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr) - cancelRecoveryTimer() - setLastSequenceNr(highestSeqNr) - - try onRecoveryCompleted(state) - catch { case NonFatal(ex) ⇒ onRecoveryFailure(ex, Some(state)) } - - case ReplayMessagesFailure(cause) ⇒ - onRecoveryFailure(cause, event = None) - - case other ⇒ - stash(context, other) - Behaviors.same + withMdc(setup) { + stay(setup, state) + } } - } catch { - case NonFatal(e) ⇒ - cancelRecoveryTimer() - onRecoveryFailure(e, None) + + private def stay[Command, Event, State]( + setup: EventsourcedSetup[Command, Event, State], + state: RecoveringState[State] + ): Behavior[InternalProtocol] = + Behaviors.immutable { + case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, state, r) + case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r) + case (_, RecoveryTickEvent(snap)) ⇒ onRecoveryTick(setup, state, snap) + case (_, cmd @ IncomingCommand(_)) ⇒ onCommand(setup, cmd) + } + + private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(wrapped: Behavior[InternalProtocol]) = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → "recover-evnts" + ) + + Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) } - def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = { - log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response)) - Behaviors.same // ignore the response + private def onJournalResponse[Command, Event, State]( + setup: EventsourcedSetup[Command, Event, State], + state: RecoveringState[State], + response: JournalProtocol.Response): Behavior[InternalProtocol] = { + import setup.context.log + try { + response match { + case ReplayedMessage(repr) ⇒ + // eventSeenInInterval = true + // updateLastSequenceNr(repr) + + val newState = state.copy( + seqNr = repr.sequenceNr, + state = setup.eventHandler(state.state, repr.payload.asInstanceOf[Event]) + ) + + stay(setup, newState) + + case RecoverySuccess(highestSeqNr) ⇒ + log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr) + cancelRecoveryTimer(setup.timers) + + try onRecoveryCompleted(setup, state) + catch { case NonFatal(ex) ⇒ onRecoveryFailure(setup, ex, highestSeqNr, Some(state)) } + + case ReplayMessagesFailure(cause) ⇒ + onRecoveryFailure(setup, cause, state.seqNr, None) + + case other ⇒ + // stash(setup, setup.internalStash, other) + // Behaviors.same + Behaviors.unhandled + } + } catch { + case NonFatal(cause) ⇒ + cancelRecoveryTimer(setup.timers) + onRecoveryFailure(setup, cause, state.seqNr, None) + } + } + + private def onCommand[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], cmd: InternalProtocol): Behavior[InternalProtocol] = { + // during recovery, stash all incoming commands + stash(setup, setup.internalStash, cmd) + Behaviors.same + } + + // FYI, have to keep carrying all [C,E,S] everywhere as otherwise ending up with: + // [error] /Users/ktoso/code/akka/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala:117:14: type mismatch; + // [error] found : akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,_$2] where type _$2, type _$1 + // [error] required: akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,Any] where type _$1 + // [error] Note: _$2 <: Any, but class EventsourcedSetup is invariant in type State. + // [error] You may wish to define State as +State instead. (SLS 4.5) + // [error] Error occurred in an application involving default arguments. + // [error] stay(setup, state.copy(eventSeenInInterval = false)) + // [error] ^ + protected def onRecoveryTick[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], state: RecoveringState[State], snapshot: Boolean): Behavior[InternalProtocol] = + if (!snapshot) { + if (state.eventSeenInInterval) { + stay(setup, state.copy(eventSeenInInterval = false)) + } else { + cancelRecoveryTimer(setup.timers) + val msg = s"Recovery timed out, didn't get event within ${setup.settings.recoveryEventTimeout}, highest sequence number seen ${state.seqNr}" + onRecoveryFailure(setup, new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this? + } + } else { + // snapshot timeout, but we're already in the events recovery phase + Behavior.unhandled + } + + def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { + setup.log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response)) + Behaviors.unhandled // ignore the response } /** @@ -129,87 +147,40 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State]( * @param cause failure cause. * @param event the event that was processed in `receiveRecover`, if the exception was thrown there */ - protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = { - returnRecoveryPermit("on recovery failure: " + cause.getMessage) - cancelRecoveryTimer() + protected def onRecoveryFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, sequenceNr: Long, event: Option[Any]): Behavior[InternalProtocol] = { + returnRecoveryPermit(setup, "on recovery failure: " + cause.getMessage) + cancelRecoveryTimer(setup.timers) event match { case Some(evt) ⇒ - log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) + setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) Behaviors.stopped case None ⇒ - log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", persistenceId, sequenceNr) + setup.log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", setup.persistenceId, sequenceNr) Behaviors.stopped } } - protected def onRecoveryCompleted(state: State): Behavior[Any] = { - try { - returnRecoveryPermit("recovery completed successfully") - recoveryCompleted(commandContext, state) + protected def onRecoveryCompleted[C, E, S](setup: EventsourcedSetup[C, E, S], state: RecoveringState[S]): Behavior[InternalProtocol] = try { + returnRecoveryPermit(setup, "recovery completed successfully") + setup.recoveryCompleted(setup.commandContext, state.state) - val running = new EventsourcedRunning[Command, Event, State]( - setup, - context, - timers, - internalStash, + val running = EventsourcedRunning.HandlingCommands[C, E, S]( + setup, + EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state) + ) - sequenceNr, - writerIdentity, - - state - ) - - tryUnstash(context, running) - } finally { - cancelRecoveryTimer() - } + tryUnstash(setup, setup.internalStash, running) + } finally { + cancelRecoveryTimer(setup.timers) } - protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] = - if (!snapshot) { - if (!eventSeenInInterval) { - cancelRecoveryTimer() - val msg = s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr" - onRecoveryFailure(new RecoveryTimedOut(msg), event = None) // TODO allow users to hook into this? - } else { - eventSeenInInterval = false - same - } - } else { - // snapshot timeout, but we're already in the events recovery phase - Behavior.unhandled - } - - // ---------- - - override def onMessage(msg: Any): Behavior[Any] = { - msg match { - // TODO explore crazy hashcode hack to make this match quicker...? - case JournalResponse(r) ⇒ onJournalResponse(r) - case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot) - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly - } - } - - // ---------- - - // ---------- journal interactions --------- - - private def replayEvents(fromSeqNr: SeqNr, toSeqNr: SeqNr): Unit = { - log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) - // reply is sent to `selfUntypedAdapted`, it is important to target that one - journal ! ReplayMessages(fromSeqNr, toSeqNr, recovery.replayMax, persistenceId, selfUntypedAdapted) - } - - private def returnRecoveryPermit(reason: String): Unit = { - log.debug("Returning recovery permit, reason: " + reason) - // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped) - } - - override def toString = s"EventsourcedRecoveringEvents($persistenceId)" + // protect against snapshot stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "recovery-tick" + private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit = + timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) + private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(RecoveryTickTimerKey) } + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala index 65a3c1d452..6bcf3401fe 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala @@ -4,14 +4,14 @@ package akka.persistence.typed.internal import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.Behaviors.MutableBehavior -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.actor.typed.scaladsl.Behaviors.same +import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } import akka.annotation.InternalApi -import akka.event.Logging -import akka.persistence.SnapshotProtocol.{ LoadSnapshot, LoadSnapshotFailed, LoadSnapshotResult } +import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult } import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity -import akka.util.Helpers._ +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ +import akka.{ actor ⇒ a } import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } @@ -20,6 +20,7 @@ import scala.util.{ Failure, Success, Try } * INTERNAL API * * Second (of four) behavior of an PersistentBehavior. + * See next behavior [[EventsourcedRecoveringEvents]]. * * In this behavior the recovery process is initiated. * We try to obtain a snapshot from the configured snapshot store, @@ -29,71 +30,89 @@ import scala.util.{ Failure, Success, Try } * recovery of events continues in [[EventsourcedRecoveringEvents]]. */ @InternalApi -final class EventsourcedRecoveringSnapshot[Command, Event, State]( - val setup: EventsourcedSetup[Command, Event, State], - override val context: ActorContext[Any], - override val timers: TimerScheduler[Any], - override val internalStash: StashBuffer[Any], +object EventsourcedRecoveringSnapshot extends EventsourcedJournalInteractions with EventsourcedStashManagement { - val writerIdentity: WriterIdentity -) extends MutableBehavior[Any] - with EventsourcedBehavior[Command, Event, State] - with EventsourcedStashManagement { - import setup._ + def apply[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = { + startRecoveryTimer(setup) - import Behaviors.same - import EventsourcedBehavior._ - import akka.actor.typed.scaladsl.adapter._ + withMdc(setup) { + Behaviors.immutable { + case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r) + case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, r) + case (_, RecoveryTickEvent(snapshot)) ⇒ onRecoveryTick(setup, snapshot) + case (_, cmd: IncomingCommand[Command]) ⇒ onCommand(setup, cmd) + } + } + } - protected val log = Logging(context.system.toUntyped, this) + def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → "recover-snap" + ) + Behaviors.withMdc(_ ⇒ mdc, b) + } - // -------- initialize -------- - startRecoveryTimer() + /** + * Called whenever a message replay fails. By default it logs the error. + * + * The actor is always stopped after this method has been invoked. + * + * @param cause failure cause. + * @param event the event that was processed in `receiveRecover`, if the exception was thrown there + */ + private def onRecoveryFailure[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = { + cancelRecoveryTimer(setup.timers) - loadSnapshot(persistenceId, recovery.fromSnapshot, recovery.toSequenceNr) - // ---- end of initialize ---- + val lastSequenceNr = 0 // FIXME not needed since snapshot == 0 + event match { + case Some(evt) ⇒ + setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + + "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, setup.persistenceId) + Behaviors.stopped - val commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + case None ⇒ + setup.log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " + + "Last known sequence number [{}]", setup.persistenceId, lastSequenceNr) + Behaviors.stopped + } + } - // ---------- - - protected var awaitingSnapshot: Boolean = true - - // ---------- - - private var lastSequenceNr: Long = 0L - def snapshotSequenceNr: Long = lastSequenceNr - - // ---------- - - lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") + private def onRecoveryTick[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], snapshot: Boolean): Behavior[InternalProtocol] = + if (snapshot) { + // we know we're in snapshotting mode; snapshot recovery timeout arrived + val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") + onRecoveryFailure(setup, ex, event = None) + } else same // ignore, since we received the snapshot already // protect against snapshot stalling forever because of journal overloaded and such private val RecoveryTickTimerKey = "recovery-tick" - private def startRecoveryTimer(): Unit = { - timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) - } - private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey) - def onCommand(cmd: Command): Behavior[Any] = { + private def startRecoveryTimer(setup: EventsourcedSetup[_, _, _]): Unit = { + setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout) + } + + private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey) + + def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands - stash(context, cmd) + setup.internalStash.stash(cmd) // TODO move stash out as it's mutable Behavior.same } - def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try { + def onJournalResponse[Command](setup: EventsourcedSetup[_, _, _], response: JournalProtocol.Response): Behavior[InternalProtocol] = try { throw new Exception("Should not talk to journal yet! But got: " + response) } catch { case NonFatal(cause) ⇒ - returnRecoveryPermitOnlyOnFailure(cause) + returnRecoveryPermitOnlyOnFailure(setup, cause) throw cause } - def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = try { + def onSnapshotterResponse[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = try { response match { case LoadSnapshotResult(sso, toSnr) ⇒ - var state: S = initialState - val re: Try[SeqNr] = Try { + var state: State = setup.initialState + val re: Try[Long] = Try { sso match { case Some(SelectedSnapshot(metadata, snapshot)) ⇒ state = snapshot.asInstanceOf[State] @@ -106,106 +125,38 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State]( re match { case Success(seqNr) ⇒ - lastSequenceNr = seqNr - replayMessages(state, toSnr) + replayMessages(setup, state, seqNr, toSnr) case Failure(cause) ⇒ // FIXME better exception type - val ex = new RuntimeException(s"Failed to recover state for [$persistenceId] from snapshot offer.", cause) - onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME + val ex = new RuntimeException(s"Failed to recover state for [${setup.persistenceId}] from snapshot offer.", cause) + onRecoveryFailure(setup, ex, event = None) // FIXME the failure logs has bad messages... FIXME } case LoadSnapshotFailed(cause) ⇒ - cancelRecoveryTimer() + cancelRecoveryTimer(setup.timers) - onRecoveryFailure(cause, event = None) + onRecoveryFailure(setup, cause, event = None) - case other ⇒ - stash(context, other) - same + case _ ⇒ + Behaviors.unhandled } } catch { case NonFatal(cause) ⇒ - returnRecoveryPermitOnlyOnFailure(cause) + returnRecoveryPermitOnlyOnFailure(setup, cause) throw cause } - private def replayMessages(state: State, toSnr: SeqNr): Behavior[Any] = { - cancelRecoveryTimer() + private def replayMessages[Command, Event, State](setup: EventsourcedSetup[Command, Event, State], state: State, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = { + cancelRecoveryTimer(setup.timers) - val rec = recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types + val rec = setup.recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types - new EventsourcedRecoveringEvents[Command, Event, State]( + EventsourcedRecoveringEvents[Command, Event, State]( setup.copy(recovery = rec), - context, - timers, - internalStash, - - lastSequenceNr, - writerIdentity, - - state + // setup.internalStash, // TODO move it out of setup + EventsourcedRecoveringEvents.RecoveringState(lastSequenceNr, state) ) } - /** - * Called whenever a message replay fails. By default it logs the error. - * - * The actor is always stopped after this method has been invoked. - * - * @param cause failure cause. - * @param event the event that was processed in `receiveRecover`, if the exception was thrown there - */ - protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = { - cancelRecoveryTimer() - event match { - case Some(evt) ⇒ - log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + - "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId) - Behaviors.stopped - - case None ⇒ - log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " + - "Last known sequence number [{}]", persistenceId, lastSequenceNr) - Behaviors.stopped - } - } - - protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] = - // we know we're in snapshotting mode - if (snapshot) onRecoveryFailure(new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), event = None) - else same // ignore, since we received the snapshot already - - // ---------- - - override def onMessage(msg: Any): Behavior[Any] = { - msg match { - // TODO explore crazy hashcode hack to make this match quicker...? - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case JournalResponse(r) ⇒ onJournalResponse(r) - case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot) - case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly - } - } - - // ---------- - - // ---------- journal interactions --------- - - /** - * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] - * to the running [[PersistentActor]]. - */ - private def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { - snapshotStore.tell(LoadSnapshot(persistenceId, criteria, toSequenceNr), selfUntypedAdapted) - } - - private def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = { - log.debug("Returning recovery permit, on failure because: " + cause.getMessage) - // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped) - } - - override def toString = s"EventsourcedRecoveringSnapshot($persistenceId)" - } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala index 80d7705a85..aaccc403a6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -9,81 +9,59 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerSc import akka.annotation.InternalApi import akka.event.Logging import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity +import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } /** * INTERNAL API * * First (of four) behaviour of an PersistentBehaviour. + * See next behavior [[EventsourcedRecoveringSnapshot]]. * * Requests a permit to start recovering this actor; this is tone to avoid * hammering the journal with too many concurrently recovering actors. + * */ @InternalApi -private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, State]( - val setup: EventsourcedSetup[Command, Event, State], - override val context: ActorContext[Any], - override val timers: TimerScheduler[Any] - -) extends MutableBehavior[Any] - with EventsourcedBehavior[Command, Event, State] - with EventsourcedStashManagement { - import setup._ - +private[akka] object EventsourcedRequestingRecoveryPermit extends EventsourcedStashManagement { import akka.actor.typed.scaladsl.adapter._ - // has to be lazy, since we want to obtain the persistenceId - protected lazy val log = Logging(context.system.toUntyped, this) + def apply[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = { + // request a permit, as only once we obtain one we can start recovering + requestRecoveryPermit(setup.context, setup.persistence) - override protected val internalStash: StashBuffer[Any] = { - val stashSize = context.system.settings.config - .getInt("akka.persistence.typed.stash-buffer-size") - StashBuffer[Any](stashSize) - } + withMdc(setup) { + Behaviors.immutable[InternalProtocol] { + case (_, InternalProtocol.RecoveryPermitGranted) ⇒ // FIXME types + becomeRecovering(setup) - // --- initialization --- - // only once we have a permit, we can become active: - requestRecoveryPermit() - - val writerIdentity: WriterIdentity = WriterIdentity.newIdentity() - - // --- end of initialization --- - - // ---------- - - def becomeRecovering(): Behavior[Any] = { - log.debug(s"Initializing snapshot recovery: {}", recovery) - - new EventsourcedRecoveringSnapshot( - setup, - context, - timers, - internalStash, - - writerIdentity - ) - } - - // ---------- - - override def onMessage(msg: Any): Behavior[Any] = { - msg match { - case RecoveryPermitter.RecoveryPermitGranted ⇒ - log.debug("Awaiting permit, received: RecoveryPermitGranted") - becomeRecovering() - - case other ⇒ - stash(context, other) - Behaviors.same + case (_, other) ⇒ + stash(setup, setup.internalStash, other) + Behaviors.same + } } } + private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S])(wrapped: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → "awaiting-permit" + ) + + Behaviors.withMdc(_ ⇒ mdc, wrapped) + } + + private def becomeRecovering[Command, Event, State](setup: EventsourcedSetup[Command, Event, State]): Behavior[InternalProtocol] = { + setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery) + + EventsourcedRecoveringSnapshot(setup) + } + // ---------- journal interactions --------- - private def requestRecoveryPermit(): Unit = { + private def requestRecoveryPermit[Command](context: ActorContext[Command], persistence: Persistence): Unit = { // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped) + val selfUntyped = context.self.toUntyped + persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped) } - override def toString = s"EventsourcedRequestingRecoveryPermit($persistenceId)" } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 586159f0ac..690365c62e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -13,7 +13,8 @@ import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandler import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged -import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse } import scala.annotation.tailrec import scala.collection.immutable @@ -34,224 +35,206 @@ import scala.collection.immutable * which perform the Persistence extension lookup on creation and similar things (config lookup) * */ -@InternalApi -class EventsourcedRunning[Command, Event, State]( - val setup: EventsourcedSetup[Command, Event, State], - override val context: ActorContext[Any], - override val timers: TimerScheduler[Any], - override val internalStash: StashBuffer[Any], +@InternalApi object EventsourcedRunning extends EventsourcedJournalInteractions with EventsourcedStashManagement { - private var sequenceNr: Long, - val writerIdentity: WriterIdentity, + final case class EventsourcedState[State]( + seqNr: Long, + state: State, + pendingInvocations: immutable.Seq[PendingHandlerInvocation] = Nil + ) { - private var state: State -) extends MutableBehavior[Any] - with EventsourcedBehavior[Command, Event, State] - with EventsourcedStashManagement { same ⇒ - import setup._ + def nextSequenceNr(): EventsourcedState[State] = + copy(seqNr = seqNr + 1) - import EventsourcedBehavior._ - import akka.actor.typed.scaladsl.adapter._ + def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] = + if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this - protected val log = Logging(context.system.toUntyped, this) + def popApplyPendingInvocation(repr: PersistentRepr): EventsourcedState[State] = { + val (headSeq, remainingInvocations) = pendingInvocations.splitAt(1) + headSeq.head.handler(repr.payload) - private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + copy( + pendingInvocations = remainingInvocations, + seqNr = repr.sequenceNr + ) + } - // ---------- - - // Holds callbacks for persist calls (note that we do not implement persistAsync currently) - private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty - private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it - - // ---------- - - private def snapshotSequenceNr: Long = sequenceNr - - private def updateLastSequenceNr(persistent: PersistentRepr): Unit = - if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr - private def nextSequenceNr(): Long = { - sequenceNr += 1L - sequenceNr - } - // ---------- - - private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = { - response match { - case SaveSnapshotSuccess(meta) ⇒ - log.debug("Save snapshot successful: " + meta) - same - case SaveSnapshotFailure(meta, ex) ⇒ - log.error(ex, "Save snapshot failed! " + meta) - same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop + def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = { + val updated = setup.eventHandler(state, event) + copy(state = updated) } } - // ---------- + // =============================================== - trait EventsourcedRunningPhase { - def name: String - def onCommand(c: Command): Behavior[Any] - def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] - } - - object HandlingCommands extends EventsourcedRunningPhase { - def name = "HandlingCommands" - - final override def onCommand(command: Command): Behavior[Any] = { - val effect = commandHandler(commandContext, state, command) - applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + object HandlingCommands { + def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = { + withMdc(setup, "run-cmnds") { + Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { + case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r) + case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, r) + case (_, IncomingCommand(c: C @unchecked)) ⇒ onCommand(setup, state, c) + } + } } - final override def onJournalResponse(response: Response): Behavior[Any] = { - // should not happen, what would it reply? - throw new RuntimeException("Received message which should not happen in Running state!") + + private def onJournalResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: Response): Behavior[InternalProtocol] = { + // TODO ignore, could happen if actor was restarted? + Behaviors.unhandled + } + + private def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { + val effect = setup.commandHandler(setup.commandContext, state.state, cmd) + applyEffects(setup, cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? } } - object PersistingEventsNoSideEffects extends PersistingEvents(Nil) + // =============================================== - sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase { - def name = "PersistingEvents" + object PersistingEvents { - final override def onCommand(c: Command): Behavior[Any] = { - stash(context, c) - same + def apply[C, E, S]( + setup: EventsourcedSetup[C, E, S], + state: EventsourcedState[S], + pendingInvocations: immutable.Seq[PendingHandlerInvocation], + sideEffects: immutable.Seq[ChainableEffect[_, S]] + ): Behavior[InternalProtocol] = { + withMdc(setup, "run-persist-evnts") { + Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { + case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(setup, r) + case (_, JournalResponse(r)) ⇒ onJournalResponse(setup, state, pendingInvocations, sideEffects, r) + case (_, in: IncomingCommand[C @unchecked]) ⇒ onCommand(setup, state, in) + } + } } - final override def onJournalResponse(response: Response): Behavior[Any] = { - log.debug("Received Journal response: {}", response) + def onCommand[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { + stash(setup, setup.internalStash, cmd) + Behaviors.same + } + + final def onJournalResponse[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], + pendingInvocations: immutable.Seq[PendingHandlerInvocation], + sideEffects: immutable.Seq[ChainableEffect[_, S]], + response: Response): Behavior[InternalProtocol] = { + setup.log.debug("Received Journal response: {}", response) response match { case WriteMessageSuccess(p, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case we ignore the call to the handler - if (id == writerIdentity.instanceId) { - updateLastSequenceNr(p) - popApplyHandler(p.payload) - onWriteMessageComplete() - tryUnstash(context, applySideEffects(sideEffects)) - } else same + if (id == setup.writerIdentity.instanceId) { + val newState = state.popApplyPendingInvocation(p) + + // only once all things are applied we can revert back + if (newState.pendingInvocations.nonEmpty) Behaviors.same + else tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, newState)) + } else Behaviors.same case WriteMessageRejected(p, cause, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case the handler has already been discarded - if (id == writerIdentity.instanceId) { - updateLastSequenceNr(p) - onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop - tryUnstash(context, applySideEffects(sideEffects)) - } else same + if (id == setup.writerIdentity.instanceId) { + val newState = state.updateLastSequenceNr(p) + onPersistRejected(setup, cause, p.payload, p.sequenceNr) // does not stop (by design) + tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, newState)) + } else Behaviors.same case WriteMessageFailure(p, cause, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case the handler has already been discarded - if (id == writerIdentity.instanceId) { - onWriteMessageComplete() - onPersistFailureThenStop(cause, p.payload, p.sequenceNr) - } else same + if (id == setup.writerIdentity.instanceId) { + // onWriteMessageComplete() -> tryBecomeHandlingCommands + onPersistFailureThenStop(setup, cause, p.payload, p.sequenceNr) + } else Behaviors.same case WriteMessagesSuccessful ⇒ // ignore - same + Behaviors.same case WriteMessagesFailed(_) ⇒ // ignore - same // it will be stopped by the first WriteMessageFailure message; not applying side effects + Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects case _: LoopMessageSuccess ⇒ // ignore, should never happen as there is no persistAsync in typed - same + Behaviors.same } } - private def onWriteMessageComplete(): Unit = - tryBecomeHandlingCommands() + // private def onWriteMessageComplete(): Unit = + // tryBecomeHandlingCommands() - private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { - log.error( + private def onPersistRejected[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Any, seqNr: Long): Unit = { + setup.log.error( cause, "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", - event.getClass.getName, seqNr, persistenceId, cause.getMessage) + event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage) } - private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = { - log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", - event.getClass.getName, seqNr, persistenceId) + private def onPersistFailureThenStop[C, E, S](setup: EventsourcedSetup[C, E, S], cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = { + setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", + event.getClass.getName, seqNr, setup.persistenceId) // FIXME see #24479 for reconsidering the stopping behaviour Behaviors.stopped } } + // -------------------------- - // the active phase switches between PersistingEvents and HandlingCommands; - // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect - private[this] var phase: EventsourcedRunningPhase = HandlingCommands + private def withMdc[C, E, S](setup: EventsourcedSetup[C, E, S], phase: String)(wrapped: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → phase + ) - override def onMessage(msg: Any): Behavior[Any] = { - msg match { - // TODO explore crazy hashcode hack to make this match quicker...? - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case JournalResponse(r) ⇒ phase.onJournalResponse(r) - case command: Command @unchecked ⇒ - // the above type-check does nothing, since Command is tun - // we cast explicitly to fail early in case of type mismatch - val c = command.asInstanceOf[Command] - phase.onCommand(c) + // FIXME remove need for class tag!!! + Behaviors.withMdc[Any]((_: Any) ⇒ mdc, wrapped.asInstanceOf[Behavior[Any]]).asInstanceOf[Behavior[InternalProtocol]] + } + + // -------------------------- + + private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { + response match { + case SaveSnapshotSuccess(meta) ⇒ + setup.context.log.debug("Save snapshot successful: " + meta) + Behaviors.same + case SaveSnapshotFailure(meta, ex) ⇒ + setup.context.log.error(ex, "Save snapshot failed! " + meta) + Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop } } - // ---------- + private def applyEvent[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S], event: E): S = + setup.eventHandler(state.state, event) - def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { - var res: Behavior[Any] = same - val it = effects.iterator + @tailrec private def applyEffects[C, E, S]( + setup: EventsourcedSetup[C, E, S], + msg: Any, + state: EventsourcedState[S], + effect: EffectImpl[E, S], + sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil + ): Behavior[InternalProtocol] = { + import setup.log - // if at least one effect results in a `stop`, we need to stop - // manual loop implementation to avoid allocations and multiple scans - while (it.hasNext) { - val effect = it.next() - applySideEffect(effect) match { - case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped - case _ ⇒ // nothing to do - } - } - - res - } - - def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match { - case _: Stop.type @unchecked ⇒ - Behaviors.stopped - - case SideEffect(sideEffects) ⇒ - sideEffects(state) - same - - case _ ⇒ - throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") - } - - def applyEvent(s: S, event: E): S = - eventHandler(s, event) - - @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = { if (log.isDebugEnabled) log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) effect match { - case CompositeEffect(e, currentSideEffects) ⇒ + case CompositeEffect(eff, currentSideEffects) ⇒ // unwrap and accumulate effects - applyEffects(msg, e, currentSideEffects ++ sideEffects) + applyEffects(setup, msg, state, eff, currentSideEffects ++ sideEffects) case Persist(event) ⇒ // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event - state = applyEvent(state, event) - val tags = tagger(event) - val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) + val newState = state.applyEvent(setup, event) + val eventToPersist = tagEvent(setup, event) - internalPersist(eventToPersist, sideEffects) { _ ⇒ - if (snapshotWhen(state, event, sequenceNr)) - internalSaveSnapshot(state) + internalPersist(setup, state, eventToPersist, sideEffects) { _ ⇒ + if (setup.snapshotWhen(newState.state, event, newState.seqNr)) + internalSaveSnapshot(setup, state) } case PersistAll(events) ⇒ @@ -260,104 +243,364 @@ class EventsourcedRunning[Command, Event, State]( // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event var count = events.size - var seqNr = sequenceNr - val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { - case ((currentState, snapshot), event) ⇒ - seqNr += 1 - val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr) - (applyEvent(currentState, event), shouldSnapshot) - } - state = newState - val eventsToPersist = events.map { event ⇒ - val tags = tagger(event) - if (tags.isEmpty) event else Tagged(event, tags) - } + // var seqNr = state.seqNr + val (newState, shouldSnapshotAfterPersist) = + events.foldLeft((state, false)) { + case ((currentState, snapshot), event) ⇒ + val value = currentState + .nextSequenceNr() + .applyEvent(setup, event) - internalPersistAll(eventsToPersist, sideEffects) { _ ⇒ + val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr) + (value, shouldSnapshot) + } + // state = newState + + val eventsToPersist = events.map { tagEvent(setup, _) } + + internalPersistAll(setup, eventsToPersist, newState, sideEffects) { _ ⇒ count -= 1 if (count == 0) { - sideEffects.foreach(applySideEffect) + // FIXME the result of applying side effects is ignored + val b = applySideEffects(sideEffects, newState) + if (shouldSnapshotAfterPersist) - internalSaveSnapshot(state) + internalSaveSnapshot(setup, newState) } } } else { // run side-effects even when no events are emitted - tryUnstash(context, applySideEffects(sideEffects)) + tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, state)) } - case e: PersistNothing.type @unchecked ⇒ - tryUnstash(context, applySideEffects(sideEffects)) + case _: PersistNothing.type @unchecked ⇒ + tryUnstash(setup, setup.internalStash, applySideEffects(sideEffects, state)) case _: Unhandled.type @unchecked ⇒ - applySideEffects(sideEffects) + applySideEffects(sideEffects, state) Behavior.unhandled case c: ChainableEffect[_, S] ⇒ - applySideEffect(c) + applySideEffect(c, state) } } - private def popApplyHandler(payload: Any): Unit = - pendingInvocations.pop().handler(payload) - - private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { - if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException( - "Attempted to become PersistingEvents while already in this phase! Logic error?") - - phase = - if (sideEffects.isEmpty) PersistingEventsNoSideEffects - else new PersistingEvents(sideEffects) - - same +/***/ + private def tagEvent[S, E, C](setup: EventsourcedSetup[C, E, S], event: E): Any = { + val tags = setup.tagger(event) + if (tags.isEmpty) event else Tagged(event, tags) } - private def tryBecomeHandlingCommands(): Behavior[Any] = { - if (phase == HandlingCommands) throw new IllegalArgumentException( - "Attempted to become HandlingCommands while already in this phase! Logic error?") + def applySideEffects[S](effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { + var res: Behavior[InternalProtocol] = Behaviors.same + val it = effects.iterator - if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN? - phase = HandlingCommands - } - - same - } - - // ---------- journal interactions --------- - - // Any since can be `E` or `Tagged` - private def internalPersist(event: Any, sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = { - pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - - val senderNotKnownBecauseAkkaTyped = null - val repr = PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped) - - val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync - journal.tell(JournalProtocol.WriteMessages(eventBatch, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted) - - becomePersistingEvents(sideEffects) - } - - private def internalPersistAll(events: immutable.Seq[Any], sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = { - if (events.nonEmpty) { - val senderNotKnownBecauseAkkaTyped = null - - events.foreach { event ⇒ - pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + // if at least one effect results in a `stop`, we need to stop + // manual loop implementation to avoid allocations and multiple scans + while (it.hasNext) { + val effect = it.next() + applySideEffect(effect, state) match { + case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped + case _ ⇒ // nothing to do } + } - val write = AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId, - sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped))) - - journal.tell(JournalProtocol.WriteMessages(write :: Nil, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted) - - becomePersistingEvents(sideEffects) - } else same + res } - private def internalSaveSnapshot(snapshot: State): Unit = { - snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(persistenceId, snapshotSequenceNr), snapshot), selfUntypedAdapted) + def applySideEffect[S](effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { + case _: Stop.type @unchecked ⇒ + Behaviors.stopped + + case SideEffect(sideEffects) ⇒ + sideEffects(state.state) + Behaviors.same + + case _ ⇒ + throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") } - override def toString = s"EventsourcedRunning($persistenceId,${phase.name})" } + +//@InternalApi +//class EventsourcedRunning[Command, Event, State]( +// val setup: EventsourcedSetup[Command, Event, State], +// // internalStash: StashBuffer[Any], // FIXME separate or in settings? +// +// private var sequenceNr: Long, +// val writerIdentity: WriterIdentity, +// +// private var state: State +//) extends MutableBehavior[Any] +// with EventsourcedBehavior[Command, Event, State] +// with EventsourcedStashManagement { same ⇒ +// import setup._ +// +// import EventsourcedBehavior._ +// import akka.actor.typed.scaladsl.adapter._ +// +// // Holds callbacks for persist calls (note that we do not implement persistAsync currently) +// private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty +// private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it +// +// // ---------- +//// +//// private def snapshotSequenceNr: Long = sequenceNr +//// +//// private def updateLastSequenceNr(persistent: PersistentRepr): Unit = +//// if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr +//// private def nextSequenceNr(): Long = { +//// sequenceNr += 1L +//// sequenceNr +//// } +// // ---------- +// +// private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[C] = { +// response match { +// case SaveSnapshotSuccess(meta) ⇒ +// setup.context.log.debug("Save snapshot successful: " + meta) +// Behaviors.same +// case SaveSnapshotFailure(meta, ex) ⇒ +// setup.context.log.error(ex, "Save snapshot failed! " + meta) +// Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop +// } +// } +// +// // ---------- +// +// trait EventsourcedRunningPhase { +// def name: String +// def onCommand(c: Command): Behavior[Any] +// def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] +// } +// +//// object HandlingCommands extends EventsourcedRunningPhase { +//// def name = "HandlingCommands" +//// +//// final override def onCommand(command: Command): Behavior[Any] = { +//// val effect = commandHandler(commandContext, state, command) +//// applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? +//// } +//// final override def onJournalResponse(response: Response): Behavior[Any] = { +//// // ignore, could happen if actor was restarted? +//// } +//// } +// +// object PersistingEventsNoSideEffects extends PersistingEvents(Nil) +// +// sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase { +// def name = "PersistingEvents" +// +// final override def onCommand(c: Command): Behavior[Any] = { +// stash(setup, context, c) +// same +// } +// +// final override def onJournalResponse(response: Response): Behavior[Any] = { +// log.debug("Received Journal response: {}", response) +// response match { +// case WriteMessageSuccess(p, id) ⇒ +// // instanceId mismatch can happen for persistAsync and defer in case of actor restart +// // while message is in flight, in that case we ignore the call to the handler +// if (id == writerIdentity.instanceId) { +// updateLastSequenceNr(p) +// popApplyHandler(p.payload) +// onWriteMessageComplete() +// tryUnstash(setup, internalStash, applySideEffects(sideEffects)) +// } else same +// +// case WriteMessageRejected(p, cause, id) ⇒ +// // instanceId mismatch can happen for persistAsync and defer in case of actor restart +// // while message is in flight, in that case the handler has already been discarded +// if (id == writerIdentity.instanceId) { +// updateLastSequenceNr(p) +// onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop +// tryUnstash(setup, applySideEffects(sideEffects)) +// } else same +// +// case WriteMessageFailure(p, cause, id) ⇒ +// // instanceId mismatch can happen for persistAsync and defer in case of actor restart +// // while message is in flight, in that case the handler has already been discarded +// if (id == writerIdentity.instanceId) { +// onWriteMessageComplete() +// onPersistFailureThenStop(cause, p.payload, p.sequenceNr) +// } else same +// +// case WriteMessagesSuccessful ⇒ +// // ignore +// same +// +// case WriteMessagesFailed(_) ⇒ +// // ignore +// same // it will be stopped by the first WriteMessageFailure message; not applying side effects +// +// case _: LoopMessageSuccess ⇒ +// // ignore, should never happen as there is no persistAsync in typed +// same +// } +// } +// +// private def onWriteMessageComplete(): Unit = +// tryBecomeHandlingCommands() +// +// private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { +// log.error( +// cause, +// "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", +// event.getClass.getName, seqNr, persistenceId, cause.getMessage) +// } +// +// private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = { +// log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", +// event.getClass.getName, seqNr, persistenceId) +// +// // FIXME see #24479 for reconsidering the stopping behaviour +// Behaviors.stopped +// } +// +// } +// +// // the active phase switches between PersistingEvents and HandlingCommands; +// // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect +// private[this] var phase: EventsourcedRunningPhase = HandlingCommands +// +// override def onMessage(msg: Any): Behavior[Any] = { +// msg match { +// // TODO explore crazy hashcode hack to make this match quicker...? +// case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) +// case JournalResponse(r) ⇒ phase.onJournalResponse(r) +// case command: Command @unchecked ⇒ +// // the above type-check does nothing, since Command is tun +// // we cast explicitly to fail early in case of type mismatch +// val c = command.asInstanceOf[Command] +// phase.onCommand(c) +// } +// } +// +// // ---------- +// +// def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { +// var res: Behavior[Any] = same +// val it = effects.iterator +// +// // if at least one effect results in a `stop`, we need to stop +// // manual loop implementation to avoid allocations and multiple scans +// while (it.hasNext) { +// val effect = it.next() +// applySideEffect(effect) match { +// case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped +// case _ ⇒ // nothing to do +// } +// } +// +// res +// } +// +// def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match { +// case _: Stop.type @unchecked ⇒ +// Behaviors.stopped +// +// case SideEffect(sideEffects) ⇒ +// sideEffects(state) +// same +// +// case _ ⇒ +// throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") +// } +// +// def applyEvent(s: S, event: E): S = +// eventHandler(s, event) +// +// @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = { +// if (log.isDebugEnabled) +// log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) +// +// effect match { +// case CompositeEffect(e, currentSideEffects) ⇒ +// // unwrap and accumulate effects +// applyEffects(msg, e, currentSideEffects ++ sideEffects) +// +// case Persist(event) ⇒ +// // apply the event before persist so that validation exception is handled before persisting +// // the invalid event, in case such validation is implemented in the event handler. +// // also, ensure that there is an event handler for each single event +// state = applyEvent(state, event) +// val tags = tagger(event) +// val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) +// +// internalPersist(eventToPersist, sideEffects) { _ ⇒ +// if (snapshotWhen(state, event, sequenceNr)) +// internalSaveSnapshot(state) +// } +// +// case PersistAll(events) ⇒ +// if (events.nonEmpty) { +// // apply the event before persist so that validation exception is handled before persisting +// // the invalid event, in case such validation is implemented in the event handler. +// // also, ensure that there is an event handler for each single event +// var count = events.size +// var seqNr = sequenceNr +// val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { +// case ((currentState, snapshot), event) ⇒ +// seqNr += 1 +// val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr) +// (applyEvent(currentState, event), shouldSnapshot) +// } +// state = newState +// val eventsToPersist = events.map { event ⇒ +// val tags = tagger(event) +// if (tags.isEmpty) event else Tagged(event, tags) +// } +// +// internalPersistAll(eventsToPersist, sideEffects) { _ ⇒ +// count -= 1 +// if (count == 0) { +// sideEffects.foreach(applySideEffect) +// if (shouldSnapshotAfterPersist) +// internalSaveSnapshot(state) +// } +// } +// } else { +// // run side-effects even when no events are emitted +// tryUnstash(context, applySideEffects(sideEffects)) +// } +// +// case e: PersistNothing.type @unchecked ⇒ +// tryUnstash(context, applySideEffects(sideEffects)) +// +// case _: Unhandled.type @unchecked ⇒ +// applySideEffects(sideEffects) +// Behavior.unhandled +// +// case c: ChainableEffect[_, S] ⇒ +// applySideEffect(c) +// } +// } +// +// private def popApplyHandler(payload: Any): Unit = +// pendingInvocations.pop().handler(payload) +// +// private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { +// if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException( +// "Attempted to become PersistingEvents while already in this phase! Logic error?") +// +// phase = +// if (sideEffects.isEmpty) PersistingEventsNoSideEffects +// else new PersistingEvents(sideEffects) +// +// same +// } +// +// private def tryBecomeHandlingCommands(): Behavior[Any] = { +// if (phase == HandlingCommands) throw new IllegalArgumentException( +// "Attempted to become HandlingCommands while already in this phase! Logic error?") +// +// if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN? +// phase = HandlingCommands +// } +// +// same +// } +// +// override def toString = s"EventsourcedRunning($persistenceId,${phase.name})" +//} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala new file mode 100644 index 0000000000..8f8b8191f5 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import java.util.concurrent.TimeUnit + +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.event.Logging +import akka.event.Logging.LogLevel +import com.typesafe.config.Config + +import scala.concurrent.duration._ + +trait EventsourcedSettings { + + def stashCapacity: Int + // def stashOverflowStrategyName: String // TODO not supported, the stash just throws for now + def stashingLogLevel: LogLevel + def journalPluginId: String + def snapshotPluginId: String + def recoveryEventTimeout: FiniteDuration + + def withJournalPluginId(id: Option[String]): EventsourcedSettings + def withSnapshotPluginId(id: Option[String]): EventsourcedSettings +} + +object EventsourcedSettings { + + def apply(system: ActorSystem[_]): EventsourcedSettings = + apply(system.settings.config) + + def apply(config: Config): EventsourcedSettings = { + val typedConfig = config.getConfig("akka.persistence.typed") + val untypedConfig = config.getConfig("akka.persistence") + + // StashOverflowStrategy + val internalStashOverflowStrategy = + untypedConfig.getString("internal-stash-overflow-strategy") // FIXME or copy it to typed? + + val stashCapacity = typedConfig.getInt("stash-capacity") + + val stashingLogLevel = typedConfig.getString("log-stashing") match { + case "off" ⇒ Logging.OffLevel + case "on" | "true" ⇒ Logging.DebugLevel + case l ⇒ Logging.levelFor(l).getOrElse(Logging.OffLevel) + } + + // FIXME this is wrong I think + val recoveryEventTimeout = 10.seconds // untypedConfig.getDuration("plugin-journal-fallback.recovery-event-timeout", TimeUnit.MILLISECONDS).millis + + EventsourcedSettingsImpl( + stashCapacity = stashCapacity, + internalStashOverflowStrategy, + stashingLogLevel = stashingLogLevel, + journalPluginId = "", + snapshotPluginId = "", + recoveryEventTimeout = recoveryEventTimeout + ) + } +} + +@InternalApi +private[persistence] final case class EventsourcedSettingsImpl( + stashCapacity: Int, + stashOverflowStrategyName: String, + stashingLogLevel: LogLevel, + journalPluginId: String, + snapshotPluginId: String, + recoveryEventTimeout: FiniteDuration +) extends EventsourcedSettings { + + def withJournalPluginId(id: Option[String]): EventsourcedSettings = id match { + case Some(identifier) ⇒ copy(journalPluginId = identifier) + case _ ⇒ this + } + def withSnapshotPluginId(id: Option[String]): EventsourcedSettings = id match { + case Some(identifier) ⇒ copy(snapshotPluginId = identifier) + case _ ⇒ this + } +} + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index 2191caa8cc..d63cf468c5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -1,129 +1,119 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ package akka.persistence.typed.internal -import akka.actor.typed -import akka.actor.typed.Behavior -import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.internal.TimerSchedulerImpl -import akka.actor.typed.scaladsl.ActorContext -import akka.actor.typed.scaladsl.TimerScheduler +import akka.actor.ActorRef +import akka.{ actor ⇒ a } +import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler } import akka.annotation.InternalApi -import akka.persistence.Recovery -import akka.persistence.SnapshotSelectionCriteria -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryPermitGranted +import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler +import akka.persistence._ import akka.util.ConstantFun -/** INTERNAL API */ @InternalApi -private[persistence] case class EventsourcedSetup[Command, Event, State]( +private[persistence] object EventsourcedSetup { + + def apply[Command, Event, State]( + context: ActorContext[InternalProtocol], + timers: TimerScheduler[InternalProtocol], + + persistenceId: String, + initialState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State): EventsourcedSetup[Command, Event, State] = { + apply( + context, + timers, + persistenceId, + initialState, + commandHandler, + eventHandler, + // values dependent on context + EventsourcedSettings(context.system)) + } + + def apply[Command, Event, State]( + context: ActorContext[InternalProtocol], + timers: TimerScheduler[InternalProtocol], + + persistenceId: String, + initialState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State, + settings: EventsourcedSettings): EventsourcedSetup[Command, Event, State] = { + new EventsourcedSetup[Command, Event, State]( + context, + timers, + + persistenceId, + initialState, + commandHandler, + eventHandler, + writerIdentity = WriterIdentity.newIdentity(), + recoveryCompleted = ConstantFun.scalaAnyTwoToUnit, + tagger = (_: Event) ⇒ Set.empty[String], + snapshotWhen = ConstantFun.scalaAnyThreeToFalse, + recovery = Recovery(), + settings, + StashBuffer(settings.stashCapacity) + ) + } +} + +/** INTERNAL API: Carry state for the Persistent behavior implementation behaviors */ +@InternalApi +private[persistence] final case class EventsourcedSetup[Command, Event, State]( + context: ActorContext[InternalProtocol], + timers: TimerScheduler[InternalProtocol], + persistenceId: String, initialState: State, commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - eventHandler: (State, Event) ⇒ State, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - journalPluginId: String = "", - snapshotPluginId: String = "", - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, - recovery: Recovery = Recovery() -) extends PersistentBehavior[Command, Event, State] { + eventHandler: (State, Event) ⇒ State, + writerIdentity: WriterIdentity, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, + tagger: Event ⇒ Set[String], + snapshotWhen: (State, Event, Long) ⇒ Boolean, + recovery: Recovery, - override def apply(ctx: typed.ActorContext[Command]): Behavior[Command] = { - DeferredBehavior[Command](ctx ⇒ - TimerSchedulerImpl.wrapWithTimers[Command] { timers ⇒ - new EventsourcedRequestingRecoveryPermit( - this, - ctx.asInstanceOf[ActorContext[Any]], // sorry - timers.asInstanceOf[TimerScheduler[Any]] // sorry - ).narrow[Command] + settings: EventsourcedSettings, - }(ctx)) + internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\ +) { + import akka.actor.typed.scaladsl.adapter._ + + def withJournalPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = { + require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") + copy(settings = settings.withJournalPluginId(id)) } - /** - * The `callback` function is called to notify the actor that the recovery process - * is finished. - */ - def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = - copy(recoveryCompleted = callback) - - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * `predicate` receives the State, Event and the sequenceNr used for the Event - */ - def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = - copy(snapshotWhen = predicate) - - /** - * Snapshot every N events - * - * `numberOfEvents` should be greater than 0 - */ - def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { - require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") - copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) - } - - /** - * Change the journal plugin id that this actor should use. - */ - def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] = { - require(id != null, "persistence plugin id must not be null; use empty string for 'default' journal") - copy(journalPluginId = id) - } - - /** - * Change the snapshot store plugin id that this actor should use. - */ - def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { + def withSnapshotPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = { require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") - copy(snapshotPluginId = id) + copy(settings = settings.withSnapshotPluginId(id)) } - /** - * Changes the snapshot selection criteria used by this behavior. - * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events - * from the sequence number up until which the snapshot reached. - * - * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be - * performed by replaying all events -- which may take a long time. - */ - def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { - copy(recovery = Recovery(selection)) - } + def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] - /** - * The `tagger` function should give event tags, which will be used in persistence query - */ - def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = - copy(tagger = tagger) + def log = context.log - def copy( - initialState: State = initialState, - commandHandler: CommandHandler[Command, Event, State] = commandHandler, - eventHandler: (State, Event) ⇒ State = eventHandler, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted, - tagger: Event ⇒ Set[String] = tagger, - snapshotWhen: (State, Event, Long) ⇒ Boolean = snapshotWhen, - journalPluginId: String = journalPluginId, - snapshotPluginId: String = snapshotPluginId, - recovery: Recovery = recovery): EventsourcedSetup[Command, Event, State] = - new EventsourcedSetup[Command, Event, State]( - persistenceId = persistenceId, - initialState = initialState, - commandHandler = commandHandler, - eventHandler = eventHandler, - recoveryCompleted = recoveryCompleted, - tagger = tagger, - journalPluginId = journalPluginId, - snapshotPluginId = snapshotPluginId, - snapshotWhen = snapshotWhen, - recovery = recovery) + val persistence: Persistence = Persistence(context.system.toUntyped) + + val journal: ActorRef = persistence.journalFor(settings.journalPluginId) + val snapshotStore: ActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) + + def selfUntyped = context.self.toUntyped + + import EventsourcedBehavior.InternalProtocol + val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { + case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted + case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) + case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) + }.toUntyped } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala index 9c2781d0a1..7579384d07 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala @@ -1,58 +1,36 @@ package akka.persistence.typed.internal -import java.util.Locale - -import akka.actor.typed.{ ActorSystem, Behavior } -import akka.actor.{ DeadLetter, StashOverflowException } +import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer } +import akka.actor.{ DeadLetter, StashOverflowException } import akka.annotation.InternalApi -import akka.event.Logging.LogLevel -import akka.event.{ Logging, LoggingAdapter } -import akka.persistence._ +import akka.event.Logging +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol +import akka.persistence.{ StashOverflowStrategy, _ } import akka.util.ConstantFun import akka.{ actor ⇒ a } /** INTERNAL API: Stash management for persistent behaviors */ @InternalApi private[akka] trait EventsourcedStashManagement { - import EventsourcedStashManagement._ import akka.actor.typed.scaladsl.adapter._ - protected def log: LoggingAdapter + protected def stash(setup: EventsourcedSetup[_, _, _], stash: StashBuffer[InternalProtocol], msg: InternalProtocol): Unit = { + import setup.context - protected def extension: Persistence + val logLevel = setup.settings.stashingLogLevel + if (logLevel != Logging.OffLevel) context.log.debug("Stashing message: {}", msg) // FIXME can be log(logLevel once missing method added - protected val internalStash: StashBuffer[Any] + val internalStashOverflowStrategy: StashOverflowStrategy = setup.persistence.defaultInternalStashOverflowStrategy - private lazy val logLevel = { - val configuredLevel = extension.system.settings.config - .getString("akka.persistence.typed.log-stashing") - Logging.levelFor(configuredLevel).getOrElse(OffLevel) // this is OffLevel - } - - /** - * The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash - * when the internal Stash capacity exceeded. - */ - protected val internalStashOverflowStrategy: StashOverflowStrategy = - extension.defaultInternalStashOverflowStrategy match { - case ReplyToStrategy(_) ⇒ - throw new RuntimeException("ReplyToStrategy is not supported in Akka Typed, since there is no sender()!") - case other ⇒ - other // the other strategies are supported - } - - protected def stash(ctx: ActorContext[Any], msg: Any): Unit = { - if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg) - - try internalStash.stash(msg) catch { + try stash.stash(msg) catch { case e: StashOverflowException ⇒ internalStashOverflowStrategy match { case DiscardToDeadLetterStrategy ⇒ val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow? - ctx.system.deadLetters.tell(DeadLetter(msg, snd, ctx.self.toUntyped)) + context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped)) - case ReplyToStrategy(response) ⇒ + case ReplyToStrategy(_) ⇒ throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!") case ThrowOverflowExceptionStrategy ⇒ @@ -61,15 +39,16 @@ private[akka] trait EventsourcedStashManagement { } } - protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = { + // FIXME, yet we need to also stash not-commands, due to journal responses ... + protected def tryUnstash[C, E, S]( + setup: EventsourcedSetup[C, E, S], + internalStash: StashBuffer[InternalProtocol], // TODO since may want to not have it inside setup + behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { if (internalStash.nonEmpty) { - log.debug("Unstashing message: {}", internalStash.head.getClass) - internalStash.unstash(ctx, behavior, 1, ConstantFun.scalaIdentityFunction) + setup.log.debug("Unstashing message: {}", internalStash.head.getClass) + + internalStash.asInstanceOf[StashBuffer[InternalProtocol]].unstash(setup.context, behavior.asInstanceOf[Behavior[InternalProtocol]], 1, ConstantFun.scalaIdentityFunction) } else behavior } } - -object EventsourcedStashManagement { - private val OffLevel = LogLevel(Int.MinValue) -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 442bd27f0a..058832f11a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -3,11 +3,15 @@ */ package akka.persistence.typed.scaladsl +import akka.actor.typed +import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler } import akka.annotation.InternalApi -import akka.persistence.SnapshotSelectionCriteria +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import akka.persistence.typed.internal._ +import akka.persistence._ +import akka.util.ConstantFun import scala.language.implicitConversions @@ -22,29 +26,9 @@ object PersistentBehaviors { def immutable[Command, Event, State]( persistenceId: String, initialState: State, - commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State], + commandHandler: CommandHandler[Command, Event, State], eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = - new EventsourcedSetup( - persistenceId = persistenceId, - initialState = initialState, - commandHandler = commandHandler, - eventHandler = eventHandler - ) - - /** - * Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known - * until the actor is started and typically based on the entityId, which - * is the actor name. - * - * TODO This will not be needed when it can be wrapped in `Actor.deferred`. - */ - @Deprecated // FIXME remove this - def persistentEntity[Command, Event, State]( - persistenceIdFromActorName: String ⇒ String, - initialState: State, - commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State], - eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = - ??? + PersistentBehaviorImpl(persistenceId, initialState, commandHandler, eventHandler) /** * The `CommandHandler` defines how to act on commands. @@ -108,7 +92,7 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command /** * Change the journal plugin id that this actor should use. */ - def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] + def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] /** * Change the snapshot store plugin id that this actor should use. @@ -130,3 +114,101 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command */ def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] } + +@InternalApi +private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( + persistenceId: String, + initialState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State, + + journalPluginId: Option[String] = None, + snapshotPluginId: Option[String] = None, + // settings: Option[EventsourcedSettings], // FIXME can't because no context available yet + + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + recovery: Recovery = Recovery() +) extends PersistentBehavior[Command, Event, State] { + + override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { + Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ + Behaviors.withTimers[EventsourcedBehavior.InternalProtocol] { timers ⇒ + val setup = EventsourcedSetup( + ctx, + timers, + persistenceId, + initialState, + commandHandler, + eventHandler) + .withJournalPluginId(journalPluginId) + .withSnapshotPluginId(snapshotPluginId) + + EventsourcedRequestingRecoveryPermit(setup) + } + }.widen[Command] { case c ⇒ InternalProtocol.IncomingCommand(c) } // TODO this is nice, same way applicable to mutable style + } + + /** + * The `callback` function is called to notify the actor that the recovery process + * is finished. + */ + def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = + copy(recoveryCompleted = callback) + + /** + * Initiates a snapshot if the given function returns true. + * When persisting multiple events at once the snapshot is triggered after all the events have + * been persisted. + * + * `predicate` receives the State, Event and the sequenceNr used for the Event + */ + def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = + copy(snapshotWhen = predicate) + + /** + * Snapshot every N events + * + * `numberOfEvents` should be greater than 0 + */ + def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { + require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") + copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) + } + + /** + * Change the journal plugin id that this actor should use. + */ + def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") + copy(journalPluginId = if (id != "") Some(id) else None) + } + + /** + * Change the snapshot store plugin id that this actor should use. + */ + def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") + copy(snapshotPluginId = if (id != "") Some(id) else None) + } + + /** + * Changes the snapshot selection criteria used by this behavior. + * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events + * from the sequence number up until which the snapshot reached. + * + * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be + * performed by replaying all events -- which may take a long time. + */ + def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { + copy(recovery = Recovery(selection)) + } + + /** + * The `tagger` function should give event tags, which will be used in persistence query + */ + def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = + copy(tagger = tagger) + +}