diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index ff33a1f1dd..833e20ebc4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -16,7 +16,7 @@ import akka.actor.typed.Signal import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi +import akka.annotation._ import akka.persistence.JournalProtocol import akka.persistence.Recovery import akka.persistence.RecoveryPermitter @@ -34,7 +34,7 @@ import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.scaladsl.RetentionCriteria import akka.persistence.typed.scaladsl._ -import akka.util.ConstantFun +import akka.util.{ unused, ConstantFun } @InternalApi private[akka] object EventSourcedBehaviorImpl { @@ -105,6 +105,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( ctx.log.warning("Failed to delete events to sequence number [{}] due to [{}].", toSequenceNr, failure) } + // do this once, even if the actor is restarted + initialize(context.asScala) + Behaviors .supervise { Behaviors.setup[Command] { _ => @@ -158,6 +161,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( .onFailure[JournalFailureException](supervisionStrategy) } + @InternalStableApi + private[akka] def initialize(@unused context: ActorContext[_]): Unit = () + override def receiveSignal( handler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State] = copy(signalHandler = handler) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 680eb4601d..84bcbd69d5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -5,14 +5,12 @@ package akka.persistence.typed.internal import scala.util.control.NonFatal - -import akka.actor.typed.Behavior -import akka.actor.typed.Signal +import scala.concurrent.duration._ +import akka.actor.typed.{ Behavior, Signal } import akka.actor.typed.internal.PoisonPill import akka.actor.typed.internal.UnstashException -import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi +import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors } +import akka.annotation.{ InternalApi, InternalStableApi } import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ @@ -20,6 +18,8 @@ import akka.persistence.typed.RecoveryFailed import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible +import akka.util.unused +import akka.util.PrettyDuration._ /*** * INTERNAL API @@ -44,7 +44,8 @@ private[akka] object ReplayingEvents { state: State, eventSeenInInterval: Boolean, toSeqNr: Long, - receivedPoisonPill: Boolean) + receivedPoisonPill: Boolean, + recoveryStartTime: Long) def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = Behaviors.setup { _ => @@ -69,6 +70,15 @@ private[akka] final class ReplayingEvents[C, E, S]( import ReplayingEvents.ReplayingState replayEvents(state.seqNr + 1L, state.toSeqNr) + onRecoveryStart(setup.context) + + @InternalStableApi + def onRecoveryStart(@unused context: ActorContext[_]): Unit = () + @InternalStableApi + def onRecoveryComplete(@unused context: ActorContext[_]): Unit = () + @InternalStableApi + def onRecoveryFailed(@unused context: ActorContext[_], @unused reason: Throwable, @unused event: Option[Any]): Unit = + () override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { msg match { @@ -167,11 +177,18 @@ private[akka] final class ReplayingEvents[C, E, S]( * @param event the event that was being processed when the exception was thrown */ private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = { + onRecoveryFailed(setup.context, cause, event) setup.onSignal(state.state, RecoveryFailed(cause), catchAndLog = true) setup.cancelRecoveryTimer() tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) - + if (setup.log.isDebugEnabled) { + setup.log.debug( + "Recovery failure for persistenceId [{}] after {}", + setup.persistenceId, + (System.nanoTime() - state.recoveryStartTime).nanos.pretty) + } val sequenceNr = state.seqNr + val msg = event match { case Some(evt) => s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " + @@ -186,7 +203,14 @@ private[akka] final class ReplayingEvents[C, E, S]( private def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try { + onRecoveryComplete(setup.context) tryReturnRecoveryPermit("replay completed successfully") + if (setup.log.isDebugEnabled) { + setup.log.debug( + "Recovery for persistenceId [{}] took {}", + setup.persistenceId, + (System.nanoTime() - state.recoveryStartTime).nanos.pretty) + } setup.onSignal(state.state, RecoveryCompleted, catchAndLog = false) if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 96c14ba467..5d40a0697e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -6,11 +6,12 @@ package akka.persistence.typed.internal import akka.actor.typed.Behavior import akka.actor.typed.internal.PoisonPill -import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.annotation.{ InternalApi, InternalStableApi } import akka.persistence.SnapshotProtocol.LoadSnapshotFailed import akka.persistence.SnapshotProtocol.LoadSnapshotResult import akka.persistence._ +import akka.util.unused /** * INTERNAL API @@ -83,11 +84,15 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup * @param cause failure cause. */ private def onRecoveryFailure(cause: Throwable): Behavior[InternalProtocol] = { + onRecoveryFailed(setup.context, cause) setup.cancelRecoveryTimer() setup.log.error(cause, s"Persistence failure when replaying snapshot. ${cause.getMessage}") Behaviors.stopped } + @InternalStableApi + def onRecoveryFailed(@unused context: ActorContext[_], @unused reason: Throwable): Unit = () + private def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] = if (snapshot) { // we know we're in snapshotting mode; snapshot recovery timeout arrived @@ -142,7 +147,13 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup ReplayingEvents[C, E, S]( setup, - ReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill)) + ReplayingEvents.ReplayingState( + lastSequenceNr, + state, + eventSeenInInterval = false, + toSnr, + receivedPoisonPill, + System.nanoTime())) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 0db3416109..26b3cf0cc2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -6,14 +6,12 @@ package akka.persistence.typed.internal import scala.annotation.tailrec import scala.collection.immutable - import akka.actor.UnhandledMessage import akka.actor.typed.Behavior import akka.actor.typed.Signal import akka.actor.typed.internal.PoisonPill -import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi +import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors } +import akka.annotation.{ InternalApi, InternalStableApi } import akka.persistence.DeleteMessagesFailure import akka.persistence.DeleteMessagesSuccess import akka.persistence.DeleteSnapshotFailure @@ -39,6 +37,7 @@ import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.SnapshotMetadata import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.scaladsl.Effect +import akka.util.unused /** * INTERNAL API @@ -228,7 +227,8 @@ private[akka] object Running { var visibleState: RunningState[S], // previous state until write success numberOfEvents: Int, shouldSnapshotAfterPersist: SnapshotAfterPersist, - var sideEffects: immutable.Seq[SideEffect[S]]) + var sideEffects: immutable.Seq[SideEffect[S]], + persistStartTime: Long = System.nanoTime()) extends AbstractBehavior[InternalProtocol] with WithSeqNrAccessible { @@ -256,7 +256,9 @@ private[akka] object Running { } final def onJournalResponse(response: Response): Behavior[InternalProtocol] = { - setup.log.debug("Received Journal response: {}", response) + if (setup.log.isDebugEnabled) { + setup.log.debug("Received Journal response: {} after: {} nanos", response, System.nanoTime() - persistStartTime) + } def onWriteResponse(p: PersistentRepr): Behavior[InternalProtocol] = { state = state.updateLastSequenceNr(p) @@ -283,13 +285,17 @@ private[akka] object Running { case WriteMessageRejected(p, cause, id) => if (id == setup.writerIdentity.instanceId) { + // current + 1 as it is the inflight event that that has failed + onWriteRejected(setup.context, cause, p.payload, currentSequenceNumber + 1) throw new EventRejectedException(setup.persistenceId, p.sequenceNr, cause) } else this case WriteMessageFailure(p, cause, id) => - if (id == setup.writerIdentity.instanceId) + if (id == setup.writerIdentity.instanceId) { + // current + 1 as it is the inflight event that that has failed + onWriteFailed(setup.context, cause, p.payload, currentSequenceNumber + 1) throw new JournalFailureException(setup.persistenceId, p.sequenceNr, p.payload.getClass.getName, cause) - else this + } else this case WriteMessagesSuccessful => // ignore @@ -497,4 +503,17 @@ private[akka] object Running { } } + @InternalStableApi + private[akka] def onWriteFailed( + @unused ctx: ActorContext[_], + @unused reason: Throwable, + @unused event: Any, + @unused sequenceNr: Long): Unit = () + @InternalStableApi + private[akka] def onWriteRejected( + @unused ctx: ActorContext[_], + @unused reason: Throwable, + @unused event: Any, + @unused sequenceNr: Long): Unit = () + } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index e379bd7575..a35cf79735 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -8,7 +8,7 @@ import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import akka.actor.{ Actor, ActorCell, DeadLetter, StashOverflowException } -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.dispatch.Envelope import akka.event.{ Logging, LoggingAdapter } import akka.util.Helpers.ConfigOps @@ -184,6 +184,7 @@ private[persistence] trait Eventsourced * @param cause failure cause. * @param event the event that was to be persisted */ + @InternalStableApi protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { log.error( cause, @@ -201,6 +202,7 @@ private[persistence] trait Eventsourced * @param cause failure cause * @param event the event that was to be persisted */ + @InternalStableApi protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { log.error( cause,