Making sure that the RemoteActorRefProvider delegates systemServices down to the LocalActorRefProvider

This commit is contained in:
Viktor Klang 2011-10-18 14:21:48 +02:00
parent 1c3b9a389b
commit 65868d7c96

View file

@ -46,93 +46,95 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false)
def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = {
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
def actorOf(props: Props, address: String, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(props, address, systemService)
else {
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future
case null
val actor: ActorRef = try {
app.deployer.lookupDeploymentFor(address) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses)))
actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future
case null
val actor: ActorRef = try {
app.deployer.lookupDeploymentFor(address) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses)))
val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {
case FailureDetectorType.NoOp new NoOpFailureDetector
case FailureDetectorType.RemoveConnectionOnFirstFailure new RemoveConnectionOnFirstFailureFailureDetector
case FailureDetectorType.BannagePeriod(timeToBan) new BannagePeriodFailureDetector(timeToBan)
case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
}
val thisHostname = remote.address.getHostName
val thisPort = remote.address.getPort
def isReplicaNode: Boolean = remoteAddresses exists { some some.hostname == thisHostname && some.port == thisPort }
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
new LocalActorRef(app, props, address, false)
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RandomRouter
case RouterType.RoundRobin
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RoundRobinRouter
case RouterType.ScatterGather
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {
case FailureDetectorType.NoOp new NoOpFailureDetector
case FailureDetectorType.RemoveConnectionOnFirstFailure new RemoveConnectionOnFirstFailureFailureDetector
case FailureDetectorType.BannagePeriod(timeToBan) new BannagePeriodFailureDetector(timeToBan)
case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
}
val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a)
val inetAddr = new InetSocketAddress(a.hostname, a.port)
conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None))
val thisHostname = remote.address.getHostName
val thisPort = remote.address.getPort
def isReplicaNode: Boolean = remoteAddresses exists { some some.hostname == thisHostname && some.port == thisPort }
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
new LocalActorRef(app, props, address, false)
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RandomRouter
case RouterType.RoundRobin
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RoundRobinRouter
case RouterType.ScatterGather
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
}
val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a)
val inetAddr = new InetSocketAddress(a.hostname, a.port)
conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None))
}
val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector)
connections.keys foreach { useActorOnNode(_, address, props.creator) }
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address)
}
val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector)
connections.keys foreach { useActorOnNode(_, address, props.creator) }
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address)
}
case deploy local.actorOf(props, address, systemService)
case deploy local.actorOf(props, address, systemService)
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actors.replace(address, newFuture, actor)
actor
case actor: ActorRef actor
case future: Future[_] future.get.asInstanceOf[ActorRef]
newFuture completeWithResult actor
actors.replace(address, newFuture, actor)
actor
case actor: ActorRef actor
case future: Future[_] future.get.asInstanceOf[ActorRef]
}
}
}
/**
* Copied from LocalActorRefProvider...