diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 42396b3ec4..7dfb0b2047 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -479,6 +479,26 @@ trait Actor { */ protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled) + /** + * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default. + */ + protected[akka] def aroundPreStart(): Unit = preStart() + + /** + * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default. + */ + protected[akka] def aroundPostStop(): Unit = postStop() + + /** + * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default. + */ + protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message) + + /** + * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default. + */ + protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason) + /** * User overridable definition the strategy to use for supervising * child actors. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 8e8d6f6501..1cec62fbcd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -559,7 +559,7 @@ private[akka] class ActorCell( try { val created = newActor() actor = created - created.preStart() + created.aroundPreStart() checkReceiveTimeout if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index e5bd255a25..324559df6b 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -63,7 +63,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ val optionalMessage = if (currentMessage ne null) Some(currentMessage.message) else None try { // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort - if (failedActor.context ne null) failedActor.preRestart(cause, optionalMessage) + if (failedActor.context ne null) failedActor.aroundPreRestart(cause, optionalMessage) } catch handleNonFatalOrInterruptedException { e ⇒ val ex = new PreRestartException(self, e, cause, optionalMessage) publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) @@ -198,7 +198,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this * specific order. */ - try if (a ne null) a.postStop() + try if (a ne null) a.aroundPostStop() catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) } finally try dispatcher.detach(this) finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) @@ -226,7 +226,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ actor = freshActor // this must happen before postRestart has a chance to fail if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. - freshActor.postRestart(cause) + freshActor.aroundPostRestart(cause) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) // only after parent is up and running again do restart the children which were not stopped diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 57e09d633e..5adbcf0bfb 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -65,19 +65,19 @@ public class PersistenceDocTest { abstract class MyProcessor1 extends UntypedProcessor { //#recover-on-start-disabled @Override - public void preStartProcessor() {} + public void preStart() {} //#recover-on-start-disabled //#recover-on-restart-disabled @Override - public void preRestartProcessor(Throwable reason, Option message) {} + public void preRestart(Throwable reason, Option message) {} //#recover-on-restart-disabled } abstract class MyProcessor2 extends UntypedProcessor { //#recover-on-start-custom @Override - public void preStartProcessor() { + public void preStart() { getSelf().tell(Recover.create(457L), null); } //#recover-on-start-custom @@ -86,11 +86,11 @@ public class PersistenceDocTest { abstract class MyProcessor3 extends UntypedProcessor { //#deletion @Override - public void preRestartProcessor(Throwable reason, Option message) throws Exception { + public void preRestart(Throwable reason, Option message) { if (message.isDefined() && message.get() instanceof Persistent) { delete((Persistent) message.get()); } - super.preRestartProcessor(reason, message); + super.preRestart(reason, message); } //#deletion } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 8e9c8c1efa..f26af4239f 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -79,7 +79,7 @@ only be received by that processor after recovery completes. Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ -Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation. +Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-disabled @@ -87,18 +87,15 @@ In this case, a processor must be recovered explicitly by sending it a ``Recover .. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-explicit -If not overridden, ``preStartProcessor`` sends a ``Recover`` message to ``getSelf()``. Applications may also override -``preStartProcessor`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. +If not overridden, ``preStart`` sends a ``Recover`` message to ``getSelf()``. Applications may also override +``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom -Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation. +Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled -This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal -state and makes a message replay unnecessary). - Recovery status ^^^^^^^^^^^^^^^ @@ -116,13 +113,6 @@ a replay of that message during recovery it can be marked as deleted. .. includecode:: code/docs/persistence/PersistenceDocTest.java#deletion -Life cycle hooks ----------------- - -``UntypedProcessor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``, -``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``, -``postRestart`` and ``postStop`` directly. - Identifiers ----------- diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 8f3d56ddc1..b7f950a3f9 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -44,16 +44,16 @@ trait PersistenceDocSpec { new AnyRef { trait MyProcessor1 extends Processor { //#recover-on-start-disabled - override def preStartProcessor() = () + override def preStart() = () //#recover-on-start-disabled //#recover-on-restart-disabled - override def preRestartProcessor(reason: Throwable, message: Option[Any]) = () + override def preRestart(reason: Throwable, message: Option[Any]) = () //#recover-on-restart-disabled } trait MyProcessor2 extends Processor { //#recover-on-start-custom - override def preStartProcessor() { + override def preStart() { self ! Recover(toSequenceNr = 457L) } //#recover-on-start-custom @@ -61,12 +61,12 @@ trait PersistenceDocSpec { trait MyProcessor3 extends Processor { //#deletion - override def preRestartProcessor(reason: Throwable, message: Option[Any]) { + override def preRestart(reason: Throwable, message: Option[Any]) { message match { case Some(p: Persistent) ⇒ delete(p) case _ ⇒ } - super.preRestartProcessor(reason, message) + super.preRestart(reason, message) } //#deletion } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 899dfa30ef..5392d92f1f 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -74,7 +74,7 @@ only be received by that processor after recovery completes. Recovery customization ^^^^^^^^^^^^^^^^^^^^^^ -Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation. +Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-disabled @@ -82,18 +82,15 @@ In this case, a processor must be recovered explicitly by sending it a ``Recover .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-explicit -If not overridden, ``preStartProcessor`` sends a ``Recover()`` message to ``self``. Applications may also override -``preStartProcessor`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example. +If not overridden, ``preStart`` sends a ``Recover()`` message to ``self``. Applications may also override +``preStart`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom -Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation. +Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled -This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal -state and makes a message replay unnecessary). - Recovery status ^^^^^^^^^^^^^^^ @@ -111,14 +108,6 @@ a replay of that message during recovery it can be marked as deleted. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#deletion -Life cycle hooks ----------------- - -``Processor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``, -``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``, -``postRestart`` and ``postStop`` directly. The latter are nevertheless non-final to allow composition with -existing traits such as ``akka.actor.FSM``, for example. - Identifiers ----------- diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index b16f5681ba..b55c01d78c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -222,9 +222,35 @@ trait Processor extends Actor with Stash { _currentState.aroundReceive(receive, message) } - private def nextSequenceNr(): Long = { - _sequenceNr += 1L - _sequenceNr + /** + * INTERNAL API. + */ + final override protected[akka] def aroundPreStart(): Unit = { + try preStart() finally super.preStart() + } + + /** + * INTERNAL API. + */ + final override protected[akka] def aroundPostStop(): Unit = { + try unstashAll(unstashFilterPredicate) finally postStop() + } + + /** + * INTERNAL API. + */ + final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { + try { + unstashAll(unstashFilterPredicate) + unstashAllInternal() + } finally { + message match { + case Some(Written(m)) ⇒ preRestartDefault(reason, Some(m)) + case Some(Looped(m)) ⇒ preRestartDefault(reason, Some(m)) + case Some(Replayed(m)) ⇒ preRestartDefault(reason, Some(m)) + case mo ⇒ preRestartDefault(reason, None) + } + } } /** @@ -232,80 +258,32 @@ trait Processor extends Actor with Stash { * a `Recover()` to `self`. */ @throws(classOf[Exception]) - def preStartProcessor(): Unit = { + override def preStart(): Unit = { self ! Recover() } - /** - * User-overridable callback. Called when a processor is stopped. Empty default implementation. - */ - @throws(classOf[Exception]) - def postStopProcessor(): Unit = () - /** * User-overridable callback. Called before a processor is restarted. Default implementation sends * a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`. */ - @throws(classOf[Exception]) - def preRestartProcessor(reason: Throwable, message: Option[Any]): Unit = message match { - case Some(_) ⇒ self ! Recover(lastSequenceNr) - case None ⇒ self ! Recover() - } - - /** - * User-overridable callback. Called after a processor has been restarted. Empty default implementation. - */ - @throws(classOf[Exception]) - def postRestartProcessor(reason: Throwable): Unit = () - - /** - * Calls [[preStartProcessor]]. - */ - override def preStart() { - preStartProcessor() - super.preStart() - } - - /** - * Calls [[postStopProcessor]] and unstashes all messages from the ''user stash'' that cannot be - * replayed. The user stash is empty afterwards. - */ - override def postStop() { - postStopProcessor() - try unstashAll(unstashFilterPredicate) finally super.postStop() - } - - /** - * Calls [[preRestartDefault]] and then `super.preRestart()`. If processor implementation - * classes want to opt out from stopping child actors, they should override this method and - * call [[preRestartDefault]] only. - */ - override def preRestart(reason: Throwable, message: Option[Any]) { - try preRestartDefault(reason, message) finally super.preRestart(reason, message) - } - - /** - * Calls [[preRestartProcessor]] and unstashes all messages from the ''user stash'' that cannot be - * replayed. The user stash is empty afterwards. - */ - protected def preRestartDefault(reason: Throwable, message: Option[Any]) { + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { message match { - case Some(Written(m)) ⇒ preRestartProcessor(reason, Some(m)) - case Some(Looped(m)) ⇒ preRestartProcessor(reason, Some(m)) - case Some(Replayed(m)) ⇒ preRestartProcessor(reason, Some(m)) - case mo ⇒ preRestartProcessor(reason, None) + case Some(_) ⇒ self ! Recover(lastSequenceNr) + case None ⇒ self ! Recover() } - - unstashAll(unstashFilterPredicate) - unstashAllInternal() } /** - * Calls [[postRestartProcessor]]. + * Calls [[preRestart]] and then `super.preRestart()`. If processor implementation classes want to + * opt out from stopping child actors, they should override this method and call [[preRestart]] only. */ - override def postRestart(reason: Throwable) { - postRestartProcessor(reason) - super.postRestart(reason) + def preRestartDefault(reason: Throwable, message: Option[Any]): Unit = { + try preRestart(reason, message) finally super.preRestart(reason, message) + } + + private def nextSequenceNr(): Long = { + _sequenceNr += 1L + _sequenceNr } // ----------------------------------------------------- diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 5bb808a107..79b0a33e19 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -47,5 +47,5 @@ abstract class NamedProcessor(name: String) extends Processor { } trait TurnOffRecoverOnStart { this: Processor ⇒ - override def preStartProcessor(): Unit = () + override def preStart(): Unit = () } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index b009812fc8..ac10967abf 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -22,12 +22,12 @@ object ProcessorSpec { case GetState ⇒ sender ! state.reverse } - override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { + override def preRestart(reason: Throwable, message: Option[Any]) = { message match { case Some(m: Persistent) ⇒ delete(m) // delete message from journal case _ ⇒ // ignore } - super.preRestartProcessor(reason, message) + super.preRestart(reason, message) } } @@ -111,12 +111,12 @@ object ProcessorSpec { } class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { - override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { + override def preRestart(reason: Throwable, message: Option[Any]) = { message match { case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) case _ ⇒ } - super.preRestartProcessor(reason, message) + super.preRestart(reason, message) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index 6ff69569a5..35c8184536 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -39,12 +39,12 @@ object ProcessorStashSpec { } class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) { - override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { + override def preRestart(reason: Throwable, message: Option[Any]) = { message match { case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) case _ ⇒ } - super.preRestartProcessor(reason, message) + super.preRestart(reason, message) } } } diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java index 6999635638..e236887a5a 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java @@ -32,11 +32,11 @@ public class ProcessorFailureExample { } @Override - public void preRestartProcessor(Throwable reason, Option message) throws Exception { + public void preRestart(Throwable reason, Option message) { if (message.isDefined() && message.get() instanceof Persistent) { delete((Persistent) message.get()); } - super.preRestartProcessor(reason, message); + super.preRestart(reason, message); } } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala index 23645b0125..67919ed0be 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala @@ -26,7 +26,7 @@ object ConversationRecoveryExample extends App { case "init" ⇒ if (counter == 0) self forward Persistent(Ping) } - override def preStartProcessor() = () + override def preStart() = () } class Pong extends Processor { @@ -43,7 +43,7 @@ object ConversationRecoveryExample extends App { } } - override def preStartProcessor() = () + override def preStart() = () } val system = ActorSystem("example") diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala index c7961238e0..3a781450eb 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala @@ -18,12 +18,12 @@ object ProcessorFailureExample extends App { case Persistent(payload: String, _) ⇒ received = payload :: received } - override def preRestartProcessor(reason: Throwable, message: Option[Any]) { + override def preRestart(reason: Throwable, message: Option[Any]) { message match { case Some(p: Persistent) if !recoveryRunning ⇒ delete(p) // mark failing message as deleted case _ ⇒ // ignore } - super.preRestartProcessor(reason, message) + super.preRestart(reason, message) } }