Hooks for Akka Persistence Typed lifecycle (#26999)

This commit is contained in:
Christopher Batey 2019-05-23 15:10:32 +01:00 committed by GitHub
parent 55ae1ad5c4
commit 138ffe25d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 84 additions and 22 deletions

View file

@ -16,7 +16,7 @@ import akka.actor.typed.Signal
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.annotation._
import akka.persistence.JournalProtocol
import akka.persistence.Recovery
import akka.persistence.RecoveryPermitter
@ -34,7 +34,7 @@ import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.persistence.typed.scaladsl._
import akka.util.ConstantFun
import akka.util.{ unused, ConstantFun }
@InternalApi
private[akka] object EventSourcedBehaviorImpl {
@ -105,6 +105,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
ctx.log.warning("Failed to delete events to sequence number [{}] due to [{}].", toSequenceNr, failure)
}
// do this once, even if the actor is restarted
initialize(context.asScala)
Behaviors
.supervise {
Behaviors.setup[Command] { _ =>
@ -158,6 +161,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
.onFailure[JournalFailureException](supervisionStrategy)
}
@InternalStableApi
private[akka] def initialize(@unused context: ActorContext[_]): Unit = ()
override def receiveSignal(
handler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State] =
copy(signalHandler = handler)

View file

@ -5,14 +5,12 @@
package akka.persistence.typed.internal
import scala.util.control.NonFatal
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
import scala.concurrent.duration._
import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.UnstashException
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
@ -20,6 +18,8 @@ import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.util.unused
import akka.util.PrettyDuration._
/***
* INTERNAL API
@ -44,7 +44,8 @@ private[akka] object ReplayingEvents {
state: State,
eventSeenInInterval: Boolean,
toSeqNr: Long,
receivedPoisonPill: Boolean)
receivedPoisonPill: Boolean,
recoveryStartTime: Long)
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
Behaviors.setup { _ =>
@ -69,6 +70,15 @@ private[akka] final class ReplayingEvents[C, E, S](
import ReplayingEvents.ReplayingState
replayEvents(state.seqNr + 1L, state.toSeqNr)
onRecoveryStart(setup.context)
@InternalStableApi
def onRecoveryStart(@unused context: ActorContext[_]): Unit = ()
@InternalStableApi
def onRecoveryComplete(@unused context: ActorContext[_]): Unit = ()
@InternalStableApi
def onRecoveryFailed(@unused context: ActorContext[_], @unused reason: Throwable, @unused event: Option[Any]): Unit =
()
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
msg match {
@ -167,11 +177,18 @@ private[akka] final class ReplayingEvents[C, E, S](
* @param event the event that was being processed when the exception was thrown
*/
private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = {
onRecoveryFailed(setup.context, cause, event)
setup.onSignal(state.state, RecoveryFailed(cause), catchAndLog = true)
setup.cancelRecoveryTimer()
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)
if (setup.log.isDebugEnabled) {
setup.log.debug(
"Recovery failure for persistenceId [{}] after {}",
setup.persistenceId,
(System.nanoTime() - state.recoveryStartTime).nanos.pretty)
}
val sequenceNr = state.seqNr
val msg = event match {
case Some(evt) =>
s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " +
@ -186,7 +203,14 @@ private[akka] final class ReplayingEvents[C, E, S](
private def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] =
try {
onRecoveryComplete(setup.context)
tryReturnRecoveryPermit("replay completed successfully")
if (setup.log.isDebugEnabled) {
setup.log.debug(
"Recovery for persistenceId [{}] took {}",
setup.persistenceId,
(System.nanoTime() - state.recoveryStartTime).nanos.pretty)
}
setup.onSignal(state.state, RecoveryCompleted, catchAndLog = false)
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)

View file

@ -6,11 +6,12 @@ package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.persistence.SnapshotProtocol.LoadSnapshotFailed
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
import akka.persistence._
import akka.util.unused
/**
* INTERNAL API
@ -83,11 +84,15 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
* @param cause failure cause.
*/
private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = {
onRecoveryFailed(setup.context, cause)
setup.cancelRecoveryTimer()
setup.log.error(cause, s"Persistence failure when replaying snapshot. ${cause.getMessage}")
Behaviors.stopped
}
@InternalStableApi
def onRecoveryFailed(@unused context: ActorContext[_], @unused reason: Throwable): Unit = ()
private def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] =
if (snapshot) {
// we know we're in snapshotting mode; snapshot recovery timeout arrived
@ -142,7 +147,13 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
ReplayingEvents[C, E, S](
setup,
ReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill))
ReplayingEvents.ReplayingState(
lastSequenceNr,
state,
eventSeenInInterval = false,
toSnr,
receivedPoisonPill,
System.nanoTime()))
}
}

View file

@ -6,14 +6,12 @@ package akka.persistence.typed.internal
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.UnhandledMessage
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.persistence.DeleteMessagesFailure
import akka.persistence.DeleteMessagesSuccess
import akka.persistence.DeleteSnapshotFailure
@ -39,6 +37,7 @@ import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl.Effect
import akka.util.unused
/**
* INTERNAL API
@ -228,7 +227,8 @@ private[akka] object Running {
var visibleState: RunningState[S], // previous state until write success
numberOfEvents: Int,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
var sideEffects: immutable.Seq[SideEffect[S]])
var sideEffects: immutable.Seq[SideEffect[S]],
persistStartTime: Long = System.nanoTime())
extends AbstractBehavior[InternalProtocol]
with WithSeqNrAccessible {
@ -256,7 +256,9 @@ private[akka] object Running {
}
final def onJournalResponse(response: Response): Behavior[InternalProtocol] = {
setup.log.debug("Received Journal response: {}", response)
if (setup.log.isDebugEnabled) {
setup.log.debug("Received Journal response: {} after: {} nanos", response, System.nanoTime() - persistStartTime)
}
def onWriteResponse(p: PersistentRepr): Behavior[InternalProtocol] = {
state = state.updateLastSequenceNr(p)
@ -283,13 +285,17 @@ private[akka] object Running {
case WriteMessageRejected(p, cause, id) =>
if (id == setup.writerIdentity.instanceId) {
// current + 1 as it is the inflight event that that has failed
onWriteRejected(setup.context, cause, p.payload, currentSequenceNumber + 1)
throw new EventRejectedException(setup.persistenceId, p.sequenceNr, cause)
} else this
case WriteMessageFailure(p, cause, id) =>
if (id == setup.writerIdentity.instanceId)
if (id == setup.writerIdentity.instanceId) {
// current + 1 as it is the inflight event that that has failed
onWriteFailed(setup.context, cause, p.payload, currentSequenceNumber + 1)
throw new JournalFailureException(setup.persistenceId, p.sequenceNr, p.payload.getClass.getName, cause)
else this
} else this
case WriteMessagesSuccessful =>
// ignore
@ -497,4 +503,17 @@ private[akka] object Running {
}
}
@InternalStableApi
private[akka] def onWriteFailed(
@unused ctx: ActorContext[_],
@unused reason: Throwable,
@unused event: Any,
@unused sequenceNr: Long): Unit = ()
@InternalStableApi
private[akka] def onWriteRejected(
@unused ctx: ActorContext[_],
@unused reason: Throwable,
@unused event: Any,
@unused sequenceNr: Long): Unit = ()
}

View file

@ -8,7 +8,7 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ Actor, ActorCell, DeadLetter, StashOverflowException }
import akka.annotation.InternalApi
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.dispatch.Envelope
import akka.event.{ Logging, LoggingAdapter }
import akka.util.Helpers.ConfigOps
@ -184,6 +184,7 @@ private[persistence] trait Eventsourced
* @param cause failure cause.
* @param event the event that was to be persisted
*/
@InternalStableApi
protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
log.error(
cause,
@ -201,6 +202,7 @@ private[persistence] trait Eventsourced
* @param cause failure cause
* @param event the event that was to be persisted
*/
@InternalStableApi
protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
log.error(
cause,