merge master into wip-ActorPath-rk

This commit is contained in:
Roland 2011-11-29 21:52:18 +01:00
commit afda539324
102 changed files with 1837 additions and 2902 deletions

View file

@ -41,13 +41,13 @@ class RemoteActorRefProvider(
def deathWatch = local.deathWatch
def guardian = local.guardian
def systemGuardian = local.systemGuardian
def nodename = remoteExtension.settings.NodeName
def clustername = remoteExtension.settings.ClusterName
def nodename = remoteExtension.NodeName
def clustername = remoteExtension.ClusterName
private val actors = new ConcurrentHashMap[String, AnyRef]
/*
* The problem is that ActorRefs need a reference to the ActorSystem to
* The problem is that ActorRefs need a reference to the ActorSystem to
* provide their service. Hence they cannot be created while the
* constructors of ActorSystem and ActorRefProvider are still running.
* The solution is to split out that last part into an init() method,
@ -56,9 +56,9 @@ class RemoteActorRefProvider(
@volatile
private var system: ActorSystemImpl = _
private lazy val remoteExtension = RemoteExtension(system)
private lazy val serializationExtension = SerializationExtension(system)
private lazy val serialization = SerializationExtension(system)
lazy val rootPath: ActorPath = {
val remoteAddress = RemoteAddress(system.name, remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
val remoteAddress = RemoteAddress(system.name, remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port)
new RootActorPath(remoteAddress)
}
private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters)
@ -91,14 +91,6 @@ class RemoteActorRefProvider(
deployer.lookupDeploymentFor(path.toString) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses)))
// FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one
// val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {
// case FailureDetectorType.NoOp new NoOpFailureDetector
// case FailureDetectorType.RemoveConnectionOnFirstFailure new RemoveConnectionOnFirstFailureFailureDetector
// case FailureDetectorType.BannagePeriod(timeToBan) new BannagePeriodFailureDetector(timeToBan)
// case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
// }
def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
//system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
@ -108,6 +100,9 @@ class RemoteActorRefProvider(
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
} else {
implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
implicit val timeout = system.settings.ActorTimeout
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct
@ -116,6 +111,12 @@ class RemoteActorRefProvider(
.format(name, remoteAddresses.mkString(", ")))
() new DirectRouter
case RouterType.Broadcast
if (remoteAddresses.size != 1) throw new ConfigurationException(
"Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
.format(name, remoteAddresses.mkString(", ")))
() new BroadcastRouter
case RouterType.Random
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
@ -214,9 +215,9 @@ class RemoteActorRefProvider(
log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
val actorFactoryBytes =
serializationExtension.serialization.serialize(actorFactory) match {
serialization.serialize(actorFactory) match {
case Left(error) throw error
case Right(bytes) if (remoteExtension.settings.ShouldCompressData) LZF.compress(bytes) else bytes
case Right(bytes) if (remoteExtension.ShouldCompressData) LZF.compress(bytes) else bytes
}
val command = RemoteSystemDaemonMessageProtocol.newBuilder
@ -236,7 +237,7 @@ class RemoteActorRefProvider(
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
if (withACK) {
try {
val f = connection ? (command, remoteExtension.settings.RemoteSystemDaemonAckTimeout)
val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout)
(try f.await.value catch { case _: FutureTimeoutException None }) match {
case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver)