Merging with master

This commit is contained in:
Viktor Klang 2011-11-10 18:06:16 +01:00
commit 1fb1309dc3
48 changed files with 757 additions and 541 deletions

View file

@ -51,15 +51,19 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
def defaultDispatcher = app.dispatcher
def defaultTimeout = app.AkkaConfig.ActorTimeout
def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(props, supervisor, address, systemService)
private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
actorOf(props, supervisor, supervisor.path / name, systemService)
private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(props, supervisor, path, systemService)
else {
val name = path.name
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
case null
val actor: ActorRef = try {
deployer.lookupDeploymentFor(address) match {
deployer.lookupDeploymentFor(path.toString) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses)))
// FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one
@ -76,7 +80,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
local.actorOf(props, supervisor, address, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
local.actorOf(props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
@ -84,25 +88,25 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
case RouterType.Direct
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
.format(name, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
.format(name, remoteAddresses.mkString(", ")))
() new RandomRouter
case RouterType.RoundRobin
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
.format(name, remoteAddresses.mkString(", ")))
() new RoundRobinRouter
case RouterType.ScatterGather
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(address, remoteAddresses.mkString(", ")))
.format(name, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
@ -113,17 +117,17 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a)
val remoteAddress = RemoteAddress(a.hostname, a.port)
conns + (remoteAddress -> RemoteActorRef(remote.server, remoteAddress, address, None))
conns + (remoteAddress -> RemoteActorRef(remote.server, remoteAddress, path, None))
}
val connectionManager = new RemoteConnectionManager(app, remote, connections)
connections.keys foreach { useActorOnNode(_, address, props.creator) }
connections.keys foreach { useActorOnNode(_, path.toString, props.creator) }
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, address)
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
}
case deploy local.actorOf(props, supervisor, address, systemService)
case deploy local.actorOf(props, supervisor, name, systemService)
}
} catch {
case e: Exception
@ -134,7 +138,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actors.replace(address, newFuture, actor)
actors.replace(path.toString, newFuture, actor)
actor
case actor: ActorRef actor
case future: Future[_] future.get.asInstanceOf[ActorRef]
@ -145,13 +149,13 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
* Copied from LocalActorRefProvider...
*/
// FIXME: implement supervision
def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef = {
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router")
new RoutedActorRef(app, props, address)
def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
new RoutedActorRef(app, props, supervisor, name)
}
def actorFor(address: String): Option[ActorRef] = actors.get(address) match {
case null local.actorFor(address)
def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match {
case null local.actorFor(path)
case actor: ActorRef Some(actor)
case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
}
@ -162,27 +166,28 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
/**
* Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/
private[akka] def evict(address: String): Boolean = actors.remove(address) ne null
private[akka] def evict(path: String): Boolean = actors.remove(path) ne null
private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match {
case r: RemoteActorRef new SerializedActorRef(actor.address, r.remoteAddress)
case r: RemoteActorRef new SerializedActorRef(r.remoteAddress, actor.path.toString)
case other local.serialize(actor)
}
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
val remoteAddress = RemoteAddress(actor.hostname, actor.port)
if (optimizeLocalScoped_? && remoteAddress == app.defaultAddress) {
local.actorFor(actor.address)
local.actorFor(ActorPath.split(actor.path))
} else {
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.address, remoteAddress)
Some(RemoteActorRef(remote.server, remoteAddress, actor.address, None)) //Should it be None here
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.path, remoteAddress)
Some(RemoteActorRef(remote.server, remoteAddress, ActorPath(app, actor.path), None)) //Should it be None here
}
}
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(remoteAddress: RemoteAddress, actorAddress: String, actorFactory: () Actor) {
log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorAddress, remoteAddress)
def useActorOnNode(remoteAddress: RemoteAddress, actorPath: String, actorFactory: () Actor) {
log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorPath, remoteAddress)
val actorFactoryBytes =
app.serialization.serialize(actorFactory) match {
@ -192,11 +197,11 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
val command = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
.setActorPath(actorPath)
.setPayload(ByteString.copyFrom(actorFactoryBytes))
.build()
val connectionFactory = () deserialize(new SerializedActorRef(remote.remoteDaemonServiceName, remoteAddress)).get
val connectionFactory = () deserialize(new SerializedActorRef(remoteAddress, remote.remoteDaemon.path.toString)).get
// try to get the connection for the remote address, if not already there then create it
val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory)
@ -245,12 +250,17 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
private[akka] case class RemoteActorRef private[akka] (
remote: RemoteSupport,
remoteAddress: RemoteAddress,
address: String,
path: ActorPath,
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
@volatile
private var running: Boolean = true
def name = path.name
def address = remoteAddress + path.toString
def isShutdown: Boolean = !running
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported