Added configuration based routing (direct, random and round-robin) to the remote actors created by the RemoteActorRefProvider, also changed the configuration to allow specifying multiple remote nodes for a remotely configured actor.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-10-05 18:44:27 +02:00
parent 0957e41d19
commit d124f6e781
8 changed files with 82 additions and 119 deletions

View file

@ -22,7 +22,7 @@ class DeployerSpec extends WordSpec with MustMatchers {
LeastCPU,
NrOfInstances(3),
BannagePeriodFailureDetector(10),
RemoteScope("localhost", 2552))))
RemoteScope(List(RemoteAddress("localhost", 2552))))))
// ClusterScope(
// List(Node("node1")),
// new NrOfInstances(3),

View file

@ -14,103 +14,6 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers {
// "direct router" must {
// "be able to shut down its instance" in {
// val address = "direct-0"
// Deployer.deploy(
// Deploy(
// address,
// None,
// Direct,
// NrOfInstances(1),
// RemoveConnectionOnFirstFailureLocalFailureDetector,
// LocalScope))
// val helloLatch = new CountDownLatch(1)
// val stopLatch = new CountDownLatch(1)
// val actor = actorOf(new Actor {
// def receive = {
// case "hello" helloLatch.countDown()
// }
// override def postStop() {
// stopLatch.countDown()
// }
// }, address)
// actor ! "hello"
// helloLatch.await(5, TimeUnit.SECONDS) must be(true)
// actor.stop()
// stopLatch.await(5, TimeUnit.SECONDS) must be(true)
// }
// "send message to connection" in {
// val address = "direct-1"
// Deployer.deploy(
// Deploy(
// address,
// None,
// Direct,
// NrOfInstances(1),
// RemoveConnectionOnFirstFailureLocalFailureDetector,
// LocalScope))
// val doneLatch = new CountDownLatch(1)
// val counter = new AtomicInteger(0)
// val actor = actorOf(new Actor {
// def receive = {
// case "end" doneLatch.countDown()
// case _ counter.incrementAndGet()
// }
// }, address)
// actor ! "hello"
// actor ! "end"
// doneLatch.await(5, TimeUnit.SECONDS) must be(true)
// counter.get must be(1)
// }
// "deliver a broadcast message" in {
// val address = "direct-2"
// Deployer.deploy(
// Deploy(
// address,
// None,
// Direct,
// NrOfInstances(1),
// RemoveConnectionOnFirstFailureLocalFailureDetector,
// LocalScope))
// val doneLatch = new CountDownLatch(1)
// val counter1 = new AtomicInteger
// val actor = actorOf(new Actor {
// def receive = {
// case "end" doneLatch.countDown()
// case msg: Int counter1.addAndGet(msg)
// }
// }, address)
// actor ! Broadcast(1)
// actor ! "end"
// doneLatch.await(5, TimeUnit.SECONDS) must be(true)
// counter1.get must be(1)
// }
// }
"round robin router" must {
"be able to shut down its instance" in {

View file

@ -231,10 +231,29 @@ object Deployer extends ActorDeployer {
if (addressConfig.getSection("cluster").isDefined) throw new ConfigurationException(
"Configuration for deployment ID [" + address + "] can not have both 'remote' and 'cluster' sections.")
val hostname = remoteConfig.getString("hostname", "localhost")
val port = remoteConfig.getInt("port", 2552)
// --------------------------------
// akka.actor.deployment.<address>.remote.nodes
// --------------------------------
val remoteAddresses = remoteConfig.getList("nodes") match {
case Nil Nil
case nodes
def raiseRemoteNodeParsingError() = throw new ConfigurationException(
"Config option [" + addressPath +
".remote.nodes] needs to be a list with elements on format \"<hostname>:<port>\", was [" + nodes.mkString(", ") + "]")
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(hostname, port)))
nodes map { node
val tokenizer = new java.util.StringTokenizer(node, ":")
val hostname = tokenizer.nextElement.toString
if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError()
val port = try tokenizer.nextElement.toString.toInt catch {
case e: Exception raiseRemoteNodeParsingError()
}
if (port == 0) raiseRemoteNodeParsingError()
RemoteAddress(hostname, port)
}
}
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(remoteAddresses)))
case None // check for 'cluster' config section

