2011-09-15 10:20:18 +02:00
|
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
2011-09-15 10:20:18 +02:00
|
|
|
|
*/
|
|
|
|
|
|
|
2011-09-20 21:44:50 +02:00
|
|
|
|
package akka.remote
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
2016-02-29 13:02:24 +01:00
|
|
|
|
import akka.Done
|
2011-09-15 10:20:18 +02:00
|
|
|
|
import akka.actor._
|
2013-03-05 16:19:54 +01:00
|
|
|
|
import akka.dispatch.sysmsg._
|
2016-04-14 10:26:09 +02:00
|
|
|
|
import akka.event.{ EventStream, Logging, LoggingAdapter }
|
2012-08-16 11:31:53 +02:00
|
|
|
|
import akka.event.Logging.Error
|
2015-02-19 15:49:02 +01:00
|
|
|
|
import akka.serialization.{ Serialization, SerializationExtension }
|
2012-12-14 16:09:38 +01:00
|
|
|
|
import akka.pattern.pipe
|
2012-08-16 11:31:53 +02:00
|
|
|
|
import scala.util.control.NonFatal
|
2016-04-14 10:26:09 +02:00
|
|
|
|
import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone }
|
2013-01-27 12:56:35 +01:00
|
|
|
|
import scala.util.control.Exception.Catcher
|
2015-02-19 15:49:02 +01:00
|
|
|
|
import scala.concurrent.Future
|
2013-04-15 09:26:51 +02:00
|
|
|
|
import akka.ConfigurationException
|
2013-04-26 12:18:01 +02:00
|
|
|
|
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
2016-05-09 07:31:41 +02:00
|
|
|
|
import akka.remote.artery.ArteryTransport
|
2016-06-05 15:40:06 +02:00
|
|
|
|
import akka.util.OptionVal
|
2016-06-29 17:09:33 +02:00
|
|
|
|
import akka.remote.artery.OutboundEnvelope
|
2012-12-14 16:09:38 +01:00
|
|
|
|
|
2013-02-08 13:13:52 +01:00
|
|
|
|
/**
|
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
|
*/
|
|
|
|
|
|
private[akka] object RemoteActorRefProvider {
|
2014-03-07 13:20:01 +01:00
|
|
|
|
private final case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
|
2013-08-23 14:39:21 +02:00
|
|
|
|
extends NoSerializationVerificationNeeded
|
2012-12-14 16:09:38 +01:00
|
|
|
|
|
|
|
|
|
|
sealed trait TerminatorState
|
|
|
|
|
|
case object Uninitialized extends TerminatorState
|
|
|
|
|
|
case object Idle extends TerminatorState
|
|
|
|
|
|
case object WaitDaemonShutdown extends TerminatorState
|
|
|
|
|
|
case object WaitTransportShutdown extends TerminatorState
|
|
|
|
|
|
case object Finished extends TerminatorState
|
|
|
|
|
|
|
2013-04-26 12:18:01 +02:00
|
|
|
|
private class RemotingTerminator(systemGuardian: ActorRef) extends Actor with FSM[TerminatorState, Option[Internals]]
|
|
|
|
|
|
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
2012-12-14 16:09:38 +01:00
|
|
|
|
import context.dispatcher
|
|
|
|
|
|
|
|
|
|
|
|
startWith(Uninitialized, None)
|
|
|
|
|
|
|
|
|
|
|
|
when(Uninitialized) {
|
|
|
|
|
|
case Event(i: Internals, _) ⇒
|
2012-12-18 12:54:17 +01:00
|
|
|
|
systemGuardian ! RegisterTerminationHook
|
2012-12-14 16:09:38 +01:00
|
|
|
|
goto(Idle) using Some(i)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
when(Idle) {
|
|
|
|
|
|
case Event(TerminationHook, Some(internals)) ⇒
|
|
|
|
|
|
log.info("Shutting down remote daemon.")
|
|
|
|
|
|
internals.remoteDaemon ! TerminationHook
|
|
|
|
|
|
goto(WaitDaemonShutdown)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: state timeout
|
|
|
|
|
|
when(WaitDaemonShutdown) {
|
|
|
|
|
|
case Event(TerminationHookDone, Some(internals)) ⇒
|
2012-12-18 12:54:17 +01:00
|
|
|
|
log.info("Remote daemon shut down; proceeding with flushing remote transports.")
|
2012-12-14 16:09:38 +01:00
|
|
|
|
internals.transport.shutdown() pipeTo self
|
|
|
|
|
|
goto(WaitTransportShutdown)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
when(WaitTransportShutdown) {
|
2016-02-29 13:02:24 +01:00
|
|
|
|
case Event(Done, _) ⇒
|
2012-12-14 16:09:38 +01:00
|
|
|
|
log.info("Remoting shut down.")
|
2012-12-18 12:54:17 +01:00
|
|
|
|
systemGuardian ! TerminationHookDone
|
2012-12-14 16:09:38 +01:00
|
|
|
|
stop()
|
2016-02-29 13:02:24 +01:00
|
|
|
|
|
|
|
|
|
|
case Event(Status.Failure(ex), _) ⇒
|
|
|
|
|
|
log.error(ex, "Remoting shut down with error")
|
|
|
|
|
|
systemGuardian ! TerminationHookDone
|
|
|
|
|
|
stop()
|
2012-12-14 16:09:38 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
2013-01-28 14:38:33 +01:00
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
* Remoting wraps messages destined to a remote host in a remoting specific envelope: EndpointManager.Send
|
|
|
|
|
|
* As these wrapped messages might arrive to the dead letters of an EndpointWriter, they need to be unwrapped
|
|
|
|
|
|
* and handled as dead letters to the original (remote) destination. Without this special case, DeathWatch related
|
|
|
|
|
|
* functionality breaks, like the special handling of Watch messages arriving to dead letters.
|
|
|
|
|
|
*/
|
2016-06-02 14:06:57 +02:00
|
|
|
|
private class RemoteDeadLetterActorRef(
|
|
|
|
|
|
_provider: ActorRefProvider,
|
|
|
|
|
|
_path: ActorPath,
|
|
|
|
|
|
_eventStream: EventStream) extends DeadLetterActorRef(_provider, _path, _eventStream) {
|
2013-03-27 17:47:56 +01:00
|
|
|
|
import EndpointManager.Send
|
2013-01-28 14:38:33 +01:00
|
|
|
|
|
|
|
|
|
|
override def !(message: Any)(implicit sender: ActorRef): Unit = message match {
|
2013-03-27 17:47:56 +01:00
|
|
|
|
case Send(m, senderOption, _, seqOpt) ⇒
|
|
|
|
|
|
// else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved
|
|
|
|
|
|
// the dead letter status
|
|
|
|
|
|
if (seqOpt.isEmpty) super.!(m)(senderOption.orNull)
|
|
|
|
|
|
case DeadLetter(Send(m, senderOption, recipient, seqOpt), _, _) ⇒
|
|
|
|
|
|
// else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved
|
|
|
|
|
|
// the dead letter status
|
|
|
|
|
|
if (seqOpt.isEmpty) super.!(m)(senderOption.orNull)
|
2016-06-29 17:09:33 +02:00
|
|
|
|
case env: OutboundEnvelope ⇒
|
|
|
|
|
|
super.!(env.message)(env.sender.orNull)
|
|
|
|
|
|
case DeadLetter(env: OutboundEnvelope, _, _) ⇒
|
|
|
|
|
|
super.!(env.message)(env.sender.orNull)
|
2013-03-27 17:47:56 +01:00
|
|
|
|
case _ ⇒ super.!(message)(sender)
|
2013-01-28 14:38:33 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@throws(classOf[java.io.ObjectStreamException])
|
|
|
|
|
|
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
|
|
|
|
|
|
}
|
2012-12-14 16:09:38 +01:00
|
|
|
|
}
|
2011-09-19 14:43:28 +02:00
|
|
|
|
|
2011-09-15 10:20:18 +02:00
|
|
|
|
/**
|
2013-02-08 13:13:52 +01:00
|
|
|
|
* INTERNAL API
|
|
|
|
|
|
* Depending on this class is not supported, only the [[akka.actor.ActorRefProvider]] interface is supported.
|
2012-11-26 17:42:25 +01:00
|
|
|
|
*
|
2013-02-08 13:13:52 +01:00
|
|
|
|
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
|
2012-11-26 17:42:25 +01:00
|
|
|
|
*
|
2011-09-15 10:20:18 +02:00
|
|
|
|
*/
|
2013-02-08 13:13:52 +01:00
|
|
|
|
private[akka] class RemoteActorRefProvider(
|
2016-06-02 14:06:57 +02:00
|
|
|
|
val systemName: String,
|
|
|
|
|
|
val settings: ActorSystem.Settings,
|
|
|
|
|
|
val eventStream: EventStream,
|
2012-02-10 11:36:23 +01:00
|
|
|
|
val dynamicAccess: DynamicAccess) extends ActorRefProvider {
|
2012-12-14 16:09:38 +01:00
|
|
|
|
import RemoteActorRefProvider._
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
2013-01-17 16:19:31 +01:00
|
|
|
|
val remoteSettings: RemoteSettings = new RemoteSettings(settings.config)
|
2011-12-08 14:44:05 +01:00
|
|
|
|
|
2012-11-15 12:48:13 +01:00
|
|
|
|
override val deployer: Deployer = createDeployer
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* Factory method to make it possible to override deployer in subclass
|
|
|
|
|
|
* Creates a new instance every time
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
|
2012-01-27 12:14:28 +01:00
|
|
|
|
|
2013-03-30 01:03:17 +01:00
|
|
|
|
private val local = new LocalActorRefProvider(systemName, settings, eventStream, dynamicAccess, deployer,
|
2013-01-31 09:24:12 +01:00
|
|
|
|
Some(deadLettersPath ⇒ new RemoteDeadLetterActorRef(this, deadLettersPath, eventStream)))
|
2012-01-27 12:14:28 +01:00
|
|
|
|
|
|
|
|
|
|
@volatile
|
|
|
|
|
|
private var _log = local.log
|
|
|
|
|
|
def log: LoggingAdapter = _log
|
|
|
|
|
|
|
2012-11-22 14:40:54 +01:00
|
|
|
|
override def rootPath: ActorPath = local.rootPath
|
2013-01-31 09:24:12 +01:00
|
|
|
|
override def deadLetters: InternalActorRef = local.deadLetters
|
2012-01-27 12:14:28 +01:00
|
|
|
|
|
|
|
|
|
|
// these are only available after init()
|
2012-05-24 11:44:39 +02:00
|
|
|
|
override def rootGuardian: InternalActorRef = local.rootGuardian
|
2012-06-19 14:52:02 +02:00
|
|
|
|
override def guardian: LocalActorRef = local.guardian
|
|
|
|
|
|
override def systemGuardian: LocalActorRef = local.systemGuardian
|
2014-08-25 15:12:27 +02:00
|
|
|
|
override def terminationFuture: Future[Terminated] = local.terminationFuture
|
2012-05-24 11:44:39 +02:00
|
|
|
|
override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)
|
|
|
|
|
|
override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path)
|
|
|
|
|
|
override def tempPath(): ActorPath = local.tempPath()
|
|
|
|
|
|
override def tempContainer: VirtualPathContainer = local.tempContainer
|
2012-01-18 11:52:35 +01:00
|
|
|
|
|
2013-04-15 09:26:51 +02:00
|
|
|
|
@volatile private var _internals: Internals = _
|
2012-01-20 14:29:50 +01:00
|
|
|
|
|
2012-12-14 16:09:38 +01:00
|
|
|
|
def transport: RemoteTransport = _internals.transport
|
|
|
|
|
|
def serialization: Serialization = _internals.serialization
|
|
|
|
|
|
def remoteDaemon: InternalActorRef = _internals.remoteDaemon
|
2012-01-20 14:29:50 +01:00
|
|
|
|
|
2012-12-14 16:09:38 +01:00
|
|
|
|
// This actor ensures the ordering of shutdown between remoteDaemon and the transport
|
2013-04-15 09:26:51 +02:00
|
|
|
|
@volatile private var remotingTerminator: ActorRef = _
|
|
|
|
|
|
|
2015-02-19 15:49:02 +01:00
|
|
|
|
@volatile private var _remoteWatcher: ActorRef = _
|
|
|
|
|
|
private[akka] def remoteWatcher = _remoteWatcher
|
|
|
|
|
|
|
2013-04-15 09:26:51 +02:00
|
|
|
|
@volatile private var remoteDeploymentWatcher: ActorRef = _
|
2012-01-20 14:29:50 +01:00
|
|
|
|
|
2012-05-24 11:44:39 +02:00
|
|
|
|
def init(system: ActorSystemImpl): Unit = {
|
2011-12-08 14:44:05 +01:00
|
|
|
|
local.init(system)
|
2012-01-20 14:29:50 +01:00
|
|
|
|
|
2013-08-19 12:06:07 +02:00
|
|
|
|
remotingTerminator = system.systemActorOf(
|
|
|
|
|
|
remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)),
|
|
|
|
|
|
"remoting-terminator")
|
2012-12-14 16:09:38 +01:00
|
|
|
|
|
|
|
|
|
|
val internals = Internals(
|
|
|
|
|
|
remoteDaemon = {
|
2016-06-02 14:06:57 +02:00
|
|
|
|
val d = new RemoteSystemDaemon(
|
|
|
|
|
|
system,
|
|
|
|
|
|
local.rootPath / "remote",
|
|
|
|
|
|
rootGuardian,
|
|
|
|
|
|
remotingTerminator,
|
|
|
|
|
|
log,
|
|
|
|
|
|
untrustedMode = remoteSettings.UntrustedMode)
|
|
|
|
|
|
local.registerExtraNames(Map(("remote", d)))
|
|
|
|
|
|
d
|
|
|
|
|
|
},
|
2012-12-14 16:09:38 +01:00
|
|
|
|
serialization = SerializationExtension(system),
|
2016-05-09 07:31:41 +02:00
|
|
|
|
transport = if (remoteSettings.EnableArtery) new ArteryTransport(system, this) else new Remoting(system, this))
|
2012-12-14 16:09:38 +01:00
|
|
|
|
|
|
|
|
|
|
_internals = internals
|
2012-12-18 12:54:17 +01:00
|
|
|
|
remotingTerminator ! internals
|
2012-01-27 12:14:28 +01:00
|
|
|
|
|
2014-11-17 17:17:52 -06:00
|
|
|
|
_log = Logging(eventStream, getClass.getName)
|
2012-01-27 12:14:28 +01:00
|
|
|
|
|
|
|
|
|
|
// this enables reception of remote requests
|
2012-12-14 16:09:38 +01:00
|
|
|
|
transport.start()
|
2012-01-20 14:29:50 +01:00
|
|
|
|
|
2015-02-19 15:49:02 +01:00
|
|
|
|
_remoteWatcher = createRemoteWatcher(system)
|
2013-04-15 09:26:51 +02:00
|
|
|
|
remoteDeploymentWatcher = createRemoteDeploymentWatcher(system)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = {
|
|
|
|
|
|
import remoteSettings._
|
|
|
|
|
|
val failureDetector = createRemoteWatcherFailureDetector(system)
|
2013-08-19 12:06:07 +02:00
|
|
|
|
system.systemActorOf(
|
|
|
|
|
|
configureDispatcher(
|
|
|
|
|
|
RemoteWatcher.props(
|
|
|
|
|
|
failureDetector,
|
|
|
|
|
|
heartbeatInterval = WatchHeartBeatInterval,
|
|
|
|
|
|
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
|
|
|
|
|
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter)),
|
2013-05-29 16:13:10 +02:00
|
|
|
|
"remote-watcher")
|
2013-04-15 09:26:51 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = {
|
2013-05-17 14:16:26 +02:00
|
|
|
|
def createFailureDetector(): FailureDetector =
|
|
|
|
|
|
FailureDetectorLoader.load(remoteSettings.WatchFailureDetectorImplementationClass, remoteSettings.WatchFailureDetectorConfig, system)
|
2013-04-15 09:26:51 +02:00
|
|
|
|
|
|
|
|
|
|
new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
|
2011-11-16 17:18:36 +01:00
|
|
|
|
}
|
2011-09-19 14:43:28 +02:00
|
|
|
|
|
2013-04-15 09:26:51 +02:00
|
|
|
|
protected def createRemoteDeploymentWatcher(system: ActorSystemImpl): ActorRef =
|
2016-06-23 11:58:54 +02:00
|
|
|
|
system.systemActorOf(remoteSettings.configureDispatcher(Props[RemoteDeploymentWatcher]()), "remote-deployment-watcher")
|
2013-04-15 09:26:51 +02:00
|
|
|
|
|
2012-01-31 21:19:28 +01:00
|
|
|
|
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
|
2013-05-27 20:15:24 +02:00
|
|
|
|
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef =
|
2012-06-13 17:57:56 +02:00
|
|
|
|
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async)
|
2011-10-18 14:21:48 +02:00
|
|
|
|
else {
|
2011-12-10 20:32:23 +01:00
|
|
|
|
|
2013-05-27 20:15:24 +02:00
|
|
|
|
if (!system.dispatchers.hasDispatcher(props.dispatcher))
|
|
|
|
|
|
throw new ConfigurationException(s"Dispatcher [${props.dispatcher}] not configured for path $path")
|
|
|
|
|
|
|
2011-12-10 20:32:23 +01:00
|
|
|
|
/*
|
|
|
|
|
|
* This needs to deal with “mangled” paths, which are created by remote
|
|
|
|
|
|
* deployment, also in this method. The scheme is the following:
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
|
|
|
|
|
* Whenever a remote deployment is found, create a path on that remote
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* address below “remote”, including the current system’s identification
|
|
|
|
|
|
* as “sys@host:port” (typically; it will use whatever the remote
|
|
|
|
|
|
* transport uses). This means that on a path up an actor tree each node
|
2012-09-12 11:18:42 +02:00
|
|
|
|
* change introduces one layer or “remote/scheme/sys@host:port/” within the URI.
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* Example:
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2014-01-21 15:14:27 +01:00
|
|
|
|
* akka.tcp://sys@home:1234/remote/akka/sys@remote:6667/remote/akka/sys@other:3333/user/a/b/c
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2014-01-21 15:14:27 +01:00
|
|
|
|
* means that the logical parent originates from “akka.tcp://sys@other:3333” with
|
|
|
|
|
|
* one child (may be “a” or “b”) being deployed on “akka.tcp://sys@remote:6667” and
|
|
|
|
|
|
* finally either “b” or “c” being created on “akka.tcp://sys@home:1234”, where
|
2011-12-14 22:46:43 +01:00
|
|
|
|
* this whole thing actually resides. Thus, the logical path is
|
|
|
|
|
|
* “/user/a/b/c” and the physical path contains all remote placement
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* information.
|
2011-12-14 22:46:43 +01:00
|
|
|
|
*
|
2011-12-10 20:32:23 +01:00
|
|
|
|
* Deployments are always looked up using the logical path, which is the
|
|
|
|
|
|
* purpose of the lookupRemotes internal method.
|
|
|
|
|
|
*/
|
2011-10-18 11:26:35 +02:00
|
|
|
|
|
2011-12-09 18:07:42 +01:00
|
|
|
|
@scala.annotation.tailrec
|
2011-12-12 23:31:15 +01:00
|
|
|
|
def lookupRemotes(p: Iterable[String]): Option[Deploy] = {
|
2011-12-09 18:07:42 +01:00
|
|
|
|
p.headOption match {
|
|
|
|
|
|
case None ⇒ None
|
2012-09-12 11:18:42 +02:00
|
|
|
|
case Some("remote") ⇒ lookupRemotes(p.drop(3))
|
2012-04-27 12:48:22 +02:00
|
|
|
|
case Some("user") ⇒ deployer.lookup(p.drop(1))
|
2011-12-09 18:15:14 +01:00
|
|
|
|
case Some(_) ⇒ None
|
2011-12-09 18:07:42 +01:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2011-12-09 18:15:14 +01:00
|
|
|
|
val elems = path.elements
|
2012-01-31 21:19:28 +01:00
|
|
|
|
val lookup =
|
|
|
|
|
|
if (lookupDeploy)
|
|
|
|
|
|
elems.head match {
|
2016-03-21 14:33:07 +01:00
|
|
|
|
case "user" | "system" ⇒ deployer.lookup(elems.drop(1))
|
|
|
|
|
|
case "remote" ⇒ lookupRemotes(elems)
|
|
|
|
|
|
case _ ⇒ None
|
2012-01-31 21:19:28 +01:00
|
|
|
|
}
|
|
|
|
|
|
else None
|
|
|
|
|
|
|
|
|
|
|
|
val deployment = {
|
2012-02-03 09:43:23 +01:00
|
|
|
|
deploy.toList ::: lookup.toList match {
|
2012-01-31 21:19:28 +01:00
|
|
|
|
case Nil ⇒ Nil
|
2012-02-03 09:43:23 +01:00
|
|
|
|
case l ⇒ List(l reduce ((a, b) ⇒ b withFallback a))
|
2012-01-31 21:19:28 +01:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2012-02-03 09:43:23 +01:00
|
|
|
|
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match {
|
2013-04-18 13:35:36 +02:00
|
|
|
|
case d @ Deploy(_, _, _, RemoteScope(addr), _, _) ⇒
|
2012-12-07 16:03:04 +01:00
|
|
|
|
if (hasAddress(addr)) {
|
2012-06-13 17:57:56 +02:00
|
|
|
|
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
|
2013-05-29 16:13:10 +02:00
|
|
|
|
} else if (props.deploy.scope == LocalScope) {
|
2013-06-02 16:23:54 +02:00
|
|
|
|
throw new ConfigurationException(s"configuration requested remote deployment for local-only Props at [$path]")
|
2013-05-29 16:13:10 +02:00
|
|
|
|
} else try {
|
2013-06-01 21:58:34 +02:00
|
|
|
|
try {
|
|
|
|
|
|
// for consistency we check configuration of dispatcher and mailbox locally
|
|
|
|
|
|
val dispatcher = system.dispatchers.lookup(props.dispatcher)
|
|
|
|
|
|
system.mailboxes.getMailboxType(props, dispatcher.configurator.config)
|
|
|
|
|
|
} catch {
|
2013-06-02 16:23:54 +02:00
|
|
|
|
case NonFatal(e) ⇒ throw new ConfigurationException(
|
2013-06-01 21:58:34 +02:00
|
|
|
|
s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]", e)
|
|
|
|
|
|
}
|
2013-05-29 16:13:10 +02:00
|
|
|
|
val localAddress = transport.localAddressForRemote(addr)
|
|
|
|
|
|
val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
|
|
|
|
|
|
withUid(path.uid)
|
|
|
|
|
|
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒ throw new IllegalArgumentException(s"remote deployment failed for [$path]", e)
|
2011-12-09 00:02:27 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
2013-09-19 08:00:05 +02:00
|
|
|
|
case _ ⇒
|
|
|
|
|
|
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
2011-09-27 16:52:33 +02:00
|
|
|
|
}
|
2011-09-15 10:20:18 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2013-03-26 18:17:50 +01:00
|
|
|
|
@deprecated("use actorSelection instead of actorFor", "2.2")
|
2015-05-08 10:25:39 +02:00
|
|
|
|
override private[akka] def actorFor(path: ActorPath): InternalActorRef = {
|
2012-12-07 16:03:04 +01:00
|
|
|
|
if (hasAddress(path.address)) actorFor(rootGuardian, path.elements)
|
2012-11-23 10:15:19 +01:00
|
|
|
|
else try {
|
2013-03-06 15:10:59 +01:00
|
|
|
|
new RemoteActorRef(transport, transport.localAddressForRemote(path.address),
|
2012-11-23 10:15:19 +01:00
|
|
|
|
path, Nobody, props = None, deploy = None)
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒
|
2013-03-26 18:17:50 +01:00
|
|
|
|
log.error(e, "Error while looking up address [{}]", path.address)
|
2012-11-23 10:15:19 +01:00
|
|
|
|
new EmptyLocalActorRef(this, path, eventStream)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2011-12-07 16:29:12 +01:00
|
|
|
|
|
2013-03-26 18:17:50 +01:00
|
|
|
|
@deprecated("use actorSelection instead of actorFor", "2.2")
|
2015-05-08 10:25:39 +02:00
|
|
|
|
override private[akka] def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
2012-01-20 14:29:50 +01:00
|
|
|
|
case ActorPathExtractor(address, elems) ⇒
|
2012-12-07 16:03:04 +01:00
|
|
|
|
if (hasAddress(address)) actorFor(rootGuardian, elems)
|
2013-03-26 11:25:09 +01:00
|
|
|
|
else {
|
|
|
|
|
|
val rootPath = RootActorPath(address) / elems
|
|
|
|
|
|
try {
|
|
|
|
|
|
new RemoteActorRef(transport, transport.localAddressForRemote(address),
|
|
|
|
|
|
rootPath, Nobody, props = None, deploy = None)
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒
|
2013-03-26 18:17:50 +01:00
|
|
|
|
log.error(e, "Error while looking up address [{}]", rootPath.address)
|
2013-03-26 11:25:09 +01:00
|
|
|
|
new EmptyLocalActorRef(this, rootPath, eventStream)
|
|
|
|
|
|
}
|
2013-03-25 08:42:48 +01:00
|
|
|
|
}
|
2011-12-07 16:29:12 +01:00
|
|
|
|
case _ ⇒ local.actorFor(ref, path)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2013-03-26 18:17:50 +01:00
|
|
|
|
@deprecated("use actorSelection instead of actorFor", "2.2")
|
2015-05-08 10:25:39 +02:00
|
|
|
|
override private[akka] def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef =
|
2013-03-26 18:17:50 +01:00
|
|
|
|
local.actorFor(ref, path)
|
|
|
|
|
|
|
2016-05-27 16:45:48 +02:00
|
|
|
|
def rootGuardianAt(address: Address): ActorRef = {
|
2013-03-26 18:17:50 +01:00
|
|
|
|
if (hasAddress(address)) rootGuardian
|
2016-05-27 16:45:48 +02:00
|
|
|
|
else try {
|
|
|
|
|
|
new RemoteActorRef(transport, transport.localAddressForRemote(address),
|
|
|
|
|
|
RootActorPath(address), Nobody, props = None, deploy = None)
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
|
log.error(e, "No root guardian at [{}]", address)
|
|
|
|
|
|
new EmptyLocalActorRef(this, RootActorPath(address), eventStream)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2013-03-26 18:17:50 +01:00
|
|
|
|
|
2013-02-08 13:13:52 +01:00
|
|
|
|
/**
|
2012-11-21 15:58:01 +01:00
|
|
|
|
* INTERNAL API
|
2013-03-26 18:17:50 +01:00
|
|
|
|
* Called in deserialization of incoming remote messages where the correct local address is known.
|
2012-11-21 15:58:01 +01:00
|
|
|
|
*/
|
2013-03-26 18:17:50 +01:00
|
|
|
|
private[akka] def resolveActorRefWithLocalAddress(path: String, localAddress: Address): InternalActorRef = {
|
2013-03-13 16:01:57 +01:00
|
|
|
|
path match {
|
|
|
|
|
|
case ActorPathExtractor(address, elems) ⇒
|
2016-06-23 11:58:54 +02:00
|
|
|
|
if (hasAddress(address))
|
|
|
|
|
|
local.resolveActorRef(rootGuardian, elems)
|
2016-05-27 16:45:48 +02:00
|
|
|
|
else try {
|
2016-06-23 11:58:54 +02:00
|
|
|
|
new RemoteActorRef(transport, localAddress, RootActorPath(address) / elems, Nobody, props = None, deploy = None)
|
2016-05-27 16:45:48 +02:00
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
|
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
|
|
|
|
|
|
new EmptyLocalActorRef(this, RootActorPath(address) / elems, eventStream)
|
|
|
|
|
|
}
|
2013-03-13 16:01:57 +01:00
|
|
|
|
case _ ⇒
|
2013-03-26 18:17:50 +01:00
|
|
|
|
log.debug("resolve of unknown path [{}] failed", path)
|
|
|
|
|
|
deadLetters
|
2013-03-13 16:01:57 +01:00
|
|
|
|
}
|
2012-11-21 15:58:01 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
2013-03-26 18:17:50 +01:00
|
|
|
|
def resolveActorRef(path: String): ActorRef = path match {
|
|
|
|
|
|
case ActorPathExtractor(address, elems) ⇒
|
|
|
|
|
|
if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems)
|
|
|
|
|
|
else {
|
|
|
|
|
|
val rootPath = RootActorPath(address) / elems
|
|
|
|
|
|
try {
|
|
|
|
|
|
new RemoteActorRef(transport, transport.localAddressForRemote(address),
|
|
|
|
|
|
rootPath, Nobody, props = None, deploy = None)
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒
|
2016-05-27 16:45:48 +02:00
|
|
|
|
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
|
2013-03-26 18:17:50 +01:00
|
|
|
|
new EmptyLocalActorRef(this, rootPath, eventStream)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
log.debug("resolve of unknown path [{}] failed", path)
|
|
|
|
|
|
deadLetters
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def resolveActorRef(path: ActorPath): ActorRef = {
|
|
|
|
|
|
if (hasAddress(path.address)) local.resolveActorRef(rootGuardian, path.elements)
|
|
|
|
|
|
else try {
|
|
|
|
|
|
new RemoteActorRef(transport, transport.localAddressForRemote(path.address),
|
|
|
|
|
|
path, Nobody, props = None, deploy = None)
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case NonFatal(e) ⇒
|
2016-05-27 16:45:48 +02:00
|
|
|
|
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
|
2013-03-26 18:17:50 +01:00
|
|
|
|
new EmptyLocalActorRef(this, path, eventStream)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2011-09-15 10:20:18 +02:00
|
|
|
|
|
2011-09-19 14:43:28 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Using (checking out) actor on a specific node.
|
|
|
|
|
|
*/
|
2013-03-26 18:17:50 +01:00
|
|
|
|
def useActorOnNode(ref: ActorRef, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = {
|
|
|
|
|
|
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, ref.path)
|
2011-09-19 14:43:28 +02:00
|
|
|
|
|
2011-12-09 00:02:27 +01:00
|
|
|
|
// we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor
|
2013-03-26 18:17:50 +01:00
|
|
|
|
// actorSelection can't be used here because then it is not guaranteed that the actor is created
|
|
|
|
|
|
// before someone can send messages to it
|
|
|
|
|
|
resolveActorRef(RootActorPath(ref.path.address) / "remote") !
|
|
|
|
|
|
DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor)
|
2013-04-15 09:26:51 +02:00
|
|
|
|
|
|
|
|
|
|
remoteDeploymentWatcher ! RemoteDeploymentWatcher.WatchRemote(ref, supervisor)
|
2011-09-15 10:20:18 +02:00
|
|
|
|
}
|
2012-02-02 09:40:17 +01:00
|
|
|
|
|
|
|
|
|
|
def getExternalAddressFor(addr: Address): Option[Address] = {
|
|
|
|
|
|
addr match {
|
2012-12-14 13:45:55 +01:00
|
|
|
|
case _ if hasAddress(addr) ⇒ Some(local.rootPath.address)
|
|
|
|
|
|
case Address(_, _, Some(_), Some(_)) ⇒ try Some(transport.localAddressForRemote(addr)) catch { case NonFatal(_) ⇒ None }
|
|
|
|
|
|
case _ ⇒ None
|
2012-02-02 09:40:17 +01:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2012-10-26 11:42:18 +02:00
|
|
|
|
|
2012-12-18 16:08:06 +01:00
|
|
|
|
def getDefaultAddress: Address = transport.defaultAddress
|
2012-11-22 14:40:54 +01:00
|
|
|
|
|
2012-12-07 16:03:04 +01:00
|
|
|
|
private def hasAddress(address: Address): Boolean =
|
2012-11-21 11:44:39 +01:00
|
|
|
|
address == local.rootPath.address || address == rootPath.address || transport.addresses(address)
|
2012-10-26 11:42:18 +02:00
|
|
|
|
|
2013-04-18 17:35:43 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses.
|
2016-04-14 10:26:09 +02:00
|
|
|
|
*
|
2013-04-18 17:35:43 +02:00
|
|
|
|
* @param address Address of the remote system to be quarantined
|
2014-02-13 11:27:40 +01:00
|
|
|
|
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
|
|
|
|
|
|
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
|
2013-04-18 17:35:43 +02:00
|
|
|
|
*/
|
2014-02-13 11:27:40 +01:00
|
|
|
|
def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid)
|
2013-04-15 09:26:51 +02:00
|
|
|
|
|
2011-09-15 10:20:18 +02:00
|
|
|
|
}
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2012-05-24 11:44:39 +02:00
|
|
|
|
private[akka] trait RemoteRef extends ActorRefScope {
|
2012-05-03 21:14:47 +02:00
|
|
|
|
final def isLocal = false
|
2011-12-29 16:27:32 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-20 12:40:56 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
|
*/
|
|
|
|
|
|
private[remote] sealed abstract class LargeMessageDestinationFlag
|
|
|
|
|
|
private[remote] case object RegularDestination extends LargeMessageDestinationFlag
|
|
|
|
|
|
private[remote] case object LargeDestination extends LargeMessageDestinationFlag
|
|
|
|
|
|
|
2011-10-13 17:42:26 +02:00
|
|
|
|
/**
|
2013-02-08 13:13:52 +01:00
|
|
|
|
* INTERNAL API
|
2011-10-13 17:42:26 +02:00
|
|
|
|
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
|
|
|
|
|
|
* This reference is network-aware (remembers its origin) and immutable.
|
|
|
|
|
|
*/
|
2011-12-07 16:29:12 +01:00
|
|
|
|
private[akka] class RemoteActorRef private[akka] (
|
2016-06-02 14:06:57 +02:00
|
|
|
|
remote: RemoteTransport,
|
2012-09-12 11:18:42 +02:00
|
|
|
|
val localAddressToUse: Address,
|
2016-06-02 14:06:57 +02:00
|
|
|
|
val path: ActorPath,
|
|
|
|
|
|
val getParent: InternalActorRef,
|
|
|
|
|
|
props: Option[Props],
|
|
|
|
|
|
deploy: Option[Deploy])
|
2011-12-29 16:27:32 +01:00
|
|
|
|
extends InternalActorRef with RemoteRef {
|
2011-11-08 11:56:46 +01:00
|
|
|
|
|
2016-05-27 16:45:48 +02:00
|
|
|
|
remote match {
|
|
|
|
|
|
case t: ArteryTransport ⇒
|
|
|
|
|
|
// detect mistakes such as using "akka.tcp" with Artery
|
|
|
|
|
|
if (path.address.protocol != t.localAddress.address.protocol)
|
|
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
|
|
s"Wrong protocol of [${path}], expected [${t.localAddress.address.protocol}]")
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
}
|
|
|
|
|
|
@volatile private[remote] var cachedAssociation: artery.Association = null
|
2016-04-14 10:26:09 +02:00
|
|
|
|
|
2016-05-20 12:40:56 +02:00
|
|
|
|
// used by artery to direct messages to a separate stream for large messages
|
2016-05-27 16:45:48 +02:00
|
|
|
|
@volatile private[remote] var cachedLargeMessageDestinationFlag: LargeMessageDestinationFlag = null
|
2016-05-20 12:40:56 +02:00
|
|
|
|
|
2011-12-09 00:02:27 +01:00
|
|
|
|
def getChild(name: Iterator[String]): InternalActorRef = {
|
2011-12-09 18:07:42 +01:00
|
|
|
|
val s = name.toStream
|
|
|
|
|
|
s.headOption match {
|
|
|
|
|
|
case None ⇒ this
|
|
|
|
|
|
case Some("..") ⇒ getParent getChild name
|
2013-03-06 15:10:59 +01:00
|
|
|
|
case _ ⇒ new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None)
|
2011-12-09 18:07:42 +01:00
|
|
|
|
}
|
2011-12-09 00:02:27 +01:00
|
|
|
|
}
|
2011-12-03 11:06:38 +01:00
|
|
|
|
|
2015-05-08 10:37:41 +02:00
|
|
|
|
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
|
|
|
|
|
|
override private[akka] def isTerminated: Boolean = false
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2016-06-08 12:40:40 +02:00
|
|
|
|
private def handleException(message: Any, sender: ActorRef): Catcher[Unit] = {
|
2013-01-27 12:56:35 +01:00
|
|
|
|
case e: InterruptedException ⇒
|
|
|
|
|
|
remote.system.eventStream.publish(Error(e, path.toString, getClass, "interrupted during message send"))
|
2016-06-08 12:40:40 +02:00
|
|
|
|
remote.system.deadLetters.tell(message, sender)
|
2013-01-27 12:56:35 +01:00
|
|
|
|
Thread.currentThread.interrupt()
|
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
|
remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send"))
|
2016-06-08 12:40:40 +02:00
|
|
|
|
remote.system.deadLetters.tell(message, sender)
|
2013-01-27 12:56:35 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
2015-02-19 15:49:02 +01:00
|
|
|
|
/**
|
|
|
|
|
|
* Determine if a watch/unwatch message must be handled by the remoteWatcher actor, or sent to this remote ref
|
|
|
|
|
|
*/
|
|
|
|
|
|
def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef) =
|
|
|
|
|
|
if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) {
|
|
|
|
|
|
provider.log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path)
|
|
|
|
|
|
false // Not managed by the remote watcher, so not reliable to communication failure or remote system crash
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted
|
|
|
|
|
|
// If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers
|
|
|
|
|
|
watcher != provider.remoteWatcher && watchee == this
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2013-04-15 09:26:51 +02:00
|
|
|
|
def sendSystemMessage(message: SystemMessage): Unit =
|
|
|
|
|
|
try {
|
2015-02-19 15:49:02 +01:00
|
|
|
|
//send to remote, unless watch message is intercepted by the remoteWatcher
|
|
|
|
|
|
message match {
|
|
|
|
|
|
case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher) ⇒
|
|
|
|
|
|
provider.remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher)
|
|
|
|
|
|
//Unwatch has a different signature, need to pattern match arguments against InternalActorRef
|
|
|
|
|
|
case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) ⇒
|
|
|
|
|
|
provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher)
|
2016-06-05 15:40:06 +02:00
|
|
|
|
case _ ⇒ remote.send(message, OptionVal.None, this)
|
2015-02-19 15:49:02 +01:00
|
|
|
|
}
|
2016-06-08 12:40:40 +02:00
|
|
|
|
} catch handleException(message, Actor.noSender)
|
2011-10-18 15:39:26 +02:00
|
|
|
|
|
2013-02-20 11:42:29 +01:00
|
|
|
|
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
|
|
|
|
|
|
if (message == null) throw new InvalidMessageException("Message is null")
|
2016-06-08 12:40:40 +02:00
|
|
|
|
try remote.send(message, OptionVal(sender), this) catch handleException(message, sender)
|
2013-02-20 11:42:29 +01:00
|
|
|
|
}
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2013-03-06 15:10:59 +01:00
|
|
|
|
override def provider: RemoteActorRefProvider = remote.provider
|
|
|
|
|
|
|
|
|
|
|
|
def start(): Unit =
|
2013-03-26 18:17:50 +01:00
|
|
|
|
if (props.isDefined && deploy.isDefined) remote.provider.useActorOnNode(this, props.get, deploy.get, getParent)
|
2012-10-10 14:19:15 +02:00
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def suspend(): Unit = sendSystemMessage(Suspend())
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2012-08-08 14:13:52 +02:00
|
|
|
|
def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure))
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
2011-12-08 14:44:05 +01:00
|
|
|
|
def stop(): Unit = sendSystemMessage(Terminate())
|
|
|
|
|
|
|
|
|
|
|
|
def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause))
|
2011-10-13 17:42:26 +02:00
|
|
|
|
|
|
|
|
|
|
@throws(classOf[java.io.ObjectStreamException])
|
2013-03-25 08:42:48 +01:00
|
|
|
|
private def writeReplace(): AnyRef = SerializedActorRef(this)
|
2012-10-15 13:33:39 +02:00
|
|
|
|
}
|