diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index b6f8bb3b6f..2ac25cfb07 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -105,6 +105,7 @@ object Behavior { */ def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = BehaviorImpl.widened(behavior, matcher) + } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala index e2bb4f8034..121d27823b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala @@ -52,6 +52,12 @@ case object PreRestart extends PreRestart { * registered watchers after this signal has been processed. */ sealed abstract class PostStop extends Signal +// comment copied onto object for better hints in IDEs +/** + * Lifecycle signal that is fired after this actor and all its child actors + * (transitively) have terminated. The [[Terminated]] signal is only sent to + * registered watchers after this signal has been processed. + */ case object PostStop extends PostStop { def instance: PostStop = this } diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf index 465038ae15..cdccb7df71 100644 --- a/akka-persistence-typed/src/main/resources/reference.conf +++ b/akka-persistence-typed/src/main/resources/reference.conf @@ -2,14 +2,22 @@ akka.persistence.typed { # Persistent actors stash while recovering or persisting events, # this setting configures the default capacity of this stash. - stash-capacity = 2048 - # If negative (or zero) then an unbounded stash is used (default) - # If positive then a bounded stash is used and the capacity is set using - # the property + # + # Stashing is always bounded to the size that is defined in this setting. + # You can set it to large values, however "unbounded" buffering is not supported. + # Negative or 0 values are not allowed. + stash-capacity = 4096 - # enables automatic logging of messages stashed automatically by an PersistentBehavior, + # Configure how to react when the internal stash overflows; + # Implementation must be a sub class of `akka.persistence.StashOverflowStrategyConfigurator` + # + # Provided implementations: + # - akka.persistence.DiscardConfigurator -- discards the overflowing message + # - akka.persistence.ThrowExceptionConfigurator -- throws on overflow which will stop the actor + internal-stash-overflow-strategy = akka.persistence.ThrowExceptionConfigurator + + # enables automatic DEBUG level logging of messages stashed automatically by an PersistentBehavior, # this may happen while it receives commands while it is recovering events or while it is persisting events - # Set to a log-level (debug, info, warn, error) to log stashed messages on given log level; log-stashing = off } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala index 0dd64fe32c..158c9025cb 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala @@ -7,15 +7,9 @@ package akka.persistence.typed.internal import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import akka.actor.NoSerializationVerificationNeeded import akka.actor.typed.Behavior -import akka.actor.typed.Behavior.StoppedBehavior -import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler } +import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi -import akka.event.{ LogSource, Logging } -import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence.{ JournalProtocol, Persistence, RecoveryPermitter, SnapshotProtocol } -import akka.{ actor ⇒ a } /** INTERNAL API */ @InternalApi @@ -33,6 +27,24 @@ private[akka] object EventsourcedBehavior { } final case class WriterIdentity(instanceId: Int, writerUuid: String) + object MDC { + // format: OFF + val AwaitingPermit = "await-permit" + val RecoveringSnapshot = "recover-snap" + val RecoveringEvents = "recover-evts" + val RunningCmds = "running-cmnds" + val PersistingEvents = "persist-evts" + // format: ON + } + + def withMdc(setup: EventsourcedSetup[_, _, _], phaseName: String)(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → phaseName + ) + Behaviors.withMdc(_ ⇒ mdc, b) + } + /** Protocol used internally by the eventsourced behaviors, never exposed to user-land */ sealed trait InternalProtocol object InternalProtocol { @@ -40,63 +52,6 @@ private[akka] object EventsourcedBehavior { final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol - final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends InternalProtocol final case class IncomingCommand[C](c: C) extends InternalProtocol } - // implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] { - // override def genString(b: EventsourcedBehavior[_, _, _]): String = { - // val behaviorShortName = b match { - // case _: EventsourcedRunning[_, _, _] ⇒ "running" - // case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events" - // case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap" - // case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit" - // } - // s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]" - // } - // } - -} - -/** INTERNAL API */ -@InternalApi -private[akka] trait EventsourcedBehavior[Command, Event, State] { - import EventsourcedBehavior._ - import akka.actor.typed.scaladsl.adapter._ - - // protected def timers: TimerScheduler[Any] - - type C = Command - type AC = ActorContext[C] - type E = Event - type S = State - - // used for signaling intent in type signatures - type SeqNr = Long - - // def persistenceId: String = setup.persistenceId - // - // protected def setup: EventsourcedSetup[Command, Event, State] - // protected def initialState: State = setup.initialState - // protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler - // protected def eventHandler: (State, Event) ⇒ State = setup.eventHandler - // protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = setup.snapshotWhen - // protected def tagger: Event ⇒ Set[String] = setup.tagger - // - // protected final def journalPluginId: String = setup.journalPluginId - // protected final def snapshotPluginId: String = setup.snapshotPluginId - - // ------ common ------- - - // protected lazy val extension = Persistence(context.system.toUntyped) - // protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId) - // protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId) - // - // protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped - // protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { - // case res: JournalProtocol.Response ⇒ JournalResponse(res) - // case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted - // case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res) - // case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd - // }.toUntyped - } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala index 8bd2845141..afbfce41b7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala @@ -3,35 +3,27 @@ */ package akka.persistence.typed.internal -import akka.actor.typed.Behavior +import akka.actor.ActorRef +import akka.actor.typed.{ Behavior, PostStop, PreRestart, Signal } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.annotation.InternalApi -import akka.persistence.Eventsourced.StashingHandlerInvocation import akka.persistence.JournalProtocol.ReplayMessages -import akka.persistence._ import akka.persistence.SnapshotProtocol.LoadSnapshot +import akka.persistence._ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import scala.collection.immutable +/** INTERNAL API */ @InternalApi private[akka] trait EventsourcedJournalInteractions[C, E, S] { - import akka.actor.typed.scaladsl.adapter._ def setup: EventsourcedSetup[C, E, S] - private def context = setup.context + type EventOrTagged = Any // `Any` since can be `E` or `Tagged` // ---------- journal interactions --------- - protected def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = { - setup.log.debug("Returning recovery permit, on failure because: " + cause.getMessage) - // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - val permitter = setup.persistence.recoveryPermitter - permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped) - } - - type EventOrTagged = Any // `Any` since can be `E` or `Tagged` protected def internalPersist( state: EventsourcedRunning.EventsourcedState[S], event: EventOrTagged): EventsourcedRunning.EventsourcedState[S] = { @@ -47,8 +39,8 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { sender = senderNotKnownBecauseAkkaTyped ) - val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync - setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped) + val write = AtomicWrite(repr) :: Nil + setup.journal.tell(JournalProtocol.WriteMessages(write, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped) newState } @@ -57,16 +49,18 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { events: immutable.Seq[EventOrTagged], state: EventsourcedRunning.EventsourcedState[S]): EventsourcedRunning.EventsourcedState[S] = { if (events.nonEmpty) { - val newState = state.nextSequenceNr() + var newState = state - val senderNotKnownBecauseAkkaTyped = null - val write = AtomicWrite(events.map(event ⇒ PersistentRepr( - event, - persistenceId = setup.persistenceId, - sequenceNr = newState.seqNr, // FIXME increment for each event - writerUuid = setup.writerIdentity.writerUuid, - sender = senderNotKnownBecauseAkkaTyped) - )) + val writes = events.map { event ⇒ + newState = newState.nextSequenceNr() + PersistentRepr( + event, + persistenceId = setup.persistenceId, + sequenceNr = newState.seqNr, + writerUuid = setup.writerIdentity.writerUuid, + sender = ActorRef.noSender) + } + val write = AtomicWrite(writes) setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped) @@ -76,16 +70,41 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) - // reply is sent to `selfUntypedAdapted`, it is important to target that one setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntyped) } - protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = { - setup.log.debug("Returning recovery permit, reason: " + reason) - // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped) + protected def requestRecoveryPermit(): Unit = { + setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfUntyped) } + /** Intended to be used in .onSignal(returnPermitOnStop) by behaviors */ + protected def returnPermitOnStop: PartialFunction[(ActorContext[InternalProtocol], Signal), Behavior[InternalProtocol]] = { + case (_, PostStop) ⇒ + tryReturnRecoveryPermit("PostStop") + Behaviors.stopped + case (_, PreRestart) ⇒ + // TODO was not entirely sure if it's needed here as well + tryReturnRecoveryPermit("PostStop") + Behaviors.stopped + } + + /** Mutates setup, by setting the `holdingRecoveryPermit` to false */ + protected def tryReturnRecoveryPermit(reason: String): Unit = { + if (setup.holdingRecoveryPermit) { + setup.log.debug("Returning recovery permit, reason: " + reason) + setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped) + setup.holdingRecoveryPermit = false + } // else, no need to return the permit + } + + protected def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = + if (setup.holdingRecoveryPermit) { + setup.log.debug("Returning recovery permit, on failure because: {}", cause.getMessage) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + val permitter = setup.persistence.recoveryPermitter + permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped) + } else setup.log.info("Attempted return of recovery permit however was not holding a permit; ignoring") // TODO: make debug level once confident + // ---------- snapshot store interactions --------- /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala index b8d65c722c..3b3c96d0fd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala @@ -3,13 +3,13 @@ */ package akka.persistence.typed.internal -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } +import akka.actor.typed.{ Behavior, PostStop, Signal } +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler } import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse } +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.internal.EventsourcedBehavior._ import scala.concurrent.duration.FiniteDuration @@ -18,8 +18,16 @@ import scala.util.control.NonFatal /*** * INTERNAL API * - * See next behavior [[EventsourcedRunning]]. + * Third (of four) behavior of an PersistentBehavior. * + * In this behavior we finally start replaying events, beginning from the last applied sequence number + * (i.e. the one up-until-which the snapshot recovery has brought us). + * + * Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed + * and control is given to the user's handlers to drive the actors behavior from there. + * + * See next behavior [[EventsourcedRunning]]. + * See previous behavior [[EventsourcedRecoveringSnapshot]]. */ @InternalApi private[persistence] object EventsourcedRecoveringEvents { @@ -40,9 +48,9 @@ private[persistence] object EventsourcedRecoveringEvents { } @InternalApi -private[persistence] class EventsourcedRecoveringEvents[C, E, S]( - override val setup: EventsourcedSetup[C, E, S]) +private[persistence] class EventsourcedRecoveringEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S]) extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { + import setup.context.log import EventsourcedRecoveringEvents.RecoveringState def createBehavior(state: RecoveringState[S]): Behavior[InternalProtocol] = { @@ -51,67 +59,53 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S]( replayEvents(state.seqNr + 1L, setup.recovery.toSequenceNr) - withMdc { + withMdc(setup, MDC.RecoveringEvents) { stay(state) } } - } - private def stay( - state: RecoveringState[S] - ): Behavior[InternalProtocol] = - Behaviors.immutable { - case (_, JournalResponse(r)) ⇒ onJournalResponse(state, r) - case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) - case (_, RecoveryTickEvent(snap)) ⇒ onRecoveryTick(state, snap) - case (_, cmd @ IncomingCommand(_)) ⇒ onCommand(cmd) + private def stay(state: RecoveringState[S]): Behavior[InternalProtocol] = + withMdc(setup, MDC.RecoveringEvents) { + Behaviors.immutable[InternalProtocol] { + case (_, JournalResponse(r)) ⇒ onJournalResponse(state, r) + case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) + case (_, RecoveryTickEvent(snap)) ⇒ onRecoveryTick(state, snap) + case (_, cmd: IncomingCommand[C]) ⇒ onCommand(cmd) + }.onSignal(returnPermitOnStop) } - private def withMdc(wrapped: Behavior[InternalProtocol]) = { - val mdc = Map( - "persistenceId" → setup.persistenceId, - "phase" → "recover-evnts" - ) - - Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) - } - private def onJournalResponse( state: RecoveringState[S], response: JournalProtocol.Response): Behavior[InternalProtocol] = { - import setup.context.log try { response match { case ReplayedMessage(repr) ⇒ - // eventSeenInInterval = true - // updateLastSequenceNr(repr) - - val newState = state.copy( - seqNr = repr.sequenceNr, - state = setup.eventHandler(state.state, repr.payload.asInstanceOf[E]) - ) - - stay(newState) + val event = repr.payload.asInstanceOf[E] + try { + val newState = state.copy( + seqNr = repr.sequenceNr, + state = setup.eventHandler(state.state, event), + eventSeenInInterval = true) + stay(newState) + } catch { + case NonFatal(ex) ⇒ onRecoveryFailure(ex, repr.sequenceNr, Some(event)) + } case RecoverySuccess(highestSeqNr) ⇒ - log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr) + log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr) cancelRecoveryTimer(setup.timers) - try onRecoveryCompleted(state) - catch { case NonFatal(ex) ⇒ onRecoveryFailure(ex, highestSeqNr, Some(state)) } + onRecoveryCompleted(state) case ReplayMessagesFailure(cause) ⇒ - onRecoveryFailure(cause, state.seqNr, None) + onRecoveryFailure(cause, state.seqNr, Some(response)) - case other ⇒ - // stash(setup, setup.internalStash, other) - // Behaviors.same + case _ ⇒ Behaviors.unhandled } } catch { case NonFatal(cause) ⇒ - cancelRecoveryTimer(setup.timers) onRecoveryFailure(cause, state.seqNr, None) } } @@ -122,22 +116,13 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S]( Behaviors.same } - // FYI, have to keep carrying all [C,E,S] everywhere as otherwise ending up with: - // [error] /Users/ktoso/code/akka/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala:117:14: type mismatch; - // [error] found : akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,_$2] where type _$2, type _$1 - // [error] required: akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,Any] where type _$1 - // [error] Note: _$2 <: Any, but class EventsourcedSetup is invariant in type State. - // [error] You may wish to define State as +State instead. (SLS 4.5) - // [error] Error occurred in an application involving default arguments. - // [error] stay(setup, state.copy(eventSeenInInterval = false)) - // [error] ^ protected def onRecoveryTick(state: RecoveringState[S], snapshot: Boolean): Behavior[InternalProtocol] = if (!snapshot) { if (state.eventSeenInInterval) { stay(state.copy(eventSeenInInterval = false)) } else { cancelRecoveryTimer(setup.timers) - val msg = s"Recovery timed out, didn't get event within ${setup.settings.recoveryEventTimeout}, highest sequence number seen ${state.seqNr}" + val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this? } } else { @@ -146,7 +131,7 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S]( } def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { - setup.log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response)) + setup.log.warning("Unexpected [{}] from SnapshotStore, already in replaying events state.", Logging.simpleName(response)) Behaviors.unhandled // ignore the response } @@ -156,25 +141,24 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S]( * The actor is always stopped after this method has been invoked. * * @param cause failure cause. - * @param event the event that was processed in `receiveRecover`, if the exception was thrown there + * @param message the message that was being processed when the exception was thrown */ - protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, event: Option[Any]): Behavior[InternalProtocol] = { - returnRecoveryPermit(setup, "on recovery failure: " + cause.getMessage) + protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = { cancelRecoveryTimer(setup.timers) + tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) - event match { + message match { case Some(evt) ⇒ - setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) - Behaviors.stopped - + setup.log.error(cause, "Exception during recovery while handling [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) case None ⇒ - setup.log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", setup.persistenceId, sequenceNr) - Behaviors.stopped + setup.log.error(cause, "Exception during recovery. Last known sequence number [{}]", setup.persistenceId, sequenceNr) } + + Behaviors.stopped } protected def onRecoveryCompleted(state: RecoveringState[S]): Behavior[InternalProtocol] = try { - returnRecoveryPermit(setup, "recovery completed successfully") + tryReturnRecoveryPermit("replay completed successfully") setup.recoveryCompleted(setup.commandContext, state.state) val running = EventsourcedRunning[C, E, S]( @@ -187,8 +171,8 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S]( cancelRecoveryTimer(setup.timers) } - // protect against snapshot stalling forever because of journal overloaded and such - private val RecoveryTickTimerKey = "recovery-tick" + // protect against event recovery stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "event-recovery-tick" private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(RecoveryTickTimerKey) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala index 3822171c32..74edfb0561 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala @@ -3,24 +3,19 @@ */ package akka.persistence.typed.internal -import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors.same -import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler } +import akka.actor.typed.{ Behavior, PostStop, Signal } import akka.annotation.InternalApi import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult } import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ -import akka.{ actor ⇒ a } - -import scala.util.control.NonFatal -import scala.util.{ Failure, Success, Try } +import akka.persistence.typed.internal.EventsourcedBehavior._ /** * INTERNAL API * * Second (of four) behavior of an PersistentBehavior. - * See next behavior [[EventsourcedRecoveringEvents]]. * * In this behavior the recovery process is initiated. * We try to obtain a snapshot from the configured snapshot store, @@ -28,6 +23,9 @@ import scala.util.{ Failure, Success, Try } * * Once snapshot recovery is done (or no snapshot was selected), * recovery of events continues in [[EventsourcedRecoveringEvents]]. + * + * See next behavior [[EventsourcedRecoveringEvents]]. + * See previous behavior [[EventsourcedRequestingRecoveryPermit]]. */ @InternalApi private[akka] object EventsourcedRecoveringSnapshot { @@ -38,8 +36,7 @@ private[akka] object EventsourcedRecoveringSnapshot { } @InternalApi -private[akka] class EventsourcedRecoveringSnapshot[C, E, S]( - override val setup: EventsourcedSetup[C, E, S]) +private[akka] class EventsourcedRecoveringSnapshot[C, E, S](override val setup: EventsourcedSetup[C, E, S]) extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { def createBehavior(): Behavior[InternalProtocol] = { @@ -47,24 +44,16 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S]( loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr) - withMdc { - Behaviors.immutable { + withMdc(setup, MDC.RecoveringSnapshot) { + Behaviors.immutable[InternalProtocol] { case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) case (_, JournalResponse(r)) ⇒ onJournalResponse(r) case (_, RecoveryTickEvent(snapshot)) ⇒ onRecoveryTick(snapshot) case (_, cmd: IncomingCommand[C]) ⇒ onCommand(cmd) - } + }.onSignal(returnPermitOnStop) } } - def withMdc(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { - val mdc = Map( - "persistenceId" → setup.persistenceId, - "phase" → "recover-snap" - ) - Behaviors.withMdc(_ ⇒ mdc, b) - } - /** * Called whenever a message replay fails. By default it logs the error. * @@ -76,93 +65,64 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S]( private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = { cancelRecoveryTimer(setup.timers) - val lastSequenceNr = 0 // FIXME not needed since snapshot == 0 event match { case Some(evt) ⇒ - setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + - "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, setup.persistenceId) - Behaviors.stopped - - case None ⇒ - setup.log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " + - "Last known sequence number [{}]", setup.persistenceId, lastSequenceNr) - Behaviors.stopped + setup.log.error(cause, "Exception in receiveRecover when replaying snapshot [{}]", evt.getClass.getName) + case _ ⇒ + setup.log.error(cause, "Persistence failure when replaying snapshot") } + + Behaviors.stopped } private def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] = if (snapshot) { // we know we're in snapshotting mode; snapshot recovery timeout arrived val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") - onRecoveryFailure(ex, event = None) + onRecoveryFailure(ex, None) } else same // ignore, since we received the snapshot already - // protect against snapshot stalling forever because of journal overloaded and such - private val RecoveryTickTimerKey = "recovery-tick" - - private def startRecoveryTimer(): Unit = { - setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout) - } - - private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey) - def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands - setup.internalStash.stash(cmd) // TODO move stash out as it's mutable + setup.internalStash.stash(cmd) Behavior.same } - def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = try { - throw new Exception("Should not talk to journal yet! But got: " + response) - } catch { - case NonFatal(cause) ⇒ - returnRecoveryPermitOnlyOnFailure(cause) - throw cause + def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = { + setup.log.debug("Unexpected response from journal: [{}], may be due to an actor restart, ignoring...", response) + Behaviors.unhandled } - def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = try { + def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { response match { case LoadSnapshotResult(sso, toSnr) ⇒ var state: S = setup.initialState - val re: Try[Long] = Try { - sso match { - case Some(SelectedSnapshot(metadata, snapshot)) ⇒ - state = snapshot.asInstanceOf[S] - metadata.sequenceNr - case None ⇒ - 0 // from the start please - } + val seqNr: Long = sso match { + case Some(SelectedSnapshot(metadata, snapshot)) ⇒ + state = snapshot.asInstanceOf[S] + metadata.sequenceNr + case None ⇒ 0 // from the beginning please } - re match { - case Success(seqNr) ⇒ - replayMessages(state, seqNr, toSnr) - - case Failure(cause) ⇒ - // FIXME better exception type - val ex = new RuntimeException(s"Failed to recover state for [${setup.persistenceId}] from snapshot offer.", cause) - onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME - } + becomeReplayingEvents(state, seqNr, toSnr) case LoadSnapshotFailed(cause) ⇒ cancelRecoveryTimer(setup.timers) - onRecoveryFailure(cause, event = None) case _ ⇒ Behaviors.unhandled } - } catch { - case NonFatal(cause) ⇒ - returnRecoveryPermitOnlyOnFailure(cause) - throw cause } - private def replayMessages(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = { + private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = { cancelRecoveryTimer(setup.timers) - val rec = setup.recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types + val rec = setup.recovery.copy( + toSequenceNr = toSnr, + fromSnapshot = SnapshotSelectionCriteria.None + ) EventsourcedRecoveringEvents[C, E, S]( setup.copy(recovery = rec), @@ -171,4 +131,10 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S]( ) } + // protect against snapshot stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "snapshot-recovery-tick" + private def startRecoveryTimer(): Unit = + setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout) + private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey) + } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala index 82aa663bf1..0be1f2b8a1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -3,23 +3,21 @@ */ package akka.persistence.typed.internal -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.Behaviors.MutableBehavior -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.actor.typed.{ Behavior, PostStop, PreRestart } +import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi -import akka.event.Logging import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol /** * INTERNAL API * * First (of four) behaviour of an PersistentBehaviour. - * See next behavior [[EventsourcedRecoveringSnapshot]]. * * Requests a permit to start recovering this actor; this is tone to avoid * hammering the journal with too many concurrently recovering actors. * + * See next behavior [[EventsourcedRecoveringSnapshot]]. */ @InternalApi private[akka] object EventsourcedRequestingRecoveryPermit { @@ -30,9 +28,8 @@ private[akka] object EventsourcedRequestingRecoveryPermit { } @InternalApi -private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S]( - override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedStashManagement[C, E, S] { +private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val setup: EventsourcedSetup[C, E, S]) + extends EventsourcedStashManagement[C, E, S] with EventsourcedJournalInteractions[C, E, S] { def createBehavior(): Behavior[InternalProtocol] = { // request a permit, as only once we obtain one we can start recovering @@ -62,15 +59,8 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S]( private def becomeRecovering(): Behavior[InternalProtocol] = { setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery) + setup.holdingRecoveryPermit = true EventsourcedRecoveringSnapshot(setup) } - // ---------- journal interactions --------- - - private def requestRecoveryPermit(): Unit = { - import akka.actor.typed.scaladsl.adapter._ - // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfUntyped) - } - } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 2d7eb39b8a..322218d28b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -3,18 +3,17 @@ */ package akka.persistence.typed.internal -import akka.actor.typed.Behavior +import akka.actor.typed.{ Behavior, Signal } import akka.actor.typed.Behavior.StoppedBehavior +import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors.MutableBehavior -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } import akka.annotation.InternalApi -import akka.event.Logging import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation } import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse } +import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, SnapshotterResponse } import scala.annotation.tailrec import scala.collection.immutable @@ -22,20 +21,22 @@ import scala.collection.immutable /** * INTERNAL API * - * Fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior. + * Conceptually fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior. * * In this phase recovery has completed successfully and we continue handling incoming commands, * as well as persisting new events as dictated by the user handlers. * - * This behavior operates in two phases: + * This behavior operates in two phases (also behaviors): * - HandlingCommands - where the command handler is invoked for incoming commands * - PersistingEvents - where incoming commands are stashed until persistence completes * * This is implemented as such to avoid creating many EventsourcedRunning instances, * which perform the Persistence extension lookup on creation and similar things (config lookup) * + * See previous [[EventsourcedRecoveringEvents]]. */ -@InternalApi private[akka] object EventsourcedRunning { +@InternalApi +private[akka] object EventsourcedRunning { final case class EventsourcedState[State]( seqNr: Long, @@ -60,14 +61,21 @@ import scala.collection.immutable // =============================================== -@InternalApi private[akka] class EventsourcedRunning[C, E, S](override val setup: EventsourcedSetup[C, E, S]) +/** INTERNAL API */ +@InternalApi private[akka] class EventsourcedRunning[C, E, S]( + override val setup: EventsourcedSetup[C, E, S]) extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { import EventsourcedRunning.EventsourcedState + import EventsourcedBehavior.withMdc + + private def log = setup.log + private def commandContext = setup.commandContext + def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = { def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { - val effect = setup.commandHandler(setup.commandContext, state.state, cmd) + val effect = setup.commandHandler(commandContext, state.state, cmd) applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? } @@ -77,8 +85,6 @@ import scala.collection.immutable effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil ): Behavior[InternalProtocol] = { - import setup.log - if (log.isDebugEnabled) log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) @@ -158,12 +164,12 @@ import scala.collection.immutable if (tags.isEmpty) event else Tagged(event, tags) } - withMdc("run-cmnds") { + withMdc(setup, MDC.RunningCmds) { Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { - case (_, SnapshotterResponse(r)) ⇒ Behaviors.unhandled - case (_, JournalResponse(r)) ⇒ Behaviors.unhandled case (_, IncomingCommand(c: C @unchecked)) ⇒ onCommand(state, c) - } + case (_, SnapshotterResponse(_)) ⇒ Behaviors.unhandled + case (_, JournalResponse(_)) ⇒ Behaviors.unhandled + }.onSignal(returnPermitOnStop) } } @@ -175,8 +181,8 @@ import scala.collection.immutable pendingInvocations: immutable.Seq[PendingHandlerInvocation], sideEffects: immutable.Seq[ChainableEffect[_, S]] ): Behavior[InternalProtocol] = { - withMdc("run-persist-evnts") { - Behaviors.mutable[EventsourcedBehavior.InternalProtocol](_ ⇒ new PersistingEvents(state, pendingInvocations, sideEffects)) + withMdc(setup, MDC.PersistingEvents) { + new PersistingEvents(state, pendingInvocations, sideEffects) } } @@ -194,6 +200,9 @@ import scala.collection.immutable } } + override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = + { case signal ⇒ returnPermitOnStop((setup.context, signal)) } + def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { stash(cmd) this @@ -204,8 +213,6 @@ import scala.collection.immutable setup.log.debug("Received Journal response: {}", response) response match { case WriteMessageSuccess(p, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case we ignore the call to the handler if (id == setup.writerIdentity.instanceId) { state = state.updateLastSequenceNr(p) // FIXME is the order of pendingInvocations not reversed? @@ -218,8 +225,6 @@ import scala.collection.immutable } else this case WriteMessageRejected(p, cause, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case the handler has already been discarded if (id == setup.writerIdentity.instanceId) { state = state.updateLastSequenceNr(p) onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) @@ -227,10 +232,7 @@ import scala.collection.immutable } else this case WriteMessageFailure(p, cause, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case the handler has already been discarded if (id == setup.writerIdentity.instanceId) { - // onWriteMessageComplete() -> tryBecomeHandlingCommands onPersistFailureThenStop(cause, p.payload, p.sequenceNr) } else this @@ -248,9 +250,6 @@ import scala.collection.immutable } } - // private def onWriteMessageComplete(): Unit = - // tryBecomeHandlingCommands() - private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { setup.log.error( cause, @@ -269,10 +268,10 @@ import scala.collection.immutable private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { response match { case SaveSnapshotSuccess(meta) ⇒ - setup.context.log.debug("Save snapshot successful: " + meta) + setup.context.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) this case SaveSnapshotFailure(meta, ex) ⇒ - setup.context.log.error(ex, "Save snapshot failed! " + meta) + setup.context.log.error(ex, "Save snapshot failed, snapshot metadata: [{}]", meta) this // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop } } @@ -281,15 +280,6 @@ import scala.collection.immutable // -------------------------- - private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = { - val mdc = Map( - "persistenceId" → setup.persistenceId, - "phase" → phase - ) - - Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) - } - def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { var res: Behavior[InternalProtocol] = handlingCommands(state) val it = effects.iterator @@ -298,10 +288,8 @@ import scala.collection.immutable // manual loop implementation to avoid allocations and multiple scans while (it.hasNext) { val effect = it.next() - applySideEffect(effect, state) match { - case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped - case _ ⇒ // nothing to do - } + val stopped = !Behavior.isAlive(applySideEffect(effect, state)) + if (stopped) res = Behaviors.stopped } res @@ -320,286 +308,3 @@ import scala.collection.immutable } } - -//@InternalApi -//class EventsourcedRunning[Command, Event, State]( -// val setup: EventsourcedSetup[Command, Event, State], -// // internalStash: StashBuffer[Any], // FIXME separate or in settings? -// -// private var sequenceNr: Long, -// val writerIdentity: WriterIdentity, -// -// private var state: State -//) extends MutableBehavior[Any] -// with EventsourcedBehavior[Command, Event, State] -// with EventsourcedStashManagement { same ⇒ -// import setup._ -// -// import EventsourcedBehavior._ -// import akka.actor.typed.scaladsl.adapter._ -// -// // Holds callbacks for persist calls (note that we do not implement persistAsync currently) -// private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty -// private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it -// -// // ---------- -//// -//// private def snapshotSequenceNr: Long = sequenceNr -//// -//// private def updateLastSequenceNr(persistent: PersistentRepr): Unit = -//// if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr -//// private def nextSequenceNr(): Long = { -//// sequenceNr += 1L -//// sequenceNr -//// } -// // ---------- -// -// private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[C] = { -// response match { -// case SaveSnapshotSuccess(meta) ⇒ -// setup.context.log.debug("Save snapshot successful: " + meta) -// Behaviors.same -// case SaveSnapshotFailure(meta, ex) ⇒ -// setup.context.log.error(ex, "Save snapshot failed! " + meta) -// Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop -// } -// } -// -// // ---------- -// -// trait EventsourcedRunningPhase { -// def name: String -// def onCommand(c: Command): Behavior[Any] -// def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] -// } -// -//// object HandlingCommands extends EventsourcedRunningPhase { -//// def name = "HandlingCommands" -//// -//// final override def onCommand(command: Command): Behavior[Any] = { -//// val effect = commandHandler(commandContext, state, command) -//// applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? -//// } -//// final override def onJournalResponse(response: Response): Behavior[Any] = { -//// // ignore, could happen if actor was restarted? -//// } -//// } -// -// object PersistingEventsNoSideEffects extends PersistingEvents(Nil) -// -// sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase { -// def name = "PersistingEvents" -// -// final override def onCommand(c: Command): Behavior[Any] = { -// stash(setup, context, c) -// same -// } -// -// final override def onJournalResponse(response: Response): Behavior[Any] = { -// log.debug("Received Journal response: {}", response) -// response match { -// case WriteMessageSuccess(p, id) ⇒ -// // instanceId mismatch can happen for persistAsync and defer in case of actor restart -// // while message is in flight, in that case we ignore the call to the handler -// if (id == writerIdentity.instanceId) { -// updateLastSequenceNr(p) -// popApplyHandler(p.payload) -// onWriteMessageComplete() -// tryUnstash(setup, internalStash, applySideEffects(sideEffects)) -// } else same -// -// case WriteMessageRejected(p, cause, id) ⇒ -// // instanceId mismatch can happen for persistAsync and defer in case of actor restart -// // while message is in flight, in that case the handler has already been discarded -// if (id == writerIdentity.instanceId) { -// updateLastSequenceNr(p) -// onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop -// tryUnstash(setup, applySideEffects(sideEffects)) -// } else same -// -// case WriteMessageFailure(p, cause, id) ⇒ -// // instanceId mismatch can happen for persistAsync and defer in case of actor restart -// // while message is in flight, in that case the handler has already been discarded -// if (id == writerIdentity.instanceId) { -// onWriteMessageComplete() -// onPersistFailureThenStop(cause, p.payload, p.sequenceNr) -// } else same -// -// case WriteMessagesSuccessful ⇒ -// // ignore -// same -// -// case WriteMessagesFailed(_) ⇒ -// // ignore -// same // it will be stopped by the first WriteMessageFailure message; not applying side effects -// -// case _: LoopMessageSuccess ⇒ -// // ignore, should never happen as there is no persistAsync in typed -// same -// } -// } -// -// private def onWriteMessageComplete(): Unit = -// tryBecomeHandlingCommands() -// -// private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { -// log.error( -// cause, -// "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", -// event.getClass.getName, seqNr, persistenceId, cause.getMessage) -// } -// -// private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = { -// log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", -// event.getClass.getName, seqNr, persistenceId) -// -// // FIXME see #24479 for reconsidering the stopping behaviour -// Behaviors.stopped -// } -// -// } -// -// // the active phase switches between PersistingEvents and HandlingCommands; -// // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect -// private[this] var phase: EventsourcedRunningPhase = HandlingCommands -// -// override def onMessage(msg: Any): Behavior[Any] = { -// msg match { -// // TODO explore crazy hashcode hack to make this match quicker...? -// case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) -// case JournalResponse(r) ⇒ phase.onJournalResponse(r) -// case command: Command @unchecked ⇒ -// // the above type-check does nothing, since Command is tun -// // we cast explicitly to fail early in case of type mismatch -// val c = command.asInstanceOf[Command] -// phase.onCommand(c) -// } -// } -// -// // ---------- -// -// def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { -// var res: Behavior[Any] = same -// val it = effects.iterator -// -// // if at least one effect results in a `stop`, we need to stop -// // manual loop implementation to avoid allocations and multiple scans -// while (it.hasNext) { -// val effect = it.next() -// applySideEffect(effect) match { -// case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped -// case _ ⇒ // nothing to do -// } -// } -// -// res -// } -// -// def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match { -// case _: Stop.type @unchecked ⇒ -// Behaviors.stopped -// -// case SideEffect(sideEffects) ⇒ -// sideEffects(state) -// same -// -// case _ ⇒ -// throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") -// } -// -// def applyEvent(s: S, event: E): S = -// eventHandler(s, event) -// -// @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = { -// if (log.isDebugEnabled) -// log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) -// -// effect match { -// case CompositeEffect(e, currentSideEffects) ⇒ -// // unwrap and accumulate effects -// applyEffects(msg, e, currentSideEffects ++ sideEffects) -// -// case Persist(event) ⇒ -// // apply the event before persist so that validation exception is handled before persisting -// // the invalid event, in case such validation is implemented in the event handler. -// // also, ensure that there is an event handler for each single event -// state = applyEvent(state, event) -// val tags = tagger(event) -// val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) -// -// internalPersist(eventToPersist, sideEffects) { _ ⇒ -// if (snapshotWhen(state, event, sequenceNr)) -// internalSaveSnapshot(state) -// } -// -// case PersistAll(events) ⇒ -// if (events.nonEmpty) { -// // apply the event before persist so that validation exception is handled before persisting -// // the invalid event, in case such validation is implemented in the event handler. -// // also, ensure that there is an event handler for each single event -// var count = events.size -// var seqNr = sequenceNr -// val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { -// case ((currentState, snapshot), event) ⇒ -// seqNr += 1 -// val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr) -// (applyEvent(currentState, event), shouldSnapshot) -// } -// state = newState -// val eventsToPersist = events.map { event ⇒ -// val tags = tagger(event) -// if (tags.isEmpty) event else Tagged(event, tags) -// } -// -// internalPersistAll(eventsToPersist, sideEffects) { _ ⇒ -// count -= 1 -// if (count == 0) { -// sideEffects.foreach(applySideEffect) -// if (shouldSnapshotAfterPersist) -// internalSaveSnapshot(state) -// } -// } -// } else { -// // run side-effects even when no events are emitted -// tryUnstash(context, applySideEffects(sideEffects)) -// } -// -// case e: PersistNothing.type @unchecked ⇒ -// tryUnstash(context, applySideEffects(sideEffects)) -// -// case _: Unhandled.type @unchecked ⇒ -// applySideEffects(sideEffects) -// Behavior.unhandled -// -// case c: ChainableEffect[_, S] ⇒ -// applySideEffect(c) -// } -// } -// -// private def popApplyHandler(payload: Any): Unit = -// pendingInvocations.pop().handler(payload) -// -// private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { -// if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException( -// "Attempted to become PersistingEvents while already in this phase! Logic error?") -// -// phase = -// if (sideEffects.isEmpty) PersistingEventsNoSideEffects -// else new PersistingEvents(sideEffects) -// -// same -// } -// -// private def tryBecomeHandlingCommands(): Behavior[Any] = { -// if (phase == HandlingCommands) throw new IllegalArgumentException( -// "Attempted to become HandlingCommands while already in this phase! Logic error?") -// -// if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN? -// phase = HandlingCommands -// } -// -// same -// } -// -// override def toString = s"EventsourcedRunning($persistenceId,${phase.name})" -//} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala index 8f8b8191f5..1abf30347c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala @@ -7,23 +7,25 @@ import java.util.concurrent.TimeUnit import akka.actor.typed.ActorSystem import akka.annotation.InternalApi -import akka.event.Logging -import akka.event.Logging.LogLevel +import akka.persistence.Persistence import com.typesafe.config.Config import scala.concurrent.duration._ -trait EventsourcedSettings { +/** INTERNAL API */ +@InternalApi +private[akka] trait EventsourcedSettings { def stashCapacity: Int - // def stashOverflowStrategyName: String // TODO not supported, the stash just throws for now - def stashingLogLevel: LogLevel - def journalPluginId: String - def snapshotPluginId: String + def logOnStashing: Boolean + def stashOverflowStrategyConfigurator: String + def recoveryEventTimeout: FiniteDuration - def withJournalPluginId(id: Option[String]): EventsourcedSettings - def withSnapshotPluginId(id: Option[String]): EventsourcedSettings + def journalPluginId: String + def withJournalPluginId(id: String): EventsourcedSettings + def snapshotPluginId: String + def withSnapshotPluginId(id: String): EventsourcedSettings } object EventsourcedSettings { @@ -33,51 +35,54 @@ object EventsourcedSettings { def apply(config: Config): EventsourcedSettings = { val typedConfig = config.getConfig("akka.persistence.typed") - val untypedConfig = config.getConfig("akka.persistence") // StashOverflowStrategy - val internalStashOverflowStrategy = - untypedConfig.getString("internal-stash-overflow-strategy") // FIXME or copy it to typed? + val stashOverflowStrategyConfigurator = typedConfig.getString("internal-stash-overflow-strategy") val stashCapacity = typedConfig.getInt("stash-capacity") + require(stashCapacity > 0, "stash-capacity MUST be > 0, unbounded buffering is not supported.") - val stashingLogLevel = typedConfig.getString("log-stashing") match { - case "off" ⇒ Logging.OffLevel - case "on" | "true" ⇒ Logging.DebugLevel - case l ⇒ Logging.levelFor(l).getOrElse(Logging.OffLevel) - } - - // FIXME this is wrong I think - val recoveryEventTimeout = 10.seconds // untypedConfig.getDuration("plugin-journal-fallback.recovery-event-timeout", TimeUnit.MILLISECONDS).millis + val logOnStashing = typedConfig.getBoolean("log-stashing") EventsourcedSettingsImpl( + config, stashCapacity = stashCapacity, - internalStashOverflowStrategy, - stashingLogLevel = stashingLogLevel, + stashOverflowStrategyConfigurator, + logOnStashing = logOnStashing, journalPluginId = "", - snapshotPluginId = "", - recoveryEventTimeout = recoveryEventTimeout + snapshotPluginId = "" ) } + + /** + * INTERNAL API + */ + private[akka] final def journalConfigFor(config: Config, journalPluginId: String): Config = { + val defaultJournalPluginId = config.getString("akka.persistence.journal.plugin") + val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId + config.getConfig(configPath) + .withFallback(config.getConfig(Persistence.JournalFallbackConfigPath)) + } + } @InternalApi private[persistence] final case class EventsourcedSettingsImpl( - stashCapacity: Int, - stashOverflowStrategyName: String, - stashingLogLevel: LogLevel, - journalPluginId: String, - snapshotPluginId: String, - recoveryEventTimeout: FiniteDuration + private val config: Config, + stashCapacity: Int, + stashOverflowStrategyConfigurator: String, + logOnStashing: Boolean, + journalPluginId: String, + snapshotPluginId: String ) extends EventsourcedSettings { - def withJournalPluginId(id: Option[String]): EventsourcedSettings = id match { - case Some(identifier) ⇒ copy(journalPluginId = identifier) - case _ ⇒ this - } - def withSnapshotPluginId(id: Option[String]): EventsourcedSettings = id match { - case Some(identifier) ⇒ copy(snapshotPluginId = identifier) - case _ ⇒ this - } + def withJournalPluginId(id: String): EventsourcedSettings = + copy(journalPluginId = id) + def withSnapshotPluginId(id: String): EventsourcedSettings = + copy(snapshotPluginId = id) + + private val journalConfig = EventsourcedSettings.journalConfigFor(config, journalPluginId) + val recoveryEventTimeout = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis + } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index d63cf468c5..c00aef8ab7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -4,99 +4,46 @@ package akka.persistence.typed.internal import akka.actor.ActorRef -import akka.{ actor ⇒ a } import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler } import akka.annotation.InternalApi -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryPermitGranted +import akka.persistence._ import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence._ -import akka.util.ConstantFun +import akka.{ actor ⇒ a } +/** + * INTERNAL API: Carry state for the Persistent behavior implementation behaviors + */ @InternalApi -private[persistence] object EventsourcedSetup { - - def apply[Command, Event, State]( - context: ActorContext[InternalProtocol], - timers: TimerScheduler[InternalProtocol], - - persistenceId: String, - initialState: State, - commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - eventHandler: (State, Event) ⇒ State): EventsourcedSetup[Command, Event, State] = { - apply( - context, - timers, - persistenceId, - initialState, - commandHandler, - eventHandler, - // values dependent on context - EventsourcedSettings(context.system)) - } - - def apply[Command, Event, State]( - context: ActorContext[InternalProtocol], - timers: TimerScheduler[InternalProtocol], - - persistenceId: String, - initialState: State, - commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - eventHandler: (State, Event) ⇒ State, - settings: EventsourcedSettings): EventsourcedSetup[Command, Event, State] = { - new EventsourcedSetup[Command, Event, State]( - context, - timers, - - persistenceId, - initialState, - commandHandler, - eventHandler, - writerIdentity = WriterIdentity.newIdentity(), - recoveryCompleted = ConstantFun.scalaAnyTwoToUnit, - tagger = (_: Event) ⇒ Set.empty[String], - snapshotWhen = ConstantFun.scalaAnyThreeToFalse, - recovery = Recovery(), - settings, - StashBuffer(settings.stashCapacity) - ) - } -} - -/** INTERNAL API: Carry state for the Persistent behavior implementation behaviors */ -@InternalApi -private[persistence] final case class EventsourcedSetup[Command, Event, State]( - context: ActorContext[InternalProtocol], - timers: TimerScheduler[InternalProtocol], - - persistenceId: String, - initialState: State, - commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - - eventHandler: (State, Event) ⇒ State, - writerIdentity: WriterIdentity, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, - tagger: Event ⇒ Set[String], - snapshotWhen: (State, Event, Long) ⇒ Boolean, - recovery: Recovery, - - settings: EventsourcedSettings, - - internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\ +private[persistence] final case class EventsourcedSetup[C, E, S]( + context: ActorContext[InternalProtocol], + timers: TimerScheduler[InternalProtocol], + persistenceId: String, + initialState: S, + commandHandler: PersistentBehaviors.CommandHandler[C, E, S], + eventHandler: (S, E) ⇒ S, + writerIdentity: WriterIdentity, + recoveryCompleted: (ActorContext[C], S) ⇒ Unit, + tagger: E ⇒ Set[String], + snapshotWhen: (S, E, Long) ⇒ Boolean, + recovery: Recovery, + var holdingRecoveryPermit: Boolean, + settings: EventsourcedSettings, + internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\ ) { import akka.actor.typed.scaladsl.adapter._ - def withJournalPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = { + def withJournalPluginId(id: String): EventsourcedSetup[C, E, S] = { require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") copy(settings = settings.withJournalPluginId(id)) } - def withSnapshotPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = { + def withSnapshotPluginId(id: String): EventsourcedSetup[C, E, S] = { require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") copy(settings = settings.withSnapshotPluginId(id)) } - def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]] def log = context.log @@ -107,13 +54,5 @@ private[persistence] final case class EventsourcedSetup[Command, Event, State]( def selfUntyped = context.self.toUntyped - import EventsourcedBehavior.InternalProtocol - val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { - case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) - case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted - case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) - case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) - }.toUntyped - } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala index 020f4e4860..c4d4c5e4b6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala @@ -1,12 +1,15 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ package akka.persistence.typed.internal import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer } -import akka.actor.{ DeadLetter, StashOverflowException } +import akka.actor.{ DeadLetter, ExtendedActorSystem, StashOverflowException } import akka.annotation.InternalApi -import akka.event.Logging import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import akka.persistence.{ StashOverflowStrategy, _ } +import akka.util.Collections.EmptyImmutableSeq import akka.util.ConstantFun import akka.{ actor ⇒ a } @@ -17,23 +20,25 @@ private[akka] trait EventsourcedStashManagement[C, E, S] { def setup: EventsourcedSetup[C, E, S] - private def context = setup.context + private def context: ActorContext[InternalProtocol] = setup.context private def stashBuffer: StashBuffer[InternalProtocol] = setup.internalStash protected def stash(msg: InternalProtocol): Unit = { + if (setup.settings.logOnStashing) setup.log.debug("Stashing message: [{}]", msg) - val logLevel = setup.settings.stashingLogLevel - if (logLevel != Logging.OffLevel) context.log.debug("Stashing message: {}", msg) // FIXME can be log(logLevel once missing method added - - val internalStashOverflowStrategy: StashOverflowStrategy = setup.persistence.defaultInternalStashOverflowStrategy + val internalStashOverflowStrategy: StashOverflowStrategy = { + val system = context.system.toUntyped.asInstanceOf[ExtendedActorSystem] + system.dynamicAccess.createInstanceFor[StashOverflowStrategyConfigurator](setup.settings.stashOverflowStrategyConfigurator, EmptyImmutableSeq) + .map(_.create(system.settings.config)).get + } try stashBuffer.stash(msg) catch { case e: StashOverflowException ⇒ internalStashOverflowStrategy match { case DiscardToDeadLetterStrategy ⇒ - val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow? - context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped)) + val noSenderBecauseAkkaTyped: a.ActorRef = a.ActorRef.noSender + context.system.deadLetters.tell(DeadLetter(msg, noSenderBecauseAkkaTyped, context.self.toUntyped)) case ReplyToStrategy(_) ⇒ throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!") @@ -44,11 +49,10 @@ private[akka] trait EventsourcedStashManagement[C, E, S] { } } - // FIXME, yet we need to also stash not-commands, due to journal responses ... protected def tryUnstash( behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { if (stashBuffer.nonEmpty) { - setup.log.debug("Unstashing message: {}", stashBuffer.head.getClass) + if (setup.settings.logOnStashing) setup.log.debug("Unstashing message: [{}]", stashBuffer.head.getClass) stashBuffer.unstash(setup.context, behavior.asInstanceOf[Behavior[InternalProtocol]], 1, ConstantFun.scalaIdentityFunction) } else behavior diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala new file mode 100644 index 0000000000..5a0885090b --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer } +import akka.annotation.InternalApi +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } +import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors } +import akka.util.ConstantFun + +@InternalApi +private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( + persistenceId: String, + initialState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State, + journalPluginId: Option[String] = None, + snapshotPluginId: Option[String] = None, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + recovery: Recovery = Recovery() +) extends PersistentBehavior[Command, Event, State] { + + override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { + Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ + Behaviors.withTimers { timers ⇒ + val settings = EventsourcedSettings(ctx.system) + val setup = EventsourcedSetup( + ctx, + timers, + persistenceId, + initialState, + commandHandler, + eventHandler, + WriterIdentity.newIdentity(), + recoveryCompleted, + tagger, + snapshotWhen, + recovery, + holdingRecoveryPermit = false, + settings = settings, + internalStash = StashBuffer(settings.stashCapacity) + ).withJournalPluginId(journalPluginId.getOrElse("")) + .withSnapshotPluginId(snapshotPluginId.getOrElse("")) + + EventsourcedRequestingRecoveryPermit(setup) + } + }.widen[Any] { + case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) + case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted + case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) + }.narrow[Command] + } + + /** + * The `callback` function is called to notify the actor that the recovery process + * is finished. + */ + def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = + copy(recoveryCompleted = callback) + + /** + * Initiates a snapshot if the given function returns true. + * When persisting multiple events at once the snapshot is triggered after all the events have + * been persisted. + * + * `predicate` receives the State, Event and the sequenceNr used for the Event + */ + def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = + copy(snapshotWhen = predicate) + + /** + * Snapshot every N events + * + * `numberOfEvents` should be greater than 0 + */ + def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { + require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") + copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) + } + + /** + * Change the journal plugin id that this actor should use. + */ + def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") + copy(journalPluginId = if (id != "") Some(id) else None) + } + + /** + * Change the snapshot store plugin id that this actor should use. + */ + def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") + copy(snapshotPluginId = if (id != "") Some(id) else None) + } + + /** + * Changes the snapshot selection criteria used by this behavior. + * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events + * from the sequence number up until which the snapshot reached. + * + * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be + * performed by replaying all events -- which may take a long time. + */ + def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { + copy(recovery = Recovery(selection)) + } + + /** + * The `tagger` function should give event tags, which will be used in persistence query + */ + def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = + copy(tagger = tagger) + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 5bd4a5789d..1f8067cc1d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -3,17 +3,11 @@ */ package akka.persistence.typed.scaladsl -import akka.actor.typed -import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler } +import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol -import akka.persistence.typed.internal._ import akka.persistence._ -import akka.util.ConstantFun - -import scala.language.implicitConversions +import akka.persistence.typed.internal._ object PersistentBehaviors { @@ -114,106 +108,3 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command */ def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] } - -@InternalApi -private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( - persistenceId: String, - initialState: State, - commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - eventHandler: (State, Event) ⇒ State, - - journalPluginId: Option[String] = None, - snapshotPluginId: Option[String] = None, - // settings: Option[EventsourcedSettings], // FIXME can't because no context available yet - - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, - recovery: Recovery = Recovery() -) extends PersistentBehavior[Command, Event, State] { - - override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { - Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ - Behaviors.withTimers[EventsourcedBehavior.InternalProtocol] { timers ⇒ - val setup = EventsourcedSetup( - ctx, - timers, - persistenceId, - initialState, - commandHandler, - eventHandler) - .withJournalPluginId(journalPluginId) - .withSnapshotPluginId(snapshotPluginId) - - EventsourcedRequestingRecoveryPermit(setup) - } - }.widen[Any] { - case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) - case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted - case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) - case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) - }.narrow[Command] - } - - /** - * The `callback` function is called to notify the actor that the recovery process - * is finished. - */ - def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = - copy(recoveryCompleted = callback) - - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * `predicate` receives the State, Event and the sequenceNr used for the Event - */ - def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = - copy(snapshotWhen = predicate) - - /** - * Snapshot every N events - * - * `numberOfEvents` should be greater than 0 - */ - def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { - require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") - copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) - } - - /** - * Change the journal plugin id that this actor should use. - */ - def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = { - require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") - copy(journalPluginId = if (id != "") Some(id) else None) - } - - /** - * Change the snapshot store plugin id that this actor should use. - */ - def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { - require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") - copy(snapshotPluginId = if (id != "") Some(id) else None) - } - - /** - * Changes the snapshot selection criteria used by this behavior. - * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events - * from the sequence number up until which the snapshot reached. - * - * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be - * performed by replaying all events -- which may take a long time. - */ - def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { - copy(recovery = Recovery(selection)) - } - - /** - * The `tagger` function should give event tags, which will be used in persistence query - */ - def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = - copy(tagger = tagger) - -} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 102ef3d919..57d81795f5 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -3,11 +3,9 @@ */ package akka.persistence.typed.scaladsl -import akka.actor.ActorSystemImpl import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } import akka.persistence.snapshot.SnapshotStore -import akka.persistence.typed.scaladsl.PersistentBehaviors._ import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } import akka.testkit.typed.TestKitSettings import akka.testkit.typed.scaladsl._ @@ -38,7 +36,7 @@ object PersistentBehaviorSpec { val config = ConfigFactory.parseString( s""" - akka.loglevel = DEBUG + akka.loglevel = INFO # akka.persistence.typed.log-stashing = INFO akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore" @@ -319,7 +317,11 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "snapshot via predicate" in { - val alwaysSnapshot = counter("c9").snapshotWhen { (_, _, _) ⇒ true } + val alwaysSnapshot: Behavior[Command] = + Behaviors.setup { context ⇒ + counter("c9").snapshotWhen { (_, _, _) ⇒ true } + } + val c = spawn(alwaysSnapshot) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index c78d279f5a..510713d88e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -71,7 +71,7 @@ final class PersistenceSettings(config: Config) { } /** - * Identification of [[PersistentActor]] or [[PersistentView]]. + * Identification of [[PersistentActor]]. */ //#persistence-identity trait PersistenceIdentity { @@ -133,6 +133,12 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { /** INTERNAL API. */ private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config) extends Extension + + /** Config path to fall-back to if a setting is not defined in a specific plugin's config section */ + val JournalFallbackConfigPath = "akka.persistence.journal-plugin-fallback" + + /** Config path to fall-back to if a setting is not defined in a specific snapshot plugin's config section */ + val SnapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback" } /** @@ -190,9 +196,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { /** Discovered persistence journal and snapshot store plugins. */ private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) - private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback" - private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback" - config.getStringList("journal.auto-start-journals").forEach(new Consumer[String] { override def accept(id: String): Unit = { log.info(s"Auto-starting journal plugin `$id`") @@ -213,7 +216,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { */ final def adaptersFor(journalPluginId: String): EventAdapters = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - pluginHolderFor(configPath, journalFallbackConfigPath).adapters + pluginHolderFor(configPath, JournalFallbackConfigPath).adapters } /** @@ -237,7 +240,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { */ private[akka] final def journalConfigFor(journalPluginId: String): Config = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - pluginHolderFor(configPath, journalFallbackConfigPath).config + pluginHolderFor(configPath, JournalFallbackConfigPath).config } /** @@ -261,7 +264,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { */ private[akka] final def journalFor(journalPluginId: String): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - pluginHolderFor(configPath, journalFallbackConfigPath).actor + pluginHolderFor(configPath, JournalFallbackConfigPath).actor } /** @@ -274,7 +277,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { */ private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId - pluginHolderFor(configPath, snapshotStoreFallbackConfigPath).actor + pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath).actor } @tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = { diff --git a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala index fb3aa9869d..e552a0e4b7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala @@ -19,10 +19,11 @@ import akka.actor.Terminated Props(new RecoveryPermitter(maxPermits)) sealed trait Protocol + sealed trait Request extends Protocol sealed trait Reply extends Protocol - case object RequestRecoveryPermit extends Protocol + case object RequestRecoveryPermit extends Request case object RecoveryPermitGranted extends Reply - case object ReturnRecoveryPermit extends Protocol + case object ReturnRecoveryPermit extends Request } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index c89997ac8c..c4c637e42c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl.io -import java.io.{ BufferedOutputStream, ByteArrayOutputStream, IOException, InputStream } +import java.io.{ IOException, InputStream } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } import akka.annotation.InternalApi