From 30603d194a65b7cd37c2f7d5bb1f39da7e709bea Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 23 Sep 2016 12:30:54 +0200 Subject: [PATCH] log-received-messages and log-sent-messages, #21506 (#21508) * log-received-messages and log-sent-messages, #21506 * also duplicate the trusted settings to artery section --- .../akka/remote/artery/CodecBenchmark.scala | 4 +- akka-remote/src/main/resources/reference.conf | 21 +++++++- .../akka/remote/artery/ArterySettings.scala | 7 +++ .../akka/remote/artery/ArteryTransport.scala | 8 ++- .../scala/akka/remote/artery/Codecs.scala | 16 +++++- .../akka/remote/artery/InboundEnvelope.scala | 7 +-- .../remote/artery/MessageDispatcher.scala | 50 +++++++++++-------- .../akka/remote/artery/UntrustedSpec.scala | 4 +- 8 files changed, 83 insertions(+), 34 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 0e6927c705..f0903d4925 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -140,7 +140,7 @@ class CodecBenchmark { val N = 100000 val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) @@ -197,7 +197,7 @@ class CodecBenchmark { val N = 100000 val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false)) val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) val provider = RARP(system).provider diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index ef2116885f..447efb3c8b 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -89,7 +89,7 @@ akka { artery { - ### FIXME: Temporary switch for the PoC + # Enable the new remoting with this flag enabled = off # Canonical address is the address other clients should connect to. @@ -150,6 +150,25 @@ akka { # If set to a nonempty string artery will use the given dispatcher for # its internal actors otherwise the default dispatcher is used. use-dispatcher = "akka.remote.default-remote-dispatcher" + + # Enable untrusted mode, which discards inbound system messages, PossiblyHarmful and + # ActorSelection messages. E.g. remote watch and remote deployment will not work. + # ActorSelection messages can be enabled for specific paths with the trusted-selection-paths + untrusted-mode = off + + # When 'untrusted-mode=on' inbound actor selections are by default discarded. + # Actors with paths defined in this white list are granted permission to receive actor + # selections messages. + # E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"] + trusted-selection-paths = [] + + # If this is "on", all inbound remote messages will be logged at DEBUG level, + # if off then they are not logged + log-received-messages = off + + # If this is "on", all outbound remote messages will be logged at DEBUG level, + # if off then they are not logged + log-sent-messages = off advanced { # For enabling testing features, such as blackhole in akka-remote-testkit. diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index dcb54fa34d..3e68421f0b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import akka.japi.Util.immutableSeq import akka.ConfigurationException import akka.event.Logging import akka.event.Logging.LogLevel @@ -55,6 +56,12 @@ private[akka] final class ArterySettings private (config: Config) { } val Dispatcher = getString("use-dispatcher") + val UntrustedMode: Boolean = getBoolean("untrusted-mode") + val TrustedSelectionPaths: Set[String] = immutableSeq(getStringList("trusted-selection-paths")).toSet + + val LogReceive: Boolean = getBoolean("log-received-messages") + val LogSend: Boolean = getBoolean("log-sent-messages") + object Advanced { val config = getConfig("advanced") import config._ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 38845a4e3d..0d50d89305 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -959,14 +959,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R new InboundCompressionsImpl(system, inboundContext, settings.Advanced.Compression) def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = - Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool)) + Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool, settings.LogSend)) def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, createFlightRecorderEventSink())) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ - messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender) + val originAddress = m.association match { + case OptionVal.Some(a) ⇒ OptionVal.Some(a.remoteAddress) + case OptionVal.None ⇒ OptionVal.None + } + messageDispatcher.dispatch(m.recipient.get, m.message, m.sender, originAddress) m match { case r: ReusableInboundEnvelope ⇒ inboundEnvelopePool.release(r) case _ ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 3e6ae64836..f2f90d51db 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -47,7 +47,8 @@ private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ExtendedActorSystem, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], - bufferPool: EnvelopeBufferPool) + bufferPool: EnvelopeBufferPool, + debugLogSend: Boolean) extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.ChangeOutboundCompression] { import Encoder._ @@ -89,6 +90,12 @@ private[remote] class Encoder( override protected def logSource = classOf[Encoder] + private var debugLogSendEnabled = false + + override def preStart(): Unit = { + debugLogSendEnabled = debugLogSend && log.isDebugEnabled + } + override def onPush(): Unit = { val outboundEnvelope = grab(in) val envelope = bufferPool.acquire() @@ -120,6 +127,13 @@ private[remote] class Encoder( } finally Serialization.currentTransportInformation.value = oldValue envelope.byteBuffer.flip() + + if (debugLogSendEnabled) + log.debug( + "sending remote message [{}] to [{}] from [{}]", + Logging.messageClassName(outboundEnvelope.message), + outboundEnvelope.recipient.getOrElse(""), outboundEnvelope.sender.getOrElse("")) + push(out, envelope) } catch { diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala index b9d17362ae..8caae6576b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala @@ -33,7 +33,6 @@ private[remote] object InboundEnvelope { */ private[remote] trait InboundEnvelope { def recipient: OptionVal[InternalActorRef] - def recipientAddress: Address def sender: OptionVal[ActorRef] def originUid: Long def association: OptionVal[OutboundContext] @@ -67,7 +66,6 @@ private[remote] object ReusableInboundEnvelope { */ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { private var _recipient: OptionVal[InternalActorRef] = OptionVal.None - private var _recipientAddress: Address = null private var _sender: OptionVal[ActorRef] = OptionVal.None private var _originUid: Long = 0L private var _association: OptionVal[OutboundContext] = OptionVal.None @@ -78,7 +76,6 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { private var _envelopeBuffer: EnvelopeBuffer = null override def recipient: OptionVal[InternalActorRef] = _recipient - override def recipientAddress: Address = _recipientAddress override def sender: OptionVal[ActorRef] = _sender override def originUid: Long = _originUid override def association: OptionVal[OutboundContext] = _association @@ -107,7 +104,6 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { def clear(): Unit = { _recipient = OptionVal.None - _recipientAddress = null _message = null _sender = OptionVal.None _originUid = 0L @@ -124,7 +120,6 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { envelopeBuffer: EnvelopeBuffer, association: OptionVal[OutboundContext]): InboundEnvelope = { _recipient = recipient - _recipientAddress = recipientAddress _sender = sender _originUid = originUid _serializer = serializer @@ -136,5 +131,5 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { } override def toString: String = - s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)" + s"InboundEnvelope($recipient, $message, $sender, $originUid, $association)" } diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index 4a61c4b191..7eed45c201 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -17,6 +17,7 @@ import akka.event.Logging import akka.remote.RemoteActorRefProvider import akka.remote.RemoteRef import akka.util.OptionVal +import akka.event.LoggingReceive /** * INTERNAL API @@ -27,58 +28,67 @@ private[akka] class MessageDispatcher( private val remoteDaemon = provider.remoteDaemon private val log = Logging(system, getClass.getName) + private val debugLogEnabled = log.isDebugEnabled def dispatch( - recipient: InternalActorRef, - recipientAddress: Address, - message: AnyRef, - senderOption: OptionVal[ActorRef]): Unit = { + recipient: InternalActorRef, + message: AnyRef, + senderOption: OptionVal[ActorRef], + originAddress: OptionVal[Address]): Unit = { - import provider.remoteSettings._ + import provider.remoteSettings.Artery._ + import Logging.messageClassName val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val originalReceiver = recipient.path - def msgLog = s"RemoteMessage: [$message] to [$recipient]<+[$originalReceiver] from [$sender()]" - recipient match { case `remoteDaemon` ⇒ - if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") - else { - if (LogReceive) log.debug("received daemon message {}", msgLog) + if (UntrustedMode) { + if (debugLogEnabled) log.debug( + "dropping daemon message [{}] in untrusted mode", + messageClassName(message)) + } else { + if (LogReceive && debugLogEnabled) log.debug( + "received daemon message [{}] from [{}]", + messageClassName(message), senderOption.getOrElse(originAddress.getOrElse(""))) remoteDaemon ! message } case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ - if (LogReceive) log.debug("received local message {}", msgLog) + if (LogReceive && debugLogEnabled) log.debug( + "received message [{}] to [{}] from [{}]", + messageClassName(message), recipient, senderOption.getOrElse("")) message match { case sel: ActorSelectionMessage ⇒ if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) || - sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) - log.debug( + sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) { + if (debugLogEnabled) log.debug( "operating in UntrustedMode, dropping inbound actor selection to [{}], " + "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", sel.elements.mkString("/", "/", "")) - else + } else // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor ActorSelection.deliverSelection(l, sender, sel) case msg: PossiblyHarmful if UntrustedMode ⇒ - log.debug( - "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", - Logging.messageClassName(msg)) + if (debugLogEnabled) log.debug( + "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}] to [{}] from [{}]", + messageClassName(msg), recipient, senderOption.getOrElse(originAddress.getOrElse(""))) case msg: SystemMessage ⇒ l.sendSystemMessage(msg) case msg ⇒ l.!(msg)(sender) } case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ - if (LogReceive) log.debug("received remote-destined message {}", msgLog) + if (LogReceive && debugLogEnabled) log.debug( + "received remote-destined message [{}] to [{}] from [{}]", + messageClassName(message), recipient, senderOption.getOrElse(originAddress.getOrElse(""))) // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(message)(sender) case r ⇒ log.error( - "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", - Logging.messageClassName(message), r, recipientAddress, provider.transport.addresses.mkString(", ")) + "dropping message [{}] for unknown recipient [{}] from [{}]", + messageClassName(message), r, senderOption.getOrElse(originAddress.getOrElse(""))) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala index 2ed521e292..2d901f492e 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala @@ -63,8 +63,8 @@ object UntrustedSpec { class UntrustedSpec extends AkkaSpec(""" akka.actor.provider = remote - akka.remote.untrusted-mode = on - akka.remote.trusted-selection-paths = ["/user/receptionist", ] + akka.remote.artery.untrusted-mode = on + akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ] akka.remote.artery.enabled = on akka.remote.artery.canonical.hostname = localhost akka.remote.artery.canonical.port = 0