#20587 Clean artery configuration (#21279)

* Move artery settings from remoting settings to dedicated class.
* #20587 Move hardcoded settings to configuration file.
* Copy reused settings from remote to the artery
This commit is contained in:
Martynas Mickevičius 2016-09-01 09:07:39 +03:00 committed by Patrik Nordwall
parent b2eaa854e8
commit 292face28a
10 changed files with 263 additions and 186 deletions

View file

@ -123,8 +123,8 @@ trait Conductor { this: TestConductorExt ⇒
throttle(node, target, direction, 0f)
private def requireTestConductorTranport(): Unit = {
if (transport.provider.remoteSettings.EnableArtery) {
if (!transport.provider.remoteSettings.TestMode)
if (transport.provider.remoteSettings.Artery.Enabled) {
if (!transport.provider.remoteSettings.Artery.Advanced.TestMode)
throw new ConfigurationException("To use this feature you must activate the test mode " +
"by specifying `testTransport(on = true)` in your MultiNodeConfig.")
} else {

View file

@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
import akka.actor._
import akka.remote.RemoteActorRefProvider
import akka.remote.artery.compress.CompressionSettings
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
@ -117,7 +116,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
val compressionEnabled =
RARP(context.system).provider.transport.isInstanceOf[ArteryTransport] &&
RARP(context.system).provider.remoteSettings.ArteryCompressionSettings.enabled
RARP(context.system).provider.remoteSettings.Artery.Enabled
def receive = {
case Run
@ -174,7 +173,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
f"throughput ${throughput * testSettings.senderReceiverPairs}%,.0f msg/s, " +
f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s (payload), " +
f"${throughput * totalSize(context.system) * testSettings.senderReceiverPairs}%,.0f bytes/s (total" +
(if (CompressionSettings(context.system).enabled) ",compression" else "") + "), " +
(if (RARP(context.system).provider.remoteSettings.Artery.Advanced.Compression.Enabled) ",compression" else "") + "), " +
s"dropped ${totalMessages - totalReceived}, " +
s"max round-trip $maxRoundTripMillis ms, " +
s"burst size $burstSize, " +
@ -217,7 +216,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
payloadSize: Int,
senderReceiverPairs: Int) {
// data based on measurement
def totalSize(system: ActorSystem) = payloadSize + (if (CompressionSettings(system).enabled) 38 else 110)
def totalSize(system: ActorSystem) = payloadSize + (if (RARP(system).provider.remoteSettings.Artery.Advanced.Compression.Enabled) 38 else 110)
}
class TestSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with ByteBufferSerializer {

View file

@ -32,19 +32,19 @@ akka {
# "scala.Some" = akka-misc
# "scala.None$" = akka-misc
"akka.remote.DaemonMsgCreate" = daemon-create
# Since akka.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
"akka.protobuf.GeneratedMessage" = proto
# Since com.google.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
# This com.google.protobuf serialization binding is only used if the class can be loaded,
# i.e. com.google.protobuf dependency has been added in the application project.
"com.google.protobuf.GeneratedMessage" = proto
}
serialization-identifiers {
@ -87,12 +87,12 @@ akka {
artery {
enabled = off
port = 20200
# The hostname or ip clients should connect to.
# InetAddress.getLocalHost.getHostAddress is used if empty or
# InetAddress.getLocalHost.getHostAddress is used if empty or
# "<getHostAddress>" is specified.
# InetAddress.getLocalHost.getHostName is used if
# "<getHostName>" is specified.
# InetAddress.getLocalHost.getHostName is used if
# "<getHostName>" is specified.
hostname = "<getHostAddress>"
# Actor paths to use the large message stream for when a message
@ -108,36 +108,99 @@ akka {
# stream, to pass such messages through the large message stream the selections
# but must be resolved to ActorRefs first.
large-message-destinations = []
# Sets the log granularity level at which Akka logs artery events. This setting
# can take the values OFF, ERROR, WARNING, INFO or DEBUG. Please note that the effective
# logging level is still determined by the global logging level of the actor system:
# for example debug level artery events will be only logged if the system
# is running with debug level logging.
# Failures to deserialize received messages also fall under this flag.
log-lifecycle-events = DEBUG
# If set to a nonempty string artery will use the given dispatcher for
# its internal actors otherwise the default dispatcher is used.
use-dispatcher = "akka.remote.default-remote-dispatcher"
advanced {
# For enabling testing features, such as blackhole in akka-remote-testkit.
test-mode = off
# Settings for the materializer that is used for the remote streams.
materializer = ${akka.stream.materializer}
materializer {
dispatcher = "akka.remote.default-remote-dispatcher"
}
# Controls whether to start the Aeron media driver in the same JVM or use external
# process. Set to 'off' when using external media driver, and then also set the
# process. Set to 'off' when using external media driver, and then also set the
# 'aeron-dir'.
embedded-media-driver = on
# Directory used by the Aeron media driver. It's mandatory to define the 'aeron-dir'
# if using external media driver, i.e. when 'embedded-media-driver = off'.
# Embedded media driver will use a this directory, or a temporary directory if this
# property is not defined (empty).
aeron-dir = ""
# Whether to delete aeron embeded driver directory upon driver stop.
delete-aeron-dir = yes
# Level of CPU time used, on a scale between 1 and 10, during backoff/idle.
# The tradeoff is that to have low latency more CPU time must be used to be
# able to react quickly on incoming messages or send as fast as possible after
# backoff backpressure.
# backoff backpressure.
# Level 1 strongly prefer low CPU consumption over low latency.
# Level 10 strongly prefer low latency over low CPU consumption.
# Level 10 strongly prefer low latency over low CPU consumption.
idle-cpu-level = 5
# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.
system-message-buffer-size = 20000
# unacknowledged system messages are re-delivered with this interval
system-message-resend-interval = 1 second
# incomplete handshake attempt is retried with this interval
handshake-retry-interval = 1 second
# handshake requests are performed periodically with this interval,
# also after the handshake has been completed to be able to establish
# a new session with a restarted destination system
inject-handshake-interval = 1 second
# messages that are not accepted by Aeron are dropped after retrying for this period
give-up-send-after = 60 seconds
# during ActorSystem termination the remoting will wait this long for
# an acknowledgment by the destination system that flushing of outstanding
# remote messages has been completed
shutdown-flush-timeout = 1 second
# Timeout after which the inbound stream is going to be restarted.
inbound-restart-timeout = 5 seconds
# Max number of restarts for the inbound stream.
inbound-max-restarts = 5
# Timeout after which outbout stream is going to be restarted for every association.
outbound-restart-timeout = 5 seconds
# Max number of restars of the outbound stream for every association.
outbound-max-restarts = 5
# Timeout after which aeron driver has not had keepalive messages
# from a client before it considers the client dead.
client-liveness-timeout = 20 seconds
# Timeout for each the INACTIVE and LINGER stages an aeron image
# will be retained for when it is no longer referenced.
image-liveness-timeout = 20 seconds
# Timeout after which the aeron driver is considered dead
# if it does not update its C'n'C timestamp.
driver-timeout = 20 seconds
flight-recorder {
// FIXME it should be enabled by default, but there is some concurrency issue that crashes the JVM
enabled = off
@ -145,43 +208,34 @@ akka {
# compression of common strings in remoting messages, like actor destinations, serializers etc
compression {
# possibility to disable compression by setting this to off
enabled = on
# unlocks additional very verbose debug logging of compression events (on DEBUG log level)
debug = off
actor-refs {
enabled = off # TODO possibly remove on/off option once we have battle proven it?
# Max number of compressed actor-refs
# Note that compression tables are "rolling" (i.e. a new table replaces the old
# Note that compression tables are "rolling" (i.e. a new table replaces the old
# compression table once in a while), and this setting is only about the total number
# of compressions within a single such table.
# Must be a positive natural number.
max = 256
# interval between new table compression advertisements.
# this means the time during which we collect heavy-hitter data and then turn it into a compression table.
advertisement-interval = "1 minute" # TODO find good number as default, for benchmarks trigger immediately
advertisement-interval = 1 minute # TODO find good number as default, for benchmarks trigger immediately
}
manifests {
enabled = off # TODO possibly remove on/off option once we have battle proven it?
# Max number of compressed manifests
# Note that compression tables are "rolling" (i.e. a new table replaces the old
# Note that compression tables are "rolling" (i.e. a new table replaces the old
# compression table once in a while), and this setting is only about the total number
# of compressions within a single such table.
# Must be a positive natural number.
max = 256
# interval between new table compression advertisements.
# this means the time during which we collect heavy-hitter data and then turn it into a compression table.
advertisement-interval = "1 minute" # TODO find good number as default, for benchmarks trigger immediately
advertisement-interval = 1 minute # TODO find good number as default, for benchmarks trigger immediately
}
}
}
}
### General settings
@ -211,7 +265,7 @@ akka {
# Acknowledgment timeout of management commands sent to the transport stack.
command-ack-timeout = 30 s
# The timeout for outbound associations to perform the handshake.
# If the transport is akka.remote.netty.tcp or akka.remote.netty.ssl
# the configured connection-timeout for the transport will be used instead.
@ -230,11 +284,11 @@ akka {
# system messages to be send by clients, e.g. messages like 'Create',
# 'Suspend', 'Resume', 'Terminate', 'Supervise', 'Link' etc.
untrusted-mode = off
# When 'untrusted-mode=on' inbound actor selections are by default discarded.
# Actors with paths defined in this white list are granted permission to receive actor
# selections messages.
# E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"]
# selections messages.
# E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"]
trusted-selection-paths = []
# Should the remote server require that its peers share the same
@ -272,7 +326,7 @@ akka {
# a value in bytes, such as 1000b. Note that for all messages larger than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = off
# Log warning if the number of messages in the backoff buffer in the endpoint
# writer exceeds this limit. It can be disabled by setting the value to off.
log-buffer-size-exceeding = 50000
@ -281,7 +335,7 @@ akka {
# Settings for the failure detector to monitor connections.
# For TCP it is not important to have fast failure detection, since
# most connection failures are captured by TCP itself.
# most connection failures are captured by TCP itself.
# The default DeadlineFailureDetector will trigger if there are no heartbeats within
# the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds
# with the default settings.
@ -409,7 +463,7 @@ akka {
# Messages that were negatively acknowledged are always immediately
# resent.
resend-interval = 2 s
# Maximum number of unacknowledged system messages that will be resent
# each 'resend-interval'. If you watch many (> 1000) remote actors you can
# increase this value to for example 600, but a too large limit (e.g. 10000)
@ -649,7 +703,7 @@ akka {
# "AES256CounterSecureRNG"
#
# The following are deprecated in Akka 2.4. They use one of 3 possible
# seed sources, depending on availability: /dev/random, random.org and
# seed sources, depending on availability: /dev/random, random.org and
# SecureRandom (provided by Java)
# "AES128CounterInetRNG"
# "AES256CounterInetRNG" (Install JCE Unlimited Strength Jurisdiction
@ -679,7 +733,7 @@ akka {
}
throughput = 10
}
backoff-remote-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
@ -689,8 +743,5 @@ akka {
parallelism-max = 2
}
}
}
}

View file

@ -187,7 +187,7 @@ private[akka] class RemoteActorRefProvider(
d
},
serialization = SerializationExtension(system),
transport = if (remoteSettings.EnableArtery) new ArteryTransport(system, this) else new Remoting(system, this))
transport = if (remoteSettings.Artery.Enabled) new ArteryTransport(system, this) else new Remoting(system, this))
_internals = internals
remotingTerminator ! internals

View file

@ -3,7 +3,6 @@
*/
package akka.remote
import akka.remote.artery.compress.CompressionSettings
import com.typesafe.config.Config
import scala.concurrent.duration._
import akka.util.Timeout
@ -14,29 +13,13 @@ import akka.actor.Props
import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.ConfigurationException
import java.net.InetAddress
import akka.remote.artery.ArterySettings
final class RemoteSettings(val config: Config) {
import config._
import scala.collection.JavaConverters._
val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled")
val ArteryPort: Int = getInt("akka.remote.artery.port")
val ArteryHostname: String = getString("akka.remote.artery.hostname") match {
case "" | "<getHostAddress>" InetAddress.getLocalHost.getHostAddress
case "<getHostName>" InetAddress.getLocalHost.getHostName
case other other
}
val EmbeddedMediaDriver = getBoolean("akka.remote.artery.advanced.embedded-media-driver")
val AeronDirectoryName = getString("akka.remote.artery.advanced.aeron-dir") requiring (dir
EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver")
val TestMode: Boolean = getBoolean("akka.remote.artery.advanced.test-mode")
val IdleCpuLevel: Int = getInt("akka.remote.artery.advanced.idle-cpu-level").requiring(level
1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
val FlightRecorderEnabled: Boolean = getBoolean("akka.remote.artery.advanced.flight-recorder.enabled")
val ArteryCompressionSettings = CompressionSettings(getConfig("akka.remote.artery.advanced.compression"))
val Artery = ArterySettings(getConfig("akka.remote.artery"))
val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages")

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.ConfigurationException
import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.stream.ActorMaterializerSettings
import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
import akka.util.WildcardIndex
import akka.NotUsed
import com.typesafe.config.Config
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import java.net.InetAddress
import java.util.concurrent.TimeUnit
/** INTERNAL API */
private[akka] final class ArterySettings private (config: Config) {
import config._
import ArterySettings._
val Enabled: Boolean = getBoolean("enabled")
val Port: Int = getInt("port")
val Hostname: String = getString("hostname") match {
case "" | "<getHostAddress>" InetAddress.getLocalHost.getHostAddress
case "<getHostName>" InetAddress.getLocalHost.getHostName
case other other
}
val LargeMessageDestinations =
config.getStringList("large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry)
val segments = entry.split('/').tail
tree.insert(segments, NotUsed)
}
val LifecycleEventsLogLevel: LogLevel = Logging.levelFor(toRootLowerCase(getString("log-lifecycle-events"))) match {
case Some(level) level
case None throw new ConfigurationException("Logging level must be one of (off, debug, info, warning, error)")
}
val Dispatcher = getString("use-dispatcher")
object Advanced {
val config = getConfig("advanced")
import config._
val TestMode: Boolean = getBoolean("test-mode")
val MaterializerSettings = ActorMaterializerSettings(config.getConfig("materializer"))
val EmbeddedMediaDriver = getBoolean("embedded-media-driver")
val AeronDirectoryName = getString("aeron-dir") requiring (dir
EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver")
val DeleteAeronDirectory = getBoolean("delete-aeron-dir")
val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level
1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring(
_ > 0, "system-message-buffer-size must be more than zero")
val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval
interval > 0.seconds, "system-message-resend-interval must be more than zero")
val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval
interval > 0.seconds, "handshake-retry-interval must be more than zero")
val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval
interval > 0.seconds, "inject-handshake-interval must be more than zero")
val GiveUpSendAfter = config.getMillisDuration("give-up-send-after")
val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout")
val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout")
val InboundMaxRestarts = getInt("inbound-max-restarts")
val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout")
val OutboundMaxRestarts = getInt("outbound-max-restarts")
val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout")
val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout")
val DriverTimeout = config.getMillisDuration("driver-timeout")
val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled")
val Compression = new Compression(getConfig("compression"))
}
}
/** INTERNAL API */
private[akka] object ArterySettings {
def apply(config: Config) = new ArterySettings(config)
/** INTERNAL API */
private[akka] final class Compression private[ArterySettings] (config: Config) {
import config._
// Compile time constants
final val Enabled = true
final val Debug = false // unlocks additional very verbose debug logging of compression events (on DEBUG log level)
object ActorRefs {
val config = getConfig("actor-refs")
import config._
val AdvertisementInterval = config.getMillisDuration("advertisement-interval")
val Max = getInt("max")
}
object Manifests {
val config = getConfig("manifests")
import config._
val AdvertisementInterval = config.getMillisDuration("advertisement-interval")
val Max = getInt("max")
}
}
}

View file

@ -33,7 +33,6 @@ import akka.remote.AddressUidExtension
import akka.remote.EventPublisher
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteSettings
import akka.remote.RemoteTransport
import akka.remote.RemotingLifecycleEvent
import akka.remote.ThisActorSystemQuarantinedEvent
@ -125,9 +124,9 @@ private[akka] object AssociationState {
* INTERNAL API
*/
private[akka] final class AssociationState(
val incarnation: Int,
val incarnation: Int,
val uniqueRemoteAddressPromise: Promise[UniqueAddress],
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) {
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) {
import AssociationState.QuarantinedTimestamp
@ -225,7 +224,7 @@ private[akka] trait OutboundContext {
*/
private[remote] object FlushOnShutdown {
def props(done: Promise[Done], timeout: FiniteDuration,
inboundContext: InboundContext, associations: Set[Association]): Props = {
inboundContext: InboundContext, associations: Set[Association]): Props = {
require(associations.nonEmpty)
Props(new FlushOnShutdown(done, timeout, inboundContext, associations))
}
@ -237,7 +236,7 @@ private[remote] object FlushOnShutdown {
* INTERNAL API
*/
private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration,
inboundContext: InboundContext, associations: Set[Association]) extends Actor {
inboundContext: InboundContext, associations: Set[Association]) extends Actor {
var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue)
@ -288,7 +287,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def addresses: Set[Address] = _addresses
override def localAddressForRemote(remote: Address): Address = defaultAddress
override val log: LoggingAdapter = Logging(system, getClass.getName)
val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel)
val eventPublisher = new EventPublisher(system, log, settings.LifecycleEventsLogLevel)
private val codec: AkkaPduCodec = AkkaPduProtobufCodec
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
@ -296,27 +295,16 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList
// FIXME config
private val systemMessageResendInterval: FiniteDuration = 1.second
private val handshakeRetryInterval: FiniteDuration = 1.second
private val handshakeTimeout: FiniteDuration =
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(
_ > Duration.Zero,
"handshake-timeout must be > 0")
private val injectHandshakeInterval: FiniteDuration = 1.second
private val giveUpSendAfter: FiniteDuration = 60.seconds
private val shutdownFlushTimeout = 1.second
private val remoteDispatcher = system.dispatchers.lookup(remoteSettings.Dispatcher)
private val remoteDispatcher = system.dispatchers.lookup(settings.Dispatcher)
private val largeMessageDestinations =
system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry)
val segments = entry.split('/').tail
tree.insert(segments, NotUsed)
}
// TODO use WildcardIndex.isEmpty when merged from master
val largeMessageChannelEnabled =
!largeMessageDestinations.wildcardTree.isEmpty || !largeMessageDestinations.doubleWildcardTree.isEmpty
!settings.LargeMessageDestinations.wildcardTree.isEmpty || !settings.LargeMessageDestinations.doubleWildcardTree.isEmpty
private val priorityMessageDestinations =
WildcardIndex[NotUsed]()
@ -334,11 +322,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val ordinaryStreamId = 3
private val largeStreamId = 4
private val taskRunner = new TaskRunner(system, remoteSettings.IdleCpuLevel)
private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel)
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
private val restartCounter = new RestartCounter(settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout)
private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
private val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers)
@ -373,11 +359,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
materializer,
remoteAddress,
controlSubject,
largeMessageDestinations,
settings.LargeMessageDestinations,
priorityMessageDestinations,
outboundEnvelopePool))
def remoteSettings: RemoteSettings = provider.remoteSettings
def settings = provider.remoteSettings.Artery
override def start(): Unit = {
startMediaDriver()
@ -389,22 +375,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData)
val port =
if (remoteSettings.ArteryPort == 0) ArteryTransport.autoSelectPort(remoteSettings.ArteryHostname)
else remoteSettings.ArteryPort
if (settings.Port == 0) ArteryTransport.autoSelectPort(settings.Hostname)
else settings.Port
// TODO: Configure materializer properly
// TODO: Have a supervisor actor
_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, remoteSettings.ArteryHostname, port),
Address(ArteryTransport.ProtocolName, system.name, settings.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)
// TODO: This probably needs to be a global value instead of an event as events might rotate out of the log
topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString().getBytes("US-ASCII"))
val materializerSettings = ActorMaterializerSettings(
remoteSettings.config.getConfig("akka.remote.artery.advanced.materializer"))
materializer = ActorMaterializer.systemMaterializer(materializerSettings, "remote", system)
materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system)
messageDispatcher = new MessageDispatcher(system, provider)
topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData)
@ -420,16 +404,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
private def startMediaDriver(): Unit = {
if (remoteSettings.EmbeddedMediaDriver) {
if (settings.Advanced.EmbeddedMediaDriver) {
val driverContext = new MediaDriver.Context
if (remoteSettings.AeronDirectoryName.nonEmpty)
driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName)
// FIXME settings from config
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20))
driverContext.driverTimeoutMs(SECONDS.toNanos(20))
if (settings.Advanced.AeronDirectoryName.nonEmpty)
driverContext.aeronDirectoryName(settings.Advanced.AeronDirectoryName)
driverContext.clientLivenessTimeoutNs(settings.Advanced.ClientLivenessTimeout.toNanos)
driverContext.imageLivenessTimeoutNs(settings.Advanced.ImageLivenessTimeoutNs.toNanos)
driverContext.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis)
val idleCpuLevel = remoteSettings.IdleCpuLevel
val idleCpuLevel = settings.Advanced.IdleCpuLevel
if (idleCpuLevel == 10) {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
@ -461,7 +444,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def aeronDir: String = mediaDriver match {
case Some(driver) driver.aeronDirectoryName
case None remoteSettings.AeronDirectoryName
case None settings.Advanced.AeronDirectoryName
}
private def stopMediaDriver(): Unit = {
@ -469,8 +452,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// this is only for embedded media driver
driver.close()
try {
// FIXME it should also be configurable to not delete dir
IoUtil.delete(new File(driver.aeronDirectoryName), false)
if (settings.Advanced.DeleteAeronDirectory) {
IoUtil.delete(new File(driver.aeronDirectoryName), false)
}
} catch {
case NonFatal(e)
log.warning(
@ -542,7 +526,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def runInboundControlStream(compression: InboundCompressions): Unit = {
val (ctrl, completed) =
if (remoteSettings.TestMode) {
if (settings.Advanced.TestMode) {
val (mgmt, (ctrl, completed)) =
aeronSource(controlStreamId, envelopePool)
.via(inboundFlow(compression))
@ -617,7 +601,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def runInboundOrdinaryMessagesStream(compression: InboundCompressions): Unit = {
val completed =
if (remoteSettings.TestMode) {
if (settings.Advanced.TestMode) {
val (mgmt, c) = aeronSource(ordinaryStreamId, envelopePool)
.via(inboundFlow(compression))
.viaMat(inboundTestFlow)(Keep.right)
@ -639,7 +623,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
val disableCompression = NoInboundCompressions // no compression on large message stream for now
val completed =
if (remoteSettings.TestMode) {
if (settings.Advanced.TestMode) {
val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool)
.via(inboundLargeFlow(disableCompression))
.viaMat(inboundTestFlow)(Keep.right)
@ -668,7 +652,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
restart()
} else {
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}",
streamName, maxRestarts, restartTimeout.toSeconds, cause.getMessage)
streamName, settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout.toSeconds, cause.getMessage)
system.terminate()
}
}
@ -681,8 +665,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
if (allAssociations.isEmpty) Future.successful(Done)
else {
val flushingPromise = Promise[Done]()
system.systemActorOf(FlushOnShutdown.props(flushingPromise, shutdownFlushTimeout,
this, allAssociations).withDispatcher(remoteSettings.Dispatcher), "remoteFlushOnShutdown")
system.systemActorOf(FlushOnShutdown.props(flushingPromise, settings.Advanced.ShutdownFlushTimeout,
this, allAssociations).withDispatcher(settings.Dispatcher), "remoteFlushOnShutdown")
flushingPromise.future
}
implicit val ec = remoteDispatcher
@ -774,14 +758,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.mapMaterializedValue { case (_, d) d }
private def createOutboundSink(streamId: Int, outboundContext: OutboundContext,
bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = {
bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = {
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout,
handshakeRetryInterval, injectHandshakeInterval))
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
.viaMat(createEncoder(bufferPool))(Keep.right)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.both)
envelopePool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))(Keep.both)
}
/**
@ -791,9 +775,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outboundControlPart1(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, SharedKillSwitch] = {
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout,
handshakeRetryInterval, injectHandshakeInterval))
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval,
remoteSettings.SysMsgBufferSize))
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, settings.Advanced.SystemMessageResendInterval,
settings.Advanced.SysMsgBufferSize))
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
}
@ -807,7 +791,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions =
if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext)
if (settings.Advanced.Compression.Enabled) new InboundCompressionsImpl(system, inboundContext, settings.Advanced.Compression)
else NoInboundCompressions
def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] =
@ -864,7 +848,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
private def initializeFlightRecorder(): Option[(FileChannel, File, FlightRecorder)] = {
if (remoteSettings.FlightRecorderEnabled) {
if (settings.Advanced.FlightRecorderEnabled) {
// TODO: Figure out where to put it, currently using temporary files
val afrFile = File.createTempFile("artery", ".afr")
afrFile.deleteOnExit()
@ -920,4 +904,3 @@ private[remote] object ArteryTransport {
}
}

View file

@ -70,15 +70,13 @@ private[remote] class Association(
import Association._
private val log = Logging(transport.system, getClass.getName)
private val controlQueueSize = transport.remoteSettings.SysMsgBufferSize
private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize
// FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue
// such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption
private val queueSize = 3072
private val largeQueueSize = 256
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout)
// We start with the raw wrapped queue and then it is replaced with the materialized value of
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
@ -339,7 +337,7 @@ private[remote] class Association(
controlQueue = wrapper // use new underlying queue immediately for restarts
val (queueValue, (control, completed)) =
if (transport.remoteSettings.TestMode) {
if (transport.settings.Advanced.TestMode) {
val ((queueValue, mgmt), (control, completed)) =
Source.fromGraph(new SendQueue[OutboundEnvelope])
.via(transport.outboundControlPart1(this))
@ -382,7 +380,7 @@ private[remote] class Association(
queue = wrapper // use new underlying queue immediately for restarts
val (queueValue, (changeCompression, completed)) =
if (transport.remoteSettings.TestMode) {
if (transport.settings.Advanced.TestMode) {
val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outbound(this))(Keep.both)
@ -408,7 +406,7 @@ private[remote] class Association(
largeQueue = wrapper // use new underlying queue immediately for restarts
val (queueValue, completed) =
if (transport.remoteSettings.TestMode) {
if (transport.settings.Advanced.TestMode) {
val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outboundLarge(this))(Keep.both)
@ -442,7 +440,7 @@ private[remote] class Association(
restart(cause)
} else {
log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName, remoteAddress, maxRestarts, restartTimeout.toSeconds)
streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds)
transport.system.terminate()
}
}

View file

@ -1,39 +0,0 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery.compress
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import com.typesafe.config.Config
import scala.concurrent.duration._
/** INTERNAL API */
private[akka] class CompressionSettings(_config: Config) {
val enabled = _config.getBoolean("enabled")
val debug = _config.getBoolean("debug")
object actorRefs {
private val c = _config.getConfig("actor-refs")
val advertisementInterval = c.getDuration("advertisement-interval", TimeUnit.MILLISECONDS).millis
val max = c.getInt("max")
}
object manifests {
private val c = _config.getConfig("manifests")
val advertisementInterval = c.getDuration("advertisement-interval", TimeUnit.MILLISECONDS).millis
val max = c.getInt("max")
}
}
/** INTERNAL API */
private[akka] object CompressionSettings { // TODO make it an extension
def apply(config: Config): CompressionSettings = new CompressionSettings(config)
def apply(system: ActorSystem): CompressionSettings =
new CompressionSettings(
system.settings.config.getConfig("akka.remote.artery.advanced.compression"))
}

View file

@ -11,7 +11,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
import akka.actor.{ ActorRef, ActorSystem, Address }
import akka.event.{ Logging, NoLogging }
import akka.remote.artery.{ InboundContext, OutboundContext }
import akka.remote.artery.{ ArterySettings, InboundContext, OutboundContext }
import akka.util.{ OptionVal, PrettyDuration }
import org.agrona.collections.Long2ObjectHashMap
@ -38,15 +38,14 @@ private[remote] trait InboundCompressions {
*/
private[remote] final class InboundCompressionsImpl(
system: ActorSystem,
inboundContext: InboundContext) extends InboundCompressions {
private val settings = CompressionSettings(system)
inboundContext: InboundContext,
settings: ArterySettings.Compression) extends InboundCompressions {
// FIXME we also must remove the ones that won't be used anymore - when quarantine triggers
private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]()
private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] {
override def apply(originUid: Long): InboundActorRefCompression = {
val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max)
val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max)
new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters)
}
}
@ -56,7 +55,7 @@ private[remote] final class InboundCompressionsImpl(
private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]()
private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] {
override def apply(originUid: Long): InboundManifestCompression = {
val manifestHitters = new TopHeavyHitters[String](settings.manifests.max)
val manifestHitters = new TopHeavyHitters[String](settings.Manifests.Max)
new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters)
}
}
@ -106,7 +105,7 @@ private[remote] final class InboundCompressionsImpl(
*/
private[remote] final class InboundActorRefCompression(
system: ActorSystem,
settings: CompressionSettings,
settings: ArterySettings.Compression,
originUid: Long,
inboundContext: InboundContext,
heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters) {
@ -123,7 +122,7 @@ private[remote] final class InboundActorRefCompression(
else super.decompress(tableVersion, idx)
scheduleNextTableAdvertisement()
override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval
override protected def tableAdvertisementInterval = settings.ActorRefs.AdvertisementInterval
override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[ActorRef]): Unit = {
log.debug(s"Advertise ActorRef compression [$table], from [${inboundContext.localAddress}] to [${outboundContext.remoteAddress}]")
@ -133,13 +132,13 @@ private[remote] final class InboundActorRefCompression(
final class InboundManifestCompression(
system: ActorSystem,
settings: CompressionSettings,
settings: ArterySettings.Compression,
originUid: Long,
inboundContext: InboundContext,
heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters) {
scheduleNextTableAdvertisement()
override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval
override protected def tableAdvertisementInterval = settings.Manifests.AdvertisementInterval
override lazy val log = NoLogging
@ -183,7 +182,7 @@ private[remote] object InboundCompression {
*/
private[remote] abstract class InboundCompression[T >: Null](
val system: ActorSystem,
val settings: CompressionSettings,
val settings: ArterySettings.Compression,
originUid: Long,
inboundContext: InboundContext,
val heavyHitters: TopHeavyHitters[T]) {