log-received-messages and log-sent-messages, #21506 (#21508)

* log-received-messages and log-sent-messages, #21506

* also duplicate the trusted settings to artery section
This commit is contained in:
Patrik Nordwall 2016-09-23 12:30:54 +02:00 committed by GitHub
parent 4d8b8578f4
commit 30603d194a
8 changed files with 83 additions and 34 deletions

View file

@ -140,7 +140,7 @@ class CodecBenchmark {
val N = 100000
val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool))
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))
@ -197,7 +197,7 @@ class CodecBenchmark {
val N = 100000
val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool))
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false))
val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val provider = RARP(system).provider

View file

@ -89,7 +89,7 @@ akka {
artery {
### FIXME: Temporary switch for the PoC
# Enable the new remoting with this flag
enabled = off
# Canonical address is the address other clients should connect to.
@ -150,6 +150,25 @@ akka {
# If set to a nonempty string artery will use the given dispatcher for
# its internal actors otherwise the default dispatcher is used.
use-dispatcher = "akka.remote.default-remote-dispatcher"
# Enable untrusted mode, which discards inbound system messages, PossiblyHarmful and
# ActorSelection messages. E.g. remote watch and remote deployment will not work.
# ActorSelection messages can be enabled for specific paths with the trusted-selection-paths
untrusted-mode = off
# When 'untrusted-mode=on' inbound actor selections are by default discarded.
# Actors with paths defined in this white list are granted permission to receive actor
# selections messages.
# E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"]
trusted-selection-paths = []
# If this is "on", all inbound remote messages will be logged at DEBUG level,
# if off then they are not logged
log-received-messages = off
# If this is "on", all outbound remote messages will be logged at DEBUG level,
# if off then they are not logged
log-sent-messages = off
advanced {
# For enabling testing features, such as blackhole in akka-remote-testkit.

View file

@ -3,6 +3,7 @@
*/
package akka.remote.artery
import akka.japi.Util.immutableSeq
import akka.ConfigurationException
import akka.event.Logging
import akka.event.Logging.LogLevel
@ -55,6 +56,12 @@ private[akka] final class ArterySettings private (config: Config) {
}
val Dispatcher = getString("use-dispatcher")
val UntrustedMode: Boolean = getBoolean("untrusted-mode")
val TrustedSelectionPaths: Set[String] = immutableSeq(getStringList("trusted-selection-paths")).toSet
val LogReceive: Boolean = getBoolean("log-received-messages")
val LogSend: Boolean = getBoolean("log-sent-messages")
object Advanced {
val config = getConfig("advanced")
import config._

View file

@ -959,14 +959,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
new InboundCompressionsImpl(system, inboundContext, settings.Advanced.Compression)
def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] =
Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool))
Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool, settings.LogSend))
def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] =
Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool,
createFlightRecorderEventSink()))
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender)
val originAddress = m.association match {
case OptionVal.Some(a) OptionVal.Some(a.remoteAddress)
case OptionVal.None OptionVal.None
}
messageDispatcher.dispatch(m.recipient.get, m.message, m.sender, originAddress)
m match {
case r: ReusableInboundEnvelope inboundEnvelopePool.release(r)
case _

View file

@ -47,7 +47,8 @@ private[remote] class Encoder(
uniqueLocalAddress: UniqueAddress,
system: ExtendedActorSystem,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
bufferPool: EnvelopeBufferPool)
bufferPool: EnvelopeBufferPool,
debugLogSend: Boolean)
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.ChangeOutboundCompression] {
import Encoder._
@ -89,6 +90,12 @@ private[remote] class Encoder(
override protected def logSource = classOf[Encoder]
private var debugLogSendEnabled = false
override def preStart(): Unit = {
debugLogSendEnabled = debugLogSend && log.isDebugEnabled
}
override def onPush(): Unit = {
val outboundEnvelope = grab(in)
val envelope = bufferPool.acquire()
@ -120,6 +127,13 @@ private[remote] class Encoder(
} finally Serialization.currentTransportInformation.value = oldValue
envelope.byteBuffer.flip()
if (debugLogSendEnabled)
log.debug(
"sending remote message [{}] to [{}] from [{}]",
Logging.messageClassName(outboundEnvelope.message),
outboundEnvelope.recipient.getOrElse(""), outboundEnvelope.sender.getOrElse(""))
push(out, envelope)
} catch {

View file

@ -33,7 +33,6 @@ private[remote] object InboundEnvelope {
*/
private[remote] trait InboundEnvelope {
def recipient: OptionVal[InternalActorRef]
def recipientAddress: Address
def sender: OptionVal[ActorRef]
def originUid: Long
def association: OptionVal[OutboundContext]
@ -67,7 +66,6 @@ private[remote] object ReusableInboundEnvelope {
*/
private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
private var _recipient: OptionVal[InternalActorRef] = OptionVal.None
private var _recipientAddress: Address = null
private var _sender: OptionVal[ActorRef] = OptionVal.None
private var _originUid: Long = 0L
private var _association: OptionVal[OutboundContext] = OptionVal.None
@ -78,7 +76,6 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
private var _envelopeBuffer: EnvelopeBuffer = null
override def recipient: OptionVal[InternalActorRef] = _recipient
override def recipientAddress: Address = _recipientAddress
override def sender: OptionVal[ActorRef] = _sender
override def originUid: Long = _originUid
override def association: OptionVal[OutboundContext] = _association
@ -107,7 +104,6 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
def clear(): Unit = {
_recipient = OptionVal.None
_recipientAddress = null
_message = null
_sender = OptionVal.None
_originUid = 0L
@ -124,7 +120,6 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
envelopeBuffer: EnvelopeBuffer,
association: OptionVal[OutboundContext]): InboundEnvelope = {
_recipient = recipient
_recipientAddress = recipientAddress
_sender = sender
_originUid = originUid
_serializer = serializer
@ -136,5 +131,5 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
}
override def toString: String =
s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)"
s"InboundEnvelope($recipient, $message, $sender, $originUid, $association)"
}

View file

@ -17,6 +17,7 @@ import akka.event.Logging
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteRef
import akka.util.OptionVal
import akka.event.LoggingReceive
/**
* INTERNAL API
@ -27,58 +28,67 @@ private[akka] class MessageDispatcher(
private val remoteDaemon = provider.remoteDaemon
private val log = Logging(system, getClass.getName)
private val debugLogEnabled = log.isDebugEnabled
def dispatch(
recipient: InternalActorRef,
recipientAddress: Address,
message: AnyRef,
senderOption: OptionVal[ActorRef]): Unit = {
recipient: InternalActorRef,
message: AnyRef,
senderOption: OptionVal[ActorRef],
originAddress: OptionVal[Address]): Unit = {
import provider.remoteSettings._
import provider.remoteSettings.Artery._
import Logging.messageClassName
val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
val originalReceiver = recipient.path
def msgLog = s"RemoteMessage: [$message] to [$recipient]<+[$originalReceiver] from [$sender()]"
recipient match {
case `remoteDaemon`
if (UntrustedMode) log.debug("dropping daemon message in untrusted mode")
else {
if (LogReceive) log.debug("received daemon message {}", msgLog)
if (UntrustedMode) {
if (debugLogEnabled) log.debug(
"dropping daemon message [{}] in untrusted mode",
messageClassName(message))
} else {
if (LogReceive && debugLogEnabled) log.debug(
"received daemon message [{}] from [{}]",
messageClassName(message), senderOption.getOrElse(originAddress.getOrElse("")))
remoteDaemon ! message
}
case l @ (_: LocalRef | _: RepointableRef) if l.isLocal
if (LogReceive) log.debug("received local message {}", msgLog)
if (LogReceive && debugLogEnabled) log.debug(
"received message [{}] to [{}] from [{}]",
messageClassName(message), recipient, senderOption.getOrElse(""))
message match {
case sel: ActorSelectionMessage
if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
log.debug(
sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) {
if (debugLogEnabled) log.debug(
"operating in UntrustedMode, dropping inbound actor selection to [{}], " +
"allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
sel.elements.mkString("/", "/", ""))
else
} else
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
ActorSelection.deliverSelection(l, sender, sel)
case msg: PossiblyHarmful if UntrustedMode
log.debug(
"operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]",
Logging.messageClassName(msg))
if (debugLogEnabled) log.debug(
"operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}] to [{}] from [{}]",
messageClassName(msg), recipient, senderOption.getOrElse(originAddress.getOrElse("")))
case msg: SystemMessage l.sendSystemMessage(msg)
case msg l.!(msg)(sender)
}
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode
if (LogReceive) log.debug("received remote-destined message {}", msgLog)
if (LogReceive && debugLogEnabled) log.debug(
"received remote-destined message [{}] to [{}] from [{}]",
messageClassName(message), recipient, senderOption.getOrElse(originAddress.getOrElse("")))
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r.!(message)(sender)
case r log.error(
"dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
Logging.messageClassName(message), r, recipientAddress, provider.transport.addresses.mkString(", "))
"dropping message [{}] for unknown recipient [{}] from [{}]",
messageClassName(message), r, senderOption.getOrElse(originAddress.getOrElse("")))
}
}

View file

@ -63,8 +63,8 @@ object UntrustedSpec {
class UntrustedSpec extends AkkaSpec("""
akka.actor.provider = remote
akka.remote.untrusted-mode = on
akka.remote.trusted-selection-paths = ["/user/receptionist", ]
akka.remote.artery.untrusted-mode = on
akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ]
akka.remote.artery.enabled = on
akka.remote.artery.canonical.hostname = localhost
akka.remote.artery.canonical.port = 0