!per #3618 Cleanup duplication of life cycle hooks in Processor
- introduce around life cycle hooks for symmetry with aroundReceive
- no custom processor-specific life cycle hooks needed any more
- preStart and preRestart can be overridden with empty implementation
(interceptors ensure that super.preXxx calls are still executed)
- all around life cycle hooks can be final
- standard life cycle hooks are non-final to preserve composability with existing traits (FSM, ...)
This commit is contained in:
parent
d0684c2f7e
commit
6c78629cdb
14 changed files with 97 additions and 120 deletions
|
|
@ -479,6 +479,26 @@ trait Actor {
|
|||
*/
|
||||
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
|
||||
|
||||
/**
|
||||
* Can be overridden to intercept calls to `preStart`. Calls `preStart` by default.
|
||||
*/
|
||||
protected[akka] def aroundPreStart(): Unit = preStart()
|
||||
|
||||
/**
|
||||
* Can be overridden to intercept calls to `postStop`. Calls `postStop` by default.
|
||||
*/
|
||||
protected[akka] def aroundPostStop(): Unit = postStop()
|
||||
|
||||
/**
|
||||
* Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default.
|
||||
*/
|
||||
protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)
|
||||
|
||||
/**
|
||||
* Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default.
|
||||
*/
|
||||
protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
|
||||
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
|
|
|
|||
|
|
@ -559,7 +559,7 @@ private[akka] class ActorCell(
|
|||
try {
|
||||
val created = newActor()
|
||||
actor = created
|
||||
created.preStart()
|
||||
created.aroundPreStart()
|
||||
checkReceiveTimeout
|
||||
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
|
||||
} catch {
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
val optionalMessage = if (currentMessage ne null) Some(currentMessage.message) else None
|
||||
try {
|
||||
// if the actor fails in preRestart, we can do nothing but log it: it’s best-effort
|
||||
if (failedActor.context ne null) failedActor.preRestart(cause, optionalMessage)
|
||||
if (failedActor.context ne null) failedActor.aroundPreRestart(cause, optionalMessage)
|
||||
} catch handleNonFatalOrInterruptedException { e ⇒
|
||||
val ex = new PreRestartException(self, e, cause, optionalMessage)
|
||||
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
|
||||
|
|
@ -198,7 +198,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
* Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this
|
||||
* specific order.
|
||||
*/
|
||||
try if (a ne null) a.postStop()
|
||||
try if (a ne null) a.aroundPostStop()
|
||||
catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
|
||||
finally try dispatcher.detach(this)
|
||||
finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
|
||||
|
|
@ -226,7 +226,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
actor = freshActor // this must happen before postRestart has a chance to fail
|
||||
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
|
||||
|
||||
freshActor.postRestart(cause)
|
||||
freshActor.aroundPostRestart(cause)
|
||||
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
|
||||
|
||||
// only after parent is up and running again do restart the children which were not stopped
|
||||
|
|
|
|||
|
|
@ -65,19 +65,19 @@ public class PersistenceDocTest {
|
|||
abstract class MyProcessor1 extends UntypedProcessor {
|
||||
//#recover-on-start-disabled
|
||||
@Override
|
||||
public void preStartProcessor() {}
|
||||
public void preStart() {}
|
||||
//#recover-on-start-disabled
|
||||
|
||||
//#recover-on-restart-disabled
|
||||
@Override
|
||||
public void preRestartProcessor(Throwable reason, Option<Object> message) {}
|
||||
public void preRestart(Throwable reason, Option<Object> message) {}
|
||||
//#recover-on-restart-disabled
|
||||
}
|
||||
|
||||
abstract class MyProcessor2 extends UntypedProcessor {
|
||||
//#recover-on-start-custom
|
||||
@Override
|
||||
public void preStartProcessor() {
|
||||
public void preStart() {
|
||||
getSelf().tell(Recover.create(457L), null);
|
||||
}
|
||||
//#recover-on-start-custom
|
||||
|
|
@ -86,11 +86,11 @@ public class PersistenceDocTest {
|
|||
abstract class MyProcessor3 extends UntypedProcessor {
|
||||
//#deletion
|
||||
@Override
|
||||
public void preRestartProcessor(Throwable reason, Option<Object> message) throws Exception {
|
||||
public void preRestart(Throwable reason, Option<Object> message) {
|
||||
if (message.isDefined() && message.get() instanceof Persistent) {
|
||||
delete((Persistent) message.get());
|
||||
}
|
||||
super.preRestartProcessor(reason, message);
|
||||
super.preRestart(reason, message);
|
||||
}
|
||||
//#deletion
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ only be received by that processor after recovery completes.
|
|||
Recovery customization
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation.
|
||||
Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-disabled
|
||||
|
||||
|
|
@ -87,18 +87,15 @@ In this case, a processor must be recovered explicitly by sending it a ``Recover
|
|||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-explicit
|
||||
|
||||
If not overridden, ``preStartProcessor`` sends a ``Recover`` message to ``getSelf()``. Applications may also override
|
||||
``preStartProcessor`` to define further ``Recover`` parameters such as an upper sequence number bound, for example.
|
||||
If not overridden, ``preStart`` sends a ``Recover`` message to ``getSelf()``. Applications may also override
|
||||
``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom
|
||||
|
||||
Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation.
|
||||
Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled
|
||||
|
||||
This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal
|
||||
state and makes a message replay unnecessary).
|
||||
|
||||
Recovery status
|
||||
^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
@ -116,13 +113,6 @@ a replay of that message during recovery it can be marked as deleted.
|
|||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#deletion
|
||||
|
||||
Life cycle hooks
|
||||
----------------
|
||||
|
||||
``UntypedProcessor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``,
|
||||
``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``,
|
||||
``postRestart`` and ``postStop`` directly.
|
||||
|
||||
Identifiers
|
||||
-----------
|
||||
|
||||
|
|
|
|||
|
|
@ -44,16 +44,16 @@ trait PersistenceDocSpec {
|
|||
new AnyRef {
|
||||
trait MyProcessor1 extends Processor {
|
||||
//#recover-on-start-disabled
|
||||
override def preStartProcessor() = ()
|
||||
override def preStart() = ()
|
||||
//#recover-on-start-disabled
|
||||
//#recover-on-restart-disabled
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = ()
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) = ()
|
||||
//#recover-on-restart-disabled
|
||||
}
|
||||
|
||||
trait MyProcessor2 extends Processor {
|
||||
//#recover-on-start-custom
|
||||
override def preStartProcessor() {
|
||||
override def preStart() {
|
||||
self ! Recover(toSequenceNr = 457L)
|
||||
}
|
||||
//#recover-on-start-custom
|
||||
|
|
@ -61,12 +61,12 @@ trait PersistenceDocSpec {
|
|||
|
||||
trait MyProcessor3 extends Processor {
|
||||
//#deletion
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
message match {
|
||||
case Some(p: Persistent) ⇒ delete(p)
|
||||
case _ ⇒
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
super.preRestart(reason, message)
|
||||
}
|
||||
//#deletion
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ only be received by that processor after recovery completes.
|
|||
Recovery customization
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation.
|
||||
Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-disabled
|
||||
|
||||
|
|
@ -82,18 +82,15 @@ In this case, a processor must be recovered explicitly by sending it a ``Recover
|
|||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-explicit
|
||||
|
||||
If not overridden, ``preStartProcessor`` sends a ``Recover()`` message to ``self``. Applications may also override
|
||||
``preStartProcessor`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example.
|
||||
If not overridden, ``preStart`` sends a ``Recover()`` message to ``self``. Applications may also override
|
||||
``preStart`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom
|
||||
|
||||
Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation.
|
||||
Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled
|
||||
|
||||
This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal
|
||||
state and makes a message replay unnecessary).
|
||||
|
||||
Recovery status
|
||||
^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
@ -111,14 +108,6 @@ a replay of that message during recovery it can be marked as deleted.
|
|||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#deletion
|
||||
|
||||
Life cycle hooks
|
||||
----------------
|
||||
|
||||
``Processor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``,
|
||||
``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``,
|
||||
``postRestart`` and ``postStop`` directly. The latter are nevertheless non-final to allow composition with
|
||||
existing traits such as ``akka.actor.FSM``, for example.
|
||||
|
||||
Identifiers
|
||||
-----------
|
||||
|
||||
|
|
|
|||
|
|
@ -222,9 +222,35 @@ trait Processor extends Actor with Stash {
|
|||
_currentState.aroundReceive(receive, message)
|
||||
}
|
||||
|
||||
private def nextSequenceNr(): Long = {
|
||||
_sequenceNr += 1L
|
||||
_sequenceNr
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
final override protected[akka] def aroundPreStart(): Unit = {
|
||||
try preStart() finally super.preStart()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
final override protected[akka] def aroundPostStop(): Unit = {
|
||||
try unstashAll(unstashFilterPredicate) finally postStop()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||
try {
|
||||
unstashAll(unstashFilterPredicate)
|
||||
unstashAllInternal()
|
||||
} finally {
|
||||
message match {
|
||||
case Some(Written(m)) ⇒ preRestartDefault(reason, Some(m))
|
||||
case Some(Looped(m)) ⇒ preRestartDefault(reason, Some(m))
|
||||
case Some(Replayed(m)) ⇒ preRestartDefault(reason, Some(m))
|
||||
case mo ⇒ preRestartDefault(reason, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -232,80 +258,32 @@ trait Processor extends Actor with Stash {
|
|||
* a `Recover()` to `self`.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def preStartProcessor(): Unit = {
|
||||
override def preStart(): Unit = {
|
||||
self ! Recover()
|
||||
}
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called when a processor is stopped. Empty default implementation.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def postStopProcessor(): Unit = ()
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called before a processor is restarted. Default implementation sends
|
||||
* a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def preRestartProcessor(reason: Throwable, message: Option[Any]): Unit = message match {
|
||||
case Some(_) ⇒ self ! Recover(lastSequenceNr)
|
||||
case None ⇒ self ! Recover()
|
||||
}
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called after a processor has been restarted. Empty default implementation.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def postRestartProcessor(reason: Throwable): Unit = ()
|
||||
|
||||
/**
|
||||
* Calls [[preStartProcessor]].
|
||||
*/
|
||||
override def preStart() {
|
||||
preStartProcessor()
|
||||
super.preStart()
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[postStopProcessor]] and unstashes all messages from the ''user stash'' that cannot be
|
||||
* replayed. The user stash is empty afterwards.
|
||||
*/
|
||||
override def postStop() {
|
||||
postStopProcessor()
|
||||
try unstashAll(unstashFilterPredicate) finally super.postStop()
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[preRestartDefault]] and then `super.preRestart()`. If processor implementation
|
||||
* classes want to opt out from stopping child actors, they should override this method and
|
||||
* call [[preRestartDefault]] only.
|
||||
*/
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
try preRestartDefault(reason, message) finally super.preRestart(reason, message)
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[preRestartProcessor]] and unstashes all messages from the ''user stash'' that cannot be
|
||||
* replayed. The user stash is empty afterwards.
|
||||
*/
|
||||
protected def preRestartDefault(reason: Throwable, message: Option[Any]) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||
message match {
|
||||
case Some(Written(m)) ⇒ preRestartProcessor(reason, Some(m))
|
||||
case Some(Looped(m)) ⇒ preRestartProcessor(reason, Some(m))
|
||||
case Some(Replayed(m)) ⇒ preRestartProcessor(reason, Some(m))
|
||||
case mo ⇒ preRestartProcessor(reason, None)
|
||||
case Some(_) ⇒ self ! Recover(lastSequenceNr)
|
||||
case None ⇒ self ! Recover()
|
||||
}
|
||||
|
||||
unstashAll(unstashFilterPredicate)
|
||||
unstashAllInternal()
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[postRestartProcessor]].
|
||||
* Calls [[preRestart]] and then `super.preRestart()`. If processor implementation classes want to
|
||||
* opt out from stopping child actors, they should override this method and call [[preRestart]] only.
|
||||
*/
|
||||
override def postRestart(reason: Throwable) {
|
||||
postRestartProcessor(reason)
|
||||
super.postRestart(reason)
|
||||
def preRestartDefault(reason: Throwable, message: Option[Any]): Unit = {
|
||||
try preRestart(reason, message) finally super.preRestart(reason, message)
|
||||
}
|
||||
|
||||
private def nextSequenceNr(): Long = {
|
||||
_sequenceNr += 1L
|
||||
_sequenceNr
|
||||
}
|
||||
|
||||
// -----------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -47,5 +47,5 @@ abstract class NamedProcessor(name: String) extends Processor {
|
|||
}
|
||||
|
||||
trait TurnOffRecoverOnStart { this: Processor ⇒
|
||||
override def preStartProcessor(): Unit = ()
|
||||
override def preStart(): Unit = ()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,12 +22,12 @@ object ProcessorSpec {
|
|||
case GetState ⇒ sender ! state.reverse
|
||||
}
|
||||
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) = {
|
||||
message match {
|
||||
case Some(m: Persistent) ⇒ delete(m) // delete message from journal
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
super.preRestart(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -111,12 +111,12 @@ object ProcessorSpec {
|
|||
}
|
||||
|
||||
class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) = {
|
||||
message match {
|
||||
case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m)
|
||||
case _ ⇒
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
super.preRestart(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,12 +39,12 @@ object ProcessorStashSpec {
|
|||
}
|
||||
|
||||
class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) {
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) = {
|
||||
message match {
|
||||
case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m)
|
||||
case _ ⇒
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
super.preRestart(reason, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,11 +32,11 @@ public class ProcessorFailureExample {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preRestartProcessor(Throwable reason, Option<Object> message) throws Exception {
|
||||
public void preRestart(Throwable reason, Option<Object> message) {
|
||||
if (message.isDefined() && message.get() instanceof Persistent) {
|
||||
delete((Persistent) message.get());
|
||||
}
|
||||
super.preRestartProcessor(reason, message);
|
||||
super.preRestart(reason, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ object ConversationRecoveryExample extends App {
|
|||
case "init" ⇒ if (counter == 0) self forward Persistent(Ping)
|
||||
}
|
||||
|
||||
override def preStartProcessor() = ()
|
||||
override def preStart() = ()
|
||||
}
|
||||
|
||||
class Pong extends Processor {
|
||||
|
|
@ -43,7 +43,7 @@ object ConversationRecoveryExample extends App {
|
|||
}
|
||||
}
|
||||
|
||||
override def preStartProcessor() = ()
|
||||
override def preStart() = ()
|
||||
}
|
||||
|
||||
val system = ActorSystem("example")
|
||||
|
|
|
|||
|
|
@ -18,12 +18,12 @@ object ProcessorFailureExample extends App {
|
|||
case Persistent(payload: String, _) ⇒ received = payload :: received
|
||||
}
|
||||
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
message match {
|
||||
case Some(p: Persistent) if !recoveryRunning ⇒ delete(p) // mark failing message as deleted
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
super.preRestart(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue