Added support for custom user-defined routers
This commit is contained in:
parent
e779690aa1
commit
d31057dacf
7 changed files with 49 additions and 36 deletions
|
|
@ -158,14 +158,14 @@ class LocalActorRefProvider extends ActorRefProvider {
|
||||||
// create a routed actor ref
|
// create a routed actor ref
|
||||||
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒
|
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒
|
||||||
val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
|
val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
|
||||||
case RouterType.Direct ⇒ () ⇒ new DirectRouter
|
case RouterType.Direct ⇒ () ⇒ new DirectRouter
|
||||||
case RouterType.Random ⇒ () ⇒ new RandomRouter
|
case RouterType.Random ⇒ () ⇒ new RandomRouter
|
||||||
case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter
|
case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter
|
||||||
case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter
|
case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter
|
||||||
case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
||||||
case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
||||||
case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
||||||
case RouterType.Custom ⇒ sys.error("Router Custom not supported yet")
|
case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
val connections: Iterable[ActorRef] =
|
val connections: Iterable[ActorRef] =
|
||||||
|
|
|
||||||
|
|
@ -138,12 +138,7 @@ object Deployer extends ActorDeployer {
|
||||||
case "least-cpu" ⇒ LeastCPU
|
case "least-cpu" ⇒ LeastCPU
|
||||||
case "least-ram" ⇒ LeastRAM
|
case "least-ram" ⇒ LeastRAM
|
||||||
case "least-messages" ⇒ LeastMessages
|
case "least-messages" ⇒ LeastMessages
|
||||||
case customRouterClassName ⇒
|
case routerClassName ⇒ CustomRouter(routerClassName)
|
||||||
createInstance[AnyRef](customRouterClassName, emptyParams, emptyArguments).fold(
|
|
||||||
e ⇒ throw new ConfigurationException(
|
|
||||||
"Config option [" + addressPath + ".router] needs to be one of " +
|
|
||||||
"[\"direct\", \"round-robin\", \"random\", \"scatter-gather\", \"least-cpu\", \"least-ram\", \"least-messages\" or the fully qualified name of Router class]", e),
|
|
||||||
CustomRouter(_))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------------------
|
// --------------------------------
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ object DeploymentConfig {
|
||||||
// --- Routing
|
// --- Routing
|
||||||
// --------------------------------
|
// --------------------------------
|
||||||
sealed trait Routing
|
sealed trait Routing
|
||||||
case class CustomRouter(router: AnyRef) extends Routing
|
case class CustomRouter(routerClassName: String) extends Routing
|
||||||
|
|
||||||
// For Java API
|
// For Java API
|
||||||
case class Direct() extends Routing
|
case class Direct() extends Routing
|
||||||
|
|
@ -194,21 +194,21 @@ object DeploymentConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
def routerTypeFor(routing: Routing): RouterType = routing match {
|
def routerTypeFor(routing: Routing): RouterType = routing match {
|
||||||
case Direct ⇒ RouterType.Direct
|
case Direct ⇒ RouterType.Direct
|
||||||
case Direct() ⇒ RouterType.Direct
|
case Direct() ⇒ RouterType.Direct
|
||||||
case RoundRobin ⇒ RouterType.RoundRobin
|
case RoundRobin ⇒ RouterType.RoundRobin
|
||||||
case RoundRobin() ⇒ RouterType.RoundRobin
|
case RoundRobin() ⇒ RouterType.RoundRobin
|
||||||
case Random ⇒ RouterType.Random
|
case Random ⇒ RouterType.Random
|
||||||
case Random() ⇒ RouterType.Random
|
case Random() ⇒ RouterType.Random
|
||||||
case ScatterGather ⇒ RouterType.ScatterGather
|
case ScatterGather ⇒ RouterType.ScatterGather
|
||||||
case ScatterGather() ⇒ RouterType.ScatterGather
|
case ScatterGather() ⇒ RouterType.ScatterGather
|
||||||
case LeastCPU ⇒ RouterType.LeastCPU
|
case LeastCPU ⇒ RouterType.LeastCPU
|
||||||
case LeastCPU() ⇒ RouterType.LeastCPU
|
case LeastCPU() ⇒ RouterType.LeastCPU
|
||||||
case LeastRAM ⇒ RouterType.LeastRAM
|
case LeastRAM ⇒ RouterType.LeastRAM
|
||||||
case LeastRAM() ⇒ RouterType.LeastRAM
|
case LeastRAM() ⇒ RouterType.LeastRAM
|
||||||
case LeastMessages ⇒ RouterType.LeastMessages
|
case LeastMessages ⇒ RouterType.LeastMessages
|
||||||
case LeastMessages() ⇒ RouterType.LeastMessages
|
case LeastMessages() ⇒ RouterType.LeastMessages
|
||||||
case c: CustomRouter ⇒ throw new UnsupportedOperationException("Unknown Router [" + c + "]")
|
case CustomRouter(implClass) ⇒ RouterType.Custom(implClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
|
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ object RouterType {
|
||||||
/**
|
/**
|
||||||
* A user-defined custom RouterType.
|
* A user-defined custom RouterType.
|
||||||
*/
|
*/
|
||||||
object Custom extends RouterType
|
case class Custom(implClass: String) extends RouterType
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,24 @@ object Routing {
|
||||||
if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
|
if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
|
||||||
else new RoutedActorRef(props, address)
|
else new RoutedActorRef(props, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def createCustomRouter(implClass: String): Router = {
|
||||||
|
ReflectiveAccess.createInstance(
|
||||||
|
implClass,
|
||||||
|
Array[Class[_]](),
|
||||||
|
Array[AnyRef]()) match {
|
||||||
|
case Right(router) ⇒ router.asInstanceOf[Router]
|
||||||
|
case Left(exception) ⇒
|
||||||
|
val cause = exception match {
|
||||||
|
case i: InvocationTargetException ⇒ i.getTargetException
|
||||||
|
case _ ⇒ exception
|
||||||
|
}
|
||||||
|
throw new ConfigurationException(
|
||||||
|
"Could not instantiate custom Router of [" +
|
||||||
|
implClass + "] due to: " +
|
||||||
|
cause, cause)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ object FailureDetector {
|
||||||
implClass,
|
implClass,
|
||||||
Array[Class[_]](),
|
Array[Class[_]](),
|
||||||
Array[AnyRef]()) match {
|
Array[AnyRef]()) match {
|
||||||
case Right(actor) ⇒ actor
|
case Right(failureDetector) ⇒ failureDetector
|
||||||
case Left(exception) ⇒
|
case Left(exception) ⇒
|
||||||
val cause = exception match {
|
val cause = exception match {
|
||||||
case i: InvocationTargetException ⇒ i.getTargetException
|
case i: InvocationTargetException ⇒ i.getTargetException
|
||||||
|
|
|
||||||
|
|
@ -91,10 +91,10 @@ class RemoteActorRefProvider extends ActorRefProvider {
|
||||||
.format(address, remoteAddresses.mkString(", ")))
|
.format(address, remoteAddresses.mkString(", ")))
|
||||||
() ⇒ new ScatterGatherFirstCompletedRouter
|
() ⇒ new ScatterGatherFirstCompletedRouter
|
||||||
|
|
||||||
case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
||||||
case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
||||||
case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
||||||
case RouterType.Custom ⇒ sys.error("Router Custom not supported yet")
|
case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
var connections = Map.empty[InetSocketAddress, ActorRef]
|
var connections = Map.empty[InetSocketAddress, ActorRef]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue