diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 77a09bbaca..ebc70586e6 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -120,6 +120,10 @@ When persisting events with ``persist`` it is guaranteed that the persistent act the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` calls in context of a single command. +If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`. +This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining +``supervisorStrategy`` in parent actor. + The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples in Java with Lambdas `_. It contains instructions on how to run the ``PersistentActorExample``. @@ -192,11 +196,12 @@ recovery has completed, before processing any other message sent to the persiste The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery and before any other received messages. +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed + If there is a problem with recovering the state of the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the -actor doesn't handle the :class:`RecoveryFailure` message it will be stopped. +actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed Relaxed local consistency requirements and high throughput use-cases -------------------------------------------------------------------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 8d2e408498..7afebf6c27 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -122,6 +122,10 @@ When persisting events with ``persist`` it is guaranteed that the persistent act the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` calls in context of a single command. +If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`. +This can be customized by handling ``PersistenceFailure`` message in ``onReceiveCommand`` and/or defining +``supervisorStrategy`` in parent actor. + The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Java `_. It contains instructions on how to run the ``PersistentActorExample``. @@ -195,11 +199,11 @@ recovery has completed, before processing any other message sent to the persiste The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery and before any other received messages. +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed + If there is a problem with recovering the state of the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the -actor doesn't handle the :class:`RecoveryFailure` message it will be stopped. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed +actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`. .. _persist-async-java: diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 5c5b55dddc..4f7e724e47 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -114,6 +114,10 @@ When persisting events with ``persist`` it is guaranteed that the persistent act the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` calls in context of a single command. +If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`. +This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining +``supervisorStrategy`` in parent actor. + The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Scala `_. It contains instructions on how to run the ``PersistentActorExample``. @@ -186,11 +190,11 @@ recovery has completed, before processing any other message sent to the persiste The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery and before any other received messages. +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed + If there is a problem with recovering the state of the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the -actor doesn't handle the :class:`RecoveryFailure` message it will be stopped. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed +actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`. .. _persist-async-scala: diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 8a1ae46ecf..b06a546c41 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -8,10 +8,10 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable import scala.util.control.NonFatal import akka.actor.ActorKilledException -import akka.actor.ActorLogging import akka.actor.Stash import akka.actor.StashFactory -import akka.dispatch.Envelope +import akka.event.Logging +import akka.event.LoggingAdapter /** * INTERNAL API @@ -105,8 +105,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas private[persistence] def onReplayFailure(cause: Throwable): Unit = () /** - * User-overridable callback. Called when a persistent actor is started. Default implementation sends - * a `Recover()` to `self`. + * User-overridable callback. Called when a persistent actor is started or restarted. + * Default implementation sends a `Recover()` to `self`. Note that if you override + * `preStart` (or `preRestart`) and not call `super.preStart` you must send + * a `Recover()` message to `self` to activate the persistent actor. */ @throws(classOf[Exception]) override def preStart(): Unit = @@ -143,18 +145,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } } - /** - * User-overridable callback. Called before a persistent actor is restarted. Default implementation sends - * a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`. - */ - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - super.preRestart(reason, message) - message match { - case Some(_) ⇒ self ! Recover(toSequenceNr = lastSequenceNr) - case None ⇒ self ! Recover() - } - } - /** * INTERNAL API. */ @@ -166,7 +156,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def unhandled(message: Any): Unit = { message match { - case RecoveryCompleted ⇒ // mute + case RecoveryCompleted | ReadHighestSequenceNrSuccess | ReadHighestSequenceNrFailure ⇒ // mute case RecoveryFailure(cause) ⇒ val errorMsg = s"PersistentActor killed after recovery failure (persisten id = [${persistenceId}]). " + "To avoid killing persistent actors on recovery failure, a PersistentActor must handle RecoveryFailure messages. " + @@ -203,6 +193,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas writeInProgress = true } + private def log: LoggingAdapter = Logging(context.system, this) + /** * Recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -212,8 +204,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * should not perform actions that may fail, such as interacting with external services, * for example. * - * If recovery fails, the actor will be stopped. This can be customized by - * handling [[RecoveryFailure]]. + * If recovery fails, the actor will be stopped by throwing ActorKilledException. + * This can be customized by handling [[RecoveryFailure]] message in `receiveRecover` + * and/or defining `supervisorStrategy` in parent actor. * * @see [[Recover]] */ @@ -241,8 +234,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the persistent actor will be stopped. This can be customized by - * handling [[PersistenceFailure]] in [[receiveCommand]]. + * If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException. + * This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]] + * and/or defining `supervisorStrategy` in parent actor. * * @param event event to be persisted * @param handler handler for each persisted `event` @@ -268,7 +262,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * Asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. * - * Unlike `persist` the persistent actor will continue to receive incomming commands between the + * Unlike `persist` the persistent actor will continue to receive incoming commands between the * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the "command-2 only processed after * command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method. @@ -277,8 +271,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * If persistence of an event fails, the persistent actor will be stopped. This can be customized by - * handling [[PersistenceFailure]] in [[receiveCommand]]. + * If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException. + * This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]] + * and/or defining `supervisorStrategy` in parent actor. * * @param event event to be persisted * @param handler handler for each persisted `event` @@ -308,14 +303,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, * if the given event should possible to replay. * - * If there are no pending persist handler calls, the handler will be called immediatly. + * If there are no pending persist handler calls, the handler will be called immediately. * * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers - * will not be run. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by + * throwing ActorKilledException, thus the handlers will not be run. * - * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` */ final def defer[A](event: A)(handler: A ⇒ Unit): Unit = { @@ -336,14 +331,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, * if the given event should possible to replay. * - * If there are no pending persist handler calls, the handler will be called immediatly. + * If there are no pending persist handler calls, the handler will be called immediately. * * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers - * will not be run. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by + * throwing ActorKilledException, thus the handlers will not be run. * - * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param events event to be handled in the future, when preceding persist operations have been processes * @param handler handler for each `event` */ final def defer[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = @@ -483,8 +478,11 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas changeState(initializing(recoveryBehavior)) journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) case ReplayMessagesFailure(cause) ⇒ + // in case the actor resumes the state must be initializing + changeState(initializing(recoveryBehavior)) + journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) + onReplayFailure(cause) // callback for subclass implementation - // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None)) case other ⇒ internalStash.stash() @@ -502,19 +500,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def stateReceive(receive: Receive, message: Any) = message match { case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) - case ReplayMessagesFailure(_) ⇒ - replayCompleted() - // journal couldn't tell the maximum stored sequence number, hence the next - // replay must be a full replay (up to the highest stored sequence number) - // Recover(lastSequenceNr) is sent by preRestart - setLastSequenceNr(Long.MaxValue) - case ReplayMessagesSuccess ⇒ replayCompleted() - case r: Recover ⇒ // ignore - case _ ⇒ internalStash.stash() + case ReplayMessagesSuccess | ReplayMessagesFailure(_) ⇒ replayCompleted() + case r: Recover ⇒ // ignore + case _ ⇒ internalStash.stash() } def replayCompleted(): Unit = { - // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? + // in case the actor resumes the state must be initializing + changeState(initializing(recoveryBehavior)) + journal ! ReadHighestSequenceNr(failed.sequenceNr, persistenceId, self) + Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload)))) } @@ -533,10 +528,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case ReadHighestSequenceNrSuccess(highest) ⇒ changeState(processingCommands) sequenceNr = highest + setLastSequenceNr(highest) internalStash.unstashAll() Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) case ReadHighestSequenceNrFailure(cause) ⇒ - Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None)) + log.error(cause, "PersistentActor could not retrieve highest sequence number and must " + + "therefore be stopped. (persisten id = [{}]).", persistenceId) + context.stop(self) case other ⇒ internalStash.stash() } @@ -552,26 +550,34 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas // while message is in flight, in that case we ignore the call to the handler if (id == instanceId) { updateLastSequenceNr(p) - try pendingInvocations.peek().handler(p.payload) finally onWriteMessageComplete() + try { + pendingInvocations.peek().handler(p.payload) + onWriteMessageComplete(err = false) + } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } case WriteMessageFailure(p, cause, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case the handler has already been discarded if (id == instanceId) { - Eventsourced.super.aroundReceive(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) // stops actor by default - onWriteMessageComplete() + try { + Eventsourced.super.aroundReceive(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) // stops actor by default + onWriteMessageComplete(err = false) + } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } case LoopMessageSuccess(l, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case we ignore the call to the handler if (id == instanceId) { - try pendingInvocations.peek().handler(l) finally onWriteMessageComplete() + try { + pendingInvocations.peek().handler(l) + onWriteMessageComplete(err = false) + } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } - case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ + case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ // FIXME PN: WriteMessagesFailed? if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch() } - def onWriteMessageComplete(): Unit = + def onWriteMessageComplete(err: Boolean): Unit = pendingInvocations.pop() } @@ -585,15 +591,20 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def stateReceive(receive: Receive, message: Any) = if (common.isDefinedAt(message)) common(message) - else doAroundReceive(receive, message) - - private def doAroundReceive(receive: Receive, message: Any): Unit = { - Eventsourced.super.aroundReceive(receive, message) + else try { + Eventsourced.super.aroundReceive(receive, message) + aroundReceiveComplete(err = false) + } catch { case NonFatal(e) ⇒ aroundReceiveComplete(err = true); throw e } + private def aroundReceiveComplete(err: Boolean): Unit = { if (eventBatch.nonEmpty) flushBatch() - if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) - else internalStash.unstash() + if (pendingStashingPersistInvocations > 0) + changeState(persistingEvents) + else if (err) + internalStash.unstashAll() + else + internalStash.unstash() } private def flushBatch() { @@ -637,7 +648,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas if (common.isDefinedAt(message)) common(message) else internalStash.stash() - override def onWriteMessageComplete(): Unit = { + override def onWriteMessageComplete(err: Boolean): Unit = { pendingInvocations.pop() match { case _: StashingHandlerInvocation ⇒ // enables an early return to `processingCommands`, because if this counter hits `0`, @@ -649,7 +660,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas if (pendingStashingPersistInvocations == 0) { changeState(processingCommands) - internalStash.unstash() + if (err) internalStash.unstashAll() + else internalStash.unstash() } } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index b2ff043146..2d9b9f76ed 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -174,7 +174,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced { * JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. * - * Unlike `persist` the persistent actor will continue to receive incomming commands between the + * Unlike `persist` the persistent actor will continue to receive incoming commands between the * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the "command-2 only processed after * command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method. @@ -212,14 +212,14 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced { * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, * if the given event should possible to replay. * - * If there are no pending persist handler calls, the handler will be called immediatly. + * If there are no pending persist handler calls, the handler will be called immediately. * * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers * will not be run. * - * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` */ final def defer[A](event: A)(handler: Procedure[A]): Unit = @@ -234,14 +234,14 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced { * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, * if the given event should possible to replay. * - * If there are no pending persist handler calls, the handler will be called immediatly. + * If there are no pending persist handler calls, the handler will be called immediately. * * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers * will not be run. * - * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param events event to be handled in the future, when preceding persist operations have been processes * @param handler handler for each `event` */ final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = @@ -317,7 +317,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. * - * Unlike `persist` the persistent actor will continue to receive incomming commands between the + * Unlike `persist` the persistent actor will continue to receive incoming commands between the * call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees. * @@ -339,14 +339,14 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, * if the given event should possible to replay. * - * If there are no pending persist handler calls, the handler will be called immediatly. + * If there are no pending persist handler calls, the handler will be called immediately. * * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers * will not be run. * - * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` */ final def defer[A](event: A)(handler: Procedure[A]): Unit = @@ -361,14 +361,14 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, * if the given event should possible to replay. * - * If there are no pending persist handler calls, the handler will be called immediatly. + * If there are no pending persist handler calls, the handler will be called immediately. * * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers * will not be run. * - * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param events event to be handled in the future, when preceding persist operations have been processes * @param handler handler for each `event` */ final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index 0a113c4ae8..141befac73 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -50,6 +50,13 @@ object Update { Update(await, replayMax) } +/** + * INTERNAL API + */ +private[akka] object PersistentView { + private final case class ScheduledUpdate(replayMax: Long) +} + /** * A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive * the message stream directly from the Journal. These messages can be processed to update internal state @@ -74,6 +81,7 @@ object Update { * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle */ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory { + import PersistentView._ import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult import context.dispatcher @@ -175,6 +183,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory override def preStart(): Unit = { super.preStart() self ! Recover(replayMax = autoUpdateReplayMax) + if (autoUpdate) schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, + self, ScheduledUpdate(autoUpdateReplayMax))) } /** @@ -282,7 +292,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory private var stashUpdate = await override def stateReceive(receive: Receive, message: Any) = message match { - case Update(false, _) ⇒ // ignore + case ScheduledUpdate(_) ⇒ // ignore + case Update(false, _) ⇒ // ignore case u @ Update(true, _) if !stashUpdate ⇒ stashUpdate = true internalStash.stash() @@ -299,7 +310,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory onReplayComplete(await) case ReplayMessagesFailure(cause) ⇒ onReplayComplete(await) - // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(None)) case other ⇒ internalStash.stash() @@ -310,8 +320,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory */ private def onReplayComplete(await: Boolean): Unit = { changeState(idle) - if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, - Update(await = false, autoUpdateReplayMax))) if (await) internalStash.unstashAll() } } @@ -339,7 +347,9 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory } def replayCompleted(receive: Receive): Unit = { - // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? + // in case the actor resumes the state must be `idle` + changeState(idle) + PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload)))) } } @@ -354,11 +364,15 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory override def recoveryRunning: Boolean = false override def stateReceive(receive: Receive, message: Any): Unit = message match { - case r: Recover ⇒ // ignore - case Update(awaitUpdate, replayMax) ⇒ - changeState(replayStarted(await = awaitUpdate)) - journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) - case other ⇒ PersistentView.super.aroundReceive(receive, other) + case r: Recover ⇒ // ignore + case ScheduledUpdate(replayMax) ⇒ changeStateToReplayStarted(await = false, replayMax) + case Update(awaitUpdate, replayMax) ⇒ changeStateToReplayStarted(awaitUpdate, replayMax) + case other ⇒ PersistentView.super.aroundReceive(receive, other) + } + + def changeStateToReplayStarted(await: Boolean, replayMax: Long): Unit = { + changeState(replayStarted(await)) + journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala index e07ffd0000..9322fc3e2b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -34,6 +34,7 @@ object AtLeastOnceDeliveryFailureSpec { case object Start case class Done(ints: Vector[Int]) + case class Ack(i: Int) case class ProcessingFailure(i: Int) case class JournalingFailure(i: Int) @@ -77,9 +78,11 @@ object AtLeastOnceDeliveryFailureSpec { val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate if (contains(i)) { log.debug(debugMessage(s"ignored duplicate ${i}")) + sender() ! Ack(i) } else { persist(MsgSent(i)) { evt ⇒ updateState(evt) + sender() ! Ack(i) if (shouldFail(failureRate)) throw new TestException(debugMessage(s"failed at payload ${i}")) else @@ -142,16 +145,26 @@ object AtLeastOnceDeliveryFailureSpec { class ChaosApp(probe: ActorRef) extends Actor with ActorLogging { val destination = context.actorOf(Props(classOf[ChaosDestination], probe), "destination") - val snd = context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender") + var snd = createSender() + var acks = Set.empty[Int] + + def createSender(): ActorRef = + context.watch(context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender")) def receive = { - case Start ⇒ 1 to numMessages foreach (snd ! _) + case Start ⇒ 1 to numMessages foreach (snd ! _) + case Ack(i) ⇒ acks += i case ProcessingFailure(i) ⇒ snd ! i log.debug(s"resent ${i} after processing failure") case JournalingFailure(i) ⇒ snd ! i log.debug(s"resent ${i} after journaling failure") + case Terminated(_) ⇒ + // snd will be stopped if ReadHighestSequenceNr fails + log.debug(s"sender stopped, starting it again") + snd = createSender() + 1 to numMessages foreach (i ⇒ if (!acks(i)) snd ! i) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index a65a49b859..013fec0b45 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -23,6 +23,8 @@ object PersistentActorFailureSpec { import PersistentActorSpec.Evt import PersistentActorSpec.ExamplePersistentActor + class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace + class FailingInmemJournal extends AsyncWriteProxy { import AsyncWriteProxy.SetStore @@ -38,13 +40,13 @@ object PersistentActorFailureSpec { class FailingInmemStore extends InmemStore { def failingReceive: Receive = { case w: WriteMessages if isWrong(w) ⇒ - throw new RuntimeException("Simulated Store failure") with NoStackTrace + throw new SimulatedException("Simulated Store failure") case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ val readFromStore = read(pid, fromSnr, toSnr, max) if (readFromStore.length == 0) sender() ! ReplaySuccess else if (isCorrupt(readFromStore)) - sender() ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr")) + sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr")) else { readFromStore.foreach(sender() ! _) sender() ! ReplaySuccess @@ -79,6 +81,15 @@ object PersistentActorFailureSpec { } } + class ResumingSupervisor(testActor: ActorRef) extends Supervisor(testActor) { + override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { + case e ⇒ + testActor ! e + SupervisorStrategy.Resume + } + + } + class FailingRecovery(name: String, recoveryFailureProbe: Option[ActorRef]) extends ExamplePersistentActor(name) { def this(name: String) = this(name, None) @@ -88,17 +99,34 @@ object PersistentActorFailureSpec { val failingRecover: Receive = { case Evt(data) if data == "bad" ⇒ - throw new RuntimeException("Simulated exception from receiveRecover") + throw new SimulatedException("Simulated exception from receiveRecover") case r @ RecoveryFailure(cause) if recoveryFailureProbe.isDefined ⇒ recoveryFailureProbe.foreach { _ ! r } - throw new ActorKilledException(cause.getMessage) } override def receiveRecover: Receive = failingRecover orElse super.receiveRecover - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - super.preRestart(reason, message) + } + + class ThrowingActor1(name: String) extends ExamplePersistentActor(name) { + override val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + persist(Evt(s"${data}"))(updateState) + if (data == "err") + throw new SimulatedException("Simulated exception 1") + } + } + + class ThrowingActor2(name: String) extends ExamplePersistentActor(name) { + override val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + persist(Evt(s"${data}")) { evt ⇒ + if (data == "err") + throw new SimulatedException("Simulated exception 1") + updateState(evt) + } + } } } @@ -164,11 +192,75 @@ class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem" val probe = TestProbe() system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref)) expectMsgType[ActorRef] - expectMsgType[ActorKilledException] val recoveryFailure = probe.expectMsgType[RecoveryFailure] recoveryFailure.payload should be(Some(Evt("bad"))) recoveryFailure.sequenceNr should be(Some(3L)) } + "continue by handling RecoveryFailure" in { + prepareFailingRecovery() + + // recover by creating another with same name + val probe = TestProbe() + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref)) + val persistentActor = expectMsgType[ActorRef] + val recoveryFailure = probe.expectMsgType[RecoveryFailure] + // continue + persistentActor ! Cmd("d") + persistentActor ! GetState + // "bad" failed, and "c" was not replayed + expectMsg(List("a", "b", "d")) + } + "support resume after recovery failure" in { + prepareFailingRecovery() + + // recover by creating another with same name + system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[FailingRecovery], name) + val persistentActor = expectMsgType[ActorRef] + expectMsgType[ActorKilledException] // from supervisor + // resume + persistentActor ! Cmd("d") + persistentActor ! GetState + // "bad" failed, and "c" was not replayed + expectMsg(List("a", "b", "d")) + } + "support resume after persist failure" in { + system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("wrong") + persistentActor ! Cmd("b") + // Behavior1PersistentActor persists 2 events per Cmd, + // and therefore 2 exceptions from supervisor + expectMsgType[ActorKilledException] + expectMsgType[ActorKilledException] + persistentActor ! Cmd("c") + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2", "c-1", "c-2")) + } + "support resume when persist followed by exception" in { + system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor1], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("err") + persistentActor ! Cmd("b") + expectMsgType[SimulatedException] // from supervisor + persistentActor ! Cmd("c") + persistentActor ! GetState + expectMsg(List("a", "err", "b", "c")) + } + "support resume when persist handler throws exception" in { + system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor2], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("b") + persistentActor ! Cmd("err") + persistentActor ! Cmd("c") + expectMsgType[SimulatedException] // from supervisor + persistentActor ! Cmd("d") + persistentActor ! GetState + expectMsg(List("a", "b", "c", "d")) + } + } }