View file

@ -79,9 +79,7 @@ object DeploymentConfig {
preferredNodes: Iterable[Home] = Vector(Node(Config.nodename)),
replication: ReplicationScheme = Transient) extends Scope
case class RemoteScope(
hostname: String = "localhost",
port: Int = 2552) extends Scope
case class RemoteScope(nodes: Iterable[RemoteAddress]) extends Scope
// For Java API
case class LocalScope() extends Scope
@ -89,6 +87,8 @@ object DeploymentConfig {
// For Scala API
case object LocalScope extends Scope
case class RemoteAddress(hostname: String, port: Int)
// --------------------------------
// --- Home
// --------------------------------

View file

@ -5,6 +5,7 @@
package akka.remote
import akka.actor._
import akka.routing._
import DeploymentConfig._
import Actor._
import Status._
@ -44,18 +45,58 @@ class RemoteActorRefProvider extends ActorRefProvider {
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
val actor = try {
Deployer.lookupDeploymentFor(address) match {
case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(host, port)))
// FIXME create RoutedActorRef if 'router' is specified
case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(remoteAddresses)))
val serverAddress = Remote.address
if (serverAddress.getHostName == host && serverAddress.getPort == port) {
// home node for this remote actor
val thisHostname = Remote.address.getHostName
val thisPort = Remote.address.getPort
def isReplicaNode: Boolean = remoteAddresses exists { remoteAddress
remoteAddress.hostname == thisHostname && remoteAddress.port == thisPort
}
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
Some(new LocalActorRef(props, address, false)) // create a local actor
} else {
// not home node, need to provision it
val remoteAddress = new InetSocketAddress(host, port)
useActorOnNode(remoteAddress, address, props.creator)
Some(RemoteActorRef(remoteAddress, address, Actor.TIMEOUT, None)) // create a remote actor
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerType = DeploymentConfig.routerTypeFor(router)
val routerFactory: () Router = routerType match {
case RouterType.Direct
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RandomRouter
case RouterType.RoundRobin
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RoundRobinRouter
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom sys.error("Router Custom not supported yet")
}
def provisionActorToNode(remoteAddress: RemoteAddress): RemoteActorRef = {
val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port)
useActorOnNode(inetSocketAddress, address, props.creator)
RemoteActorRef(inetSocketAddress, address, Actor.TIMEOUT, None)
}
val connections: Iterable[ActorRef] = remoteAddresses map { provisionActorToNode(_) }
Some(Routing.actorOf(RoutedProps(
routerFactory = routerFactory,
connections = connections)))
}
case deploy None // non-remote actor

View file

@ -1,4 +1,3 @@
akka.enabled-modules = ["remote"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.remote.hostname = "localhost"
akka.actor.deployment.service-hello.remote.port = 9991
akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"]

View file

@ -1,4 +1,3 @@
akka.enabled-modules = ["remote"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.remote.hostname = "localhost"
akka.actor.deployment.service-hello.remote.port = 9991
akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"]

View file

@ -94,8 +94,10 @@ akka {
#}
remote {
hostname = "localhost" # The remote server hostname or IP address the remote actor should connect to
port = 2552 # The remote server port the remote actor should connect to
nodes = ["wallace:2552", "gromit:2552"] # A list of hostnames and ports for instantiating the remote actor instances
# The format should be on "hostname:port", where:
# - hostname can be either hostname or IP address the remote actor should connect to
# - port should be the port for the remote server on the other node
}
#cluster { # defines the actor as a clustered actor