2011-09-15 10:20:18 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
2011-09-20 21:44:50 +02:00
|
|
|
|
package akka.remote
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
|
|
|
|
|
import akka.actor._
|
2011-10-13 17:42:26 +02:00
|
|
|
|
import akka.dispatch._
|
2011-09-19 14:43:28 +02:00
|
|
|
|
import akka.util.duration._
|
2011-12-14 17:30:54 +01:00
|
|
|
|
import akka.util.Timeout
|
2011-09-19 14:43:28 +02:00
|
|
|
|
import akka.config.ConfigurationException
|
2011-10-27 12:23:01 +02:00
|
|
|
|
import akka.event.{ DeathWatch, Logging }
|
2011-10-20 15:11:34 +02:00
|
|
|
|
import akka.serialization.Compression.LZF
|
2011-09-19 14:43:28 +02:00
|
|
|
|
import com.google.protobuf.ByteString
|
2011-11-14 14:21:53 +01:00
|
|
|
|
import akka.event.EventStream
|
2011-11-18 11:16:23 +01:00
|
|
|
|
import akka.dispatch.Promise
|
2011-12-11 20:00:26 +01:00
|
|
|
|
import akka.config.ConfigurationException
|
2011-12-14 17:26:18 +01:00
|
|
|
|
import java.util.concurrent.{ TimeoutException }
|
2011-09-19 14:43:28 +02:00
|
|
|
|
|
2011-09-15 10:20:18 +02:00
|
|
|
|
/**
|
2011-09-19 14:43:28 +02:00
|
|
|
|
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
|
2011-09-15 10:20:18 +02:00
|
|
|
|
*/
|
2011-11-14 14:21:53 +01:00
|
|
|
|
class RemoteActorRefProvider(
|
2011-11-29 16:32:50 +01:00
|
|
|
|
val systemName: String,
|
2011-11-17 11:51:14 +01:00
|
|
|
|
val settings: ActorSystem.Settings,
|
2011-11-14 14:21:53 +01:00
|
|
|
|
val eventStream: EventStream,
|
2011-11-29 16:32:50 +01:00
|
|
|
|
val scheduler: Scheduler,
|
2011-12-02 14:41:13 +01:00
|
|
|
|
_deadLetters: InternalActorRef) extends ActorRefProvider {
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
2011-11-18 11:59:43 +01:00
|
|
|
|
val log = Logging(eventStream, "RemoteActorRefProvider")
|
|
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
val remoteSettings = new RemoteSettings(settings.config, systemName)
|
|
|
|
|
|
|
2011-12-03 11:06:38 +01:00
|
|
|
|
def rootGuardian = local.rootGuardian
|
2011-11-14 14:21:53 +01:00
|
|
|
|
def guardian = local.guardian
|
|
|
|
|
|
def systemGuardian = local.systemGuardian
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def nodename = remoteSettings.NodeName
|
|
|
|
|
|
def clustername = remoteSettings.ClusterName
|
2011-12-09 20:19:59 +01:00
|
|
|
|
def terminationFuture = local.terminationFuture
|
|
|
|
|
|
def dispatcher = local.dispatcher
|
|
|
|
|
|
|
|
|
|
|
|
val deployer = new RemoteDeployer(settings)
|
2011-11-14 14:21:53 +01:00
|
|
|
|
|
2011-12-11 20:00:26 +01:00
|
|
|
|
val remote = new Remote(settings, remoteSettings)
|
|
|
|
|
|
implicit val transports = remote.transports
|
2011-12-09 20:19:59 +01:00
|
|
|
|
|
2011-12-11 20:00:26 +01:00
|
|
|
|
val rootPath: ActorPath = RootActorPath(remote.remoteAddress)
|
2011-12-08 14:44:05 +01:00
|
|
|
|
|
2011-12-11 20:00:26 +01:00
|
|
|
|
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)
|
2011-11-22 13:04:10 +01:00
|
|
|
|
|
2011-12-29 16:27:32 +01:00
|
|
|
|
val deathWatch = new RemoteDeathWatch(local.deathWatch, this)
|
|
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def init(system: ActorSystemImpl) {
|
|
|
|
|
|
local.init(system)
|
2011-12-12 15:58:23 +01:00
|
|
|
|
remote.init(system, this)
|
2011-12-09 00:02:27 +01:00
|
|
|
|
local.registerExtraNames(Map(("remote", remote.remoteDaemon)))
|
2011-12-08 14:44:05 +01:00
|
|
|
|
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
|
2011-11-16 17:18:36 +01:00
|
|
|
|
}
|
2011-09-19 14:43:28 +02:00
|
|
|
|
|
2011-12-13 10:58:09 +01:00
|
|
|
|
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = {
|
|
|
|
|
|
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy)
|
2011-10-18 14:21:48 +02:00
|
|
|
|
else {
|
2011-12-10 20:32:23 +01:00
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
* This needs to deal with “mangled” paths, which are created by remote
|
|
|
|
|
|
* deployment, also in this method. The scheme is the following:
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
|
|
|
|
|
* Whenever a remote deployment is found, create a path on that remote
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* address below “remote”, including the current system’s identification
|
|
|
|
|
|
* as “sys@host:port” (typically; it will use whatever the remote
|
|
|
|
|
|
* transport uses). This means that on a path up an actor tree each node
|
|
|
|
|
|
* change introduces one layer or “remote/sys@host:port/” within the URI.
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* Example:
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* akka://sys@home:1234/remote/sys@remote:6667/remote/sys@other:3333/user/a/b/c
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
|
|
|
|
|
* means that the logical parent originates from “sys@other:3333” with
|
|
|
|
|
|
* one child (may be “a” or “b”) being deployed on “sys@remote:6667” and
|
|
|
|
|
|
* finally either “b” or “c” being created on “sys@home:1234”, where
|
|
|
|
|
|
* this whole thing actually resides. Thus, the logical path is
|
|
|
|
|
|
* “/user/a/b/c” and the physical path contains all remote placement
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* information.
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* Deployments are always looked up using the logical path, which is the
|
|
|
|
|
|
* purpose of the lookupRemotes internal method.
|
|
|
|
|
|
*/
|
2011-10-18 11:26:35 +02:00
|
|
|
|
|
2011-12-09 18:07:42 +01:00
|
|
|
|
@scala.annotation.tailrec
|
2011-12-12 23:31:15 +01:00
|
|
|
|
def lookupRemotes(p: Iterable[String]): Option[Deploy] = {
|
2011-12-09 18:07:42 +01:00
|
|
|
|
p.headOption match {
|
|
|
|
|
|
case None ⇒ None
|
|
|
|
|
|
case Some("remote") ⇒ lookupRemotes(p.drop(2))
|
2011-12-09 20:19:59 +01:00
|
|
|
|
case Some("user") ⇒ deployer.lookup(p.drop(1).mkString("/", "/", ""))
|
2011-12-09 18:15:14 +01:00
|
|
|
|
case Some(_) ⇒ None
|
2011-12-09 18:07:42 +01:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2011-12-09 18:15:14 +01:00
|
|
|
|
val elems = path.elements
|
2011-12-13 01:09:05 +01:00
|
|
|
|
val deployment = deploy orElse (elems.head match {
|
2011-12-09 20:19:59 +01:00
|
|
|
|
case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", ""))
|
2011-12-09 18:15:14 +01:00
|
|
|
|
case "remote" ⇒ lookupRemotes(elems)
|
|
|
|
|
|
case _ ⇒ None
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
deployment match {
|
2011-12-12 23:31:15 +01:00
|
|
|
|
case Some(Deploy(_, _, _, _, RemoteScope(address))) ⇒
|
2011-12-13 10:58:09 +01:00
|
|
|
|
// FIXME RK this should be done within the deployer, i.e. the whole parsing business
|
|
|
|
|
|
address.parse(remote.transports) match {
|
2011-12-11 20:00:26 +01:00
|
|
|
|
case Left(x) ⇒
|
|
|
|
|
|
throw new ConfigurationException("cannot parse remote address: " + x)
|
|
|
|
|
|
case Right(addr) ⇒
|
2011-12-13 10:58:09 +01:00
|
|
|
|
if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment)
|
|
|
|
|
|
else {
|
|
|
|
|
|
val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements
|
|
|
|
|
|
useActorOnNode(rpath, props.creator, supervisor)
|
|
|
|
|
|
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
|
|
|
|
|
|
}
|
2011-12-09 00:02:27 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
2011-12-13 10:58:09 +01:00
|
|
|
|
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment)
|
2011-09-27 16:52:33 +02:00
|
|
|
|
}
|
2011-09-15 10:20:18 +02:00
|
|
|
|
}
|
2011-12-13 10:58:09 +01:00
|
|
|
|
}
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
2011-12-07 16:29:12 +01:00
|
|
|
|
def actorFor(path: ActorPath): InternalActorRef = path.root match {
|
2011-12-11 20:00:26 +01:00
|
|
|
|
case `rootPath` ⇒ actorFor(rootGuardian, path.elements)
|
|
|
|
|
|
case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None)
|
|
|
|
|
|
case _ ⇒ local.actorFor(path)
|
2011-12-07 16:29:12 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
2011-12-11 20:00:26 +01:00
|
|
|
|
case ParsedActorPath(address, elems) ⇒
|
2011-12-07 16:29:12 +01:00
|
|
|
|
if (address == rootPath.address) actorFor(rootGuardian, elems)
|
2011-12-09 00:02:27 +01:00
|
|
|
|
else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, Nobody, None)
|
2011-12-07 16:29:12 +01:00
|
|
|
|
case _ ⇒ local.actorFor(ref, path)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2011-12-03 11:06:38 +01:00
|
|
|
|
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
|
def ask(within: Timeout): Option[AskActorRef] = local.ask(within)
|
2011-11-03 14:53:38 +01:00
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Using (checking out) actor on a specific node.
|
|
|
|
|
|
*/
|
2011-12-09 00:02:27 +01:00
|
|
|
|
def useActorOnNode(path: ActorPath, actorFactory: () ⇒ Actor, supervisor: ActorRef) {
|
|
|
|
|
|
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path)
|
2011-09-19 14:43:28 +02:00
|
|
|
|
|
2011-12-09 00:02:27 +01:00
|
|
|
|
// we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor
|
2011-12-26 18:23:55 +01:00
|
|
|
|
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(actorFactory, path.toString, supervisor)
|
2011-09-15 10:20:18 +02:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-29 16:27:32 +01:00
|
|
|
|
trait RemoteRef extends ActorRefScope {
|
|
|
|
|
|
final def isLocal = false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2011-10-13 17:42:26 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
|
|
|
|
|
|
* This reference is network-aware (remembers its origin) and immutable.
|
|
|
|
|
|
*/
|
2011-12-07 16:29:12 +01:00
|
|
|
|
private[akka] class RemoteActorRef private[akka] (
|
2011-12-12 15:06:40 +01:00
|
|
|
|
provider: RemoteActorRefProvider,
|
2011-12-11 20:00:26 +01:00
|
|
|
|
remote: RemoteSupport[ParsedTransportAddress],
|
2011-12-07 16:29:12 +01:00
|
|
|
|
val path: ActorPath,
|
2011-12-09 00:02:27 +01:00
|
|
|
|
val getParent: InternalActorRef,
|
2011-10-13 17:42:26 +02:00
|
|
|
|
loader: Option[ClassLoader])
|
2011-12-29 16:27:32 +01:00
|
|
|
|
extends InternalActorRef with RemoteRef {
|
2011-11-08 11:56:46 +01:00
|
|
|
|
|
2011-12-09 00:02:27 +01:00
|
|
|
|
def getChild(name: Iterator[String]): InternalActorRef = {
|
2011-12-09 18:07:42 +01:00
|
|
|
|
val s = name.toStream
|
|
|
|
|
|
s.headOption match {
|
|
|
|
|
|
case None ⇒ this
|
|
|
|
|
|
case Some("..") ⇒ getParent getChild name
|
|
|
|
|
|
case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody, loader)
|
|
|
|
|
|
}
|
2011-12-09 00:02:27 +01:00
|
|
|
|
}
|
2011-12-03 11:06:38 +01:00
|
|
|
|
|
2011-10-13 17:42:26 +02:00
|
|
|
|
@volatile
|
|
|
|
|
|
private var running: Boolean = true
|
|
|
|
|
|
|
2011-11-23 19:03:56 +01:00
|
|
|
|
def isTerminated: Boolean = !running
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this, loader)
|
2011-10-18 15:39:26 +02:00
|
|
|
|
|
2011-12-07 16:29:12 +01:00
|
|
|
|
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
|
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
|
|
|
|
|
provider.ask(timeout) match {
|
|
|
|
|
|
case Some(a) ⇒
|
|
|
|
|
|
this.!(message)(a)
|
|
|
|
|
|
a.result
|
|
|
|
|
|
case None ⇒
|
|
|
|
|
|
this.!(message)(null)
|
2011-12-14 17:26:18 +01:00
|
|
|
|
Promise[Any]()(provider.dispatcher)
|
2011-12-12 15:06:40 +01:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def suspend(): Unit = sendSystemMessage(Suspend())
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def resume(): Unit = sendSystemMessage(Resume())
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def stop(): Unit = sendSystemMessage(Terminate())
|
|
|
|
|
|
|
|
|
|
|
|
def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause))
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
|
|
|
|
|
@throws(classOf[java.io.ObjectStreamException])
|
2011-12-01 14:29:33 +01:00
|
|
|
|
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
|
2011-10-13 17:42:26 +02:00
|
|
|
|
}
|
2011-12-26 18:23:55 +01:00
|
|
|
|
|
|
|
|
|
|
class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
|
|
|
|
|
|
|
|
|
|
|
|
def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
|
2011-12-29 16:27:32 +01:00
|
|
|
|
case r: RemoteRef ⇒
|
2011-12-26 18:23:55 +01:00
|
|
|
|
val ret = local.subscribe(watcher, watched)
|
|
|
|
|
|
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
|
|
|
|
|
|
ret
|
2011-12-29 16:27:32 +01:00
|
|
|
|
case l: LocalRef ⇒
|
2011-12-26 18:23:55 +01:00
|
|
|
|
local.subscribe(watcher, watched)
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass)
|
|
|
|
|
|
false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
|
|
|
|
|
|
|
|
|
|
|
|
def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
|
|
|
|
|
|
|
|
|
|
|
|
def publish(event: Terminated): Unit = local.publish(event)
|
|
|
|
|
|
|
|
|
|
|
|
}
|