wrap up local routing

- pull some more generic stuff out of the individual routers, add
  factories which take only target lists
- add router parsing to Deployer, removing everything which is not
  strictly related to local scope, which left only few things, so move
  them to Deployer.scala and delete DeploymentConfig
- fix ConfiguredLocalRoutingSpec to use the new configuration mechanism
  and verify that configuration overrides code
- fix DeployerSpec by using (mostly) correct lookup paths and removing
  everything which was not local
- change config file layout, removing everything which is not local from
  akka-actor/.../reference.conf, putting the remote stuff into the
  akka-remote/.../reference.conf (unused as of yet); adapt comments
  according to changed functionality
This commit is contained in:
Roland 2011-12-12 23:31:15 +01:00
parent d8bc57dc17
commit 0a7e5fe296
22 changed files with 209 additions and 1427 deletions

View file

@ -5,31 +5,20 @@
package akka.actor
import akka.testkit.AkkaSpec
import DeploymentConfig._
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import akka.routing._
object DeployerSpec {
val deployerConf = ConfigFactory.parseString("""
akka.actor.deployment {
/user/service1 {
}
/user/service2 {
router = round-robin
nr-of-instances = 3
remote {
nodes = ["wallace:2552", "gromit:2552"]
}
}
/user/service3 {
create-as {
class = "akka.actor.DeployerSpec$RecipeActor"
}
}
/user/service-auto {
router = round-robin
nr-of-instances = auto
}
/user/service-direct {
router = direct
}
@ -47,31 +36,6 @@ object DeployerSpec {
/user/service-scatter-gather {
router = scatter-gather
}
/user/service-least-cpu {
router = least-cpu
}
/user/service-least-ram {
router = least-ram
}
/user/service-least-messages {
router = least-messages
}
/user/service-custom {
router = org.my.Custom
}
/user/service-cluster1 {
cluster {
preferred-nodes = ["node:wallace", "node:gromit"]
}
}
/user/service-cluster2 {
cluster {
preferred-nodes = ["node:wallace", "node:gromit"]
replication {
strategy = write-behind
}
}
}
}
""", ConfigParseOptions.defaults)
@ -94,9 +58,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
deployment must be(Some(
Deploy(
service,
deployment.get.config,
None,
NoRouting,
NrOfInstances(1),
NoRouter,
LocalScope)))
}
@ -114,28 +78,14 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
deployment must be(Some(
Deploy(
service,
deployment.get.config,
Some(ActorRecipe(classOf[DeployerSpec.RecipeActor])),
NoRouting,
NrOfInstances(1),
LocalScope)))
}
"be able to parse 'akka.actor.deployment._' with number-of-instances=auto" in {
val service = "/user/service-auto"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
Deploy(
service,
None,
RoundRobin,
AutoNrOfInstances,
NoRouter,
LocalScope)))
}
"detect invalid number-of-instances" in {
intercept[akka.config.ConfigurationException] {
intercept[com.typesafe.config.ConfigException.WrongType] {
val invalidDeployerConf = ConfigFactory.parseString("""
akka.actor.deployment {
/user/service-invalid-number-of-instances {
@ -150,50 +100,35 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
}
"be able to parse 'akka.actor.deployment._' with direct router" in {
assertRouting(NoRouting, "/user/service-direct")
assertRouting(NoRouter, "/user/service-direct")
}
"ignore nr-of-instances with direct router" in {
assertRouting(NoRouting, "/user/service-direct2")
assertRouting(NoRouter, "/user/service-direct2")
}
"be able to parse 'akka.actor.deployment._' with round-robin router" in {
assertRouting(RoundRobin, "/user/service-round-robin")
assertRouting(RoundRobinRouter(1), "/user/service-round-robin")
}
"be able to parse 'akka.actor.deployment._' with random router" in {
assertRouting(Random, "/user/service-random")
assertRouting(RandomRouter(1), "/user/service-random")
}
"be able to parse 'akka.actor.deployment._' with scatter-gather router" in {
assertRouting(ScatterGather, "/user/service-scatter-gather")
assertRouting(ScatterGatherFirstCompletedRouter(1), "/user/service-scatter-gather")
}
"be able to parse 'akka.actor.deployment._' with least-cpu router" in {
assertRouting(LeastCPU, "/user/service-least-cpu")
}
"be able to parse 'akka.actor.deployment._' with least-ram router" in {
assertRouting(LeastRAM, "/user/service-least-ram")
}
"be able to parse 'akka.actor.deployment._' with least-messages router" in {
assertRouting(LeastMessages, "/user/service-least-messages")
}
"be able to parse 'akka.actor.deployment._' with custom router" in {
assertRouting(CustomRouter("org.my.Custom"), "/user/service-custom")
}
def assertRouting(expected: Routing, service: String) {
def assertRouting(expected: RouterConfig, service: String) {
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
Deploy(
service,
deployment.get.config,
None,
expected,
NrOfInstances(1),
LocalScope)))
}

View file

@ -4,29 +4,31 @@ import akka.actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit.AkkaSpec
import akka.actor.DeploymentConfig._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
import akka.testkit._
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
val deployer = system.asInstanceOf[ActorSystemImpl].provider.deployer
"RouterConfig" must {
"be overridable in config" in {
deployer.deploy(Deploy("/config", null, None, RandomRouter(4), LocalScope))
val actor = system.actorOf(Props(new Actor {
def receive = {
case "get" sender ! context.props
}
}).withRouting(RoundRobinRouter(12)), "config")
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4)
}
}
"round robin router" must {
"be able to shut down its instance" in {
val path = system / "round-robin-0"
deployer.deploy(
Deploy(
path.toString,
None,
RoundRobin,
NrOfInstances(5),
LocalScope))
val helloLatch = new CountDownLatch(5)
val stopLatch = new CountDownLatch(5)
@ -38,7 +40,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
override def postStop() {
stopLatch.countDown()
}
}), path.name)
}).withRouting(RoundRobinRouter(5)), "round-robin-shutdown")
actor ! "hello"
actor ! "hello"
@ -52,16 +54,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
}
"deliver messages in a round robin fashion" in {
val path = system / "round-robin-1"
deployer.deploy(
Deploy(
path.toString,
None,
RoundRobin,
NrOfInstances(10),
LocalScope))
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
@ -69,7 +61,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
val counter = new AtomicInteger
var replies = Map.empty[Int, Int]
for (i 0 until connectionCount) {
replies = replies + (i -> 0)
replies += i -> 0
}
val actor = system.actorOf(Props(new Actor {
@ -78,7 +70,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
case "hit" sender ! id
case "end" doneLatch.countDown()
}
}), path.name)
}).withRouting(RoundRobinRouter(connectionCount)), "round-robin")
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
@ -92,20 +84,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
actor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
replies.values foreach { _ must be(10) }
replies.values foreach { _ must be(iterationCount) }
}
"deliver a broadcast message using the !" in {
val path = system / "round-robin-2"
deployer.deploy(
Deploy(
path.toString,
None,
RoundRobin,
NrOfInstances(5),
LocalScope))
val helloLatch = new CountDownLatch(5)
val stopLatch = new CountDownLatch(5)
@ -117,7 +99,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
override def postStop() {
stopLatch.countDown()
}
}), path.name)
}).withRouting(RoundRobinRouter(5)), "round-robin-broadcast")
actor ! Broadcast("hello")
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
@ -130,27 +112,17 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
"random router" must {
"be able to shut down its instance" in {
val path = system / "random-0"
deployer.deploy(
Deploy(
path.toString,
None,
Random,
NrOfInstances(7),
LocalScope))
val stopLatch = new CountDownLatch(7)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "hello" {}
case "hello" sender ! "world"
}
override def postStop() {
stopLatch.countDown()
}
}), path.name)
}).withRouting(RandomRouter(7)), "random-shutdown")
actor ! "hello"
actor ! "hello"
@ -158,21 +130,15 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
actor ! "hello"
actor ! "hello"
within(2 seconds) {
for (i 1 to 5) expectMsg("world")
}
actor.stop()
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
}
"deliver messages in a random fashion" in {
val path = system / "random-1"
deployer.deploy(
Deploy(
path.toString,
None,
Random,
NrOfInstances(10),
LocalScope))
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
@ -189,7 +155,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
case "hit" sender ! id
case "end" doneLatch.countDown()
}
}), path.name)
}).withRouting(RandomRouter(connectionCount)), "random")
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
@ -204,19 +170,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
replies.values foreach { _ must be > (0) }
replies.values.sum must be === iterationCount * connectionCount
}
"deliver a broadcast message using the !" in {
val path = system / "random-2"
deployer.deploy(
Deploy(
path.toString,
None,
Random,
NrOfInstances(6),
LocalScope))
val helloLatch = new CountDownLatch(6)
val stopLatch = new CountDownLatch(6)
@ -228,7 +185,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
override def postStop() {
stopLatch.countDown()
}
}), path.name)
}).withRouting(RandomRouter(6)), "random-broadcast")
actor ! Broadcast("hello")
helloLatch.await(5, TimeUnit.SECONDS) must be(true)

View file

@ -3,7 +3,6 @@ package akka.routing
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing;
/**
* A Factory responsible for creating {@link Router} instances. It makes Java compatability possible for users that
* want to provide their own router instance.
*/
public interface RouterFactory {
/**
* Creates a new Router instance.
*
* @return the newly created Router instance.
*/
Router newRouter();
}

View file

@ -46,59 +46,32 @@ akka {
deployment {
default { # deployment id pattern, e.g. /app/service-ping
default { # deployment id pattern, e.g. /app/service-ping
router = "direct" # routing (load-balance) scheme to use
# available: "direct", "round-robin", "random", "scatter-gather"
# "least-cpu", "least-ram", "least-messages"
# or: fully qualified class name of the router class
# default is "direct";
# if 'replication' is used then the only available router is "direct"
router = "direct" # routing (load-balance) scheme to use
# available: "direct", "round-robin", "random", "scatter-gather"
# or: fully qualified class name of the router class
# default is "direct";
# In case of non-direct routing, the actors to be routed to can be specified
# in several ways:
# - nr-of-instances: will create that many children given the actor factory
# supplied in the source code (overridable using create-as below)
# - target.paths: will look the paths up using actorFor and route to
# them, i.e. will not create children
nr-of-instances = 1 # number of actor instances in the cluster
# available: positive integer (1-N) or the string "auto" for auto-scaling
# default is '1'
# if the "direct" router is used then this element is ignored (always '1')
nr-of-instances = 1 # number of children to create in case of a non-direct router; this setting
# is ignored if target.paths is given
# optional
create-as { # FIXME document 'create-as'
class = "" # fully qualified class name of recipe implementation
create-as { # FIXME document 'create-as'
class = "" # fully qualified class name of recipe implementation
}
remote = "" # if this is set to a valid remote address, the named actor will be deployed at that node
target {
nodes = [] # A list of hostnames and ports for instantiating the remote actor instances
# The format should be on "hostname:port", where:
# - hostname can be either hostname or IP address the remote actor should connect to
# - port should be the port for the remote server on the other node
paths = [] # Alternatively you can specify the full paths of those actors which should be routed to
paths = [] # Alternatively to giving nr-of-instances you can specify the full paths of
# those actors which should be routed to. This setting takes precedence over
# nr-of-instances
}
cluster { # defines the actor as a clustered actor
# default (if omitted) is local non-clustered actor
preferred-nodes = [] # a list of preferred nodes for instantiating the actor instances on
# on format "host:<hostname>", "ip:<ip address>" or "node:<node name>"
# optional
replication { # use replication or not? only makes sense for a stateful actor
# serialize-mailbox not implemented, ticket #1412
serialize-mailbox = off # should the actor mailbox be part of the serialized snapshot?
# default is 'off'
storage = "transaction-log" # storage model for replication
# available: "transaction-log" and "data-grid"
# default is "transaction-log"
strategy = "write-through" # guarantees for replication
# available: "write-through" and "write-behind"
# default is "write-through"
}
}
}
}

View file

@ -4,7 +4,6 @@
package akka.actor
import DeploymentConfig._
import akka.dispatch._
import akka.routing._
import akka.util.Duration

View file

@ -520,13 +520,13 @@ class LocalActorRefProvider(
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = {
props.routerConfig match {
case NoRouter new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
case routedActor new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path)
case NoRouter new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
case router new RoutedActorRef(system, props.withRouting(adaptFromDeploy(router, path)), supervisor, path)
}
}
private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = {
val lookupPath = p.elements.mkString("/", "/", "")
val lookupPath = p.elements.drop(1).mkString("/", "/", "")
r.adaptFromDeploy(deployer.lookup(lookupPath))
}

View file

@ -7,12 +7,20 @@ package akka.actor
import collection.immutable.Seq
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging
import akka.actor.DeploymentConfig._
import akka.AkkaException
import akka.config.ConfigurationException
import akka.util.Duration
import akka.event.EventStream
import com.typesafe.config._
import akka.routing._
case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope)
case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here
trait Scope
case class LocalScope() extends Scope
case object LocalScope extends Scope
/**
* Deployer maps actor paths to actor deployments.
@ -40,48 +48,20 @@ class Deployer(val settings: ActorSystem.Settings) {
import akka.util.ReflectiveAccess.getClassFor
val deployment = config.withFallback(default)
// --------------------------------
// akka.actor.deployment.<path>.router
// --------------------------------
val router: Routing = deployment.getString("router") match {
case "round-robin" RoundRobin
case "random" Random
case "scatter-gather" ScatterGather
case "least-cpu" LeastCPU
case "least-ram" LeastRAM
case "least-messages" LeastMessages
case routerClassName CustomRouter(routerClassName)
val targets = deployment.getStringList("target.paths").asScala.toSeq
val nrOfInstances = deployment.getInt("nr-of-instances")
val router: RouterConfig = deployment.getString("router") match {
case "direct" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, targets)
case "random" RandomRouter(nrOfInstances, targets)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, targets)
case "broadcast" BroadcastRouter(nrOfInstances, targets)
case x throw new ConfigurationException("unknown router type " + x + " for path " + key)
}
// --------------------------------
// akka.actor.deployment.<path>.nr-of-instances
// --------------------------------
val nrOfInstances = {
if (router == NoRouting) OneNrOfInstances
else {
def invalidNrOfInstances(wasValue: Any) = new ConfigurationException(
"Config option [akka.actor.deployment." + key +
".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" +
wasValue + "]")
deployment.getAnyRef("nr-of-instances").asInstanceOf[Any] match {
case "auto" AutoNrOfInstances
case 1 OneNrOfInstances
case 0 ZeroNrOfInstances
case nrOfReplicas: Number
try {
new NrOfInstances(nrOfReplicas.intValue)
} catch {
case e: Exception throw invalidNrOfInstances(nrOfReplicas)
}
case unknown throw invalidNrOfInstances(unknown)
}
}
}
// --------------------------------
// akka.actor.deployment.<path>.create-as
// --------------------------------
val recipe: Option[ActorRecipe] =
deployment.getString("create-as.class") match {
case "" None
@ -91,7 +71,7 @@ class Deployer(val settings: ActorSystem.Settings) {
Some(ActorRecipe(implementationClass))
}
Some(Deploy(key, recipe, router, nrOfInstances, LocalScope))
Some(Deploy(key, deployment, recipe, router, LocalScope))
}
}

View file

@ -1,236 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.util.Duration
import akka.routing.RouterType
object DeploymentConfig {
// --------------------------------
// --- Deploy
// --------------------------------
case class Deploy(
path: String,
recipe: Option[ActorRecipe],
routing: Routing = NoRouting,
nrOfInstances: NrOfInstances = ZeroNrOfInstances,
scope: Scope = LocalScope)
// --------------------------------
// --- Actor Recipe
// --------------------------------
case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here
// --------------------------------
// --- Routing
// --------------------------------
sealed trait Routing
case class CustomRouter(routerClassName: String) extends Routing
// For Java API
case class NoRouting() extends Routing
case class RoundRobin() extends Routing
case class Random() extends Routing
case class ScatterGather() extends Routing
case class LeastCPU() extends Routing
case class LeastRAM() extends Routing
case class LeastMessages() extends Routing
// For Scala API
case object NoRouting extends Routing
case object RoundRobin extends Routing
case object Random extends Routing
case object ScatterGather extends Routing
case object LeastCPU extends Routing
case object LeastRAM extends Routing
case object LeastMessages extends Routing
// --------------------------------
// --- Scope
// --------------------------------
trait Scope
// For Java API
case class LocalScope() extends Scope
// For Scala API
case object LocalScope extends Scope
// --------------------------------
// --- Home
// --------------------------------
sealed trait Home
// case class Host(hostName: String) extends Home
case class Node(nodeName: String) extends Home
// case class IP(ipAddress: String) extends Home
// --------------------------------
// --- Replicas
// --------------------------------
class NrOfInstances(val factor: Int) extends Serializable {
// note that -1 is used for AutoNrOfInstances
if (factor < -1) throw new IllegalArgumentException("nr-of-instances can not be negative")
override def hashCode = 0 + factor.##
override def equals(other: Any) = NrOfInstances.unapply(this) == NrOfInstances.unapply(other)
override def toString = if (factor == -1) "NrOfInstances(auto)" else "NrOfInstances(" + factor + ")"
}
object NrOfInstances {
def apply(factor: Int): NrOfInstances = factor match {
case -1 AutoNrOfInstances
case 0 ZeroNrOfInstances
case 1 OneNrOfInstances
case x new NrOfInstances(x)
}
def unapply(other: Any) = other match {
case x: NrOfInstances import x._; Some(factor)
case _ None
}
}
// For Java API
class AutoNrOfInstances extends NrOfInstances(-1)
class ZeroNrOfInstances extends NrOfInstances(0)
class OneNrOfInstances extends NrOfInstances(1)
// For Scala API
case object AutoNrOfInstances extends AutoNrOfInstances
case object ZeroNrOfInstances extends ZeroNrOfInstances
case object OneNrOfInstances extends OneNrOfInstances
// --------------------------------
// --- Replication
// --------------------------------
sealed trait ReplicationScheme
// For Java API
case class Transient() extends ReplicationScheme
// For Scala API
case object Transient extends ReplicationScheme
case class Replication(
storage: ReplicationStorage,
strategy: ReplicationStrategy) extends ReplicationScheme
// --------------------------------
// --- ReplicationStorage
// --------------------------------
sealed trait ReplicationStorage
// For Java API
case class TransactionLog() extends ReplicationStorage
case class DataGrid() extends ReplicationStorage
// For Scala API
case object TransactionLog extends ReplicationStorage
case object DataGrid extends ReplicationStorage
// --------------------------------
// --- ReplicationStrategy
// --------------------------------
sealed trait ReplicationStrategy
// For Java API
sealed class WriteBehind extends ReplicationStrategy
sealed class WriteThrough extends ReplicationStrategy
// For Scala API
case object WriteBehind extends WriteBehind
case object WriteThrough extends WriteThrough
// --------------------------------
// --- Helper methods for parsing
// --------------------------------
def nodeNameFor(home: Home): String = home match {
case Node(nodename) nodename
// case Host("localhost") Config.nodename
// case IP("0.0.0.0") Config.nodename
// case IP("127.0.0.1") Config.nodename
// case Host(hostname) throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
// case IP(address) throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
}
def routerTypeFor(routing: Routing): RouterType = routing match {
case _: NoRouting | NoRouting RouterType.NoRouter
case _: RoundRobin | RoundRobin RouterType.RoundRobin
case _: Random | Random RouterType.Random
case _: ScatterGather | ScatterGather RouterType.ScatterGather
case _: LeastCPU | LeastCPU RouterType.LeastCPU
case _: LeastRAM | LeastRAM RouterType.LeastRAM
case _: LeastMessages | LeastMessages RouterType.LeastMessages
case CustomRouter(implClass) RouterType.Custom(implClass)
}
def isReplicated(replicationScheme: ReplicationScheme): Boolean =
isReplicatedWithTransactionLog(replicationScheme) ||
isReplicatedWithDataGrid(replicationScheme)
def isWriteBehindReplication(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
case _: Transient | Transient false
case Replication(_, strategy)
strategy match {
case _: WriteBehind | WriteBehind true
case _: WriteThrough | WriteThrough false
}
}
def isWriteThroughReplication(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
case _: Transient | Transient false
case Replication(_, strategy)
strategy match {
case _: WriteBehind | WriteBehind true
case _: WriteThrough | WriteThrough false
}
}
def isReplicatedWithTransactionLog(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
case _: Transient | Transient false
case Replication(storage, _)
storage match {
case _: TransactionLog | TransactionLog true
case _: DataGrid | DataGrid throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet")
}
}
def isReplicatedWithDataGrid(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
case _: Transient | Transient false
case Replication(storage, _)
storage match {
case _: TransactionLog | TransactionLog false
case _: DataGrid | DataGrid throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet")
}
}
}
/**
* Module holding the programmatic deployment configuration classes.
* Defines the deployment specification.
* Most values have defaults and can be left out.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DeploymentConfig(val nodename: String) {
import DeploymentConfig._
case class ClusterScope(preferredNodes: Iterable[Home] = Vector(Node(nodename)), replication: ReplicationScheme = Transient) extends Scope
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home nodeNameFor(home) == nodename)
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
case Deploy(_, _, _, _, ClusterScope(_, replicationScheme)) Some(replicationScheme)
case _ None
}
def isReplicated(deployment: Deploy): Boolean = replicationSchemeFor(deployment) match {
case Some(replicationScheme) DeploymentConfig.isReplicated(replicationScheme)
case _ false
}
}

View file

@ -1,580 +0,0 @@
/**
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor._
import DeploymentConfig._
import akka.dispatch.Future
import akka.routing._
import akka.serialization.Serializer
import akka.cluster.metrics._
import akka.util.Duration
import akka.util.duration._
import akka.AkkaException
import com.eaio.uuid.UUID
import java.net.InetSocketAddress
import java.util.concurrent.{ ConcurrentSkipListSet }
class ClusterException(message: String) extends AkkaException(message)
object ChangeListener {
/**
* Cluster membership change listener.
* For Scala API.
*/
trait ChangeListener {
def notify(event: ChangeNotification, client: ClusterNode) {
event match {
case NodeConnected(name) nodeConnected(name, client)
case NodeDisconnected(name) nodeDisconnected(name, client)
case NewLeader(name: String) newLeader(name, client)
case NewSession thisNodeNewSession(client)
case ThisNode.Connected thisNodeConnected(client)
case ThisNode.Disconnected thisNodeDisconnected(client)
case ThisNode.Expired thisNodeExpired(client)
}
}
def nodeConnected(node: String, client: ClusterNode) {}
def nodeDisconnected(node: String, client: ClusterNode) {}
def newLeader(name: String, client: ClusterNode) {}
def thisNodeNewSession(client: ClusterNode) {}
def thisNodeConnected(client: ClusterNode) {}
def thisNodeDisconnected(client: ClusterNode) {}
def thisNodeExpired(client: ClusterNode) {}
}
/**
* Cluster membership change listener.
* For Java API.
*/
abstract class ChangeListenerAdapter extends ChangeListener
sealed trait ChangeNotification
case class NodeConnected(node: String) extends ChangeNotification
case class NodeDisconnected(node: String) extends ChangeNotification
case class NewLeader(name: String) extends ChangeNotification
case object NewSession extends ChangeNotification
object ThisNode {
case object Connected extends ChangeNotification
case object Disconnected extends ChangeNotification
case object Expired extends ChangeNotification
}
}
/**
* Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class NodeAddress(val clusterName: String, val nodeName: String) {
if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string")
if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string")
override def toString = "%s:%s".format(clusterName, nodeName)
override def hashCode = 0 + clusterName.## + nodeName.##
override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other)
}
/**
* NodeAddress companion object and factory.
*/
object NodeAddress {
def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName)
def apply(system: ActorSystem): NodeAddress = new NodeAddress(system.clustername, system.nodename)
def unapply(other: Any) = other match {
case address: NodeAddress Some((address.clusterName, address.nodeName))
case _ None
}
}
/*
* Allows user to access metrics of a different nodes in the cluster. Changing metrics can be monitored
* using {@link MetricsAlterationMonitor}
* Metrics of the cluster nodes are distributed through ZooKeeper. For better performance, metrics are
* cached internally, and refreshed from ZooKeeper after an interval
*/
trait NodeMetricsManager {
/*
* Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
*/
def getLocalMetrics: NodeMetrics
/*
* Gets metrics of a specified node
* @param nodeName metrics of the node specified by the name will be returned
* @param useCached if <code>true</code>, returns metrics cached in the metrics manager,
* gets metrics directly from ZooKeeper otherwise
*/
def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics]
/*
* Gets cached metrics of all nodes in the cluster
*/
def getAllMetrics: Array[NodeMetrics]
/*
* Adds monitor that reacts, when specific conditions are satisfied
*/
def addMonitor(monitor: MetricsAlterationMonitor): Unit
/*
* Removes monitor
*/
def removeMonitor(monitor: MetricsAlterationMonitor): Unit
/*
* Removes metrics of s specified node from ZooKeeper and metrics manager cache
*/
def removeNodeMetrics(nodeName: String): Unit
/*
* Sets timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
*/
def refreshTimeout_=(newValue: Duration): Unit
/*
* Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
*/
def refreshTimeout: Duration
/*
* Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
* after <code>refreshTimeout</code>, and invokes plugged monitors
*/
def start(): NodeMetricsManager
/*
* Stops metrics manager. Stopped metrics manager doesn't refresh cache from ZooKeeper,
* and doesn't invoke plugged monitors
*/
def stop(): Unit
/*
* If the value is <code>true</code>, metrics manages is started and running. Stopped, otherwise
*/
def isRunning: Boolean
}
/**
* Interface for cluster node.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ClusterNode {
import ChangeListener._
private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
def membershipNodes: Array[String]
def nodeAddress: NodeAddress
def zkServerAddresses: String
def start()
def shutdown()
def isShutdown: Boolean
def disconnect(): ClusterNode
def reconnect(): ClusterNode
def metricsManager: NodeMetricsManager
/**
* Registers a cluster change listener.
*/
def register(listener: ChangeListener): ClusterNode
/**
* Returns the name of the current leader.
*/
def leader: String
/**
* Returns true if 'this' node is the current leader.
*/
def isLeader: Boolean
/**
* Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
*/
def resign()
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
/**
* Needed to have reflection through structural typing work.
*/
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode
/**
* Needed to have reflection through structural typing work.
*/
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/**
* Removes actor from the cluster.
*/
// def remove(actorRef: ActorRef)
/**
* Removes actor with address from the cluster.
*/
// def remove(address: String): ClusterNode
/**
* Is the actor with uuid clustered or not?
*/
def isClustered(actorAddress: String): Boolean
/**
* Is the actor with uuid in use on 'this' node or not?
*/
def isInUseOnNode(actorAddress: String): Boolean
/**
* Is the actor with uuid in use or not?
*/
def isInUseOnNode(actorAddress: String, nodeName: String): Boolean
/**
* Is the actor with uuid in use or not?
*/
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String): Option[LocalActorRef]
/**
* Using (checking out) actor on a specific set of nodes.
*/
def useActorOnNodes(nodes: Array[String], actorAddress: String, replicateFromUuid: Option[UUID])
/**
* Using (checking out) actor on all nodes in the cluster.
*/
def useActorOnAllNodes(actorAddress: String, replicateFromUuid: Option[UUID])
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(node: String, actorAddress: String, replicateFromUuid: Option[UUID])
/**
* Checks in an actor after done using it on this node.
*/
def release(actorRef: ActorRef)
/**
* Checks in an actor after done using it on this node.
*/
def release(actorAddress: String)
/**
* Creates an ActorRef with a Router to a set of clustered actors.
*/
def ref(actorAddress: String, router: RouterType): ActorRef
/**
* Returns the addresses of all actors checked out on this node.
*/
def addressesForActorsInUse: Array[String]
/**
* Returns the addresses of all actors registered in this cluster.
*/
def addressesForClusteredActors: Array[String]
/**
* Returns the addresses of all actors in use registered on a specific node.
*/
def addressesForActorsInUseOnNode(nodeName: String): Array[String]
/**
* Returns Serializer for actor with UUID.
*/
def serializerForActor(actorAddress: String): Serializer
/**
* Returns home address for actor with UUID.
*/
def inetSocketAddressesForActor(actorAddress: String): Array[(UUID, InetSocketAddress)]
/**
* Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument).
*/
def send(f: Function0[Unit], nrOfInstances: Int)
/**
* Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument).
* Returns an 'Array' with all the 'Future's from the computation.
*/
def send(f: Function0[Any], nrOfInstances: Int): List[Future[Any]]
/**
* Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument)
* with the argument speficied.
*/
def send(f: Function1[Any, Unit], arg: Any, nrOfInstances: Int)
/**
* Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument)
* with the argument speficied.
* Returns an 'Array' with all the 'Future's from the computation.
*/
def send(f: Function1[Any, Any], arg: Any, nrOfInstances: Int): List[Future[Any]]
/**
* Stores a configuration element under a specific key.
* If the key already exists then it will be overwritten.
*/
def setConfigElement(key: String, bytes: Array[Byte])
/**
* Returns the config element for the key or NULL if no element exists under the key.
* Returns <code>Some(element)</code> if it exists else <code>None</code>
*/
def getConfigElement(key: String): Option[Array[Byte]]
/**
* Removes configuration element for a specific key.
* Does nothing if the key does not exist.
*/
def removeConfigElement(key: String)
/**
* Returns a list with all config element keys.
*/
def getConfigElementKeys: Array[String]
// =============== PRIVATE METHODS ===============
// FIXME BAD BAD BAD - considering moving all these private[cluster] methods to a separate trait to get them out of the user's view
private[cluster] def remoteClientLifeCycleHandler: ActorRef
private[cluster] def remoteDaemon: ActorRef
/**
* Removes actor with uuid from the cluster.
*/
// private[cluster] def remove(uuid: UUID)
/**
* Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'.
*/
private[cluster] def releaseActorOnAllNodes(actorAddress: String)
/**
* Returns the UUIDs of all actors checked out on this node.
*/
private[cluster] def uuidsForActorsInUse: Array[UUID]
/**
* Returns the UUIDs of all actors registered in this cluster.
*/
private[cluster] def uuidsForClusteredActors: Array[UUID]
/**
* Returns the actor id for the actor with a specific UUID.
*/
private[cluster] def actorAddressForUuid(uuid: UUID): Option[String]
/**
* Returns the actor ids for all the actors with a specific UUID.
*/
private[cluster] def actorAddressForUuids(uuids: Array[UUID]): Array[String]
/**
* Returns the actor UUIDs for actor ID.
*/
private[cluster] def uuidsForActorAddress(actorAddress: String): Array[UUID]
/**
* Returns the UUIDs of all actors in use registered on a specific node.
*/
private[cluster] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID]
private[cluster] def boot()
private[cluster] def publish(change: ChangeNotification)
private[cluster] def joinCluster()
private[cluster] def joinLeaderElection: Boolean
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress)
private[cluster] def migrateActorsOnFailedNodes(
failedNodes: List[String],
currentClusterNodes: List[String],
oldClusterNodes: List[String],
disconnectedConnections: Map[String, InetSocketAddress])
private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes: Traversable[String],
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress]
private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress]
private[cluster] def membershipPathFor(node: String): String
private[cluster] def configurationPathFor(key: String): String
private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String
private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String
private[cluster] def nodeToUuidsPathFor(node: String): String
private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String
private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String
private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String
private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String
private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String
private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String
private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String
}

View file

@ -1,82 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
/*
* {@link NodeMetricsManager} periodically refershes internal cache with node metrics from MBeans / Sigar.
* Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
* If updated metrics satisfy conditions, specified in <code>reactsOn</code>,
* <code>react</code> is called
*
* @exampl {{{
* class PeakCPULoadMonitor extends LocalMetricsAlterationMonitor {
* val id = "peak-cpu-load-monitor"
*
* def reactsOn(metrics: NodeMetrics) =
* metrics.systemLoadAverage > 0.8
*
* def react(metrics: NodeMetrics) =
* println("Peak average system load at node [%s] is reached!" format (metrics.nodeName))
* }
* }}}
*
*/
trait LocalMetricsAlterationMonitor extends MetricsAlterationMonitor {
/*
* Definies conditions that must be satisfied in order to <code>react<code> on the changed metrics
*/
def reactsOn(metrics: NodeMetrics): Boolean
/*
* Reacts on the changed metrics
*/
def react(metrics: NodeMetrics): Unit
}
/*
* {@link NodeMetricsManager} periodically refershes internal cache with metrics of all nodes in the cluster
* from ZooKeeper. Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
* If updated metrics satisfy conditions, specified in <code>reactsOn</code>,
* <code>react</code> is called
*
* @exampl {{{
* class PeakCPULoadReached extends ClusterMetricsAlterationMonitor {
* val id = "peak-cpu-load-reached"
*
* def reactsOn(metrics: Array[NodeMetrics]) =
* metrics.forall(_.systemLoadAverage > 0.8)
*
* def react(metrics: Array[NodeMetrics]) =
* println("One of the nodes in the scluster has reached the peak system load!")
* }
* }}}
*
*/
trait ClusterMetricsAlterationMonitor extends MetricsAlterationMonitor {
/*
* Definies conditions that must be satisfied in order to <code>react<code> on the changed metrics
*/
def reactsOn(allMetrics: Array[NodeMetrics]): Boolean
/*
* Reacts on the changed metrics
*/
def react(allMetrics: Array[NodeMetrics]): Unit
}
sealed trait MetricsAlterationMonitor extends Comparable[MetricsAlterationMonitor] {
/*
* Unique identiifier of the monitor
*/
def id: String
def compareTo(otherMonitor: MetricsAlterationMonitor) = id.compareTo(otherMonitor.id)
}

View file

@ -1,44 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
/*
* Snapshot of the JVM / system that's the node is running on
*/
trait NodeMetrics {
/*
* Name of the node the metrics are gathered at
*/
def nodeName: String
/*
* Amount of heap memory currently used
*/
def usedHeapMemory: Long
/*
* Amount of heap memory guaranteed to be available
*/
def committedHeapMemory: Long
/*
* Maximum amount of heap memory that can be used
*/
def maxHeapMemory: Long
/*
* Number of the processors avalable to the JVM
*/
def avaiableProcessors: Int
/*
* If OS-specific Hyperic Sigar library is plugged, it's used to calculate
* average load on the CPUs in the system. Otherwise, value is retreived from monitoring MBeans.
* Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
*/
def systemLoadAverage: Double
}

View file

@ -1,7 +1,6 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.actor._
@ -9,88 +8,12 @@ import akka.actor._
import akka.japi.Creator
import java.lang.reflect.InvocationTargetException
import akka.config.ConfigurationException
import akka.actor.DeploymentConfig.Deploy
import java.util.concurrent.atomic.AtomicInteger
import akka.util.ReflectiveAccess
import akka.AkkaException
import scala.collection.JavaConversions._
import akka.routing.Routing.{ Destination, Broadcast }
import java.util.concurrent.TimeUnit
sealed trait RouterType
/**
* Used for declarative configuration of Routing.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RouterType {
/**
* A RouterType that indicates no routing - i.e. direct message.
*/
object NoRouter extends RouterType
/**
* A RouterType that randomly selects a connection to send a message to.
*/
object Random extends RouterType
/**
* A RouterType that selects the connection by using round robin.
*/
object RoundRobin extends RouterType
/**
* A RouterType that broadcasts the messages to all connections.
*/
object Broadcast extends RouterType
/**
* A RouterType that selects the connection by using scatter gather.
*/
object ScatterGather extends RouterType
/**
* A RouterType that selects the connection based on the least amount of cpu usage
*/
object LeastCPU extends RouterType
/**
* A RouterType that select the connection based on the least amount of ram used.
*/
object LeastRAM extends RouterType
/**
* A RouterType that select the connection where the actor has the least amount of messages in its mailbox.
*/
object LeastMessages extends RouterType
/**
* A user-defined custom RouterType.
*/
case class Custom(implClass: String) extends RouterType
}
/**
* An {@link AkkaException} thrown when something goes wrong while routing a message
*/
class RoutingException(message: String) extends AkkaException(message)
/**
* Contains the configuration to create local and clustered routed actor references.
* Routed ActorRef configuration object, this is thread safe and fully sharable.
*/
case class RoutedProps private[akka] (
routerFactory: () Router,
connectionManager: ConnectionManager) {
// Java API
def this(creator: Creator[Router], connectionManager: ConnectionManager) {
this(() creator.create(), connectionManager)
}
}
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
* send a message to on (or more) of these actors.
@ -98,13 +21,13 @@ case class RoutedProps private[akka] (
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
extends LocalActorRef(
_system,
_props.copy(creator = _props.routerConfig),
_props.copy(creator = () _props.routerConfig.createActor()),
_supervisor,
_path) {
val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext)
val route: Route = _props.routerConfig.createRoute(_props.creator, actorContext)
override def !(message: Any)(implicit sender: ActorRef = null) {
override def !(message: Any)(implicit sender: ActorRef = null): Unit = {
val s = if (sender eq null) underlying.system.deadLetters else sender
val msg = message match {
@ -119,68 +42,76 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
}
}
trait RouterConfig extends Function0[Actor] {
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig
def createRoute(creator: () Actor, actorContext: ActorContext): Routing.Route
}
/**
* A Router is responsible for sending a message to one (or more) of its connections.
* This trait represents a router factory: it produces the actual router actor
* and creates the routing table (a function which determines the recipients
* for each message which is to be dispatched). The resulting RoutedActorRef
* optimizes the sending of the message so that it does NOT go through the
* routers mailbox unless the route returns an empty recipient set.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* '''Caution:''' This means
* that the route function is evaluated concurrently without protection by
* the RoutedActorRef: either provide a reentrant (i.e. pure) implementation or
* do the locking yourself!
*
* '''Caution:''' Please note that the [[akka.routing.Router]] which needs to
* be returned by `apply()` should not send a message to itself in its
* constructor or `preStart()` or publish its self reference from there: if
* someone tries sending a message to that reference before the constructor of
* RoutedActorRef has returned, there will be a `NullPointerException`!
*/
trait Router {
def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[ActorRef]): Vector[ActorRef] = (nrOfInstances, targets) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(props))(scala.collection.breakOut)
case (_, xs) Vector.empty[ActorRef] ++ xs
}
}
trait RouterConfig {
/**
* A Helper class to create actor references that use routing.
*/
object Routing {
def createActor(): Router = new Router {}
sealed trait RoutingMessage
/**
* Used to broadcast a message to all connections in a router. E.g. every connection gets the message
* regardless of their routing algorithm.
*/
case class Broadcast(message: Any) extends RoutingMessage
def createCustomRouter(implClass: String): Router = {
ReflectiveAccess.createInstance(implClass, Array[Class[_]](), Array[AnyRef]()) match {
case Right(router) router.asInstanceOf[Router]
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new ConfigurationException("Could not instantiate custom Router of [" +
implClass + "] due to: " + cause, cause)
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match {
case Some(Deploy(_, _, _, NoRouter, _)) this
case Some(Deploy(_, _, _, r, _)) r
case _ this
}
}
case class Destination(sender: ActorRef, recipient: ActorRef)
type Route = (ActorRef, Any) Iterable[Destination]
def createRoute(creator: () Actor, actorContext: ActorContext): Route
protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[String]): Vector[ActorRef] = (nrOfInstances, targets) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(props))(scala.collection.breakOut)
case (_, xs) Vector.empty[ActorRef] ++ xs.map(context.actorFor(_))
}
}
/**
* Base trait for `Router` actors. Override `receive` to handle custom
* messages which the corresponding [[akka.actor.RouterConfig]] lets
* through by returning an empty route.
*/
trait Router extends Actor {
def receive = {
case _
}
}
/**
* Used to broadcast a message to all connections in a router; only the
* contained message will be forwarded, i.e. the `Broadcast(...)`
* envelope will be stripped off.
*
* Router implementations may choose to handle this message differently.
*/
case class Broadcast(message: Any)
/**
* Routing configuration that indicates no routing.
* Oxymoron style.
*/
case object NoRouter extends RouterConfig {
def adaptFromDeploy(deploy: Option[Deploy]) = null
def createRoute(creator: () Actor, actorContext: ActorContext) = null
def apply(): Actor = null
}
object RoundRobinRouter {
def apply(targets: Iterable[ActorRef]) = new RoundRobinRouter(targets = targets map (_.path.toString))
}
/**
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
* <br>
@ -192,8 +123,7 @@ case object NoRouter extends RouterConfig {
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
extends Router with RouterConfig {
case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig {
/**
* Constructor that sets nrOfInstances to be created.
@ -207,26 +137,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
* Constructor that sets the targets to be used.
* Java API
*/
def this(t: java.util.Collection[ActorRef]) = {
def this(t: java.util.Collection[String]) = {
this(targets = collectionAsScalaIterable(t))
}
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match {
case Some(d)
// In case there is a config then use this over any programmed settings.
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
case _ this
}
}
def apply(): Actor = new Actor {
def receive = {
case _
}
}
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
@ -246,6 +161,9 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
}
}
object RandomRouter {
def apply(targets: Iterable[ActorRef]) = new RandomRouter(targets = targets map (_.path.toString))
}
/**
* A Router that randomly selects one of the target connections to send a message to.
* <br>
@ -257,8 +175,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
extends Router with RouterConfig {
case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig {
/**
* Constructor that sets nrOfInstances to be created.
@ -272,32 +189,17 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni
* Constructor that sets the targets to be used.
* Java API
*/
def this(t: java.util.Collection[ActorRef]) = {
def this(t: java.util.Collection[String]) = {
this(targets = collectionAsScalaIterable(t))
}
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match {
case Some(d)
// In case there is a config then use this over any programmed settings.
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
case _ this
}
}
def apply(): Actor = new Actor {
def receive = {
case _
}
}
import java.security.SecureRandom
private val random = new ThreadLocal[SecureRandom] {
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
}
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
@ -315,6 +217,9 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni
}
}
object BroadcastRouter {
def apply(targets: Iterable[ActorRef]) = new BroadcastRouter(targets = targets map (_.path.toString))
}
/**
* A Router that uses broadcasts a message to all its connections.
* <br>
@ -326,8 +231,7 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
extends Router with RouterConfig {
case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig {
/**
* Constructor that sets nrOfInstances to be created.
@ -341,26 +245,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] =
* Constructor that sets the targets to be used.
* Java API
*/
def this(t: java.util.Collection[ActorRef]) = {
def this(t: java.util.Collection[String]) = {
this(targets = collectionAsScalaIterable(t))
}
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match {
case Some(d)
// In case there is a config then use this over any programmed settings.
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
case _ this
}
}
def apply(): Actor = new Actor {
def receive = {
case _
}
}
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
@ -374,6 +263,9 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] =
}
}
object ScatterGatherFirstCompletedRouter {
def apply(targets: Iterable[ActorRef]) = new ScatterGatherFirstCompletedRouter(targets = targets map (_.path.toString))
}
/**
* Simple router that broadcasts the message to all routees, and replies with the first response.
* <br>
@ -385,7 +277,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] =
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig {
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig {
/**
* Constructor that sets nrOfInstances to be created.
@ -399,31 +291,13 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It
* Constructor that sets the targets to be used.
* Java API
*/
def this(t: java.util.Collection[ActorRef]) = {
def this(t: java.util.Collection[String]) = {
this(targets = collectionAsScalaIterable(t))
}
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match {
case Some(d)
// In case there is a config then use this over any programmed settings.
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
case _ this
}
}
def apply(): Actor = new Actor {
def receive = {
case _
}
}
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
case (x, xs) Vector.empty[ActorRef] ++ xs
}
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
{ (sender, message)
val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME!

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import akka.actor.ActorRef
package object routing {
case class Destination(sender: ActorRef, recipient: ActorRef)
type Route = (ActorRef, Any) Iterable[Destination]
}

View file

@ -7,6 +7,29 @@
akka {
actor {
deployment {
default {
remote = "" # if this is set to a valid remote address, the named actor will be deployed at that node
# e.g. "akka://sys@host:port"
target {
nodes = [] # A list of hostnames and ports for instantiating the children of a non-direct router
# The format should be on "akka://sys@host:port", where:
# - sys is the remote actor system name
# - hostname can be either hostname or IP address the remote actor should connect to
# - port should be the port for the remote server on the other node
# The number of actor instances to be spawned is still taken from the nr-of-instances
# setting as for local routers; the instances will be distributed round-robin among the
# given nodes.
}
}
}
}
remote {
transport = "akka.remote.netty.NettyRemoteSupport"

View file

@ -10,7 +10,6 @@ import akka.actor.Status._
import akka.util._
import akka.util.duration._
import akka.util.Helpers._
import akka.actor.DeploymentConfig._
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._

View file

@ -87,7 +87,7 @@ class RemoteActorRefProvider(
*/
@scala.annotation.tailrec
def lookupRemotes(p: Iterable[String]): Option[DeploymentConfig.Deploy] = {
def lookupRemotes(p: Iterable[String]): Option[Deploy] = {
p.headOption match {
case None None
case Some("remote") lookupRemotes(p.drop(2))
@ -104,7 +104,7 @@ class RemoteActorRefProvider(
})
deployment match {
case Some(DeploymentConfig.Deploy(_, _, _, _, RemoteDeploymentConfig.RemoteScope(address)))
case Some(Deploy(_, _, _, _, RemoteScope(address)))
if (address == rootPath.address) local.actorOf(system, props, supervisor, path, false)
else address.parse(remote.transports) match {
case Left(x)

View file

@ -4,21 +4,14 @@
package akka.remote
import akka.actor._
import akka.actor.DeploymentConfig._
import akka.event.EventStream
import com.typesafe.config._
import akka.config.ConfigurationException
object RemoteDeploymentConfig {
case class RemoteScope(node: UnparsedSystemAddress[UnparsedTransportAddress]) extends DeploymentConfig.Scope
}
case class RemoteScope(node: UnparsedSystemAddress[UnparsedTransportAddress]) extends Scope
class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) {
import RemoteDeploymentConfig._
override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
import scala.collection.JavaConverters._
import akka.util.ReflectiveAccess._

View file

@ -3,7 +3,6 @@ package akka.remote.random_routed
import akka.actor.Actor
import akka.remote._
import akka.routing._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
object RandomRoutedRemoteActorMultiJvmSpec {

View file

@ -3,7 +3,6 @@ package akka.remote.round_robin_routed
import akka.actor.Actor
import akka.remote._
import akka.routing._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
object RoundRobinRoutedRemoteActorMultiJvmSpec {

View file

@ -3,7 +3,6 @@ package akka.remote.scatter_gather_routed
import akka.actor.Actor
import akka.remote._
import akka.routing._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
object ScatterGatherRoutedRemoteActorMultiJvmSpec {

View file

@ -5,9 +5,8 @@ package akka.remote
import akka.testkit._
import akka.actor._
import akka.routing._
import com.typesafe.config._
import akka.actor.DeploymentConfig._
import akka.remote.RemoteDeploymentConfig.RemoteScope
object RemoteDeployerSpec {
val deployerConf = ConfigFactory.parseString("""
@ -41,9 +40,9 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
deployment must be(Some(
Deploy(
service,
deployment.get.config,
None,
RoundRobin,
NrOfInstances(3),
RoundRobinRouter(3),
RemoteScope(UnparsedSystemAddress(Some("sys"), UnparsedTransportAddress("akka", "wallace", 2552))))))
}