Disable remote watch and remote deployment outside Cluster (#27126)

This commit is contained in:
Helena Edelson 2019-06-21 05:15:36 -07:00 committed by Arnout Engelen
parent 8757e35b7e
commit 611e32de91
39 changed files with 1022 additions and 179 deletions

View file

@ -4,32 +4,38 @@
package akka.remote
import akka.Done
import akka.actor._
import akka.dispatch.sysmsg._
import akka.event.{ EventStream, Logging, LoggingAdapter }
import akka.event.Logging.Error
import akka.pattern.pipe
import scala.util.control.NonFatal
import scala.util.Failure
import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone }
import scala.util.control.Exception.Catcher
import scala.concurrent.Future
import scala.util.Failure
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
import akka.ConfigurationException
import akka.Done
import akka.actor.SystemGuardian.RegisterTerminationHook
import akka.actor.SystemGuardian.TerminationHook
import akka.actor.SystemGuardian.TerminationHookDone
import akka.actor._
import akka.annotation.InternalApi
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.artery.ArteryTransport
import akka.remote.artery.aeron.ArteryAeronUdpTransport
import akka.dispatch.RequiresMessageQueue
import akka.dispatch.UnboundedMessageQueueSemantics
import akka.dispatch.sysmsg._
import akka.event.EventStream
import akka.event.Logging
import akka.event.Logging.Error
import akka.event.LoggingAdapter
import akka.pattern.pipe
import akka.remote.artery.ArterySettings
import akka.remote.artery.ArterySettings.AeronUpd
import akka.util.{ ErrorMessages, OptionVal }
import akka.remote.artery.ArteryTransport
import akka.remote.artery.OutboundEnvelope
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.remote.serialization.ActorRefResolveThreadLocalCache
import akka.remote.artery.aeron.ArteryAeronUdpTransport
import akka.remote.artery.tcp.ArteryTcpTransport
import akka.remote.serialization.ActorRefResolveThreadLocalCache
import akka.serialization.Serialization
import akka.util.ErrorMessages
import akka.util.OptionVal
import akka.util.unused
import com.github.ghik.silencer.silent
/**
@ -152,6 +158,13 @@ private[akka] class RemoteActorRefProvider(
val remoteSettings: RemoteSettings = new RemoteSettings(settings.config)
private[akka] final val hasClusterOrUseUnsafe = settings.HasCluster || remoteSettings.UseUnsafeRemoteFeaturesWithoutCluster
private val warnOnUnsafeRemote =
!settings.HasCluster &&
!remoteSettings.UseUnsafeRemoteFeaturesWithoutCluster &&
remoteSettings.WarnUnsafeWatchWithoutCluster
override val deployer: Deployer = createDeployer
/**
@ -160,7 +173,7 @@ private[akka] class RemoteActorRefProvider(
*/
protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
private val local = new LocalActorRefProvider(
private[akka] val local = new LocalActorRefProvider(
systemName,
settings,
eventStream,
@ -194,10 +207,10 @@ private[akka] class RemoteActorRefProvider(
// This actor ensures the ordering of shutdown between remoteDaemon and the transport
@volatile private var remotingTerminator: ActorRef = _
@volatile private var _remoteWatcher: ActorRef = _
private[akka] def remoteWatcher = _remoteWatcher
@volatile private var _remoteWatcher: Option[ActorRef] = None
private[akka] def remoteWatcher: Option[ActorRef] = _remoteWatcher
@volatile private var remoteDeploymentWatcher: ActorRef = _
@volatile private var remoteDeploymentWatcher: Option[ActorRef] = None
@volatile private var actorRefResolveThreadLocalCache: ActorRefResolveThreadLocalCache = _
@ -239,13 +252,14 @@ private[akka] class RemoteActorRefProvider(
_log = Logging.withMarker(eventStream, getClass.getName)
showDirectUseWarningIfRequired()
warnIfDirectUse()
warnIfUseUnsafeWithoutCluster()
// this enables reception of remote requests
transport.start()
_remoteWatcher = createRemoteWatcher(system)
remoteDeploymentWatcher = createRemoteDeploymentWatcher(system)
_remoteWatcher = createOrNone[ActorRef](createRemoteWatcher(system))
remoteDeploymentWatcher = createOrNone[ActorRef](createRemoteDeploymentWatcher(system))
}
private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = {
@ -280,16 +294,13 @@ private[akka] class RemoteActorRefProvider(
}
}
/** Will call the provided `func` if using Cluster or explicitly enabled unsafe remote features. */
private def createOrNone[T](func: => T): Option[T] = if (hasClusterOrUseUnsafe) Some(func) else None
protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = {
import remoteSettings._
val failureDetector = createRemoteWatcherFailureDetector(system)
system.systemActorOf(
configureDispatcher(
RemoteWatcher.props(
failureDetector,
heartbeatInterval = WatchHeartBeatInterval,
unreachableReaperInterval = WatchUnreachableReaperInterval,
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter)),
configureDispatcher(RemoteWatcher.props(remoteSettings, createRemoteWatcherFailureDetector(system))),
"remote-watcher")
}
@ -309,7 +320,7 @@ private[akka] class RemoteActorRefProvider(
"remote-deployment-watcher")
/** Can be overridden when using RemoteActorRefProvider as a superclass rather than directly */
protected def showDirectUseWarningIfRequired() = {
protected def warnIfDirectUse() = {
if (remoteSettings.WarnAboutDirectUse) {
log.warning(
"Using the 'remote' ActorRefProvider directly, which is a low-level layer. " +
@ -317,6 +328,38 @@ private[akka] class RemoteActorRefProvider(
}
}
// Log on `init` similar to `warnIfDirectUse`.
private[akka] def warnIfUseUnsafeWithoutCluster(): Unit =
if (!settings.HasCluster) {
val msg =
if (remoteSettings.UseUnsafeRemoteFeaturesWithoutCluster)
"`akka.remote.use-unsafe-remote-features-without-cluster` has been enabled."
else
"Using Akka Cluster is recommended if you need remote watch and deploy."
log.warning(s"Cluster not in use - {}", msg)
}
protected def warnOnUnsafe(message: String): Unit =
if (warnOnUnsafeRemote) log.warning(message)
else log.debug(message)
/** Logs if deathwatch message is intentionally dropped. To disable
* warnings set `akka.remote.warn-unsafe-watch-without-cluster` to `off`
* or use Akka Cluster.
*/
private[akka] def warnIfUnsafeDeathwatchWithoutCluster(watchee: ActorRef, watcher: ActorRef, action: String): Unit =
warnOnUnsafe(s"Dropped remote $action: disabled for [$watcher -> $watchee]")
/** If `warnOnUnsafeRemote`, this logs a warning if `actorOf` falls back to `LocalActorRef`
* versus creating a `RemoteActorRef`. Override to log a more granular reason if using
* `RemoteActorRefProvider` as a superclass.
*/
protected def warnIfNotRemoteActorRef(path: ActorPath): Unit =
warnOnUnsafe(s"Remote deploy of [$path] is not allowed, falling back to local.")
/** Override to add any additional checks if using `RemoteActorRefProvider` as a superclass. */
protected def shouldCreateRemoteActorRef(@unused system: ActorSystem, @unused address: Address): Boolean = true
def actorOf(
system: ActorSystemImpl,
props: Props,
@ -383,6 +426,11 @@ private[akka] class RemoteActorRefProvider(
}
}
def warnThenFallback() = {
warnIfNotRemoteActorRef(path)
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
}
(Iterator(props.deploy) ++ deployment.iterator).reduce((a, b) => b.withFallback(a)) match {
case d @ Deploy(_, _, _, RemoteScope(address), _, _) =>
if (hasAddress(address)) {
@ -392,27 +440,29 @@ private[akka] class RemoteActorRefProvider(
s"${ErrorMessages.RemoteDeploymentConfigErrorPrefix} for local-only Props at [$path]")
} else
try {
try {
// for consistency we check configuration of dispatcher and mailbox locally
val dispatcher = system.dispatchers.lookup(props.dispatcher)
system.mailboxes.getMailboxType(props, dispatcher.configurator.config)
} catch {
case NonFatal(e) =>
throw new ConfigurationException(
s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]",
e)
}
val localAddress = transport.localAddressForRemote(address)
val rpath =
(RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements)
.withUid(path.uid)
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
if (hasClusterOrUseUnsafe && shouldCreateRemoteActorRef(system, address)) {
try {
// for consistency we check configuration of dispatcher and mailbox locally
val dispatcher = system.dispatchers.lookup(props.dispatcher)
system.mailboxes.getMailboxType(props, dispatcher.configurator.config)
} catch {
case NonFatal(e) =>
throw new ConfigurationException(
s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]",
e)
}
val localAddress = transport.localAddressForRemote(address)
val rpath =
(RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements)
.withUid(path.uid)
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
} else warnThenFallback()
} catch {
case NonFatal(e) => throw new IllegalArgumentException(s"remote deployment failed for [$path]", e)
}
case _ =>
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
warnThenFallback()
}
}
@ -464,7 +514,7 @@ private[akka] class RemoteActorRefProvider(
}
def resolveActorRef(path: String): ActorRef = {
// using thread local LRU cache, which will call internalRresolveActorRef
// using thread local LRU cache, which will call internalResolveActorRef
// if the value is not cached
actorRefResolveThreadLocalCache match {
case null => internalResolveActorRef(path) // not initialized yet
@ -521,17 +571,20 @@ private[akka] class RemoteActorRefProvider(
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(ref: ActorRef, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = {
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, ref.path)
def useActorOnNode(ref: ActorRef, props: Props, deploy: Deploy, supervisor: ActorRef): Unit =
remoteDeploymentWatcher match {
case Some(watcher) =>
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, ref.path)
// we dont wait for the ACK, because the remote end will process this command before any other message to the new actor
// actorSelection can't be used here because then it is not guaranteed that the actor is created
// before someone can send messages to it
resolveActorRef(RootActorPath(ref.path.address) / "remote") !
DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor)
// we dont wait for the ACK, because the remote end will process this command before any other message to the new actor
// actorSelection can't be used here because then it is not guaranteed that the actor is created
// before someone can send messages to it
resolveActorRef(RootActorPath(ref.path.address) / "remote") !
DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor)
remoteDeploymentWatcher ! RemoteDeploymentWatcher.WatchRemote(ref, supervisor)
}
watcher ! RemoteDeploymentWatcher.WatchRemote(ref, supervisor)
case None => warnIfUseUnsafeWithoutCluster()
}
def getExternalAddressFor(addr: Address): Option[Address] = {
addr match {
@ -601,8 +654,7 @@ private[akka] class RemoteActorRef private[akka] (
case t: ArteryTransport =>
// detect mistakes such as using "akka.tcp" with Artery
if (path.address.protocol != t.localAddress.address.protocol)
throw new IllegalArgumentException(
s"Wrong protocol of [${path}], expected [${t.localAddress.address.protocol}]")
throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]")
case _ =>
}
@volatile private[remote] var cachedAssociation: artery.Association = null
@ -636,10 +688,12 @@ private[akka] class RemoteActorRef private[akka] (
/**
* Determine if a watch/unwatch message must be handled by the remoteWatcher actor, or sent to this remote ref
*/
def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef) = {
def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef): Boolean = {
// If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted
// If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers
watcher != provider.remoteWatcher && watchee == this
val intercept = provider.remoteWatcher.exists(remoteWatcher => watcher != remoteWatcher) && watchee == this
if (intercept) provider.warnIfUnsafeDeathwatchWithoutCluster(watchee, watcher, "remote Watch/Unwatch")
intercept
}
def sendSystemMessage(message: SystemMessage): Unit =
@ -647,10 +701,10 @@ private[akka] class RemoteActorRef private[akka] (
//send to remote, unless watch message is intercepted by the remoteWatcher
message match {
case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher) =>
provider.remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher)
provider.remoteWatcher.foreach(_ ! RemoteWatcher.WatchRemote(watchee, watcher))
//Unwatch has a different signature, need to pattern match arguments against InternalActorRef
case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) =>
provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher)
provider.remoteWatcher.foreach(_ ! RemoteWatcher.UnwatchRemote(watchee, watcher))
case _ => remote.send(message, OptionVal.None, this)
}
} catch handleException(message, Actor.noSender)