2011-09-15 10:20:18 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
|
2011-09-20 21:44:50 +02:00
|
|
|
package akka.remote
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import DeploymentConfig._
|
|
|
|
|
import Actor._
|
|
|
|
|
import Status._
|
|
|
|
|
import akka.event.EventHandler
|
2011-09-19 14:43:28 +02:00
|
|
|
import akka.util.duration._
|
|
|
|
|
import akka.config.ConfigurationException
|
2011-09-15 10:20:18 +02:00
|
|
|
import akka.AkkaException
|
|
|
|
|
import RemoteProtocol._
|
2011-09-19 14:43:28 +02:00
|
|
|
import RemoteDaemonMessageType._
|
|
|
|
|
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
|
|
|
|
|
import Compression.LZF
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
|
|
|
import java.net.InetSocketAddress
|
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
import com.google.protobuf.ByteString
|
|
|
|
|
|
2011-09-15 10:20:18 +02:00
|
|
|
/**
|
2011-09-19 14:43:28 +02:00
|
|
|
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-09-15 10:20:18 +02:00
|
|
|
*/
|
|
|
|
|
class RemoteActorRefProvider extends ActorRefProvider {
|
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
private val failureDetector = new BannagePeriodFailureDetector(timeToBan = 60 seconds) // FIXME make timeToBan configurable
|
|
|
|
|
|
2011-09-15 10:20:18 +02:00
|
|
|
def actorOf(props: Props, address: String): Option[ActorRef] = {
|
|
|
|
|
Address.validate(address)
|
|
|
|
|
|
|
|
|
|
val actorRef = Actor.remote.actors.get(address)
|
|
|
|
|
if (actorRef ne null) Some(actorRef)
|
|
|
|
|
else {
|
|
|
|
|
// if 'Props.deployId' is not specified then use 'address' as 'deployId'
|
|
|
|
|
val deployId = props.deployId match {
|
|
|
|
|
case Props.`defaultDeployId` | null ⇒ address
|
|
|
|
|
case other ⇒ other
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Deployer.lookupDeploymentFor(deployId) match {
|
2011-09-19 14:43:28 +02:00
|
|
|
case Some(Deploy(_, _, router, _, RemoteScope(host, port))) ⇒
|
2011-09-15 10:20:18 +02:00
|
|
|
// FIXME create RoutedActorRef if 'router' is specified
|
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
val remoteAddress = new InetSocketAddress(host, port)
|
|
|
|
|
useActorOnNode(remoteAddress, address, props.creator)
|
|
|
|
|
|
|
|
|
|
Some(newRemoteActorRef(address, remoteAddress)) // create a remote actor
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
|
|
|
case deploy ⇒ None // non-remote actor
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def findActorRef(address: String): Option[ActorRef] = throw new UnsupportedOperationException
|
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
/**
|
|
|
|
|
* Using (checking out) actor on a specific node.
|
|
|
|
|
*/
|
|
|
|
|
def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () ⇒ Actor) {
|
|
|
|
|
EventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress))
|
|
|
|
|
|
|
|
|
|
val actorFactoryBytes =
|
|
|
|
|
Serialization.serialize(actorFactory) match {
|
|
|
|
|
case Left(error) ⇒ throw error
|
|
|
|
|
case Right(bytes) ⇒
|
|
|
|
|
if (Remote.shouldCompressData) LZF.compress(bytes)
|
|
|
|
|
else bytes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val command = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(USE)
|
|
|
|
|
.setActorAddress(actorAddress)
|
|
|
|
|
.setPayload(ByteString.copyFrom(actorFactoryBytes))
|
|
|
|
|
.build()
|
|
|
|
|
|
|
|
|
|
val connectionFactory = () ⇒ newRemoteActorRef(actorAddress, remoteAddress)
|
|
|
|
|
|
|
|
|
|
// try to get the connection for the remote address, if not already there then create it
|
|
|
|
|
val connection = failureDetector.putIfAbsent(remoteAddress, connectionFactory)
|
|
|
|
|
|
|
|
|
|
sendCommandToRemoteNode(connection, command, async = false) // ensure we get an ACK on the USE command
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def newRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
|
2011-09-15 10:20:18 +02:00
|
|
|
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
|
|
|
|
|
}
|
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
private def sendCommandToRemoteNode(
|
2011-09-15 10:20:18 +02:00
|
|
|
connection: ActorRef,
|
|
|
|
|
command: RemoteDaemonMessageProtocol,
|
|
|
|
|
async: Boolean = true) {
|
|
|
|
|
|
|
|
|
|
if (async) {
|
|
|
|
|
connection ! command
|
|
|
|
|
} else {
|
|
|
|
|
try {
|
|
|
|
|
(connection ? (command, Remote.remoteDaemonAckTimeout)).as[Status] match {
|
2011-09-19 14:43:28 +02:00
|
|
|
case Some(Success(receiver)) ⇒
|
|
|
|
|
EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(receiver))
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
|
|
|
case Some(Failure(cause)) ⇒
|
|
|
|
|
EventHandler.error(cause, this, cause.toString)
|
|
|
|
|
throw cause
|
|
|
|
|
|
|
|
|
|
case None ⇒
|
|
|
|
|
val error = new RemoteException("Remote command to [%s] timed out".format(connection.address))
|
|
|
|
|
EventHandler.error(error, this, error.toString)
|
|
|
|
|
throw error
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
|
|
|
|
EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString))
|
|
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|