diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 1edfea0055..c64816aebb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -21,7 +21,7 @@ class DeployerSpec extends WordSpec with MustMatchers { None, LeastCPU, RemoveConnectionOnFirstFailureRemoteFailureDetector, - Clustered( + ClusterScope( List(Node("node1")), new ReplicationFactor(3), Replication( diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d610ca6102..364501f01e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -194,13 +194,13 @@ object Actor { /** * Handle to the ClusterNode. API for the cluster client. */ - lazy val cluster: ClusterNode = ClusterModule.node + // lazy val cluster: ClusterNode = ClusterModule.node /** * Handle to the RemoteSupport. API for the remote client/server. * Only for internal use. */ - private[akka] lazy val remote: RemoteSupport = cluster.remoteService + private[akka] lazy val remote: RemoteSupport = RemoteModule.remoteService.server /** * This decorator adds invocation logging to a Receive function. @@ -248,10 +248,6 @@ object Actor { * actor ! message * actor.stop() * - * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]
-   * 
*/ def actorOf[T <: Actor: Manifest](address: String): ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) @@ -265,10 +261,6 @@ object Actor { * actor ! message * actor.stop * - * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]
-   * 
*/ def actorOf[T <: Actor: Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString) @@ -282,10 +274,6 @@ object Actor { * actor ! message * actor.stop() * - * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor])
-   * 
*/ def actorOf[T <: Actor](clazz: Class[T]): ActorRef = actorOf(clazz, new UUID().toString) @@ -297,10 +285,6 @@ object Actor { * actor ! message * actor.stop * - * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor])
-   * 
*/ def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = actorOf(Props(clazz), address) @@ -316,10 +300,6 @@ object Actor { * actor ! message * actor.stop() * - * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(new MyActor)
-   * 
*/ def actorOf[T <: Actor](factory: ⇒ T): ActorRef = actorOf(factory, newUuid().toString) @@ -335,10 +315,6 @@ object Actor { * actor ! message * actor.stop * - * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(new MyActor)
-   * 
*/ def actorOf[T <: Actor](creator: ⇒ T, address: String): ActorRef = actorOf(Props(creator), address) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index edb67e55ac..cb9b856955 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -120,7 +120,7 @@ class LocalActorRefProvider extends ActorRefProvider { Deployer.lookupDeploymentFor(deployId) match { // see if the deployment already exists, if so use it, if not create actor - case Some(Deploy(_, _, router, _, Local)) ⇒ + case Some(Deploy(_, _, router, _, LocalScope)) ⇒ // FIXME create RoutedActorRef if 'router' is specified Some(new LocalActorRef(props, address, systemService)) // create a local actor @@ -134,7 +134,7 @@ class LocalActorRefProvider extends ActorRefProvider { // def actorOf(props: Props, address: String): Option[ActorRef] = { // deploy match { -// case Deploy(configAddress, recipe, router, failureDetector, Clustered(preferredHomeNodes, replicas, replication)) ⇒ +// case Deploy(configAddress, recipe, router, failureDetector, Cluster(preferredHomeNodes, replicas, replication)) ⇒ // ClusterModule.ensureEnabled() diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 5a70d4c806..03c059a836 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -46,7 +46,7 @@ object Deployer extends ActorDeployer { def deploy(deployment: Deploy): Unit = instance.deploy(deployment) def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, _, _, Local) | Deploy(_, _, _, _, _: Local) ⇒ true + case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) ⇒ true case _ ⇒ false } @@ -122,7 +122,7 @@ object Deployer extends ActorDeployer { val addressPath = "akka.actor.deployment." + address configuration.getSection(addressPath) match { case None ⇒ - Some(Deploy(address, None, Direct, RemoveConnectionOnFirstFailureLocalFailureDetector, Local)) + Some(Deploy(address, None, Direct, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) case Some(addressConfig) ⇒ @@ -163,91 +163,105 @@ object Deployer extends ActorDeployer { ActorRecipe(implementationClass) } - // -------------------------------- - // akka.actor.deployment.
.cluster - // -------------------------------- - addressConfig.getSection("cluster") match { - case None ⇒ - Some(Deploy(address, recipe, router, RemoveConnectionOnFirstFailureLocalFailureDetector, Local)) // deploy locally + addressConfig.getSection("remote") match { + case Some(remoteConfig) ⇒ // we have a 'remote' config section - case Some(clusterConfig) ⇒ + if (addressConfig.getSection("cluster").isDefined) throw new ConfigurationException( + "Configuration for deployment ID [" + address + "] can not have both 'remote' and 'cluster' sections.") + + val hostname = remoteConfig.getString("hostname", "localhost") + val port = remoteConfig.getInt("port", 2552) + + Some(Deploy(address, recipe, router, failureDetector, RemoteScope(hostname, port))) + + case None ⇒ // check for 'cluster' config section // -------------------------------- - // akka.actor.deployment.
.cluster.preferred-nodes + // akka.actor.deployment.
.cluster // -------------------------------- + addressConfig.getSection("cluster") match { + case None ⇒ + Some(Deploy(address, recipe, router, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) // deploy locally - val preferredNodes = clusterConfig.getList("preferred-nodes") match { - case Nil ⇒ Nil - case homes ⇒ - def raiseHomeConfigError() = throw new ConfigurationException( - "Config option [" + addressPath + - ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" + - homes + "]") + case Some(clusterConfig) ⇒ - homes map { home ⇒ - if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError() + // -------------------------------- + // akka.actor.deployment.
.cluster.preferred-nodes + // -------------------------------- - val tokenizer = new java.util.StringTokenizer(home, ":") - val protocol = tokenizer.nextElement - val address = tokenizer.nextElement.asInstanceOf[String] + val preferredNodes = clusterConfig.getList("preferred-nodes") match { + case Nil ⇒ Nil + case homes ⇒ + def raiseHomeConfigError() = throw new ConfigurationException( + "Config option [" + addressPath + + ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" + + homes + "]") - protocol match { - //case "host" ⇒ Host(address) - case "node" ⇒ Node(address) - //case "ip" ⇒ IP(address) - case _ ⇒ raiseHomeConfigError() - } - } - } + homes map { home ⇒ + if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError() - // -------------------------------- - // akka.actor.deployment.
.cluster.replicas - // -------------------------------- - val replicationFactor = { - if (router == Direct) new ReplicationFactor(1) - else { - clusterConfig.getAny("replication-factor", "0") match { - case "auto" ⇒ AutoReplicationFactor - case "0" ⇒ ZeroReplicationFactor - case nrOfReplicas: String ⇒ - try { - new ReplicationFactor(nrOfReplicas.toInt) - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Config option [" + addressPath + - ".cluster.replicas] needs to be either [\"auto\"] or [0-N] - was [" + - nrOfReplicas + "]") + val tokenizer = new java.util.StringTokenizer(home, ":") + val protocol = tokenizer.nextElement + val address = tokenizer.nextElement.asInstanceOf[String] + + protocol match { + //case "host" ⇒ Host(address) + case "node" ⇒ Node(address) + //case "ip" ⇒ IP(address) + case _ ⇒ raiseHomeConfigError() + } } } - } - } - // -------------------------------- - // akka.actor.deployment.
.cluster.replication - // -------------------------------- - clusterConfig.getSection("replication") match { - case None ⇒ - Some(Deploy(address, recipe, router, failureDetector, Clustered(preferredNodes, replicationFactor, Transient))) + // -------------------------------- + // akka.actor.deployment.
.cluster.replicas + // -------------------------------- + val replicationFactor = { + if (router == Direct) new ReplicationFactor(1) + else { + clusterConfig.getAny("replication-factor", "0") match { + case "auto" ⇒ AutoReplicationFactor + case "0" ⇒ ZeroReplicationFactor + case nrOfReplicas: String ⇒ + try { + new ReplicationFactor(nrOfReplicas.toInt) + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Config option [" + addressPath + + ".cluster.replicas] needs to be either [\"auto\"] or [0-N] - was [" + + nrOfReplicas + "]") + } + } + } + } - case Some(replicationConfig) ⇒ - val storage = replicationConfig.getString("storage", "transaction-log") match { - case "transaction-log" ⇒ TransactionLog - case "data-grid" ⇒ DataGrid - case unknown ⇒ - throw new ConfigurationException("Config option [" + addressPath + - ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + - unknown + "]") + // -------------------------------- + // akka.actor.deployment.
.cluster.replication + // -------------------------------- + clusterConfig.getSection("replication") match { + case None ⇒ + Some(Deploy(address, recipe, router, failureDetector, ClusterScope(preferredNodes, replicationFactor, Transient))) + + case Some(replicationConfig) ⇒ + val storage = replicationConfig.getString("storage", "transaction-log") match { + case "transaction-log" ⇒ TransactionLog + case "data-grid" ⇒ DataGrid + case unknown ⇒ + throw new ConfigurationException("Config option [" + addressPath + + ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + + unknown + "]") + } + val strategy = replicationConfig.getString("strategy", "write-through") match { + case "write-through" ⇒ WriteThrough + case "write-behind" ⇒ WriteBehind + case unknown ⇒ + throw new ConfigurationException("Config option [" + addressPath + + ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + + unknown + "]") + } + Some(Deploy(address, recipe, router, failureDetector, ClusterScope(preferredNodes, replicationFactor, Replication(storage, strategy)))) } - val strategy = replicationConfig.getString("strategy", "write-through") match { - case "write-through" ⇒ WriteThrough - case "write-behind" ⇒ WriteBehind - case unknown ⇒ - throw new ConfigurationException("Config option [" + addressPath + - ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + - unknown + "]") - } - Some(Deploy(address, recipe, router, failureDetector, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy)))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 73e1067239..4d5fdbfc10 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -24,7 +24,7 @@ object DeploymentConfig { recipe: Option[ActorRecipe], routing: Routing = Direct, failureDetector: FailureDetector = RemoveConnectionOnFirstFailureLocalFailureDetector, - scope: Scope = Local) { + scope: Scope = LocalScope) { Address.validate(address) } @@ -73,16 +73,20 @@ object DeploymentConfig { // --- Scope // -------------------------------- sealed trait Scope - case class Clustered( + case class ClusterScope( preferredNodes: Iterable[Home] = Vector(Node(Config.nodename)), replicas: ReplicationFactor = ZeroReplicationFactor, replication: ReplicationScheme = Transient) extends Scope + case class RemoteScope( + hostname: String = "localhost", + port: Int = 2552) extends Scope + // For Java API - case class Local() extends Scope + case class LocalScope() extends Scope // For Scala API - case object Local extends Scope + case object LocalScope extends Scope // -------------------------------- // --- Home @@ -200,7 +204,7 @@ object DeploymentConfig { } def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match { - case Deploy(_, _, _, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme) + case Deploy(_, _, _, _, ClusterScope(_, _, replicationScheme)) ⇒ Some(replicationScheme) case _ ⇒ None } diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 38ab4ad6df..390f734da1 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -197,10 +197,6 @@ trait ClusterNode { def zkServerAddresses: String - def remoteService: RemoteSupport - - def remoteServerAddress: InetSocketAddress - def start() def shutdown() @@ -390,12 +386,6 @@ trait ClusterNode { */ def use[T <: Actor](actorAddress: String): Option[LocalActorRef] - /** - * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available - * for remote access through lookup by its UUID. - */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] - /** * Using (checking out) actor on a specific set of nodes. */ diff --git a/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala b/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala index be56c3b0b0..bbd14df088 100644 --- a/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala @@ -19,6 +19,13 @@ import java.util.concurrent.ConcurrentHashMap import java.io.{ PrintWriter, PrintStream } import java.lang.reflect.InvocationTargetException +class RemoteException(message: String) extends AkkaException(message) + +trait RemoteService { + def server: RemoteSupport + def address: InetSocketAddress +} + trait RemoteModule { val UUID_PREFIX = "uuid:".intern @@ -49,7 +56,7 @@ trait RemoteModule { else { val actorRef = Deployer.lookupDeploymentFor(address) match { - case Some(Deploy(_, router, _, Clustered(home, _, _))) ⇒ + case Some(Deploy(_, router, _, Cluster(home, _, _))) ⇒ if (DeploymentConfig.isHomeNode(home)) { // on home node Actor.registry.actorFor(address) match { // try to look up in actor registry diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 7938966770..6f15bcc1b6 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,16 +4,15 @@ package akka.util -import akka.dispatch.MessageInvocation -import akka.config.{ Config, ModuleNotAvailableException } -import akka.cluster.RemoteSupport import akka.actor._ import DeploymentConfig.ReplicationScheme +import akka.dispatch.MessageInvocation +import akka.config.{ Config, ModuleNotAvailableException } import akka.event.EventHandler -import akka.cluster.ClusterNode +import akka.cluster.{ RemoteSupport, ClusterNode, RemoteService } +import akka.routing.{ RoutedProps, Router } import java.net.InetSocketAddress -import akka.routing.{ RoutedProps, Router } /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -152,6 +151,18 @@ object ReflectiveAccess { } } + lazy val remoteInstance: Option[RemoteService] = getObjectFor("akka.cluster.Remote$") match { + case Right(value) ⇒ Some(value) + case Left(exception) ⇒ + EventHandler.debug(this, exception.toString) + None + } + + lazy val remoteService: RemoteService = { + ensureEnabled() + remoteInstance.get + } + val remoteSupportClass = getClassFor[RemoteSupport](TRANSPORT) match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index aa5675ff4e..de0b8527c0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -734,13 +734,7 @@ class DefaultClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, serializerForActor(actorAddress)) - - /** - * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available - * for remote access through lookup by its UUID. - */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = { + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = { val nodeName = nodeAddress.nodeName val actorFactoryPath = actorAddressRegistryPathFor(actorAddress) @@ -1233,7 +1227,7 @@ class DefaultClusterNode private[akka] ( if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor Deployer.deploymentFor(actorAddress.get) match { - case Deploy(_, _, _, _, Clustered(nodes, _, _)) ⇒ + case Deploy(_, _, _, _, Cluster(nodes, _, _)) ⇒ nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor case _ ⇒ throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered") diff --git a/akka-remote/src/main/scala/akka/cluster/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/cluster/RemoteActorRefProvider.scala new file mode 100644 index 0000000000..174832aa76 --- /dev/null +++ b/akka-remote/src/main/scala/akka/cluster/RemoteActorRefProvider.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor._ +import DeploymentConfig._ +import Actor._ +import Status._ +import akka.event.EventHandler +import akka.AkkaException +import RemoteProtocol._ + +import java.net.InetSocketAddress + +/** + * Remote ActorRefProvider. + */ +class RemoteActorRefProvider extends ActorRefProvider { + + def actorOf(props: Props, address: String): Option[ActorRef] = { + Address.validate(address) + + val actorRef = Actor.remote.actors.get(address) + if (actorRef ne null) Some(actorRef) + else { + // if 'Props.deployId' is not specified then use 'address' as 'deployId' + val deployId = props.deployId match { + case Props.`defaultDeployId` | null ⇒ address + case other ⇒ other + } + + Deployer.lookupDeploymentFor(deployId) match { + case Some(Deploy(_, _, router, _, RemoteConfig(host, port))) ⇒ + // FIXME create RoutedActorRef if 'router' is specified + + val inetSocketAddress = null + Some(createRemoteActorRef(address, inetSocketAddress)) // create a remote actor + + case deploy ⇒ None // non-remote actor + } + } + } + + def findActorRef(address: String): Option[ActorRef] = throw new UnsupportedOperationException + + private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = { + RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) + } + + private def sendCommandToConnection( + connection: ActorRef, + command: RemoteDaemonMessageProtocol, + async: Boolean = true) { + + if (async) { + connection ! command + } else { + try { + (connection ? (command, Remote.remoteDaemonAckTimeout)).as[Status] match { + case Some(Success(status)) ⇒ + EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(status)) + + case Some(Failure(cause)) ⇒ + EventHandler.error(cause, this, cause.toString) + throw cause + + case None ⇒ + val error = new RemoteException("Remote command to [%s] timed out".format(connection.address)) + EventHandler.error(error, this, error.toString) + throw error + } + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString)) + throw e + } + } + } +} diff --git a/akka-remote/src/main/scala/akka/cluster/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/cluster/RemoteDaemon.scala new file mode 100644 index 0000000000..c9e1ca3bbd --- /dev/null +++ b/akka-remote/src/main/scala/akka/cluster/RemoteDaemon.scala @@ -0,0 +1,291 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor._ +import Actor._ +import akka.event.EventHandler +import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } +import akka.config.{ Config, Supervision } +import Supervision._ +import Status._ +import Config._ +import akka.util._ +import duration._ +import Helpers._ +import DeploymentConfig._ +import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } +import ActorSerialization._ +import Compression.LZF +import RemoteProtocol._ +import RemoteDaemonMessageType._ + +import java.net.InetSocketAddress + +import com.eaio.uuid.UUID + +/** + * @author Jonas Bonér + */ +object Remote extends RemoteService { + val shouldCompressData = config.getBool("akka.cluster.use-compression", false) + val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt + + val hostname = Config.hostname + val port = Config.remoteServerPort + + val remoteAddress = "akka-remote-daemon".intern + + // FIXME configure computeGridDispatcher to what? + val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build + + private[cluster] lazy val remoteDaemon = new LocalActorRef( + Props(new RemoteDaemon).copy(dispatcher = new PinnedDispatcher()), + Remote.remoteAddress, + systemService = true) + + private[cluster] lazy val remoteDaemonSupervisor = Supervisor( + SupervisorConfig( + OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want? + Supervise( + remoteDaemon, + Permanent) + :: Nil)) + + private[cluster] lazy val remoteClientLifeCycleHandler = actorOf(Props(new Actor { + def receive = { + case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule() + case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule() + case _ ⇒ //ignore other + } + }), "akka.cluster.RemoteClientLifeCycleListener") + + lazy val server: RemoteSupport = { + val remote = new akka.cluster.netty.NettyRemoteSupport + remote.start(hostname, port) + remote.register(Remote.remoteAddress, remoteDaemon) + remote.addListener(RemoteFailureDetector.channel) + remote.addListener(remoteClientLifeCycleHandler) + remote + } + + lazy val address = server.address + + def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow) + + def uuidToUuidProtocol(uuid: UUID): UuidProtocol = + UuidProtocol.newBuilder + .setHigh(uuid.getTime) + .setLow(uuid.getClockSeqAndNode) + .build +} + +/** + * Internal "daemon" actor for cluster internal communication. + * + * It acts as the brain of the cluster that responds to cluster events (messages) and undertakes action. + * + * @author Jonas Bonér + */ +class RemoteDaemon extends Actor { + + import Remote._ + + override def preRestart(reason: Throwable, msg: Option[Any]) { + EventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason)) + } + + def receive: Receive = { + case message: RemoteDaemonMessageProtocol ⇒ + EventHandler.debug(this, + "Received command [\n%s] to RemoteDaemon on [%s]".format(message, address)) + + message.getMessageType match { + case USE ⇒ handleUse(message) + case RELEASE ⇒ handleRelease(message) + // case STOP ⇒ cluster.shutdown() + // case DISCONNECT ⇒ cluster.disconnect() + // case RECONNECT ⇒ cluster.reconnect() + // case RESIGN ⇒ cluster.resign() + // case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message) + case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message) + case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message) + case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message) + case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message) + //TODO: should we not deal with unrecognized message types? + } + + case unknown ⇒ EventHandler.warning(this, "Unknown message [%s]".format(unknown)) + } + + def handleUse(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + try { + if (message.hasActorAddress) { + val props = + Serialization.deserialize(propsBytes, classOf[Props], None) match { + case Left(error) ⇒ throw error + case Right(instance) ⇒ instance.asInstanceOf[Props] + } + + val actorAddress = message.getActorAddress + val newActorRef = actorOf(props) + + Remote.server.register(actorAddress, newActorRef) + + // if (message.hasReplicateActorFromUuid) { + + // def deserializeMessages(entriesAsBytes: Vector[Array[Byte]]): Vector[AnyRef] = { + // import akka.cluster.RemoteProtocol._ + // import akka.cluster.MessageSerializer + + // entriesAsBytes map { bytes ⇒ + // val messageBytes = + // if (shouldCompressData) LZF.uncompress(bytes) + // else bytes + // MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) + // } + // } + // def createActorRefToUseForReplay(snapshotAsBytes: Option[Array[Byte]], actorAddress: String, newActorRef: LocalActorRef): ActorRef = { + // snapshotAsBytes match { + + // // we have a new actor ref - the snapshot + // case Some(bytes) ⇒ + // // stop the new actor ref and use the snapshot instead + // //TODO: What if that actor already has been retrieved and is being used?? + // //So do we have a race here? + // server.unregister(actorAddress) + + // // deserialize the snapshot actor ref and register it as remote actor + // val uncompressedBytes = + // if (shouldCompressData) LZF.uncompress(bytes) + // else bytes + + // val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid) + // server.register(actorAddress, snapshotActorRef) + + // // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently + // //shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef + // //have the same UUID (which they should) + // //newActorRef.stop() + + // snapshotActorRef + + // // we have no snapshot - use the new actor ref + // case None ⇒ + // newActorRef + // } + // } + + // // replication is used - fetch the messages and replay them + // val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) + // val deployment = Deployer.deploymentFor(actorAddress) + // val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( + // throw new IllegalStateException( + // "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) + // val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) + + // try { + // // get the transaction log for the actor UUID + // val readonlyTxLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) + + // // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) + // val (snapshotAsBytes, entriesAsBytes) = readonlyTxLog.latestSnapshotAndSubsequentEntries + + // // deserialize and restore actor snapshot. This call will automatically recreate a transaction log. + // val actorRef = createActorRefToUseForReplay(snapshotAsBytes, actorAddress, newActorRef) + + // // deserialize the messages + // val messages: Vector[AnyRef] = deserializeMessages(entriesAsBytes) + + // EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) + + // // replay all messages + // messages foreach { message ⇒ + // EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) + + // // FIXME how to handle '?' messages? + // // We can *not* replay them with the correct semantics. Should we: + // // 1. Ignore/drop them and log warning? + // // 2. Throw exception when about to log them? + // // 3. Other? + // actorRef ! message + // } + + // } catch { + // case e: Throwable ⇒ + // EventHandler.error(e, this, e.toString) + // throw e + // } + // } + + } else { + EventHandler.error(this, "Actor 'address' is not defined, ignoring remote daemon command [%s]".format(message)) + } + + self.reply(Success(address.toString)) + } catch { + case error: Throwable ⇒ + self.reply(Failure(error)) + throw error + } + } + + def handleRelease(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + if (message.hasActorUuid) { + cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒ + cluster.release(address) + } + } else if (message.hasActorAddress) { + cluster release message.getActorAddress + } else { + EventHandler.warning(this, + "None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message)) + } + } + + def handle_fun0_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + new LocalActorRef( + Props( + self ⇒ { + case f: Function0[_] ⇒ try { f() } finally { self.stop() } + }).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) + } + + def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + new LocalActorRef( + Props( + self ⇒ { + case f: Function0[_] ⇒ try { self.reply(f()) } finally { self.stop() } + }).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) + } + + def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + new LocalActorRef( + Props( + self ⇒ { + case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { self.stop() } + }).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) + } + + def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + new LocalActorRef( + Props( + self ⇒ { + case (fun: Function[_, _], param: Any) ⇒ try { self.reply(fun.asInstanceOf[Any ⇒ Any](param)) } finally { self.stop() } + }).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + } + + def handleFailover(message: RemoteProtocol.RemoteDaemonMessageProtocol) { + // val (from, to) = payloadFor(message, classOf[(InetSocketremoteAddress, InetSocketremoteAddress)]) + // cluster.failOverClusterActorRefConnections(from, to) + } + + private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { + Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { + case Left(error) ⇒ throw error + case Right(instance) ⇒ instance.asInstanceOf[T] + } + } +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index e29f2dacd8..fa410192a6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -53,6 +53,10 @@ akka { # "circuit-breaker" # or: fully qualified class name of the router class # default is "remove-connection-on-first-remote-failure"; + remote { + hostname = "localhost" # The remote server hostname or IP address the remote actor should connect to + port = 2552 # The remote server port the remote actor should connect to + } cluster { # defines the actor as a clustered actor # default (if omitted) is local non-clustered actor