cleanup and make snapshots work, make event timeout work too

sadly does not work without MODULE$ did a ticket for it
This commit is contained in:
Konrad Malawski 2018-03-08 14:14:20 +09:00 committed by Konrad `ktoso` Malawski
parent 9e62f5b5d5
commit 5be89dea71
18 changed files with 440 additions and 839 deletions

View file

@ -105,6 +105,7 @@ object Behavior {
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
BehaviorImpl.widened(behavior, matcher)
}
/**

View file

@ -52,6 +52,12 @@ case object PreRestart extends PreRestart {
* registered watchers after this signal has been processed.
*/
sealed abstract class PostStop extends Signal
// comment copied onto object for better hints in IDEs
/**
* Lifecycle signal that is fired after this actor and all its child actors
* (transitively) have terminated. The [[Terminated]] signal is only sent to
* registered watchers after this signal has been processed.
*/
case object PostStop extends PostStop {
def instance: PostStop = this
}

View file

@ -2,14 +2,22 @@ akka.persistence.typed {
# Persistent actors stash while recovering or persisting events,
# this setting configures the default capacity of this stash.
stash-capacity = 2048
# If negative (or zero) then an unbounded stash is used (default)
# If positive then a bounded stash is used and the capacity is set using
# the property
#
# Stashing is always bounded to the size that is defined in this setting.
# You can set it to large values, however "unbounded" buffering is not supported.
# Negative or 0 values are not allowed.
stash-capacity = 4096
# enables automatic logging of messages stashed automatically by an PersistentBehavior,
# Configure how to react when the internal stash overflows;
# Implementation must be a sub class of `akka.persistence.StashOverflowStrategyConfigurator`
#
# Provided implementations:
# - akka.persistence.DiscardConfigurator -- discards the overflowing message
# - akka.persistence.ThrowExceptionConfigurator -- throws on overflow which will stop the actor
internal-stash-overflow-strategy = akka.persistence.ThrowExceptionConfigurator
# enables automatic DEBUG level logging of messages stashed automatically by an PersistentBehavior,
# this may happen while it receives commands while it is recovering events or while it is persisting events
# Set to a log-level (debug, info, warn, error) to log stashed messages on given log level;
log-stashing = off
}

View file

@ -7,15 +7,9 @@ package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.StoppedBehavior
import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler }
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.event.{ LogSource, Logging }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.{ JournalProtocol, Persistence, RecoveryPermitter, SnapshotProtocol }
import akka.{ actor a }
/** INTERNAL API */
@InternalApi
@ -33,6 +27,24 @@ private[akka] object EventsourcedBehavior {
}
final case class WriterIdentity(instanceId: Int, writerUuid: String)
object MDC {
// format: OFF
val AwaitingPermit = "await-permit"
val RecoveringSnapshot = "recover-snap"
val RecoveringEvents = "recover-evts"
val RunningCmds = "running-cmnds"
val PersistingEvents = "persist-evts"
// format: ON
}
def withMdc(setup: EventsourcedSetup[_, _, _], phaseName: String)(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" phaseName
)
Behaviors.withMdc(_ mdc, b)
}
/** Protocol used internally by the eventsourced behaviors, never exposed to user-land */
sealed trait InternalProtocol
object InternalProtocol {
@ -40,63 +52,6 @@ private[akka] object EventsourcedBehavior {
final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol
final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol
final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol
final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol
}
// implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] {
// override def genString(b: EventsourcedBehavior[_, _, _]): String = {
// val behaviorShortName = b match {
// case _: EventsourcedRunning[_, _, _] "running"
// case _: EventsourcedRecoveringEvents[_, _, _] "recover-events"
// case _: EventsourcedRecoveringSnapshot[_, _, _] "recover-snap"
// case _: EventsourcedRequestingRecoveryPermit[_, _, _] "awaiting-permit"
// }
// s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]"
// }
// }
}
/** INTERNAL API */
@InternalApi
private[akka] trait EventsourcedBehavior[Command, Event, State] {
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
// protected def timers: TimerScheduler[Any]
type C = Command
type AC = ActorContext[C]
type E = Event
type S = State
// used for signaling intent in type signatures
type SeqNr = Long
// def persistenceId: String = setup.persistenceId
//
// protected def setup: EventsourcedSetup[Command, Event, State]
// protected def initialState: State = setup.initialState
// protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = setup.commandHandler
// protected def eventHandler: (State, Event) State = setup.eventHandler
// protected def snapshotWhen: (State, Event, SeqNr) Boolean = setup.snapshotWhen
// protected def tagger: Event Set[String] = setup.tagger
//
// protected final def journalPluginId: String = setup.journalPluginId
// protected final def snapshotPluginId: String = setup.snapshotPluginId
// ------ common -------
// protected lazy val extension = Persistence(context.system.toUntyped)
// protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId)
// protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId)
//
// protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped
// protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
// case res: JournalProtocol.Response JournalResponse(res)
// case RecoveryPermitter.RecoveryPermitGranted RecoveryPermitGranted
// case res: SnapshotProtocol.Response SnapshotterResponse(res)
// case cmd: Command @unchecked cmd // if it was wrong, we'll realise when trying to onMessage the cmd
// }.toUntyped
}

View file

@ -3,35 +3,27 @@
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.ActorRef
import akka.actor.typed.{ Behavior, PostStop, PreRestart, Signal }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.InternalApi
import akka.persistence.Eventsourced.StashingHandlerInvocation
import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence._
import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import scala.collection.immutable
/** INTERNAL API */
@InternalApi
private[akka] trait EventsourcedJournalInteractions[C, E, S] {
import akka.actor.typed.scaladsl.adapter._
def setup: EventsourcedSetup[C, E, S]
private def context = setup.context
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
// ---------- journal interactions ---------
protected def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = {
setup.log.debug("Returning recovery permit, on failure because: " + cause.getMessage)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
val permitter = setup.persistence.recoveryPermitter
permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
}
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
protected def internalPersist(
state: EventsourcedRunning.EventsourcedState[S],
event: EventOrTagged): EventsourcedRunning.EventsourcedState[S] = {
@ -47,8 +39,8 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
sender = senderNotKnownBecauseAkkaTyped
)
val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped)
val write = AtomicWrite(repr) :: Nil
setup.journal.tell(JournalProtocol.WriteMessages(write, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped)
newState
}
@ -57,16 +49,18 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
events: immutable.Seq[EventOrTagged],
state: EventsourcedRunning.EventsourcedState[S]): EventsourcedRunning.EventsourcedState[S] = {
if (events.nonEmpty) {
val newState = state.nextSequenceNr()
var newState = state
val senderNotKnownBecauseAkkaTyped = null
val write = AtomicWrite(events.map(event PersistentRepr(
event,
persistenceId = setup.persistenceId,
sequenceNr = newState.seqNr, // FIXME increment for each event
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped)
))
val writes = events.map { event
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
}
val write = AtomicWrite(writes)
setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped)
@ -76,16 +70,41 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
// reply is sent to `selfUntypedAdapted`, it is important to target that one
setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntyped)
}
protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = {
setup.log.debug("Returning recovery permit, reason: " + reason)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
protected def requestRecoveryPermit(): Unit = {
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfUntyped)
}
/** Intended to be used in .onSignal(returnPermitOnStop) by behaviors */
protected def returnPermitOnStop: PartialFunction[(ActorContext[InternalProtocol], Signal), Behavior[InternalProtocol]] = {
case (_, PostStop)
tryReturnRecoveryPermit("PostStop")
Behaviors.stopped
case (_, PreRestart)
// TODO was not entirely sure if it's needed here as well
tryReturnRecoveryPermit("PostStop")
Behaviors.stopped
}
/** Mutates setup, by setting the `holdingRecoveryPermit` to false */
protected def tryReturnRecoveryPermit(reason: String): Unit = {
if (setup.holdingRecoveryPermit) {
setup.log.debug("Returning recovery permit, reason: " + reason)
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
setup.holdingRecoveryPermit = false
} // else, no need to return the permit
}
protected def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit =
if (setup.holdingRecoveryPermit) {
setup.log.debug("Returning recovery permit, on failure because: {}", cause.getMessage)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
val permitter = setup.persistence.recoveryPermitter
permitter.tell(RecoveryPermitter.ReturnRecoveryPermit, setup.selfUntyped)
} else setup.log.info("Attempted return of recovery permit however was not holding a permit; ignoring") // TODO: make debug level once confident
// ---------- snapshot store interactions ---------
/**

View file

@ -3,13 +3,13 @@
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.actor.typed.{ Behavior, PostStop, Signal }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.internal.EventsourcedBehavior._
import scala.concurrent.duration.FiniteDuration
@ -18,8 +18,16 @@ import scala.util.control.NonFatal
/***
* INTERNAL API
*
* See next behavior [[EventsourcedRunning]].
* Third (of four) behavior of an PersistentBehavior.
*
* In this behavior we finally start replaying events, beginning from the last applied sequence number
* (i.e. the one up-until-which the snapshot recovery has brought us).
*
* Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed
* and control is given to the user's handlers to drive the actors behavior from there.
*
* See next behavior [[EventsourcedRunning]].
* See previous behavior [[EventsourcedRecoveringSnapshot]].
*/
@InternalApi
private[persistence] object EventsourcedRecoveringEvents {
@ -40,9 +48,9 @@ private[persistence] object EventsourcedRecoveringEvents {
}
@InternalApi
private[persistence] class EventsourcedRecoveringEvents[C, E, S](
override val setup: EventsourcedSetup[C, E, S])
private[persistence] class EventsourcedRecoveringEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import setup.context.log
import EventsourcedRecoveringEvents.RecoveringState
def createBehavior(state: RecoveringState[S]): Behavior[InternalProtocol] = {
@ -51,67 +59,53 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S](
replayEvents(state.seqNr + 1L, setup.recovery.toSequenceNr)
withMdc {
withMdc(setup, MDC.RecoveringEvents) {
stay(state)
}
}
}
private def stay(
state: RecoveringState[S]
): Behavior[InternalProtocol] =
Behaviors.immutable {
case (_, JournalResponse(r)) onJournalResponse(state, r)
case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)
case (_, RecoveryTickEvent(snap)) onRecoveryTick(state, snap)
case (_, cmd @ IncomingCommand(_)) onCommand(cmd)
private def stay(state: RecoveringState[S]): Behavior[InternalProtocol] =
withMdc(setup, MDC.RecoveringEvents) {
Behaviors.immutable[InternalProtocol] {
case (_, JournalResponse(r)) onJournalResponse(state, r)
case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)
case (_, RecoveryTickEvent(snap)) onRecoveryTick(state, snap)
case (_, cmd: IncomingCommand[C]) onCommand(cmd)
}.onSignal(returnPermitOnStop)
}
private def withMdc(wrapped: Behavior[InternalProtocol]) = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" "recover-evnts"
)
Behaviors.withMdc((_: Any) mdc, wrapped)
}
private def onJournalResponse(
state: RecoveringState[S],
response: JournalProtocol.Response): Behavior[InternalProtocol] = {
import setup.context.log
try {
response match {
case ReplayedMessage(repr)
// eventSeenInInterval = true
// updateLastSequenceNr(repr)
val newState = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, repr.payload.asInstanceOf[E])
)
stay(newState)
val event = repr.payload.asInstanceOf[E]
try {
val newState = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
stay(newState)
} catch {
case NonFatal(ex) onRecoveryFailure(ex, repr.sequenceNr, Some(event))
}
case RecoverySuccess(highestSeqNr)
log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr)
log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
cancelRecoveryTimer(setup.timers)
try onRecoveryCompleted(state)
catch { case NonFatal(ex) onRecoveryFailure(ex, highestSeqNr, Some(state)) }
onRecoveryCompleted(state)
case ReplayMessagesFailure(cause)
onRecoveryFailure(cause, state.seqNr, None)
onRecoveryFailure(cause, state.seqNr, Some(response))
case other
// stash(setup, setup.internalStash, other)
// Behaviors.same
case _
Behaviors.unhandled
}
} catch {
case NonFatal(cause)
cancelRecoveryTimer(setup.timers)
onRecoveryFailure(cause, state.seqNr, None)
}
}
@ -122,22 +116,13 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S](
Behaviors.same
}
// FYI, have to keep carrying all [C,E,S] everywhere as otherwise ending up with:
// [error] /Users/ktoso/code/akka/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala:117:14: type mismatch;
// [error] found : akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,_$2] where type _$2, type _$1
// [error] required: akka.persistence.typed.internal.EventsourcedSetup[Command,_$1,Any] where type _$1
// [error] Note: _$2 <: Any, but class EventsourcedSetup is invariant in type State.
// [error] You may wish to define State as +State instead. (SLS 4.5)
// [error] Error occurred in an application involving default arguments.
// [error] stay(setup, state.copy(eventSeenInInterval = false))
// [error] ^
protected def onRecoveryTick(state: RecoveringState[S], snapshot: Boolean): Behavior[InternalProtocol] =
if (!snapshot) {
if (state.eventSeenInInterval) {
stay(state.copy(eventSeenInInterval = false))
} else {
cancelRecoveryTimer(setup.timers)
val msg = s"Recovery timed out, didn't get event within ${setup.settings.recoveryEventTimeout}, highest sequence number seen ${state.seqNr}"
val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]"
onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this?
}
} else {
@ -146,7 +131,7 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S](
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
setup.log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response))
setup.log.warning("Unexpected [{}] from SnapshotStore, already in replaying events state.", Logging.simpleName(response))
Behaviors.unhandled // ignore the response
}
@ -156,25 +141,24 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S](
* The actor is always stopped after this method has been invoked.
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
* @param message the message that was being processed when the exception was thrown
*/
protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, event: Option[Any]): Behavior[InternalProtocol] = {
returnRecoveryPermit(setup, "on recovery failure: " + cause.getMessage)
protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)
event match {
message match {
case Some(evt)
setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
Behaviors.stopped
setup.log.error(cause, "Exception during recovery while handling [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
case None
setup.log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", setup.persistenceId, sequenceNr)
Behaviors.stopped
setup.log.error(cause, "Exception during recovery. Last known sequence number [{}]", setup.persistenceId, sequenceNr)
}
Behaviors.stopped
}
protected def onRecoveryCompleted(state: RecoveringState[S]): Behavior[InternalProtocol] = try {
returnRecoveryPermit(setup, "recovery completed successfully")
tryReturnRecoveryPermit("replay completed successfully")
setup.recoveryCompleted(setup.commandContext, state.state)
val running = EventsourcedRunning[C, E, S](
@ -187,8 +171,8 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S](
cancelRecoveryTimer(setup.timers)
}
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
// protect against event recovery stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "event-recovery-tick"
private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit =
timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(RecoveryTickTimerKey)

View file

@ -3,24 +3,19 @@
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler }
import akka.actor.typed.{ Behavior, PostStop, Signal }
import akka.annotation.InternalApi
import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.{ actor a }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import akka.persistence.typed.internal.EventsourcedBehavior._
/**
* INTERNAL API
*
* Second (of four) behavior of an PersistentBehavior.
* See next behavior [[EventsourcedRecoveringEvents]].
*
* In this behavior the recovery process is initiated.
* We try to obtain a snapshot from the configured snapshot store,
@ -28,6 +23,9 @@ import scala.util.{ Failure, Success, Try }
*
* Once snapshot recovery is done (or no snapshot was selected),
* recovery of events continues in [[EventsourcedRecoveringEvents]].
*
* See next behavior [[EventsourcedRecoveringEvents]].
* See previous behavior [[EventsourcedRequestingRecoveryPermit]].
*/
@InternalApi
private[akka] object EventsourcedRecoveringSnapshot {
@ -38,8 +36,7 @@ private[akka] object EventsourcedRecoveringSnapshot {
}
@InternalApi
private[akka] class EventsourcedRecoveringSnapshot[C, E, S](
override val setup: EventsourcedSetup[C, E, S])
private[akka] class EventsourcedRecoveringSnapshot[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
def createBehavior(): Behavior[InternalProtocol] = {
@ -47,24 +44,16 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S](
loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr)
withMdc {
Behaviors.immutable {
withMdc(setup, MDC.RecoveringSnapshot) {
Behaviors.immutable[InternalProtocol] {
case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)
case (_, JournalResponse(r)) onJournalResponse(r)
case (_, RecoveryTickEvent(snapshot)) onRecoveryTick(snapshot)
case (_, cmd: IncomingCommand[C]) onCommand(cmd)
}
}.onSignal(returnPermitOnStop)
}
}
def withMdc(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" "recover-snap"
)
Behaviors.withMdc(_ mdc, b)
}
/**
* Called whenever a message replay fails. By default it logs the error.
*
@ -76,93 +65,64 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S](
private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
val lastSequenceNr = 0 // FIXME not needed since snapshot == 0
event match {
case Some(evt)
setup.log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
"persistenceId [{}].", evt.getClass.getName, lastSequenceNr, setup.persistenceId)
Behaviors.stopped
case None
setup.log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", setup.persistenceId, lastSequenceNr)
Behaviors.stopped
setup.log.error(cause, "Exception in receiveRecover when replaying snapshot [{}]", evt.getClass.getName)
case _
setup.log.error(cause, "Persistence failure when replaying snapshot")
}
Behaviors.stopped
}
private def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] =
if (snapshot) {
// we know we're in snapshotting mode; snapshot recovery timeout arrived
val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}")
onRecoveryFailure(ex, event = None)
onRecoveryFailure(ex, None)
} else same // ignore, since we received the snapshot already
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
private def startRecoveryTimer(): Unit = {
setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout)
}
private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey)
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
setup.internalStash.stash(cmd) // TODO move stash out as it's mutable
setup.internalStash.stash(cmd)
Behavior.same
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = try {
throw new Exception("Should not talk to journal yet! But got: " + response)
} catch {
case NonFatal(cause)
returnRecoveryPermitOnlyOnFailure(cause)
throw cause
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
setup.log.debug("Unexpected response from journal: [{}], may be due to an actor restart, ignoring...", response)
Behaviors.unhandled
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = try {
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
response match {
case LoadSnapshotResult(sso, toSnr)
var state: S = setup.initialState
val re: Try[Long] = Try {
sso match {
case Some(SelectedSnapshot(metadata, snapshot))
state = snapshot.asInstanceOf[S]
metadata.sequenceNr
case None
0 // from the start please
}
val seqNr: Long = sso match {
case Some(SelectedSnapshot(metadata, snapshot))
state = snapshot.asInstanceOf[S]
metadata.sequenceNr
case None 0 // from the beginning please
}
re match {
case Success(seqNr)
replayMessages(state, seqNr, toSnr)
case Failure(cause)
// FIXME better exception type
val ex = new RuntimeException(s"Failed to recover state for [${setup.persistenceId}] from snapshot offer.", cause)
onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME
}
becomeReplayingEvents(state, seqNr, toSnr)
case LoadSnapshotFailed(cause)
cancelRecoveryTimer(setup.timers)
onRecoveryFailure(cause, event = None)
case _
Behaviors.unhandled
}
} catch {
case NonFatal(cause)
returnRecoveryPermitOnlyOnFailure(cause)
throw cause
}
private def replayMessages(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
val rec = setup.recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types
val rec = setup.recovery.copy(
toSequenceNr = toSnr,
fromSnapshot = SnapshotSelectionCriteria.None
)
EventsourcedRecoveringEvents[C, E, S](
setup.copy(recovery = rec),
@ -171,4 +131,10 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S](
)
}
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "snapshot-recovery-tick"
private def startRecoveryTimer(): Unit =
setup.timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), setup.settings.recoveryEventTimeout)
private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(RecoveryTickTimerKey)
}

View file

@ -3,23 +3,21 @@
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.actor.typed.{ Behavior, PostStop, PreRestart }
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
/**
* INTERNAL API
*
* First (of four) behaviour of an PersistentBehaviour.
* See next behavior [[EventsourcedRecoveringSnapshot]].
*
* Requests a permit to start recovering this actor; this is tone to avoid
* hammering the journal with too many concurrently recovering actors.
*
* See next behavior [[EventsourcedRecoveringSnapshot]].
*/
@InternalApi
private[akka] object EventsourcedRequestingRecoveryPermit {
@ -30,9 +28,8 @@ private[akka] object EventsourcedRequestingRecoveryPermit {
}
@InternalApi
private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](
override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedStashManagement[C, E, S] {
private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedStashManagement[C, E, S] with EventsourcedJournalInteractions[C, E, S] {
def createBehavior(): Behavior[InternalProtocol] = {
// request a permit, as only once we obtain one we can start recovering
@ -62,15 +59,8 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](
private def becomeRecovering(): Behavior[InternalProtocol] = {
setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)
setup.holdingRecoveryPermit = true
EventsourcedRecoveringSnapshot(setup)
}
// ---------- journal interactions ---------
private def requestRecoveryPermit(): Unit = {
import akka.actor.typed.scaladsl.adapter._
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfUntyped)
}
}

View file

@ -3,18 +3,17 @@
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.Behavior.StoppedBehavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation }
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, RecoveryTickEvent, SnapshotterResponse }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, JournalResponse, SnapshotterResponse }
import scala.annotation.tailrec
import scala.collection.immutable
@ -22,20 +21,22 @@ import scala.collection.immutable
/**
* INTERNAL API
*
* Fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior.
* Conceptually fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior.
*
* In this phase recovery has completed successfully and we continue handling incoming commands,
* as well as persisting new events as dictated by the user handlers.
*
* This behavior operates in two phases:
* This behavior operates in two phases (also behaviors):
* - HandlingCommands - where the command handler is invoked for incoming commands
* - PersistingEvents - where incoming commands are stashed until persistence completes
*
* This is implemented as such to avoid creating many EventsourcedRunning instances,
* which perform the Persistence extension lookup on creation and similar things (config lookup)
*
* See previous [[EventsourcedRecoveringEvents]].
*/
@InternalApi private[akka] object EventsourcedRunning {
@InternalApi
private[akka] object EventsourcedRunning {
final case class EventsourcedState[State](
seqNr: Long,
@ -60,14 +61,21 @@ import scala.collection.immutable
// ===============================================
@InternalApi private[akka] class EventsourcedRunning[C, E, S](override val setup: EventsourcedSetup[C, E, S])
/** INTERNAL API */
@InternalApi private[akka] class EventsourcedRunning[C, E, S](
override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import EventsourcedRunning.EventsourcedState
import EventsourcedBehavior.withMdc
private def log = setup.log
private def commandContext = setup.commandContext
def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = {
def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(setup.commandContext, state.state, cmd)
val effect = setup.commandHandler(commandContext, state.state, cmd)
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
@ -77,8 +85,6 @@ import scala.collection.immutable
effect: EffectImpl[E, S],
sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil
): Behavior[InternalProtocol] = {
import setup.log
if (log.isDebugEnabled)
log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
@ -158,12 +164,12 @@ import scala.collection.immutable
if (tags.isEmpty) event else Tagged(event, tags)
}
withMdc("run-cmnds") {
withMdc(setup, MDC.RunningCmds) {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
case (_, SnapshotterResponse(r)) Behaviors.unhandled
case (_, JournalResponse(r)) Behaviors.unhandled
case (_, IncomingCommand(c: C @unchecked)) onCommand(state, c)
}
case (_, SnapshotterResponse(_)) Behaviors.unhandled
case (_, JournalResponse(_)) Behaviors.unhandled
}.onSignal(returnPermitOnStop)
}
}
@ -175,8 +181,8 @@ import scala.collection.immutable
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]]
): Behavior[InternalProtocol] = {
withMdc("run-persist-evnts") {
Behaviors.mutable[EventsourcedBehavior.InternalProtocol](_ new PersistingEvents(state, pendingInvocations, sideEffects))
withMdc(setup, MDC.PersistingEvents) {
new PersistingEvents(state, pendingInvocations, sideEffects)
}
}
@ -194,6 +200,9 @@ import scala.collection.immutable
}
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] =
{ case signal returnPermitOnStop((setup.context, signal)) }
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
stash(cmd)
this
@ -204,8 +213,6 @@ import scala.collection.immutable
setup.log.debug("Received Journal response: {}", response)
response match {
case WriteMessageSuccess(p, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == setup.writerIdentity.instanceId) {
state = state.updateLastSequenceNr(p)
// FIXME is the order of pendingInvocations not reversed?
@ -218,8 +225,6 @@ import scala.collection.immutable
} else this
case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) {
state = state.updateLastSequenceNr(p)
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design)
@ -227,10 +232,7 @@ import scala.collection.immutable
} else this
case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) {
// onWriteMessageComplete() -> tryBecomeHandlingCommands
onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
} else this
@ -248,9 +250,6 @@ import scala.collection.immutable
}
}
// private def onWriteMessageComplete(): Unit =
// tryBecomeHandlingCommands()
private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
setup.log.error(
cause,
@ -269,10 +268,10 @@ import scala.collection.immutable
private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
response match {
case SaveSnapshotSuccess(meta)
setup.context.log.debug("Save snapshot successful: " + meta)
setup.context.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
this
case SaveSnapshotFailure(meta, ex)
setup.context.log.error(ex, "Save snapshot failed! " + meta)
setup.context.log.error(ex, "Save snapshot failed, snapshot metadata: [{}]", meta)
this // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
}
}
@ -281,15 +280,6 @@ import scala.collection.immutable
// --------------------------
private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" phase
)
Behaviors.withMdc((_: Any) mdc, wrapped)
}
def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
var res: Behavior[InternalProtocol] = handlingCommands(state)
val it = effects.iterator
@ -298,10 +288,8 @@ import scala.collection.immutable
// manual loop implementation to avoid allocations and multiple scans
while (it.hasNext) {
val effect = it.next()
applySideEffect(effect, state) match {
case _: StoppedBehavior[_] res = Behaviors.stopped
case _ // nothing to do
}
val stopped = !Behavior.isAlive(applySideEffect(effect, state))
if (stopped) res = Behaviors.stopped
}
res
@ -320,286 +308,3 @@ import scala.collection.immutable
}
}
//@InternalApi
//class EventsourcedRunning[Command, Event, State](
// val setup: EventsourcedSetup[Command, Event, State],
// // internalStash: StashBuffer[Any], // FIXME separate or in settings?
//
// private var sequenceNr: Long,
// val writerIdentity: WriterIdentity,
//
// private var state: State
//) extends MutableBehavior[Any]
// with EventsourcedBehavior[Command, Event, State]
// with EventsourcedStashManagement { same
// import setup._
//
// import EventsourcedBehavior._
// import akka.actor.typed.scaladsl.adapter._
//
// // Holds callbacks for persist calls (note that we do not implement persistAsync currently)
// private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty
// private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
//
// // ----------
////
//// private def snapshotSequenceNr: Long = sequenceNr
////
//// private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
//// if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
//// private def nextSequenceNr(): Long = {
//// sequenceNr += 1L
//// sequenceNr
//// }
// // ----------
//
// private def onSnapshotterResponse[C, E, S](setup: EventsourcedSetup[C, E, S], response: SnapshotProtocol.Response): Behavior[C] = {
// response match {
// case SaveSnapshotSuccess(meta)
// setup.context.log.debug("Save snapshot successful: " + meta)
// Behaviors.same
// case SaveSnapshotFailure(meta, ex)
// setup.context.log.error(ex, "Save snapshot failed! " + meta)
// Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
// }
// }
//
// // ----------
//
// trait EventsourcedRunningPhase {
// def name: String
// def onCommand(c: Command): Behavior[Any]
// def onJournalResponse(response: JournalProtocol.Response): Behavior[Any]
// }
//
//// object HandlingCommands extends EventsourcedRunningPhase {
//// def name = "HandlingCommands"
////
//// final override def onCommand(command: Command): Behavior[Any] = {
//// val effect = commandHandler(commandContext, state, command)
//// applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
//// }
//// final override def onJournalResponse(response: Response): Behavior[Any] = {
//// // ignore, could happen if actor was restarted?
//// }
//// }
//
// object PersistingEventsNoSideEffects extends PersistingEvents(Nil)
//
// sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase {
// def name = "PersistingEvents"
//
// final override def onCommand(c: Command): Behavior[Any] = {
// stash(setup, context, c)
// same
// }
//
// final override def onJournalResponse(response: Response): Behavior[Any] = {
// log.debug("Received Journal response: {}", response)
// response match {
// case WriteMessageSuccess(p, id)
// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// // while message is in flight, in that case we ignore the call to the handler
// if (id == writerIdentity.instanceId) {
// updateLastSequenceNr(p)
// popApplyHandler(p.payload)
// onWriteMessageComplete()
// tryUnstash(setup, internalStash, applySideEffects(sideEffects))
// } else same
//
// case WriteMessageRejected(p, cause, id)
// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// // while message is in flight, in that case the handler has already been discarded
// if (id == writerIdentity.instanceId) {
// updateLastSequenceNr(p)
// onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop
// tryUnstash(setup, applySideEffects(sideEffects))
// } else same
//
// case WriteMessageFailure(p, cause, id)
// // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// // while message is in flight, in that case the handler has already been discarded
// if (id == writerIdentity.instanceId) {
// onWriteMessageComplete()
// onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
// } else same
//
// case WriteMessagesSuccessful
// // ignore
// same
//
// case WriteMessagesFailed(_)
// // ignore
// same // it will be stopped by the first WriteMessageFailure message; not applying side effects
//
// case _: LoopMessageSuccess
// // ignore, should never happen as there is no persistAsync in typed
// same
// }
// }
//
// private def onWriteMessageComplete(): Unit =
// tryBecomeHandlingCommands()
//
// private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
// log.error(
// cause,
// "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
// event.getClass.getName, seqNr, persistenceId, cause.getMessage)
// }
//
// private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = {
// log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
// event.getClass.getName, seqNr, persistenceId)
//
// // FIXME see #24479 for reconsidering the stopping behaviour
// Behaviors.stopped
// }
//
// }
//
// // the active phase switches between PersistingEvents and HandlingCommands;
// // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect
// private[this] var phase: EventsourcedRunningPhase = HandlingCommands
//
// override def onMessage(msg: Any): Behavior[Any] = {
// msg match {
// // TODO explore crazy hashcode hack to make this match quicker...?
// case SnapshotterResponse(r) onSnapshotterResponse(r)
// case JournalResponse(r) phase.onJournalResponse(r)
// case command: Command @unchecked
// // the above type-check does nothing, since Command is tun
// // we cast explicitly to fail early in case of type mismatch
// val c = command.asInstanceOf[Command]
// phase.onCommand(c)
// }
// }
//
// // ----------
//
// def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
// var res: Behavior[Any] = same
// val it = effects.iterator
//
// // if at least one effect results in a `stop`, we need to stop
// // manual loop implementation to avoid allocations and multiple scans
// while (it.hasNext) {
// val effect = it.next()
// applySideEffect(effect) match {
// case _: StoppedBehavior[_] res = Behaviors.stopped
// case _ // nothing to do
// }
// }
//
// res
// }
//
// def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match {
// case _: Stop.type @unchecked
// Behaviors.stopped
//
// case SideEffect(sideEffects)
// sideEffects(state)
// same
//
// case _
// throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
// }
//
// def applyEvent(s: S, event: E): S =
// eventHandler(s, event)
//
// @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = {
// if (log.isDebugEnabled)
// log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
//
// effect match {
// case CompositeEffect(e, currentSideEffects)
// // unwrap and accumulate effects
// applyEffects(msg, e, currentSideEffects ++ sideEffects)
//
// case Persist(event)
// // apply the event before persist so that validation exception is handled before persisting
// // the invalid event, in case such validation is implemented in the event handler.
// // also, ensure that there is an event handler for each single event
// state = applyEvent(state, event)
// val tags = tagger(event)
// val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
//
// internalPersist(eventToPersist, sideEffects) { _
// if (snapshotWhen(state, event, sequenceNr))
// internalSaveSnapshot(state)
// }
//
// case PersistAll(events)
// if (events.nonEmpty) {
// // apply the event before persist so that validation exception is handled before persisting
// // the invalid event, in case such validation is implemented in the event handler.
// // also, ensure that there is an event handler for each single event
// var count = events.size
// var seqNr = sequenceNr
// val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
// case ((currentState, snapshot), event)
// seqNr += 1
// val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr)
// (applyEvent(currentState, event), shouldSnapshot)
// }
// state = newState
// val eventsToPersist = events.map { event
// val tags = tagger(event)
// if (tags.isEmpty) event else Tagged(event, tags)
// }
//
// internalPersistAll(eventsToPersist, sideEffects) { _
// count -= 1
// if (count == 0) {
// sideEffects.foreach(applySideEffect)
// if (shouldSnapshotAfterPersist)
// internalSaveSnapshot(state)
// }
// }
// } else {
// // run side-effects even when no events are emitted
// tryUnstash(context, applySideEffects(sideEffects))
// }
//
// case e: PersistNothing.type @unchecked
// tryUnstash(context, applySideEffects(sideEffects))
//
// case _: Unhandled.type @unchecked
// applySideEffects(sideEffects)
// Behavior.unhandled
//
// case c: ChainableEffect[_, S]
// applySideEffect(c)
// }
// }
//
// private def popApplyHandler(payload: Any): Unit =
// pendingInvocations.pop().handler(payload)
//
// private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
// if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException(
// "Attempted to become PersistingEvents while already in this phase! Logic error?")
//
// phase =
// if (sideEffects.isEmpty) PersistingEventsNoSideEffects
// else new PersistingEvents(sideEffects)
//
// same
// }
//
// private def tryBecomeHandlingCommands(): Behavior[Any] = {
// if (phase == HandlingCommands) throw new IllegalArgumentException(
// "Attempted to become HandlingCommands while already in this phase! Logic error?")
//
// if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN?
// phase = HandlingCommands
// }
//
// same
// }
//
// override def toString = s"EventsourcedRunning($persistenceId,${phase.name})"
//}

View file

@ -7,23 +7,25 @@ import java.util.concurrent.TimeUnit
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.persistence.Persistence
import com.typesafe.config.Config
import scala.concurrent.duration._
trait EventsourcedSettings {
/** INTERNAL API */
@InternalApi
private[akka] trait EventsourcedSettings {
def stashCapacity: Int
// def stashOverflowStrategyName: String // TODO not supported, the stash just throws for now
def stashingLogLevel: LogLevel
def journalPluginId: String
def snapshotPluginId: String
def logOnStashing: Boolean
def stashOverflowStrategyConfigurator: String
def recoveryEventTimeout: FiniteDuration
def withJournalPluginId(id: Option[String]): EventsourcedSettings
def withSnapshotPluginId(id: Option[String]): EventsourcedSettings
def journalPluginId: String
def withJournalPluginId(id: String): EventsourcedSettings
def snapshotPluginId: String
def withSnapshotPluginId(id: String): EventsourcedSettings
}
object EventsourcedSettings {
@ -33,51 +35,54 @@ object EventsourcedSettings {
def apply(config: Config): EventsourcedSettings = {
val typedConfig = config.getConfig("akka.persistence.typed")
val untypedConfig = config.getConfig("akka.persistence")
// StashOverflowStrategy
val internalStashOverflowStrategy =
untypedConfig.getString("internal-stash-overflow-strategy") // FIXME or copy it to typed?
val stashOverflowStrategyConfigurator = typedConfig.getString("internal-stash-overflow-strategy")
val stashCapacity = typedConfig.getInt("stash-capacity")
require(stashCapacity > 0, "stash-capacity MUST be > 0, unbounded buffering is not supported.")
val stashingLogLevel = typedConfig.getString("log-stashing") match {
case "off" Logging.OffLevel
case "on" | "true" Logging.DebugLevel
case l Logging.levelFor(l).getOrElse(Logging.OffLevel)
}
// FIXME this is wrong I think
val recoveryEventTimeout = 10.seconds // untypedConfig.getDuration("plugin-journal-fallback.recovery-event-timeout", TimeUnit.MILLISECONDS).millis
val logOnStashing = typedConfig.getBoolean("log-stashing")
EventsourcedSettingsImpl(
config,
stashCapacity = stashCapacity,
internalStashOverflowStrategy,
stashingLogLevel = stashingLogLevel,
stashOverflowStrategyConfigurator,
logOnStashing = logOnStashing,
journalPluginId = "",
snapshotPluginId = "",
recoveryEventTimeout = recoveryEventTimeout
snapshotPluginId = ""
)
}
/**
* INTERNAL API
*/
private[akka] final def journalConfigFor(config: Config, journalPluginId: String): Config = {
val defaultJournalPluginId = config.getString("akka.persistence.journal.plugin")
val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId
config.getConfig(configPath)
.withFallback(config.getConfig(Persistence.JournalFallbackConfigPath))
}
}
@InternalApi
private[persistence] final case class EventsourcedSettingsImpl(
stashCapacity: Int,
stashOverflowStrategyName: String,
stashingLogLevel: LogLevel,
journalPluginId: String,
snapshotPluginId: String,
recoveryEventTimeout: FiniteDuration
private val config: Config,
stashCapacity: Int,
stashOverflowStrategyConfigurator: String,
logOnStashing: Boolean,
journalPluginId: String,
snapshotPluginId: String
) extends EventsourcedSettings {
def withJournalPluginId(id: Option[String]): EventsourcedSettings = id match {
case Some(identifier) copy(journalPluginId = identifier)
case _ this
}
def withSnapshotPluginId(id: Option[String]): EventsourcedSettings = id match {
case Some(identifier) copy(snapshotPluginId = identifier)
case _ this
}
def withJournalPluginId(id: String): EventsourcedSettings =
copy(journalPluginId = id)
def withSnapshotPluginId(id: String): EventsourcedSettings =
copy(snapshotPluginId = id)
private val journalConfig = EventsourcedSettings.journalConfigFor(config, journalPluginId)
val recoveryEventTimeout = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis
}

View file

@ -4,99 +4,46 @@
package akka.persistence.typed.internal
import akka.actor.ActorRef
import akka.{ actor a }
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryPermitGranted
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence._
import akka.util.ConstantFun
import akka.{ actor a }
/**
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors
*/
@InternalApi
private[persistence] object EventsourcedSetup {
def apply[Command, Event, State](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): EventsourcedSetup[Command, Event, State] = {
apply(
context,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler,
// values dependent on context
EventsourcedSettings(context.system))
}
def apply[Command, Event, State](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
settings: EventsourcedSettings): EventsourcedSetup[Command, Event, State] = {
new EventsourcedSetup[Command, Event, State](
context,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler,
writerIdentity = WriterIdentity.newIdentity(),
recoveryCompleted = ConstantFun.scalaAnyTwoToUnit,
tagger = (_: Event) Set.empty[String],
snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
recovery = Recovery(),
settings,
StashBuffer(settings.stashCapacity)
)
}
}
/** INTERNAL API: Carry state for the Persistent behavior implementation behaviors */
@InternalApi
private[persistence] final case class EventsourcedSetup[Command, Event, State](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
writerIdentity: WriterIdentity,
recoveryCompleted: (ActorContext[Command], State) Unit,
tagger: Event Set[String],
snapshotWhen: (State, Event, Long) Boolean,
recovery: Recovery,
settings: EventsourcedSettings,
internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\
private[persistence] final case class EventsourcedSetup[C, E, S](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: S,
commandHandler: PersistentBehaviors.CommandHandler[C, E, S],
eventHandler: (S, E) S,
writerIdentity: WriterIdentity,
recoveryCompleted: (ActorContext[C], S) Unit,
tagger: E Set[String],
snapshotWhen: (S, E, Long) Boolean,
recovery: Recovery,
var holdingRecoveryPermit: Boolean,
settings: EventsourcedSettings,
internalStash: StashBuffer[InternalProtocol] // FIXME would be nice here... but stash is mutable :\\\\\\\
) {
import akka.actor.typed.scaladsl.adapter._
def withJournalPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = {
def withJournalPluginId(id: String): EventsourcedSetup[C, E, S] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(settings = settings.withJournalPluginId(id))
}
def withSnapshotPluginId(id: Option[String]): EventsourcedSetup[Command, Event, State] = {
def withSnapshotPluginId(id: String): EventsourcedSetup[C, E, S] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(settings = settings.withSnapshotPluginId(id))
}
def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]]
def log = context.log
@ -107,13 +54,5 @@ private[persistence] final case class EventsourcedSetup[Command, Event, State](
def selfUntyped = context.self.toUntyped
import EventsourcedBehavior.InternalProtocol
val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.toUntyped
}

View file

@ -1,12 +1,15 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer }
import akka.actor.{ DeadLetter, StashOverflowException }
import akka.actor.{ DeadLetter, ExtendedActorSystem, StashOverflowException }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.{ StashOverflowStrategy, _ }
import akka.util.Collections.EmptyImmutableSeq
import akka.util.ConstantFun
import akka.{ actor a }
@ -17,23 +20,25 @@ private[akka] trait EventsourcedStashManagement[C, E, S] {
def setup: EventsourcedSetup[C, E, S]
private def context = setup.context
private def context: ActorContext[InternalProtocol] = setup.context
private def stashBuffer: StashBuffer[InternalProtocol] = setup.internalStash
protected def stash(msg: InternalProtocol): Unit = {
if (setup.settings.logOnStashing) setup.log.debug("Stashing message: [{}]", msg)
val logLevel = setup.settings.stashingLogLevel
if (logLevel != Logging.OffLevel) context.log.debug("Stashing message: {}", msg) // FIXME can be log(logLevel once missing method added
val internalStashOverflowStrategy: StashOverflowStrategy = setup.persistence.defaultInternalStashOverflowStrategy
val internalStashOverflowStrategy: StashOverflowStrategy = {
val system = context.system.toUntyped.asInstanceOf[ExtendedActorSystem]
system.dynamicAccess.createInstanceFor[StashOverflowStrategyConfigurator](setup.settings.stashOverflowStrategyConfigurator, EmptyImmutableSeq)
.map(_.create(system.settings.config)).get
}
try stashBuffer.stash(msg) catch {
case e: StashOverflowException
internalStashOverflowStrategy match {
case DiscardToDeadLetterStrategy
val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow?
context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped))
val noSenderBecauseAkkaTyped: a.ActorRef = a.ActorRef.noSender
context.system.deadLetters.tell(DeadLetter(msg, noSenderBecauseAkkaTyped, context.self.toUntyped))
case ReplyToStrategy(_)
throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!")
@ -44,11 +49,10 @@ private[akka] trait EventsourcedStashManagement[C, E, S] {
}
}
// FIXME, yet we need to also stash not-commands, due to journal responses ...
protected def tryUnstash(
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
if (stashBuffer.nonEmpty) {
setup.log.debug("Unstashing message: {}", stashBuffer.head.getClass)
if (setup.settings.logOnStashing) setup.log.debug("Unstashing message: [{}]", stashBuffer.head.getClass)
stashBuffer.unstash(setup.context, behavior.asInstanceOf[Behavior[InternalProtocol]], 1, ConstantFun.scalaIdentityFunction)
} else behavior

View file

@ -0,0 +1,122 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors }
import akka.util.ConstantFun
@InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery()
) extends PersistentBehavior[Command, Event, State] {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
Behaviors.withTimers { timers
val settings = EventsourcedSettings(ctx.system)
val setup = EventsourcedSetup(
ctx,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler,
WriterIdentity.newIdentity(),
recoveryCompleted,
tagger,
snapshotWhen,
recovery,
holdingRecoveryPermit = false,
settings = settings,
internalStash = StashBuffer(settings.stashCapacity)
).withJournalPluginId(journalPluginId.getOrElse(""))
.withSnapshotPluginId(snapshotPluginId.getOrElse(""))
EventsourcedRequestingRecoveryPermit(setup)
}
}.widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.narrow[Command]
}
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
* Snapshot every N events
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
/**
* Change the journal plugin id that this actor should use.
*/
def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = if (id != "") Some(id) else None)
}
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = if (id != "") Some(id) else None)
}
/**
* Changes the snapshot selection criteria used by this behavior.
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
copy(tagger = tagger)
}

View file

@ -3,17 +3,11 @@
*/
package akka.persistence.typed.scaladsl
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler }
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal._
import akka.persistence._
import akka.util.ConstantFun
import scala.language.implicitConversions
import akka.persistence.typed.internal._
object PersistentBehaviors {
@ -114,106 +108,3 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State]
}
@InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
// settings: Option[EventsourcedSettings], // FIXME can't because no context available yet
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery()
) extends PersistentBehavior[Command, Event, State] {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
Behaviors.withTimers[EventsourcedBehavior.InternalProtocol] { timers
val setup = EventsourcedSetup(
ctx,
timers,
persistenceId,
initialState,
commandHandler,
eventHandler)
.withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId)
EventsourcedRequestingRecoveryPermit(setup)
}
}.widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.narrow[Command]
}
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
* Snapshot every N events
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
/**
* Change the journal plugin id that this actor should use.
*/
def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = if (id != "") Some(id) else None)
}
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = if (id != "") Some(id) else None)
}
/**
* Changes the snapshot selection criteria used by this behavior.
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
copy(tagger = tagger)
}

View file

@ -3,11 +3,9 @@
*/
package akka.persistence.typed.scaladsl
import akka.actor.ActorSystemImpl
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown }
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.scaladsl.PersistentBehaviors._
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.scaladsl._
@ -38,7 +36,7 @@ object PersistentBehaviorSpec {
val config = ConfigFactory.parseString(
s"""
akka.loglevel = DEBUG
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = INFO
akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore"
@ -319,7 +317,11 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"snapshot via predicate" in {
val alwaysSnapshot = counter("c9").snapshotWhen { (_, _, _) true }
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { context
counter("c9").snapshotWhen { (_, _, _) true }
}
val c = spawn(alwaysSnapshot)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()

View file

@ -71,7 +71,7 @@ final class PersistenceSettings(config: Config) {
}
/**
* Identification of [[PersistentActor]] or [[PersistentView]].
* Identification of [[PersistentActor]].
*/
//#persistence-identity
trait PersistenceIdentity {
@ -133,6 +133,12 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
/** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config)
extends Extension
/** Config path to fall-back to if a setting is not defined in a specific plugin's config section */
val JournalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
/** Config path to fall-back to if a setting is not defined in a specific snapshot plugin's config section */
val SnapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
}
/**
@ -190,9 +196,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
/** Discovered persistence journal and snapshot store plugins. */
private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
config.getStringList("journal.auto-start-journals").forEach(new Consumer[String] {
override def accept(id: String): Unit = {
log.info(s"Auto-starting journal plugin `$id`")
@ -213,7 +216,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
*/
final def adaptersFor(journalPluginId: String): EventAdapters = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, journalFallbackConfigPath).adapters
pluginHolderFor(configPath, JournalFallbackConfigPath).adapters
}
/**
@ -237,7 +240,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
*/
private[akka] final def journalConfigFor(journalPluginId: String): Config = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, journalFallbackConfigPath).config
pluginHolderFor(configPath, JournalFallbackConfigPath).config
}
/**
@ -261,7 +264,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
*/
private[akka] final def journalFor(journalPluginId: String): ActorRef = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, journalFallbackConfigPath).actor
pluginHolderFor(configPath, JournalFallbackConfigPath).actor
}
/**
@ -274,7 +277,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
*/
private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = {
val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId
pluginHolderFor(configPath, snapshotStoreFallbackConfigPath).actor
pluginHolderFor(configPath, SnapshotStoreFallbackConfigPath).actor
}
@tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = {

View file

@ -19,10 +19,11 @@ import akka.actor.Terminated
Props(new RecoveryPermitter(maxPermits))
sealed trait Protocol
sealed trait Request extends Protocol
sealed trait Reply extends Protocol
case object RequestRecoveryPermit extends Protocol
case object RequestRecoveryPermit extends Request
case object RecoveryPermitGranted extends Reply
case object ReturnRecoveryPermit extends Protocol
case object ReturnRecoveryPermit extends Request
}

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl.io
import java.io.{ BufferedOutputStream, ByteArrayOutputStream, IOException, InputStream }
import java.io.{ IOException, InputStream }
import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit }
import akka.annotation.InternalApi