merge system-cleanup into master
This commit is contained in:
commit
62032cb1ff
139 changed files with 1109 additions and 928 deletions
|
|
@ -17,47 +17,63 @@ import akka.serialization.{ Serialization, Serializer, Compression }
|
|||
import akka.serialization.Compression.LZF
|
||||
import akka.remote.RemoteProtocol._
|
||||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import com.google.protobuf.ByteString
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import akka.event.EventStream
|
||||
|
||||
/**
|
||||
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
||||
class RemoteActorRefProvider(
|
||||
val settings: ActorSystem.Settings,
|
||||
val rootPath: ActorPath,
|
||||
val eventStream: EventStream,
|
||||
val dispatcher: MessageDispatcher,
|
||||
val scheduler: Scheduler) extends ActorRefProvider {
|
||||
|
||||
val log = Logging(app, this)
|
||||
val log = Logging(eventStream, this)
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.dispatch.Promise
|
||||
|
||||
val local = new LocalActorRefProvider(app)
|
||||
val remote = new Remote(app)
|
||||
val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler)
|
||||
def deathWatch = local.deathWatch
|
||||
def guardian = local.guardian
|
||||
def systemGuardian = local.systemGuardian
|
||||
def nodename = local.nodename
|
||||
def tempName = local.tempName
|
||||
|
||||
@volatile
|
||||
var remote: Remote = _
|
||||
|
||||
private val actors = new ConcurrentHashMap[String, AnyRef]
|
||||
|
||||
private val remoteDaemonConnectionManager = new RemoteConnectionManager(app, remote)
|
||||
@volatile
|
||||
private var remoteDaemonConnectionManager: RemoteConnectionManager = _
|
||||
|
||||
def init(system: ActorSystemImpl) {
|
||||
local.init(system)
|
||||
remote = new Remote(system, nodename)
|
||||
remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
|
||||
}
|
||||
|
||||
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime
|
||||
private[akka] def terminationFuture = local.terminationFuture
|
||||
|
||||
private[akka] def deployer: Deployer = local.deployer
|
||||
|
||||
def defaultDispatcher = app.dispatcher
|
||||
def defaultTimeout = app.AkkaConfig.ActorTimeout
|
||||
def defaultDispatcher = dispatcher
|
||||
def defaultTimeout = settings.ActorTimeout
|
||||
|
||||
def scheduler: Scheduler = local.scheduler
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
|
||||
actorOf(system, props, supervisor, supervisor.path / name, systemService)
|
||||
|
||||
private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
|
||||
actorOf(props, supervisor, supervisor.path / name, systemService)
|
||||
|
||||
private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
|
||||
if (systemService) local.actorOf(props, supervisor, path, systemService)
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
|
||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
|
||||
else {
|
||||
val name = path.name
|
||||
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
|
||||
|
|
@ -76,13 +92,13 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
// case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass)
|
||||
// }
|
||||
|
||||
def isReplicaNode: Boolean = remoteAddresses exists { _ == app.address }
|
||||
def isReplicaNode: Boolean = remoteAddresses exists { _ == system.address }
|
||||
|
||||
//app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
//system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
|
||||
if (isReplicaNode) {
|
||||
// we are on one of the replica node for this remote actor
|
||||
local.actorOf(props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
|
||||
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
|
||||
} else {
|
||||
|
||||
// we are on the single "reference" node uses the remote actors on the replica nodes
|
||||
|
|
@ -119,17 +135,17 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
|
||||
val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
|
||||
val remoteAddress = RemoteAddress(a.hostname, a.port)
|
||||
conns + (remoteAddress -> RemoteActorRef(remote.server, remoteAddress, path, None))
|
||||
conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None))
|
||||
}
|
||||
|
||||
val connectionManager = new RemoteConnectionManager(app, remote, connections)
|
||||
val connectionManager = new RemoteConnectionManager(system, remote, connections)
|
||||
|
||||
connections.keys foreach { useActorOnNode(_, path.toString, props.creator) }
|
||||
connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
|
||||
|
||||
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
|
||||
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
|
||||
}
|
||||
|
||||
case deploy ⇒ local.actorOf(props, supervisor, name, systemService)
|
||||
case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
|
|
@ -137,7 +153,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
throw e
|
||||
}
|
||||
|
||||
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
|
||||
// actor foreach system.registry.register // only for ActorRegistry backward compat, will be removed later
|
||||
|
||||
newFuture completeWithResult actor
|
||||
actors.replace(path.toString, newFuture, actor)
|
||||
|
|
@ -151,9 +167,9 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
* Copied from LocalActorRefProvider...
|
||||
*/
|
||||
// FIXME: implement supervision
|
||||
def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
|
||||
def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
|
||||
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
|
||||
new RoutedActorRef(app, props, supervisor, name)
|
||||
new RoutedActorRef(system, props, supervisor, name)
|
||||
}
|
||||
|
||||
def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match {
|
||||
|
|
@ -162,6 +178,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef])
|
||||
}
|
||||
|
||||
// TODO remove me
|
||||
val optimizeLocal = new AtomicBoolean(true)
|
||||
def optimizeLocalScoped_?() = optimizeLocal.get
|
||||
|
||||
|
|
@ -177,22 +194,22 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
|
||||
val remoteAddress = RemoteAddress(actor.hostname, actor.port)
|
||||
if (optimizeLocalScoped_? && remoteAddress == app.address) {
|
||||
if (optimizeLocalScoped_? && remoteAddress == rootPath.remoteAddress) {
|
||||
local.actorFor(ActorPath.split(actor.path))
|
||||
} else {
|
||||
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.address, actor.path, remoteAddress)
|
||||
Some(RemoteActorRef(remote.server, remoteAddress, ActorPath(app, actor.path), None)) //Should it be None here
|
||||
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", rootPath.remoteAddress, actor.path, remoteAddress)
|
||||
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
*/
|
||||
def useActorOnNode(remoteAddress: RemoteAddress, actorPath: String, actorFactory: () ⇒ Actor) {
|
||||
log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.address, actorPath, remoteAddress)
|
||||
def useActorOnNode(system: ActorSystem, remoteAddress: RemoteAddress, actorPath: String, actorFactory: () ⇒ Actor) {
|
||||
log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
|
||||
|
||||
val actorFactoryBytes =
|
||||
app.serialization.serialize(actorFactory) match {
|
||||
system.serialization.serialize(actorFactory) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(bytes) ⇒ if (remote.shouldCompressData) LZF.compress(bytes) else bytes
|
||||
}
|
||||
|
|
@ -242,8 +259,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
|
||||
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
|
||||
|
||||
private[akka] def dummyAskSender = local.dummyAskSender
|
||||
|
||||
private[akka] def tempPath = local.tempPath
|
||||
}
|
||||
|
||||
|
|
@ -254,6 +269,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] case class RemoteActorRef private[akka] (
|
||||
provider: ActorRefProvider,
|
||||
remote: RemoteSupport,
|
||||
remoteAddress: RemoteAddress,
|
||||
path: ActorPath,
|
||||
|
|
@ -273,7 +289,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader)
|
||||
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout)
|
||||
|
||||
def suspend(): Unit = ()
|
||||
|
||||
|
|
@ -289,7 +305,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
}
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = remote.app.provider.serialize(this)
|
||||
private def writeReplace(): AnyRef = provider.serialize(this)
|
||||
|
||||
def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue