Switched to server managed for Supervisor config
This commit is contained in:
parent
453fa4dd1c
commit
d54eb18228
4 changed files with 23 additions and 6 deletions
|
|
@ -131,7 +131,6 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
|
|||
servers.map(server =>
|
||||
server match {
|
||||
case Supervise(actorRef, lifeCycle, remoteAddress) =>
|
||||
remoteAddress.foreach { address => actorRef.makeRemote(address.hostname, address.port) }
|
||||
actorRef.start
|
||||
val className = actorRef.actor.getClass.getName
|
||||
val currentActors = {
|
||||
|
|
@ -142,6 +141,10 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
|
|||
_childActors.put(className, actorRef :: currentActors)
|
||||
actorRef.lifeCycle = lifeCycle
|
||||
supervisor.link(actorRef)
|
||||
if (remoteAddress.isDefined) {
|
||||
val address = remoteAddress.get
|
||||
RemoteServerModule.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef)
|
||||
}
|
||||
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
|
||||
val childSupervisor = Supervisor(supervisorConfig)
|
||||
supervisor.link(childSupervisor.supervisor)
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ object ReflectiveAccess extends Logging {
|
|||
val PORT = Config.config.getInt("akka.remote.server.port", 2552)
|
||||
|
||||
type RemoteServerObject = {
|
||||
def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef): Unit
|
||||
def registerActor(address: InetSocketAddress, actor: ActorRef): Unit
|
||||
def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
|
||||
}
|
||||
|
||||
|
|
@ -128,9 +128,9 @@ object ReflectiveAccess extends Logging {
|
|||
val remoteNodeObjectInstance: Option[RemoteNodeObject] =
|
||||
getObjectFor("akka.remote.RemoteNode$")
|
||||
|
||||
def registerActor(address: InetSocketAddress, uuid: Uuid, actorRef: ActorRef) = {
|
||||
def registerActor(address: InetSocketAddress, actorRef: ActorRef) = {
|
||||
ensureRemotingEnabled
|
||||
remoteServerObjectInstance.get.registerActor(address, uuid, actorRef)
|
||||
remoteServerObjectInstance.get.registerActor(address, actorRef)
|
||||
}
|
||||
|
||||
def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {
|
||||
|
|
|
|||
|
|
@ -139,10 +139,24 @@ object RemoteServer {
|
|||
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in REflectiveAccess
|
||||
*/
|
||||
private[akka] def registerActor(address: InetSocketAddress, actorRef: ActorRef) {
|
||||
serverFor(address) foreach { _.register(actorRef) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in Reflective
|
||||
*/
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) {
|
||||
serverFor(address) foreach { _.registerTypedActor(implementationClassName,proxy)}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteServer.
|
||||
* Life-cycle events for RemoteServer.
|
||||
*/
|
||||
sealed trait RemoteServerLifeCycleEvent
|
||||
case class RemoteServerError(@BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
|
|
|
|||
|
|
@ -919,7 +919,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
|
|||
* Unregisters initialization and stops its ActorRef.
|
||||
*/
|
||||
def unregister(proxy: AnyRef): AspectInit = {
|
||||
val init = initializations.remove(proxy)
|
||||
val init = if (proxy ne null) initializations.remove(proxy) else null
|
||||
if (init ne null) {
|
||||
notifyListeners(AspectInitUnregistered(proxy, init))
|
||||
init.actorRef.stop
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue