removed the Init(config) life-cycle message and the config parameters to pre/postRestart instead calling init right after start has been invoked for doing post start initialization
This commit is contained in:
parent
3de15e3590
commit
6c4d05bbdc
5 changed files with 17 additions and 20 deletions
|
|
@ -413,13 +413,13 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
|
|||
throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
|
||||
}
|
||||
|
||||
override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def preRestart(reason: AnyRef) {
|
||||
try {
|
||||
if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
|
||||
} catch { case e: InvocationTargetException => throw e.getCause }
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
try {
|
||||
if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
|
||||
} catch { case e: InvocationTargetException => throw e.getCause }
|
||||
|
|
|
|||
|
|
@ -42,7 +42,6 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
|
|||
}
|
||||
|
||||
@serializable sealed trait LifeCycleMessage
|
||||
case class Init(config: AnyRef) extends LifeCycleMessage
|
||||
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage
|
||||
case class Restart(reason: AnyRef) extends LifeCycleMessage
|
||||
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
|
||||
|
|
@ -230,12 +229,10 @@ trait Actor extends TransactionManagement {
|
|||
@volatile private var _isShutDown: Boolean = false
|
||||
private var _isEventBased: Boolean = false
|
||||
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
|
||||
private var _config: Option[AnyRef] = None
|
||||
private val _remoteFlagLock = new ReadWriteLock
|
||||
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
|
||||
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
|
||||
private[akka] var _supervisor: Option[Actor] = None
|
||||
|
||||
private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
|
||||
|
||||
// ====================================
|
||||
|
|
@ -375,7 +372,7 @@ trait Actor extends TransactionManagement {
|
|||
* Optional callback method that is called during initialization.
|
||||
* To be implemented by subclassing actor.
|
||||
*/
|
||||
protected def init(config: AnyRef) = {}
|
||||
protected def init = {}
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -383,7 +380,7 @@ trait Actor extends TransactionManagement {
|
|||
* Mandatory callback method that is called during restart and reinitialization after a server crash.
|
||||
* To be implemented by subclassing actor.
|
||||
*/
|
||||
protected def preRestart(reason: AnyRef, config: Option[AnyRef]) = {}
|
||||
protected def preRestart(reason: AnyRef) = {}
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -391,7 +388,7 @@ trait Actor extends TransactionManagement {
|
|||
* Mandatory callback method that is called during restart and reinitialization after a server crash.
|
||||
* To be implemented by subclassing actor.
|
||||
*/
|
||||
protected def postRestart(reason: AnyRef, config: Option[AnyRef]) = {}
|
||||
protected def postRestart(reason: AnyRef) = {}
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -424,6 +421,7 @@ trait Actor extends TransactionManagement {
|
|||
_isRunning = true
|
||||
//if (isTransactional) this !! TransactionalInit
|
||||
}
|
||||
init // call user-defined init method
|
||||
Actor.log.debug("[%s] has started", toString)
|
||||
this
|
||||
}
|
||||
|
|
@ -887,7 +885,7 @@ trait Actor extends TransactionManagement {
|
|||
} else proceed
|
||||
} catch {
|
||||
case e =>
|
||||
Actor.log.error(e, "Exception when \ninvoking actor [%s] \nwith message [%s]", this, message)
|
||||
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
||||
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
||||
clearTransaction // need to clear currentTransaction before call to supervisor
|
||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||
|
|
@ -904,7 +902,6 @@ trait Actor extends TransactionManagement {
|
|||
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
|
||||
|
||||
private val lifeCycles: PartialFunction[Any, Unit] = {
|
||||
case Init(config) => _config = Some(config); init(config)
|
||||
case HotSwap(code) => _hotswap = code
|
||||
case Restart(reason) => restart(reason)
|
||||
case Exit(dead, reason) => handleTrapExit(dead, reason)
|
||||
|
|
@ -946,9 +943,9 @@ trait Actor extends TransactionManagement {
|
|||
}
|
||||
|
||||
private[Actor] def restart(reason: AnyRef) = _mailbox.synchronized {
|
||||
preRestart(reason, _config)
|
||||
preRestart(reason)
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
postRestart(reason, _config)
|
||||
postRestart(reason)
|
||||
}
|
||||
|
||||
private[akka] def registerSupervisorAsRemoteActor: Option[String] = _mailbox.synchronized {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ object Log {
|
|||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
Log.messageLog += reason.asInstanceOf[Exception].getMessage
|
||||
}
|
||||
}
|
||||
|
|
@ -48,7 +48,7 @@ object Log {
|
|||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
Log.messageLog += reason.asInstanceOf[Exception].getMessage
|
||||
}
|
||||
}
|
||||
|
|
@ -63,7 +63,7 @@ object Log {
|
|||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
Log.messageLog += reason.asInstanceOf[Exception].getMessage
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -556,7 +556,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
case Die =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
messageLog += reason.asInstanceOf[Exception].getMessage
|
||||
}
|
||||
}
|
||||
|
|
@ -569,7 +569,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
case Die =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
messageLog += reason.asInstanceOf[Exception].getMessage
|
||||
}
|
||||
}
|
||||
|
|
@ -583,7 +583,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
|
||||
override protected def postRestart(reason: AnyRef) {
|
||||
messageLog += reason.asInstanceOf[Exception].getMessage
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -557,8 +557,8 @@ object AMQP {
|
|||
}
|
||||
}
|
||||
|
||||
override def preRestart(reason: AnyRef, config: Option[AnyRef]) = disconnect
|
||||
override def preRestart(reason: AnyRef) = disconnect
|
||||
|
||||
override def postRestart(reason: AnyRef, config: Option[AnyRef]) = reconnect(initReconnectDelay)
|
||||
override def postRestart(reason: AnyRef) = reconnect(initReconnectDelay)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue