Merge branch 'master' of github.com:jboner/akka

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-10-20 15:11:34 +02:00
commit 303d34692d
128 changed files with 1372 additions and 1180 deletions

View file

@ -4,22 +4,24 @@
package akka.remote
import akka.{ AkkaException, AkkaApplication }
import akka.actor._
import akka.routing._
import akka.actor.Actor._
import akka.actor.Status._
import akka.routing._
import akka.dispatch._
import akka.event.EventHandler
import akka.util.duration._
import akka.config.ConfigurationException
import akka.AkkaException
import RemoteProtocol._
import RemoteSystemDaemonMessageType._
import akka.event.{ DeathWatch, EventHandler }
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
import Compression.LZF
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import com.google.protobuf.ByteString
import akka.AkkaApplication
/**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -28,13 +30,10 @@ import akka.AkkaApplication
*/
class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
val local = new LocalActorRefProvider(app)
val remote = new Remote(app)
private val actors = new ConcurrentHashMap[String, Promise[ActorRef]]
private val actors = new ConcurrentHashMap[String, AnyRef]
private val remoteDaemonConnectionManager = new RemoteConnectionManager(app, remote)
@ -43,113 +42,109 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false)
def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = {
Address.validate(address)
def actorOf(props: Props, address: String, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(props, address, systemService)
else {
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
val oldFuture = actors.putIfAbsent(address, newFuture)
actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future
case null
val actor: ActorRef = try {
app.deployer.lookupDeploymentFor(address) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses)))
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
val actor: ActorRef = try {
app.deployer.lookupDeploymentFor(address) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses)))
// FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one
// val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {
// case FailureDetectorType.NoOp new NoOpFailureDetector
// case FailureDetectorType.RemoveConnectionOnFirstFailure new RemoveConnectionOnFirstFailureFailureDetector
// case FailureDetectorType.BannagePeriod(timeToBan) new BannagePeriodFailureDetector(timeToBan)
// case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
// }
val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {
case FailureDetectorType.NoOp new NoOpFailureDetector
case FailureDetectorType.RemoveConnectionOnFirstFailure new RemoveConnectionOnFirstFailureFailureDetector
case FailureDetectorType.BannagePeriod(timeToBan) new BannagePeriodFailureDetector(timeToBan)
case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
val thisHostname = remote.address.getHostName
val thisPort = remote.address.getPort
def isReplicaNode: Boolean = remoteAddresses exists { some some.hostname == thisHostname && some.port == thisPort }
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
new LocalActorRef(app, props, address, false)
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RandomRouter
case RouterType.RoundRobin
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RoundRobinRouter
case RouterType.ScatterGather
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
}
val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a)
val inetAddr = new InetSocketAddress(a.hostname, a.port)
conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None))
}
val connectionManager = new RemoteConnectionManager(app, remote, connections)
connections.keys foreach { useActorOnNode(_, address, props.creator) }
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address)
}
case deploy local.actorOf(props, address, systemService)
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
val thisHostname = remote.address.getHostName
val thisPort = remote.address.getPort
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
def isReplicaNode: Boolean = remoteAddresses exists { remoteAddress
remoteAddress.hostname == thisHostname && remoteAddress.port == thisPort
}
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
val localProps =
if (props.dispatcher == Props.defaultDispatcher) props.copy(dispatcher = app.dispatcher)
else props
new LocalActorRef(app, localProps, address, false)
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RandomRouter
case RouterType.RoundRobin
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new RoundRobinRouter
case RouterType.ScatterGather
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
}
var connections = Map.empty[InetSocketAddress, ActorRef]
remoteAddresses foreach { remoteAddress: DeploymentConfig.RemoteAddress
val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port)
connections += (inetSocketAddress -> RemoteActorRef(remote.server, inetSocketAddress, address, None))
}
val connectionManager = new RemoteConnectionManager(app, remote, connections)
connections.keys foreach { useActorOnNode(_, address, props.creator) }
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address)
}
case deploy local.actorOf(props, address, systemService)
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
throw e
newFuture completeWithResult actor
actors.replace(address, newFuture, actor)
actor
case actor: ActorRef actor
case future: Future[_] future.get.asInstanceOf[ActorRef]
}
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actor
} else { // we lost the race -- wait for future to complete
oldFuture.await.resultOrException.get
}
}
/**
* Copied from LocalActorRefProvider...
*/
def actorOf(props: RoutedProps, address: String): ActorRef = {
if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router")
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router")
new RoutedActorRef(props, address)
}
def actorFor(address: String): Option[ActorRef] = actors.get(address) match {
case null None
case future Some(future.get)
case null None
case actor: ActorRef Some(actor)
case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
}
/**
@ -193,11 +188,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
sendCommandToRemoteNode(connection, command, withACK = true) // ensure we get an ACK on the USE command
}
private def sendCommandToRemoteNode(
connection: ActorRef,
command: RemoteSystemDaemonMessageProtocol,
withACK: Boolean) {
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
if (withACK) {
try {
(connection ? (command, remote.remoteSystemDaemonAckTimeout)).as[Status] match {
@ -222,6 +213,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
connection ! command
}
}
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch
}
/**
@ -237,6 +230,8 @@ private[akka] case class RemoteActorRef private[akka] (
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
private[akka] val uuid: Uuid = newUuid
@volatile
private var running: Boolean = true
@ -268,7 +263,7 @@ private[akka] case class RemoteActorRef private[akka] (
synchronized {
if (running) {
running = false
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
postMessageToMailbox(Terminate, None)
}
}
}