Added RemoteActorRefProvider which deploys and instantiates actor on a remote host.
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
990d49236b
commit
cd4a3c3a1a
12 changed files with 503 additions and 131 deletions
|
|
@ -21,7 +21,7 @@ class DeployerSpec extends WordSpec with MustMatchers {
|
|||
None,
|
||||
LeastCPU,
|
||||
RemoveConnectionOnFirstFailureRemoteFailureDetector,
|
||||
Clustered(
|
||||
ClusterScope(
|
||||
List(Node("node1")),
|
||||
new ReplicationFactor(3),
|
||||
Replication(
|
||||
|
|
|
|||
|
|
@ -194,13 +194,13 @@ object Actor {
|
|||
/**
|
||||
* Handle to the ClusterNode. API for the cluster client.
|
||||
*/
|
||||
lazy val cluster: ClusterNode = ClusterModule.node
|
||||
// lazy val cluster: ClusterNode = ClusterModule.node
|
||||
|
||||
/**
|
||||
* Handle to the RemoteSupport. API for the remote client/server.
|
||||
* Only for internal use.
|
||||
*/
|
||||
private[akka] lazy val remote: RemoteSupport = cluster.remoteService
|
||||
private[akka] lazy val remote: RemoteSupport = RemoteModule.remoteService.server
|
||||
|
||||
/**
|
||||
* This decorator adds invocation logging to a Receive function.
|
||||
|
|
@ -248,10 +248,6 @@ object Actor {
|
|||
* actor ! message
|
||||
* actor.stop()
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf[MyActor]
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor: Manifest](address: String): ActorRef =
|
||||
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address)
|
||||
|
|
@ -265,10 +261,6 @@ object Actor {
|
|||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf[MyActor]
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor: Manifest]: ActorRef =
|
||||
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString)
|
||||
|
|
@ -282,10 +274,6 @@ object Actor {
|
|||
* actor ! message
|
||||
* actor.stop()
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(classOf[MyActor])
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor](clazz: Class[T]): ActorRef = actorOf(clazz, new UUID().toString)
|
||||
|
||||
|
|
@ -297,10 +285,6 @@ object Actor {
|
|||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(classOf[MyActor])
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = actorOf(Props(clazz), address)
|
||||
|
||||
|
|
@ -316,10 +300,6 @@ object Actor {
|
|||
* actor ! message
|
||||
* actor.stop()
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(new MyActor)
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor](factory: ⇒ T): ActorRef = actorOf(factory, newUuid().toString)
|
||||
|
||||
|
|
@ -335,10 +315,6 @@ object Actor {
|
|||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(new MyActor)
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor](creator: ⇒ T, address: String): ActorRef = actorOf(Props(creator), address)
|
||||
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class LocalActorRefProvider extends ActorRefProvider {
|
|||
|
||||
Deployer.lookupDeploymentFor(deployId) match { // see if the deployment already exists, if so use it, if not create actor
|
||||
|
||||
case Some(Deploy(_, _, router, _, Local)) ⇒
|
||||
case Some(Deploy(_, _, router, _, LocalScope)) ⇒
|
||||
// FIXME create RoutedActorRef if 'router' is specified
|
||||
Some(new LocalActorRef(props, address, systemService)) // create a local actor
|
||||
|
||||
|
|
@ -134,7 +134,7 @@ class LocalActorRefProvider extends ActorRefProvider {
|
|||
|
||||
// def actorOf(props: Props, address: String): Option[ActorRef] = {
|
||||
// deploy match {
|
||||
// case Deploy(configAddress, recipe, router, failureDetector, Clustered(preferredHomeNodes, replicas, replication)) ⇒
|
||||
// case Deploy(configAddress, recipe, router, failureDetector, Cluster(preferredHomeNodes, replicas, replication)) ⇒
|
||||
|
||||
// ClusterModule.ensureEnabled()
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ object Deployer extends ActorDeployer {
|
|||
def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
|
||||
|
||||
def isLocal(deployment: Deploy): Boolean = deployment match {
|
||||
case Deploy(_, _, _, _, Local) | Deploy(_, _, _, _, _: Local) ⇒ true
|
||||
case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
|
|
@ -122,7 +122,7 @@ object Deployer extends ActorDeployer {
|
|||
val addressPath = "akka.actor.deployment." + address
|
||||
configuration.getSection(addressPath) match {
|
||||
case None ⇒
|
||||
Some(Deploy(address, None, Direct, RemoveConnectionOnFirstFailureLocalFailureDetector, Local))
|
||||
Some(Deploy(address, None, Direct, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope))
|
||||
|
||||
case Some(addressConfig) ⇒
|
||||
|
||||
|
|
@ -163,91 +163,105 @@ object Deployer extends ActorDeployer {
|
|||
ActorRecipe(implementationClass)
|
||||
}
|
||||
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster
|
||||
// --------------------------------
|
||||
addressConfig.getSection("cluster") match {
|
||||
case None ⇒
|
||||
Some(Deploy(address, recipe, router, RemoveConnectionOnFirstFailureLocalFailureDetector, Local)) // deploy locally
|
||||
addressConfig.getSection("remote") match {
|
||||
case Some(remoteConfig) ⇒ // we have a 'remote' config section
|
||||
|
||||
case Some(clusterConfig) ⇒
|
||||
if (addressConfig.getSection("cluster").isDefined) throw new ConfigurationException(
|
||||
"Configuration for deployment ID [" + address + "] can not have both 'remote' and 'cluster' sections.")
|
||||
|
||||
val hostname = remoteConfig.getString("hostname", "localhost")
|
||||
val port = remoteConfig.getInt("port", 2552)
|
||||
|
||||
Some(Deploy(address, recipe, router, failureDetector, RemoteScope(hostname, port)))
|
||||
|
||||
case None ⇒ // check for 'cluster' config section
|
||||
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster.preferred-nodes
|
||||
// akka.actor.deployment.<address>.cluster
|
||||
// --------------------------------
|
||||
addressConfig.getSection("cluster") match {
|
||||
case None ⇒
|
||||
Some(Deploy(address, recipe, router, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) // deploy locally
|
||||
|
||||
val preferredNodes = clusterConfig.getList("preferred-nodes") match {
|
||||
case Nil ⇒ Nil
|
||||
case homes ⇒
|
||||
def raiseHomeConfigError() = throw new ConfigurationException(
|
||||
"Config option [" + addressPath +
|
||||
".cluster.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" +
|
||||
homes + "]")
|
||||
case Some(clusterConfig) ⇒
|
||||
|
||||
homes map { home ⇒
|
||||
if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster.preferred-nodes
|
||||
// --------------------------------
|
||||
|
||||
val tokenizer = new java.util.StringTokenizer(home, ":")
|
||||
val protocol = tokenizer.nextElement
|
||||
val address = tokenizer.nextElement.asInstanceOf[String]
|
||||
val preferredNodes = clusterConfig.getList("preferred-nodes") match {
|
||||
case Nil ⇒ Nil
|
||||
case homes ⇒
|
||||
def raiseHomeConfigError() = throw new ConfigurationException(
|
||||
"Config option [" + addressPath +
|
||||
".cluster.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" +
|
||||
homes + "]")
|
||||
|
||||
protocol match {
|
||||
//case "host" ⇒ Host(address)
|
||||
case "node" ⇒ Node(address)
|
||||
//case "ip" ⇒ IP(address)
|
||||
case _ ⇒ raiseHomeConfigError()
|
||||
}
|
||||
}
|
||||
}
|
||||
homes map { home ⇒
|
||||
if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
|
||||
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster.replicas
|
||||
// --------------------------------
|
||||
val replicationFactor = {
|
||||
if (router == Direct) new ReplicationFactor(1)
|
||||
else {
|
||||
clusterConfig.getAny("replication-factor", "0") match {
|
||||
case "auto" ⇒ AutoReplicationFactor
|
||||
case "0" ⇒ ZeroReplicationFactor
|
||||
case nrOfReplicas: String ⇒
|
||||
try {
|
||||
new ReplicationFactor(nrOfReplicas.toInt)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Config option [" + addressPath +
|
||||
".cluster.replicas] needs to be either [\"auto\"] or [0-N] - was [" +
|
||||
nrOfReplicas + "]")
|
||||
val tokenizer = new java.util.StringTokenizer(home, ":")
|
||||
val protocol = tokenizer.nextElement
|
||||
val address = tokenizer.nextElement.asInstanceOf[String]
|
||||
|
||||
protocol match {
|
||||
//case "host" ⇒ Host(address)
|
||||
case "node" ⇒ Node(address)
|
||||
//case "ip" ⇒ IP(address)
|
||||
case _ ⇒ raiseHomeConfigError()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster.replication
|
||||
// --------------------------------
|
||||
clusterConfig.getSection("replication") match {
|
||||
case None ⇒
|
||||
Some(Deploy(address, recipe, router, failureDetector, Clustered(preferredNodes, replicationFactor, Transient)))
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster.replicas
|
||||
// --------------------------------
|
||||
val replicationFactor = {
|
||||
if (router == Direct) new ReplicationFactor(1)
|
||||
else {
|
||||
clusterConfig.getAny("replication-factor", "0") match {
|
||||
case "auto" ⇒ AutoReplicationFactor
|
||||
case "0" ⇒ ZeroReplicationFactor
|
||||
case nrOfReplicas: String ⇒
|
||||
try {
|
||||
new ReplicationFactor(nrOfReplicas.toInt)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Config option [" + addressPath +
|
||||
".cluster.replicas] needs to be either [\"auto\"] or [0-N] - was [" +
|
||||
nrOfReplicas + "]")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case Some(replicationConfig) ⇒
|
||||
val storage = replicationConfig.getString("storage", "transaction-log") match {
|
||||
case "transaction-log" ⇒ TransactionLog
|
||||
case "data-grid" ⇒ DataGrid
|
||||
case unknown ⇒
|
||||
throw new ConfigurationException("Config option [" + addressPath +
|
||||
".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" +
|
||||
unknown + "]")
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.cluster.replication
|
||||
// --------------------------------
|
||||
clusterConfig.getSection("replication") match {
|
||||
case None ⇒
|
||||
Some(Deploy(address, recipe, router, failureDetector, ClusterScope(preferredNodes, replicationFactor, Transient)))
|
||||
|
||||
case Some(replicationConfig) ⇒
|
||||
val storage = replicationConfig.getString("storage", "transaction-log") match {
|
||||
case "transaction-log" ⇒ TransactionLog
|
||||
case "data-grid" ⇒ DataGrid
|
||||
case unknown ⇒
|
||||
throw new ConfigurationException("Config option [" + addressPath +
|
||||
".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" +
|
||||
unknown + "]")
|
||||
}
|
||||
val strategy = replicationConfig.getString("strategy", "write-through") match {
|
||||
case "write-through" ⇒ WriteThrough
|
||||
case "write-behind" ⇒ WriteBehind
|
||||
case unknown ⇒
|
||||
throw new ConfigurationException("Config option [" + addressPath +
|
||||
".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
|
||||
unknown + "]")
|
||||
}
|
||||
Some(Deploy(address, recipe, router, failureDetector, ClusterScope(preferredNodes, replicationFactor, Replication(storage, strategy))))
|
||||
}
|
||||
val strategy = replicationConfig.getString("strategy", "write-through") match {
|
||||
case "write-through" ⇒ WriteThrough
|
||||
case "write-behind" ⇒ WriteBehind
|
||||
case unknown ⇒
|
||||
throw new ConfigurationException("Config option [" + addressPath +
|
||||
".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
|
||||
unknown + "]")
|
||||
}
|
||||
Some(Deploy(address, recipe, router, failureDetector, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ object DeploymentConfig {
|
|||
recipe: Option[ActorRecipe],
|
||||
routing: Routing = Direct,
|
||||
failureDetector: FailureDetector = RemoveConnectionOnFirstFailureLocalFailureDetector,
|
||||
scope: Scope = Local) {
|
||||
scope: Scope = LocalScope) {
|
||||
Address.validate(address)
|
||||
}
|
||||
|
||||
|
|
@ -73,16 +73,20 @@ object DeploymentConfig {
|
|||
// --- Scope
|
||||
// --------------------------------
|
||||
sealed trait Scope
|
||||
case class Clustered(
|
||||
case class ClusterScope(
|
||||
preferredNodes: Iterable[Home] = Vector(Node(Config.nodename)),
|
||||
replicas: ReplicationFactor = ZeroReplicationFactor,
|
||||
replication: ReplicationScheme = Transient) extends Scope
|
||||
|
||||
case class RemoteScope(
|
||||
hostname: String = "localhost",
|
||||
port: Int = 2552) extends Scope
|
||||
|
||||
// For Java API
|
||||
case class Local() extends Scope
|
||||
case class LocalScope() extends Scope
|
||||
|
||||
// For Scala API
|
||||
case object Local extends Scope
|
||||
case object LocalScope extends Scope
|
||||
|
||||
// --------------------------------
|
||||
// --- Home
|
||||
|
|
@ -200,7 +204,7 @@ object DeploymentConfig {
|
|||
}
|
||||
|
||||
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
|
||||
case Deploy(_, _, _, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme)
|
||||
case Deploy(_, _, _, _, ClusterScope(_, _, replicationScheme)) ⇒ Some(replicationScheme)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -197,10 +197,6 @@ trait ClusterNode {
|
|||
|
||||
def zkServerAddresses: String
|
||||
|
||||
def remoteService: RemoteSupport
|
||||
|
||||
def remoteServerAddress: InetSocketAddress
|
||||
|
||||
def start()
|
||||
|
||||
def shutdown()
|
||||
|
|
@ -390,12 +386,6 @@ trait ClusterNode {
|
|||
*/
|
||||
def use[T <: Actor](actorAddress: String): Option[LocalActorRef]
|
||||
|
||||
/**
|
||||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef]
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific set of nodes.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -19,6 +19,13 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
import java.io.{ PrintWriter, PrintStream }
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
class RemoteException(message: String) extends AkkaException(message)
|
||||
|
||||
trait RemoteService {
|
||||
def server: RemoteSupport
|
||||
def address: InetSocketAddress
|
||||
}
|
||||
|
||||
trait RemoteModule {
|
||||
val UUID_PREFIX = "uuid:".intern
|
||||
|
||||
|
|
@ -49,7 +56,7 @@ trait RemoteModule {
|
|||
else {
|
||||
val actorRef =
|
||||
Deployer.lookupDeploymentFor(address) match {
|
||||
case Some(Deploy(_, router, _, Clustered(home, _, _))) ⇒
|
||||
case Some(Deploy(_, router, _, Cluster(home, _, _))) ⇒
|
||||
|
||||
if (DeploymentConfig.isHomeNode(home)) { // on home node
|
||||
Actor.registry.actorFor(address) match { // try to look up in actor registry
|
||||
|
|
|
|||
|
|
@ -4,16 +4,15 @@
|
|||
|
||||
package akka.util
|
||||
|
||||
import akka.dispatch.MessageInvocation
|
||||
import akka.config.{ Config, ModuleNotAvailableException }
|
||||
import akka.cluster.RemoteSupport
|
||||
import akka.actor._
|
||||
import DeploymentConfig.ReplicationScheme
|
||||
import akka.dispatch.MessageInvocation
|
||||
import akka.config.{ Config, ModuleNotAvailableException }
|
||||
import akka.event.EventHandler
|
||||
import akka.cluster.ClusterNode
|
||||
import akka.cluster.{ RemoteSupport, ClusterNode, RemoteService }
|
||||
import akka.routing.{ RoutedProps, Router }
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import akka.routing.{ RoutedProps, Router }
|
||||
|
||||
/**
|
||||
* Helper class for reflective access to different modules in order to allow optional loading of modules.
|
||||
|
|
@ -152,6 +151,18 @@ object ReflectiveAccess {
|
|||
}
|
||||
}
|
||||
|
||||
lazy val remoteInstance: Option[RemoteService] = getObjectFor("akka.cluster.Remote$") match {
|
||||
case Right(value) ⇒ Some(value)
|
||||
case Left(exception) ⇒
|
||||
EventHandler.debug(this, exception.toString)
|
||||
None
|
||||
}
|
||||
|
||||
lazy val remoteService: RemoteService = {
|
||||
ensureEnabled()
|
||||
remoteInstance.get
|
||||
}
|
||||
|
||||
val remoteSupportClass = getClassFor[RemoteSupport](TRANSPORT) match {
|
||||
case Right(value) ⇒ Some(value)
|
||||
case Left(exception) ⇒
|
||||
|
|
|
|||
|
|
@ -734,13 +734,7 @@ class DefaultClusterNode private[akka] (
|
|||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, serializerForActor(actorAddress))
|
||||
|
||||
/**
|
||||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = {
|
||||
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = {
|
||||
val nodeName = nodeAddress.nodeName
|
||||
|
||||
val actorFactoryPath = actorAddressRegistryPathFor(actorAddress)
|
||||
|
|
@ -1233,7 +1227,7 @@ class DefaultClusterNode private[akka] (
|
|||
if (actorAddress.isDefined) {
|
||||
// use 'preferred-nodes' in deployment config for the actor
|
||||
Deployer.deploymentFor(actorAddress.get) match {
|
||||
case Deploy(_, _, _, _, Clustered(nodes, _, _)) ⇒
|
||||
case Deploy(_, _, _, _, Cluster(nodes, _, _)) ⇒
|
||||
nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor
|
||||
case _ ⇒
|
||||
throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.actor._
|
||||
import DeploymentConfig._
|
||||
import Actor._
|
||||
import Status._
|
||||
import akka.event.EventHandler
|
||||
import akka.AkkaException
|
||||
import RemoteProtocol._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
/**
|
||||
* Remote ActorRefProvider.
|
||||
*/
|
||||
class RemoteActorRefProvider extends ActorRefProvider {
|
||||
|
||||
def actorOf(props: Props, address: String): Option[ActorRef] = {
|
||||
Address.validate(address)
|
||||
|
||||
val actorRef = Actor.remote.actors.get(address)
|
||||
if (actorRef ne null) Some(actorRef)
|
||||
else {
|
||||
// if 'Props.deployId' is not specified then use 'address' as 'deployId'
|
||||
val deployId = props.deployId match {
|
||||
case Props.`defaultDeployId` | null ⇒ address
|
||||
case other ⇒ other
|
||||
}
|
||||
|
||||
Deployer.lookupDeploymentFor(deployId) match {
|
||||
case Some(Deploy(_, _, router, _, RemoteConfig(host, port))) ⇒
|
||||
// FIXME create RoutedActorRef if 'router' is specified
|
||||
|
||||
val inetSocketAddress = null
|
||||
Some(createRemoteActorRef(address, inetSocketAddress)) // create a remote actor
|
||||
|
||||
case deploy ⇒ None // non-remote actor
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def findActorRef(address: String): Option[ActorRef] = throw new UnsupportedOperationException
|
||||
|
||||
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
|
||||
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
|
||||
}
|
||||
|
||||
private def sendCommandToConnection(
|
||||
connection: ActorRef,
|
||||
command: RemoteDaemonMessageProtocol,
|
||||
async: Boolean = true) {
|
||||
|
||||
if (async) {
|
||||
connection ! command
|
||||
} else {
|
||||
try {
|
||||
(connection ? (command, Remote.remoteDaemonAckTimeout)).as[Status] match {
|
||||
case Some(Success(status)) ⇒
|
||||
EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(status))
|
||||
|
||||
case Some(Failure(cause)) ⇒
|
||||
EventHandler.error(cause, this, cause.toString)
|
||||
throw cause
|
||||
|
||||
case None ⇒
|
||||
val error = new RemoteException("Remote command to [%s] timed out".format(connection.address))
|
||||
EventHandler.error(error, this, error.toString)
|
||||
throw error
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
291
akka-remote/src/main/scala/akka/cluster/RemoteDaemon.scala
Normal file
291
akka-remote/src/main/scala/akka/cluster/RemoteDaemon.scala
Normal file
|
|
@ -0,0 +1,291 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
|
||||
import akka.config.{ Config, Supervision }
|
||||
import Supervision._
|
||||
import Status._
|
||||
import Config._
|
||||
import akka.util._
|
||||
import duration._
|
||||
import Helpers._
|
||||
import DeploymentConfig._
|
||||
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
|
||||
import ActorSerialization._
|
||||
import Compression.LZF
|
||||
import RemoteProtocol._
|
||||
import RemoteDaemonMessageType._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import com.eaio.uuid.UUID
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Remote extends RemoteService {
|
||||
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
|
||||
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
|
||||
|
||||
val hostname = Config.hostname
|
||||
val port = Config.remoteServerPort
|
||||
|
||||
val remoteAddress = "akka-remote-daemon".intern
|
||||
|
||||
// FIXME configure computeGridDispatcher to what?
|
||||
val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build
|
||||
|
||||
private[cluster] lazy val remoteDaemon = new LocalActorRef(
|
||||
Props(new RemoteDaemon).copy(dispatcher = new PinnedDispatcher()),
|
||||
Remote.remoteAddress,
|
||||
systemService = true)
|
||||
|
||||
private[cluster] lazy val remoteDaemonSupervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
|
||||
Supervise(
|
||||
remoteDaemon,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
|
||||
private[cluster] lazy val remoteClientLifeCycleHandler = actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule()
|
||||
case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule()
|
||||
case _ ⇒ //ignore other
|
||||
}
|
||||
}), "akka.cluster.RemoteClientLifeCycleListener")
|
||||
|
||||
lazy val server: RemoteSupport = {
|
||||
val remote = new akka.cluster.netty.NettyRemoteSupport
|
||||
remote.start(hostname, port)
|
||||
remote.register(Remote.remoteAddress, remoteDaemon)
|
||||
remote.addListener(RemoteFailureDetector.channel)
|
||||
remote.addListener(remoteClientLifeCycleHandler)
|
||||
remote
|
||||
}
|
||||
|
||||
lazy val address = server.address
|
||||
|
||||
def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow)
|
||||
|
||||
def uuidToUuidProtocol(uuid: UUID): UuidProtocol =
|
||||
UuidProtocol.newBuilder
|
||||
.setHigh(uuid.getTime)
|
||||
.setLow(uuid.getClockSeqAndNode)
|
||||
.build
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal "daemon" actor for cluster internal communication.
|
||||
*
|
||||
* It acts as the brain of the cluster that responds to cluster events (messages) and undertakes action.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteDaemon extends Actor {
|
||||
|
||||
import Remote._
|
||||
|
||||
override def preRestart(reason: Throwable, msg: Option[Any]) {
|
||||
EventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason))
|
||||
}
|
||||
|
||||
def receive: Receive = {
|
||||
case message: RemoteDaemonMessageProtocol ⇒
|
||||
EventHandler.debug(this,
|
||||
"Received command [\n%s] to RemoteDaemon on [%s]".format(message, address))
|
||||
|
||||
message.getMessageType match {
|
||||
case USE ⇒ handleUse(message)
|
||||
case RELEASE ⇒ handleRelease(message)
|
||||
// case STOP ⇒ cluster.shutdown()
|
||||
// case DISCONNECT ⇒ cluster.disconnect()
|
||||
// case RECONNECT ⇒ cluster.reconnect()
|
||||
// case RESIGN ⇒ cluster.resign()
|
||||
// case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message)
|
||||
case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message)
|
||||
case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message)
|
||||
case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message)
|
||||
case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message)
|
||||
//TODO: should we not deal with unrecognized message types?
|
||||
}
|
||||
|
||||
case unknown ⇒ EventHandler.warning(this, "Unknown message [%s]".format(unknown))
|
||||
}
|
||||
|
||||
def handleUse(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
try {
|
||||
if (message.hasActorAddress) {
|
||||
val props =
|
||||
Serialization.deserialize(propsBytes, classOf[Props], None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[Props]
|
||||
}
|
||||
|
||||
val actorAddress = message.getActorAddress
|
||||
val newActorRef = actorOf(props)
|
||||
|
||||
Remote.server.register(actorAddress, newActorRef)
|
||||
|
||||
// if (message.hasReplicateActorFromUuid) {
|
||||
|
||||
// def deserializeMessages(entriesAsBytes: Vector[Array[Byte]]): Vector[AnyRef] = {
|
||||
// import akka.cluster.RemoteProtocol._
|
||||
// import akka.cluster.MessageSerializer
|
||||
|
||||
// entriesAsBytes map { bytes ⇒
|
||||
// val messageBytes =
|
||||
// if (shouldCompressData) LZF.uncompress(bytes)
|
||||
// else bytes
|
||||
// MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
|
||||
// }
|
||||
// }
|
||||
// def createActorRefToUseForReplay(snapshotAsBytes: Option[Array[Byte]], actorAddress: String, newActorRef: LocalActorRef): ActorRef = {
|
||||
// snapshotAsBytes match {
|
||||
|
||||
// // we have a new actor ref - the snapshot
|
||||
// case Some(bytes) ⇒
|
||||
// // stop the new actor ref and use the snapshot instead
|
||||
// //TODO: What if that actor already has been retrieved and is being used??
|
||||
// //So do we have a race here?
|
||||
// server.unregister(actorAddress)
|
||||
|
||||
// // deserialize the snapshot actor ref and register it as remote actor
|
||||
// val uncompressedBytes =
|
||||
// if (shouldCompressData) LZF.uncompress(bytes)
|
||||
// else bytes
|
||||
|
||||
// val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid)
|
||||
// server.register(actorAddress, snapshotActorRef)
|
||||
|
||||
// // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently
|
||||
// //shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef
|
||||
// //have the same UUID (which they should)
|
||||
// //newActorRef.stop()
|
||||
|
||||
// snapshotActorRef
|
||||
|
||||
// // we have no snapshot - use the new actor ref
|
||||
// case None ⇒
|
||||
// newActorRef
|
||||
// }
|
||||
// }
|
||||
|
||||
// // replication is used - fetch the messages and replay them
|
||||
// val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
|
||||
// val deployment = Deployer.deploymentFor(actorAddress)
|
||||
// val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
|
||||
// throw new IllegalStateException(
|
||||
// "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
|
||||
// val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
|
||||
|
||||
// try {
|
||||
// // get the transaction log for the actor UUID
|
||||
// val readonlyTxLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
|
||||
|
||||
// // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
|
||||
// val (snapshotAsBytes, entriesAsBytes) = readonlyTxLog.latestSnapshotAndSubsequentEntries
|
||||
|
||||
// // deserialize and restore actor snapshot. This call will automatically recreate a transaction log.
|
||||
// val actorRef = createActorRefToUseForReplay(snapshotAsBytes, actorAddress, newActorRef)
|
||||
|
||||
// // deserialize the messages
|
||||
// val messages: Vector[AnyRef] = deserializeMessages(entriesAsBytes)
|
||||
|
||||
// EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
|
||||
|
||||
// // replay all messages
|
||||
// messages foreach { message ⇒
|
||||
// EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
|
||||
|
||||
// // FIXME how to handle '?' messages?
|
||||
// // We can *not* replay them with the correct semantics. Should we:
|
||||
// // 1. Ignore/drop them and log warning?
|
||||
// // 2. Throw exception when about to log them?
|
||||
// // 3. Other?
|
||||
// actorRef ! message
|
||||
// }
|
||||
|
||||
// } catch {
|
||||
// case e: Throwable ⇒
|
||||
// EventHandler.error(e, this, e.toString)
|
||||
// throw e
|
||||
// }
|
||||
// }
|
||||
|
||||
} else {
|
||||
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote daemon command [%s]".format(message))
|
||||
}
|
||||
|
||||
self.reply(Success(address.toString))
|
||||
} catch {
|
||||
case error: Throwable ⇒
|
||||
self.reply(Failure(error))
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
def handleRelease(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
if (message.hasActorUuid) {
|
||||
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒
|
||||
cluster.release(address)
|
||||
}
|
||||
} else if (message.hasActorAddress) {
|
||||
cluster release message.getActorAddress
|
||||
} else {
|
||||
EventHandler.warning(this,
|
||||
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message))
|
||||
}
|
||||
}
|
||||
|
||||
def handle_fun0_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
new LocalActorRef(
|
||||
Props(
|
||||
self ⇒ {
|
||||
case f: Function0[_] ⇒ try { f() } finally { self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
|
||||
}
|
||||
|
||||
def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
new LocalActorRef(
|
||||
Props(
|
||||
self ⇒ {
|
||||
case f: Function0[_] ⇒ try { self.reply(f()) } finally { self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
|
||||
}
|
||||
|
||||
def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
new LocalActorRef(
|
||||
Props(
|
||||
self ⇒ {
|
||||
case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
||||
}
|
||||
|
||||
def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
new LocalActorRef(
|
||||
Props(
|
||||
self ⇒ {
|
||||
case (fun: Function[_, _], param: Any) ⇒ try { self.reply(fun.asInstanceOf[Any ⇒ Any](param)) } finally { self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
|
||||
}
|
||||
|
||||
def handleFailover(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
|
||||
// val (from, to) = payloadFor(message, classOf[(InetSocketremoteAddress, InetSocketremoteAddress)])
|
||||
// cluster.failOverClusterActorRefConnections(from, to)
|
||||
}
|
||||
|
||||
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
|
||||
Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[T]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -53,6 +53,10 @@ akka {
|
|||
# "circuit-breaker"
|
||||
# or: fully qualified class name of the router class
|
||||
# default is "remove-connection-on-first-remote-failure";
|
||||
remote {
|
||||
hostname = "localhost" # The remote server hostname or IP address the remote actor should connect to
|
||||
port = 2552 # The remote server port the remote actor should connect to
|
||||
}
|
||||
|
||||
cluster { # defines the actor as a clustered actor
|
||||
# default (if omitted) is local non-clustered actor
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue