diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index cb1114fb12..7f12b6d4e5 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -123,8 +123,8 @@ trait Conductor { this: TestConductorExt ⇒ throttle(node, target, direction, 0f) private def requireTestConductorTranport(): Unit = { - if (transport.provider.remoteSettings.EnableArtery) { - if (!transport.provider.remoteSettings.TestMode) + if (transport.provider.remoteSettings.Artery.Enabled) { + if (!transport.provider.remoteSettings.Artery.Advanced.TestMode) throw new ConfigurationException("To use this feature you must activate the test mode " + "by specifying `testTransport(on = true)` in your MultiNodeConfig.") } else { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 8dad9e0c6e..335d3bedbe 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.concurrent.duration._ import akka.actor._ import akka.remote.RemoteActorRefProvider -import akka.remote.artery.compress.CompressionSettings import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -117,7 +116,7 @@ object MaxThroughputSpec extends MultiNodeConfig { val compressionEnabled = RARP(context.system).provider.transport.isInstanceOf[ArteryTransport] && - RARP(context.system).provider.remoteSettings.ArteryCompressionSettings.enabled + RARP(context.system).provider.remoteSettings.Artery.Enabled def receive = { case Run ⇒ @@ -174,7 +173,7 @@ object MaxThroughputSpec extends MultiNodeConfig { f"throughput ${throughput * testSettings.senderReceiverPairs}%,.0f msg/s, " + f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s (payload), " + f"${throughput * totalSize(context.system) * testSettings.senderReceiverPairs}%,.0f bytes/s (total" + - (if (CompressionSettings(context.system).enabled) ",compression" else "") + "), " + + (if (RARP(context.system).provider.remoteSettings.Artery.Advanced.Compression.Enabled) ",compression" else "") + "), " + s"dropped ${totalMessages - totalReceived}, " + s"max round-trip $maxRoundTripMillis ms, " + s"burst size $burstSize, " + @@ -217,7 +216,7 @@ object MaxThroughputSpec extends MultiNodeConfig { payloadSize: Int, senderReceiverPairs: Int) { // data based on measurement - def totalSize(system: ActorSystem) = payloadSize + (if (CompressionSettings(system).enabled) 38 else 110) + def totalSize(system: ActorSystem) = payloadSize + (if (RARP(system).provider.remoteSettings.Artery.Advanced.Compression.Enabled) 38 else 110) } class TestSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with ByteBufferSerializer { diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index d40cc4047f..bf0fbd141c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -32,19 +32,19 @@ akka { # "scala.Some" = akka-misc # "scala.None$" = akka-misc "akka.remote.DaemonMsgCreate" = daemon-create - + # Since akka.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity. "akka.protobuf.GeneratedMessage" = proto - + # Since com.google.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity. # This com.google.protobuf serialization binding is only used if the class can be loaded, # i.e. com.google.protobuf dependency has been added in the application project. "com.google.protobuf.GeneratedMessage" = proto - + } serialization-identifiers { @@ -87,12 +87,12 @@ akka { artery { enabled = off port = 20200 - + # The hostname or ip clients should connect to. - # InetAddress.getLocalHost.getHostAddress is used if empty or + # InetAddress.getLocalHost.getHostAddress is used if empty or # "" is specified. - # InetAddress.getLocalHost.getHostName is used if - # "" is specified. + # InetAddress.getLocalHost.getHostName is used if + # "" is specified. hostname = "" # Actor paths to use the large message stream for when a message @@ -108,36 +108,99 @@ akka { # stream, to pass such messages through the large message stream the selections # but must be resolved to ActorRefs first. large-message-destinations = [] - + + # Sets the log granularity level at which Akka logs artery events. This setting + # can take the values OFF, ERROR, WARNING, INFO or DEBUG. Please note that the effective + # logging level is still determined by the global logging level of the actor system: + # for example debug level artery events will be only logged if the system + # is running with debug level logging. + # Failures to deserialize received messages also fall under this flag. + log-lifecycle-events = DEBUG + + # If set to a nonempty string artery will use the given dispatcher for + # its internal actors otherwise the default dispatcher is used. + use-dispatcher = "akka.remote.default-remote-dispatcher" + advanced { # For enabling testing features, such as blackhole in akka-remote-testkit. test-mode = off - + # Settings for the materializer that is used for the remote streams. materializer = ${akka.stream.materializer} materializer { dispatcher = "akka.remote.default-remote-dispatcher" } - + # Controls whether to start the Aeron media driver in the same JVM or use external - # process. Set to 'off' when using external media driver, and then also set the + # process. Set to 'off' when using external media driver, and then also set the # 'aeron-dir'. embedded-media-driver = on - + # Directory used by the Aeron media driver. It's mandatory to define the 'aeron-dir' # if using external media driver, i.e. when 'embedded-media-driver = off'. # Embedded media driver will use a this directory, or a temporary directory if this # property is not defined (empty). aeron-dir = "" - + + # Whether to delete aeron embeded driver directory upon driver stop. + delete-aeron-dir = yes + # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. # The tradeoff is that to have low latency more CPU time must be used to be # able to react quickly on incoming messages or send as fast as possible after - # backoff backpressure. + # backoff backpressure. # Level 1 strongly prefer low CPU consumption over low latency. - # Level 10 strongly prefer low latency over low CPU consumption. + # Level 10 strongly prefer low latency over low CPU consumption. idle-cpu-level = 5 - + + # This setting defines the maximum number of unacknowledged system messages + # allowed for a remote system. If this limit is reached the remote system is + # declared to be dead and its UID marked as tainted. + system-message-buffer-size = 20000 + + # unacknowledged system messages are re-delivered with this interval + system-message-resend-interval = 1 second + + # incomplete handshake attempt is retried with this interval + handshake-retry-interval = 1 second + + # handshake requests are performed periodically with this interval, + # also after the handshake has been completed to be able to establish + # a new session with a restarted destination system + inject-handshake-interval = 1 second + + # messages that are not accepted by Aeron are dropped after retrying for this period + give-up-send-after = 60 seconds + + # during ActorSystem termination the remoting will wait this long for + # an acknowledgment by the destination system that flushing of outstanding + # remote messages has been completed + shutdown-flush-timeout = 1 second + + # Timeout after which the inbound stream is going to be restarted. + inbound-restart-timeout = 5 seconds + + # Max number of restarts for the inbound stream. + inbound-max-restarts = 5 + + # Timeout after which outbout stream is going to be restarted for every association. + outbound-restart-timeout = 5 seconds + + # Max number of restars of the outbound stream for every association. + outbound-max-restarts = 5 + + # Timeout after which aeron driver has not had keepalive messages + # from a client before it considers the client dead. + client-liveness-timeout = 20 seconds + + # Timeout for each the INACTIVE and LINGER stages an aeron image + # will be retained for when it is no longer referenced. + image-liveness-timeout = 20 seconds + + # Timeout after which the aeron driver is considered dead + # if it does not update its C'n'C timestamp. + driver-timeout = 20 seconds + flight-recorder { // FIXME it should be enabled by default, but there is some concurrency issue that crashes the JVM enabled = off @@ -145,43 +208,34 @@ akka { # compression of common strings in remoting messages, like actor destinations, serializers etc compression { - # possibility to disable compression by setting this to off - enabled = on - - # unlocks additional very verbose debug logging of compression events (on DEBUG log level) - debug = off - + actor-refs { - enabled = off # TODO possibly remove on/off option once we have battle proven it? - # Max number of compressed actor-refs - # Note that compression tables are "rolling" (i.e. a new table replaces the old + # Note that compression tables are "rolling" (i.e. a new table replaces the old # compression table once in a while), and this setting is only about the total number # of compressions within a single such table. # Must be a positive natural number. max = 256 - + # interval between new table compression advertisements. # this means the time during which we collect heavy-hitter data and then turn it into a compression table. - advertisement-interval = "1 minute" # TODO find good number as default, for benchmarks trigger immediately + advertisement-interval = 1 minute # TODO find good number as default, for benchmarks trigger immediately } manifests { - enabled = off # TODO possibly remove on/off option once we have battle proven it? - # Max number of compressed manifests - # Note that compression tables are "rolling" (i.e. a new table replaces the old + # Note that compression tables are "rolling" (i.e. a new table replaces the old # compression table once in a while), and this setting is only about the total number # of compressions within a single such table. # Must be a positive natural number. max = 256 - + # interval between new table compression advertisements. # this means the time during which we collect heavy-hitter data and then turn it into a compression table. - advertisement-interval = "1 minute" # TODO find good number as default, for benchmarks trigger immediately + advertisement-interval = 1 minute # TODO find good number as default, for benchmarks trigger immediately } } } - + } ### General settings @@ -211,7 +265,7 @@ akka { # Acknowledgment timeout of management commands sent to the transport stack. command-ack-timeout = 30 s - + # The timeout for outbound associations to perform the handshake. # If the transport is akka.remote.netty.tcp or akka.remote.netty.ssl # the configured connection-timeout for the transport will be used instead. @@ -230,11 +284,11 @@ akka { # system messages to be send by clients, e.g. messages like 'Create', # 'Suspend', 'Resume', 'Terminate', 'Supervise', 'Link' etc. untrusted-mode = off - + # When 'untrusted-mode=on' inbound actor selections are by default discarded. # Actors with paths defined in this white list are granted permission to receive actor - # selections messages. - # E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"] + # selections messages. + # E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"] trusted-selection-paths = [] # Should the remote server require that its peers share the same @@ -272,7 +326,7 @@ akka { # a value in bytes, such as 1000b. Note that for all messages larger than this # limit there will be extra performance and scalability cost. log-frame-size-exceeding = off - + # Log warning if the number of messages in the backoff buffer in the endpoint # writer exceeds this limit. It can be disabled by setting the value to off. log-buffer-size-exceeding = 50000 @@ -281,7 +335,7 @@ akka { # Settings for the failure detector to monitor connections. # For TCP it is not important to have fast failure detection, since - # most connection failures are captured by TCP itself. + # most connection failures are captured by TCP itself. # The default DeadlineFailureDetector will trigger if there are no heartbeats within # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds # with the default settings. @@ -409,7 +463,7 @@ akka { # Messages that were negatively acknowledged are always immediately # resent. resend-interval = 2 s - + # Maximum number of unacknowledged system messages that will be resent # each 'resend-interval'. If you watch many (> 1000) remote actors you can # increase this value to for example 600, but a too large limit (e.g. 10000) @@ -649,7 +703,7 @@ akka { # "AES256CounterSecureRNG" # # The following are deprecated in Akka 2.4. They use one of 3 possible - # seed sources, depending on availability: /dev/random, random.org and + # seed sources, depending on availability: /dev/random, random.org and # SecureRandom (provided by Java) # "AES128CounterInetRNG" # "AES256CounterInetRNG" (Install JCE Unlimited Strength Jurisdiction @@ -679,7 +733,7 @@ akka { } throughput = 10 } - + backoff-remote-dispatcher { type = Dispatcher executor = "fork-join-executor" @@ -689,8 +743,5 @@ akka { parallelism-max = 2 } } - - } - } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 607ef745ba..8f1735cdda 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -187,7 +187,7 @@ private[akka] class RemoteActorRefProvider( d }, serialization = SerializationExtension(system), - transport = if (remoteSettings.EnableArtery) new ArteryTransport(system, this) else new Remoting(system, this)) + transport = if (remoteSettings.Artery.Enabled) new ArteryTransport(system, this) else new Remoting(system, this)) _internals = internals remotingTerminator ! internals diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index a4b11f41bb..1ffd06ecca 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -3,7 +3,6 @@ */ package akka.remote -import akka.remote.artery.compress.CompressionSettings import com.typesafe.config.Config import scala.concurrent.duration._ import akka.util.Timeout @@ -14,29 +13,13 @@ import akka.actor.Props import akka.event.Logging import akka.event.Logging.LogLevel import akka.ConfigurationException -import java.net.InetAddress +import akka.remote.artery.ArterySettings final class RemoteSettings(val config: Config) { import config._ import scala.collection.JavaConverters._ - val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") - val ArteryPort: Int = getInt("akka.remote.artery.port") - val ArteryHostname: String = getString("akka.remote.artery.hostname") match { - case "" | "" ⇒ InetAddress.getLocalHost.getHostAddress - case "" ⇒ InetAddress.getLocalHost.getHostName - case other ⇒ other - } - val EmbeddedMediaDriver = getBoolean("akka.remote.artery.advanced.embedded-media-driver") - val AeronDirectoryName = getString("akka.remote.artery.advanced.aeron-dir") requiring (dir ⇒ - EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver") - val TestMode: Boolean = getBoolean("akka.remote.artery.advanced.test-mode") - val IdleCpuLevel: Int = getInt("akka.remote.artery.advanced.idle-cpu-level").requiring(level ⇒ - 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") - - val FlightRecorderEnabled: Boolean = getBoolean("akka.remote.artery.advanced.flight-recorder.enabled") - - val ArteryCompressionSettings = CompressionSettings(getConfig("akka.remote.artery.advanced.compression")) + val Artery = ArterySettings(getConfig("akka.remote.artery")) val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala new file mode 100644 index 0000000000..c066790e79 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.ConfigurationException +import akka.event.Logging +import akka.event.Logging.LogLevel +import akka.stream.ActorMaterializerSettings +import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase } +import akka.util.WildcardIndex +import akka.NotUsed +import com.typesafe.config.Config +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import java.net.InetAddress +import java.util.concurrent.TimeUnit + +/** INTERNAL API */ +private[akka] final class ArterySettings private (config: Config) { + import config._ + import ArterySettings._ + + val Enabled: Boolean = getBoolean("enabled") + val Port: Int = getInt("port") + val Hostname: String = getString("hostname") match { + case "" | "" ⇒ InetAddress.getLocalHost.getHostAddress + case "" ⇒ InetAddress.getLocalHost.getHostName + case other ⇒ other + } + val LargeMessageDestinations = + config.getStringList("large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry) ⇒ + val segments = entry.split('/').tail + tree.insert(segments, NotUsed) + } + val LifecycleEventsLogLevel: LogLevel = Logging.levelFor(toRootLowerCase(getString("log-lifecycle-events"))) match { + case Some(level) ⇒ level + case None ⇒ throw new ConfigurationException("Logging level must be one of (off, debug, info, warning, error)") + } + val Dispatcher = getString("use-dispatcher") + + object Advanced { + val config = getConfig("advanced") + import config._ + + val TestMode: Boolean = getBoolean("test-mode") + val MaterializerSettings = ActorMaterializerSettings(config.getConfig("materializer")) + val EmbeddedMediaDriver = getBoolean("embedded-media-driver") + val AeronDirectoryName = getString("aeron-dir") requiring (dir ⇒ + EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver") + val DeleteAeronDirectory = getBoolean("delete-aeron-dir") + val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level ⇒ + 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") + val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring( + _ > 0, "system-message-buffer-size must be more than zero") + val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒ + interval > 0.seconds, "system-message-resend-interval must be more than zero") + val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval ⇒ + interval > 0.seconds, "handshake-retry-interval must be more than zero") + val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ + interval > 0.seconds, "inject-handshake-interval must be more than zero") + val GiveUpSendAfter = config.getMillisDuration("give-up-send-after") + val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout") + val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout") + val InboundMaxRestarts = getInt("inbound-max-restarts") + val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout") + val OutboundMaxRestarts = getInt("outbound-max-restarts") + val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout") + val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout") + val DriverTimeout = config.getMillisDuration("driver-timeout") + val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled") + val Compression = new Compression(getConfig("compression")) + } +} + +/** INTERNAL API */ +private[akka] object ArterySettings { + def apply(config: Config) = new ArterySettings(config) + + /** INTERNAL API */ + private[akka] final class Compression private[ArterySettings] (config: Config) { + import config._ + + // Compile time constants + final val Enabled = true + final val Debug = false // unlocks additional very verbose debug logging of compression events (on DEBUG log level) + + object ActorRefs { + val config = getConfig("actor-refs") + import config._ + + val AdvertisementInterval = config.getMillisDuration("advertisement-interval") + val Max = getInt("max") + } + object Manifests { + val config = getConfig("manifests") + import config._ + + val AdvertisementInterval = config.getMillisDuration("advertisement-interval") + val Max = getInt("max") + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 3c4c10d5ec..a07a1e12b6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -33,7 +33,6 @@ import akka.remote.AddressUidExtension import akka.remote.EventPublisher import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider -import akka.remote.RemoteSettings import akka.remote.RemoteTransport import akka.remote.RemotingLifecycleEvent import akka.remote.ThisActorSystemQuarantinedEvent @@ -125,9 +124,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -225,7 +224,7 @@ private[akka] trait OutboundContext { */ private[remote] object FlushOnShutdown { def props(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]): Props = { + inboundContext: InboundContext, associations: Set[Association]): Props = { require(associations.nonEmpty) Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) } @@ -237,7 +236,7 @@ private[remote] object FlushOnShutdown { * INTERNAL API */ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]) extends Actor { + inboundContext: InboundContext, associations: Set[Association]) extends Actor { var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) @@ -288,7 +287,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress override val log: LoggingAdapter = Logging(system, getClass.getName) - val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel) + val eventPublisher = new EventPublisher(system, log, settings.LifecycleEventsLogLevel) private val codec: AkkaPduCodec = AkkaPduProtobufCodec private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") @@ -296,27 +295,16 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList - // FIXME config - private val systemMessageResendInterval: FiniteDuration = 1.second - private val handshakeRetryInterval: FiniteDuration = 1.second private val handshakeTimeout: FiniteDuration = system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring( _ > Duration.Zero, "handshake-timeout must be > 0") - private val injectHandshakeInterval: FiniteDuration = 1.second - private val giveUpSendAfter: FiniteDuration = 60.seconds - private val shutdownFlushTimeout = 1.second - private val remoteDispatcher = system.dispatchers.lookup(remoteSettings.Dispatcher) + private val remoteDispatcher = system.dispatchers.lookup(settings.Dispatcher) - private val largeMessageDestinations = - system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry) ⇒ - val segments = entry.split('/').tail - tree.insert(segments, NotUsed) - } // TODO use WildcardIndex.isEmpty when merged from master val largeMessageChannelEnabled = - !largeMessageDestinations.wildcardTree.isEmpty || !largeMessageDestinations.doubleWildcardTree.isEmpty + !settings.LargeMessageDestinations.wildcardTree.isEmpty || !settings.LargeMessageDestinations.doubleWildcardTree.isEmpty private val priorityMessageDestinations = WildcardIndex[NotUsed]() @@ -334,11 +322,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val ordinaryStreamId = 3 private val largeStreamId = 4 - private val taskRunner = new TaskRunner(system, remoteSettings.IdleCpuLevel) + private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel) - private val restartTimeout: FiniteDuration = 5.seconds // FIXME config - private val maxRestarts = 5 // FIXME config - private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) + private val restartCounter = new RestartCounter(settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout) private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) private val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers) @@ -373,11 +359,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R materializer, remoteAddress, controlSubject, - largeMessageDestinations, + settings.LargeMessageDestinations, priorityMessageDestinations, outboundEnvelopePool)) - def remoteSettings: RemoteSettings = provider.remoteSettings + def settings = provider.remoteSettings.Artery override def start(): Unit = { startMediaDriver() @@ -389,22 +375,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData) val port = - if (remoteSettings.ArteryPort == 0) ArteryTransport.autoSelectPort(remoteSettings.ArteryHostname) - else remoteSettings.ArteryPort + if (settings.Port == 0) ArteryTransport.autoSelectPort(settings.Hostname) + else settings.Port // TODO: Configure materializer properly // TODO: Have a supervisor actor _localAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, remoteSettings.ArteryHostname, port), + Address(ArteryTransport.ProtocolName, system.name, settings.Hostname, port), AddressUidExtension(system).longAddressUid) _addresses = Set(_localAddress.address) // TODO: This probably needs to be a global value instead of an event as events might rotate out of the log topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString().getBytes("US-ASCII")) - val materializerSettings = ActorMaterializerSettings( - remoteSettings.config.getConfig("akka.remote.artery.advanced.materializer")) - materializer = ActorMaterializer.systemMaterializer(materializerSettings, "remote", system) + materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system) messageDispatcher = new MessageDispatcher(system, provider) topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData) @@ -420,16 +404,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def startMediaDriver(): Unit = { - if (remoteSettings.EmbeddedMediaDriver) { + if (settings.Advanced.EmbeddedMediaDriver) { val driverContext = new MediaDriver.Context - if (remoteSettings.AeronDirectoryName.nonEmpty) - driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName) - // FIXME settings from config - driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20)) - driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20)) - driverContext.driverTimeoutMs(SECONDS.toNanos(20)) + if (settings.Advanced.AeronDirectoryName.nonEmpty) + driverContext.aeronDirectoryName(settings.Advanced.AeronDirectoryName) + driverContext.clientLivenessTimeoutNs(settings.Advanced.ClientLivenessTimeout.toNanos) + driverContext.imageLivenessTimeoutNs(settings.Advanced.ImageLivenessTimeoutNs.toNanos) + driverContext.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis) - val idleCpuLevel = remoteSettings.IdleCpuLevel + val idleCpuLevel = settings.Advanced.IdleCpuLevel if (idleCpuLevel == 10) { driverContext .threadingMode(ThreadingMode.DEDICATED) @@ -461,7 +444,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def aeronDir: String = mediaDriver match { case Some(driver) ⇒ driver.aeronDirectoryName - case None ⇒ remoteSettings.AeronDirectoryName + case None ⇒ settings.Advanced.AeronDirectoryName } private def stopMediaDriver(): Unit = { @@ -469,8 +452,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // this is only for embedded media driver driver.close() try { - // FIXME it should also be configurable to not delete dir - IoUtil.delete(new File(driver.aeronDirectoryName), false) + if (settings.Advanced.DeleteAeronDirectory) { + IoUtil.delete(new File(driver.aeronDirectoryName), false) + } } catch { case NonFatal(e) ⇒ log.warning( @@ -542,7 +526,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def runInboundControlStream(compression: InboundCompressions): Unit = { val (ctrl, completed) = - if (remoteSettings.TestMode) { + if (settings.Advanced.TestMode) { val (mgmt, (ctrl, completed)) = aeronSource(controlStreamId, envelopePool) .via(inboundFlow(compression)) @@ -617,7 +601,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def runInboundOrdinaryMessagesStream(compression: InboundCompressions): Unit = { val completed = - if (remoteSettings.TestMode) { + if (settings.Advanced.TestMode) { val (mgmt, c) = aeronSource(ordinaryStreamId, envelopePool) .via(inboundFlow(compression)) .viaMat(inboundTestFlow)(Keep.right) @@ -639,7 +623,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val disableCompression = NoInboundCompressions // no compression on large message stream for now val completed = - if (remoteSettings.TestMode) { + if (settings.Advanced.TestMode) { val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool) .via(inboundLargeFlow(disableCompression)) .viaMat(inboundTestFlow)(Keep.right) @@ -668,7 +652,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R restart() } else { log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}", - streamName, maxRestarts, restartTimeout.toSeconds, cause.getMessage) + streamName, settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout.toSeconds, cause.getMessage) system.terminate() } } @@ -681,8 +665,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (allAssociations.isEmpty) Future.successful(Done) else { val flushingPromise = Promise[Done]() - system.systemActorOf(FlushOnShutdown.props(flushingPromise, shutdownFlushTimeout, - this, allAssociations).withDispatcher(remoteSettings.Dispatcher), "remoteFlushOnShutdown") + system.systemActorOf(FlushOnShutdown.props(flushingPromise, settings.Advanced.ShutdownFlushTimeout, + this, allAssociations).withDispatcher(settings.Dispatcher), "remoteFlushOnShutdown") flushingPromise.future } implicit val ec = remoteDispatcher @@ -774,14 +758,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .mapMaterializedValue { case (_, d) ⇒ d } private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { + bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, - handshakeRetryInterval, injectHandshakeInterval)) + settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) .viaMat(createEncoder(bufferPool))(Keep.right) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.both) + envelopePool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))(Keep.both) } /** @@ -791,9 +775,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def outboundControlPart1(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, SharedKillSwitch] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, - handshakeRetryInterval, injectHandshakeInterval)) - .via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval, - remoteSettings.SysMsgBufferSize)) + settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) + .via(new SystemMessageDelivery(outboundContext, system.deadLetters, settings.Advanced.SystemMessageResendInterval, + settings.Advanced.SysMsgBufferSize)) // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } @@ -807,7 +791,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = - if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) + if (settings.Advanced.Compression.Enabled) new InboundCompressionsImpl(system, inboundContext, settings.Advanced.Compression) else NoInboundCompressions def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = @@ -864,7 +848,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def initializeFlightRecorder(): Option[(FileChannel, File, FlightRecorder)] = { - if (remoteSettings.FlightRecorderEnabled) { + if (settings.Advanced.FlightRecorderEnabled) { // TODO: Figure out where to put it, currently using temporary files val afrFile = File.createTempFile("artery", ".afr") afrFile.deleteOnExit() @@ -920,4 +904,3 @@ private[remote] object ArteryTransport { } } - diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 82dc5bf01f..55db97720e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -70,15 +70,13 @@ private[remote] class Association( import Association._ private val log = Logging(transport.system, getClass.getName) - private val controlQueueSize = transport.remoteSettings.SysMsgBufferSize + private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize // FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue // such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption private val queueSize = 3072 private val largeQueueSize = 256 - private val restartTimeout: FiniteDuration = 5.seconds // FIXME config - private val maxRestarts = 5 // FIXME config - private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) + private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout) // We start with the raw wrapped queue and then it is replaced with the materialized value of // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to @@ -339,7 +337,7 @@ private[remote] class Association( controlQueue = wrapper // use new underlying queue immediately for restarts val (queueValue, (control, completed)) = - if (transport.remoteSettings.TestMode) { + if (transport.settings.Advanced.TestMode) { val ((queueValue, mgmt), (control, completed)) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .via(transport.outboundControlPart1(this)) @@ -382,7 +380,7 @@ private[remote] class Association( queue = wrapper // use new underlying queue immediately for restarts val (queueValue, (changeCompression, completed)) = - if (transport.remoteSettings.TestMode) { + if (transport.settings.Advanced.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outbound(this))(Keep.both) @@ -408,7 +406,7 @@ private[remote] class Association( largeQueue = wrapper // use new underlying queue immediately for restarts val (queueValue, completed) = - if (transport.remoteSettings.TestMode) { + if (transport.settings.Advanced.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outboundLarge(this))(Keep.both) @@ -442,7 +440,7 @@ private[remote] class Association( restart(cause) } else { log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", - streamName, remoteAddress, maxRestarts, restartTimeout.toSeconds) + streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds) transport.system.terminate() } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala deleted file mode 100644 index 82a9c7e752..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery.compress - -import java.util.concurrent.TimeUnit - -import akka.actor.ActorSystem -import com.typesafe.config.Config - -import scala.concurrent.duration._ - -/** INTERNAL API */ -private[akka] class CompressionSettings(_config: Config) { - val enabled = _config.getBoolean("enabled") - - val debug = _config.getBoolean("debug") - - object actorRefs { - private val c = _config.getConfig("actor-refs") - - val advertisementInterval = c.getDuration("advertisement-interval", TimeUnit.MILLISECONDS).millis - val max = c.getInt("max") - } - object manifests { - private val c = _config.getConfig("manifests") - - val advertisementInterval = c.getDuration("advertisement-interval", TimeUnit.MILLISECONDS).millis - val max = c.getInt("max") - } -} - -/** INTERNAL API */ -private[akka] object CompressionSettings { // TODO make it an extension - def apply(config: Config): CompressionSettings = new CompressionSettings(config) - def apply(system: ActorSystem): CompressionSettings = - new CompressionSettings( - system.settings.config.getConfig("akka.remote.artery.advanced.compression")) -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 5be9354a32..7b09b10264 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -11,7 +11,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } import akka.actor.{ ActorRef, ActorSystem, Address } import akka.event.{ Logging, NoLogging } -import akka.remote.artery.{ InboundContext, OutboundContext } +import akka.remote.artery.{ ArterySettings, InboundContext, OutboundContext } import akka.util.{ OptionVal, PrettyDuration } import org.agrona.collections.Long2ObjectHashMap @@ -38,15 +38,14 @@ private[remote] trait InboundCompressions { */ private[remote] final class InboundCompressionsImpl( system: ActorSystem, - inboundContext: InboundContext) extends InboundCompressions { - - private val settings = CompressionSettings(system) + inboundContext: InboundContext, + settings: ArterySettings.Compression) extends InboundCompressions { // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { override def apply(originUid: Long): InboundActorRefCompression = { - val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) + val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max) new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters) } } @@ -56,7 +55,7 @@ private[remote] final class InboundCompressionsImpl( private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { override def apply(originUid: Long): InboundManifestCompression = { - val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) + val manifestHitters = new TopHeavyHitters[String](settings.Manifests.Max) new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters) } } @@ -106,7 +105,7 @@ private[remote] final class InboundCompressionsImpl( */ private[remote] final class InboundActorRefCompression( system: ActorSystem, - settings: CompressionSettings, + settings: ArterySettings.Compression, originUid: Long, inboundContext: InboundContext, heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters) { @@ -123,7 +122,7 @@ private[remote] final class InboundActorRefCompression( else super.decompress(tableVersion, idx) scheduleNextTableAdvertisement() - override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval + override protected def tableAdvertisementInterval = settings.ActorRefs.AdvertisementInterval override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[ActorRef]): Unit = { log.debug(s"Advertise ActorRef compression [$table], from [${inboundContext.localAddress}] to [${outboundContext.remoteAddress}]") @@ -133,13 +132,13 @@ private[remote] final class InboundActorRefCompression( final class InboundManifestCompression( system: ActorSystem, - settings: CompressionSettings, + settings: ArterySettings.Compression, originUid: Long, inboundContext: InboundContext, heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters) { scheduleNextTableAdvertisement() - override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval + override protected def tableAdvertisementInterval = settings.Manifests.AdvertisementInterval override lazy val log = NoLogging @@ -183,7 +182,7 @@ private[remote] object InboundCompression { */ private[remote] abstract class InboundCompression[T >: Null]( val system: ActorSystem, - val settings: CompressionSettings, + val settings: ArterySettings.Compression, originUid: Long, inboundContext: InboundContext, val heavyHitters: TopHeavyHitters[T]) {