Merge branch 'master' of github.com:jboner/akka

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-11-24 21:05:41 +01:00
commit 0a1740cd6d
295 changed files with 12263 additions and 3513 deletions

View file

@ -13,54 +13,80 @@ import akka.dispatch._
import akka.util.duration._
import akka.config.ConfigurationException
import akka.event.{ DeathWatch, Logging }
import akka.serialization.{ Serialization, Serializer, Compression }
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicBoolean
import akka.event.EventStream
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
import java.net.InetAddress
import akka.serialization.SerializationExtension
/**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
class RemoteActorRefProvider(
val settings: ActorSystem.Settings,
val eventStream: EventStream,
val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(app, this)
val log = Logging(eventStream, "RemoteActorRefProvider")
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
val local = new LocalActorRefProvider(app)
val remote = new Remote(app)
def deathWatch = local.deathWatch
def guardian = local.guardian
def systemGuardian = local.systemGuardian
def nodename = local.nodename
def clustername = local.clustername
def tempName = local.tempName
private val actors = new ConcurrentHashMap[String, AnyRef]
private val remoteDaemonConnectionManager = new RemoteConnectionManager(app, remote)
/*
* The problem is that ActorRefs need a reference to the ActorSystem to
* provide their service. Hence they cannot be created while the
* constructors of ActorSystem and ActorRefProvider are still running.
* The solution is to split out that last part into an init() method,
* but it also requires these references to be @volatile and lazy.
*/
@volatile
private var system: ActorSystemImpl = _
private lazy val remoteExtension = RemoteExtension(system)
private lazy val serializationExtension = SerializationExtension(system)
lazy val rootPath: ActorPath = {
val remoteAddress = RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
new RootActorPath(remoteAddress)
}
private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath,
remoteExtension.settings.NodeName, remoteExtension.settings.ClusterName)
private[akka] lazy val remote = new Remote(system, nodename)
private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
def init(_system: ActorSystemImpl) {
system = _system
local.init(_system)
terminationFuture.onComplete(_ remote.server.shutdown())
}
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime
private[akka] def terminationFuture = local.terminationFuture
private[akka] def deployer: Deployer = local.deployer
def defaultDispatcher = app.dispatcher
def defaultTimeout = app.AkkaConfig.ActorTimeout
def dispatcher = local.dispatcher
def defaultTimeout = settings.ActorTimeout
def scheduler: Scheduler = local.scheduler
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
actorOf(system, props, supervisor, supervisor.path / name, systemService)
private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
actorOf(props, supervisor, supervisor.path / name, systemService)
private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(props, supervisor, path, systemService)
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
else {
val name = path.name
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
case null
@ -68,17 +94,17 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
deployer.lookupDeploymentFor(path.toString) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses)))
def isReplicaNode: Boolean = remoteAddresses exists { _ == app.address }
def isReplicaNode: Boolean = remoteAddresses exists { _ == rootPath.remoteAddress }
//app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
//system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
local.actorOf(props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
} else {
implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher
implicit val timeout = app.AkkaConfig.ActorTimeout
implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
implicit val timeout = system.settings.ActorTimeout
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
@ -110,7 +136,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(name, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
() new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
@ -120,17 +146,17 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a)
val remoteAddress = RemoteAddress(a.hostname, a.port)
conns + (remoteAddress -> RemoteActorRef(remote.server, remoteAddress, path, None))
conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None))
}
val connectionManager = new RemoteConnectionManager(app, remote, connections)
val connectionManager = new RemoteConnectionManager(system, remote, connections)
connections.keys foreach { useActorOnNode(_, path.toString, props.creator) }
connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
}
case deploy local.actorOf(props, supervisor, name, systemService)
case deploy local.actorOf(system, props, supervisor, name, systemService)
}
} catch {
case e: Exception
@ -138,7 +164,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
throw e
}
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
// actor foreach system.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actors.replace(path.toString, newFuture, actor)
@ -152,9 +178,9 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
* Copied from LocalActorRefProvider...
*/
// FIXME: implement supervision
def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
new RoutedActorRef(app, props, supervisor, name)
new RoutedActorRef(system, props, supervisor, name)
}
def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match {
@ -163,6 +189,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
}
// TODO remove me
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
@ -178,24 +205,24 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
val remoteAddress = RemoteAddress(actor.hostname, actor.port)
if (optimizeLocalScoped_? && remoteAddress == app.address) {
if (optimizeLocalScoped_? && remoteAddress == rootPath.remoteAddress) {
local.actorFor(ActorPath.split(actor.path))
} else {
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.address, actor.path, remoteAddress)
Some(RemoteActorRef(remote.server, remoteAddress, ActorPath(app, actor.path), None)) //Should it be None here
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", rootPath.remoteAddress, actor.path, remoteAddress)
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here
}
}
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(remoteAddress: RemoteAddress, actorPath: String, actorFactory: () Actor) {
log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.address, actorPath, remoteAddress)
def useActorOnNode(system: ActorSystem, remoteAddress: RemoteAddress, actorPath: String, actorFactory: () Actor) {
log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
val actorFactoryBytes =
app.serialization.serialize(actorFactory) match {
serializationExtension.serialization.serialize(actorFactory) match {
case Left(error) throw error
case Right(bytes) if (remote.shouldCompressData) LZF.compress(bytes) else bytes
case Right(bytes) if (remoteExtension.settings.ShouldCompressData) LZF.compress(bytes) else bytes
}
val command = RemoteSystemDaemonMessageProtocol.newBuilder
@ -215,7 +242,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
if (withACK) {
try {
val f = connection ? (command, remote.remoteSystemDaemonAckTimeout)
val f = connection ? (command, remoteExtension.settings.RemoteSystemDaemonAckTimeout)
(try f.await.value catch { case _: FutureTimeoutException None }) match {
case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver)
@ -243,8 +270,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
private[akka] def dummyAskSender = local.dummyAskSender
private[akka] def tempPath = local.tempPath
}
@ -255,6 +280,7 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
provider: ActorRefProvider,
remote: RemoteSupport,
remoteAddress: RemoteAddress,
path: ActorPath,
@ -268,13 +294,13 @@ private[akka] case class RemoteActorRef private[akka] (
def address = remoteAddress + path.toString
def isShutdown: Boolean = !running
def isTerminated: Boolean = !running
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader)
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout)
def suspend(): Unit = ()
@ -290,7 +316,7 @@ private[akka] case class RemoteActorRef private[akka] (
}
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = remote.app.provider.serialize(this)
private def writeReplace(): AnyRef = provider.serialize(this)
def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement