diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index d1c48565d5..36b727cc1c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -24,7 +24,7 @@ case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends Mul commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" - akka.remoting.retry-latch-closed-for = 3 s + akka.remote.retry-latch-closed-for = 3 s akka.cluster { auto-down = on failure-detector.threshold = 4 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index afe20e1614..c7666ba395 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -25,7 +25,7 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B commonConfig(ConfigFactory.parseString( """ - akka.remoting.log-remote-lifecycle-events = off + akka.remote.log-remote-lifecycle-events = off akka.cluster.publish-stats-interval = 0s akka.loglevel = INFO """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index b2a71a66c9..7aa7b81047 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -29,7 +29,7 @@ object ClusterDeployerSpec { cluster.routees-path = "/user/myservice" } } - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 """, ConfigParseOptions.defaults) class RecipeActor extends Actor { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 1b8b6b29fe..242775fb40 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -27,7 +27,7 @@ object ClusterSpec { } akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 # akka.loglevel = DEBUG """ diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala index cffb41250c..5877d91e7a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala @@ -13,7 +13,7 @@ import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString(""" akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 """)) { val a1 = Address("tcp.akka", "sys", "a1", 2551) diff --git a/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala b/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala index 6284b4ca8e..74964fb823 100644 --- a/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala @@ -20,7 +20,7 @@ object RemoteDeploymentDocSpec { class RemoteDeploymentDocSpec extends AkkaSpec(""" akka.actor.provider = "akka.remote.RemoteActorRefProvider" - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 """) with ImplicitSender { import RemoteDeploymentDocSpec._ diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 1d00e30c51..7e8a900993 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -63,7 +63,7 @@ abstract class MultiNodeConfig { receive = on fsm = on } - akka.remoting.log-remote-lifecycle-events = on + akka.remote.log-remote-lifecycle-events = on """) else ConfigFactory.empty @@ -101,8 +101,8 @@ abstract class MultiNodeConfig { val transportConfig = if (_testTransport) ConfigFactory.parseString( """ - akka.remoting.transports.tcp.applied-adapters = [gremlin, trttl] - akka.remoting.retry-gate-closed-for = 1 s + akka.remote.netty.tcp.applied-adapters = [gremlin, trttl] + akka.remote.retry-gate-closed-for = 1 s """) else ConfigFactory.empty @@ -194,8 +194,8 @@ object MultiNodeSpec { private[testkit] val nodeConfig = mapToConfig(Map( "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", - "akka.remoting.transports.tcp.hostname" -> selfName, - "akka.remoting.transports.tcp.port" -> selfPort)) + "akka.remote.netty.tcp.hostname" -> selfName, + "akka.remote.netty.tcp.port" -> selfPort)) private[testkit] val baseConfig: Config = ConfigFactory.parseString(""" akka { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index d34d4e94f6..85fd42dffb 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -24,7 +24,7 @@ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { } commonConfig(debugConfig(on = false).withFallback( - ConfigFactory.parseString("akka.remoting.log-remote-lifecycle-events = off"))) + ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))) val master = role("master") val slave = role("slave") diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 505ee2a030..eb9e4f4108 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -53,128 +53,67 @@ akka { } } - remoting { + remote { - # FIXME document - failure-detector { - threshold = 7.0 - max-sample-size = 100 - min-std-deviation = 100 ms - acceptable-heartbeat-pause = 3 s - } + ### General settings - # FIXME document + # Timeout after which the startup of the remoting subsystem is considered to be failed. + # Increase this value if your transport drivers (see the enabled-transports section) + # need longer time to be loaded. + startup-timeout = 5 s + + # Timout after which the graceful shutdown of the remoting subsystem is considered to be failed. + # After the timeout the remoting system is forcefully shut down. + # Increase this value if your transport drivers (see the enabled-transports section) + # need longer time to stop properly. + shutdown-timeout = 5 s + + # Before shutting down the drivers, the remoting subsystem attempts to flush all pending + # writes. This setting controls the maximum time the remoting is willing to wait before + # moving on to shut down the drivers. + flush-wait-on-shutdown = 2 s + + # Reuse inbound connections for outbound messages + use-passive-connections = on + + # Dispatcher that the actors responsible to write to a connection will use. + # The mailbox type must be always a DequeBasedMailbox. writer-dispatcher { mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" } - # If this is "on", Akka will log all RemoteLifeCycleEvents at the level - # defined for each, if off then they are not logged. Failures to deserialize - # received messages also fall under this flag. - log-remote-lifecycle-events = off - - # FIXME document - heartbeat-interval = 1 s - - # FIXME document + # If enabled, an inbound connection is only considered to be live after the remote + # system sent an explicit acknowledgement. + # It is recommended to leave this setting on when connectionless transports (e.g. UDP) + # are used. wait-activity-enabled = on - # FIXME document + # Controls the backoff interval after a refused write is reattempted. (Transports may + # refuse writes if their internal buffer is full) backoff-interval = 0.01 s - # FIXME document - secure-cookie = "" - - # FIXME document - require-cookie = off - - # FIXME document - flush-wait-on-shutdown = 2 s - - # FIXME document - shutdown-timeout = 5 s - - # FIXME document - startup-timeout = 5 s - - # FIXME document - retry-gate-closed-for = 0 s - - # FIXME document - retry-window = 3 s - - # FIXME document - maximum-retries-in-window = 5 - - # FIXME document - use-passive-connections = on - - # Acknowledgment timeout for commands + # Acknowledgment timeout of management commands sent to the transport stack. command-ack-timeout = 30 s - adapters { - gremlin = "akka.remote.transport.FailureInjectorProvider" - trttl = "akka.remote.transport.ThrottlerProvider" - } - - enabled-transports = ["tcp"] - - transports.tcp { - transport-class = "akka.remote.transport.netty.NettyTransport" - applied-adapters = [] - - transport-protocol = tcp - port = 2552 - hostname = "localhost" #FIXME Empty string should default to localhost - enable-ssl = false - log-transport-events = true - connection-timeout = 120s - use-dispatcher-for-io = "" - write-buffer-high-water-mark = 0b - write-buffer-low-water-mark = 0b - send-buffer-size = 32000b - receive-buffer-size = 32000b - backlog = 4096 - - server-socket-worker-pool { - pool-size-min = 2 - pool-size-factor = 1.0 - pool-size-max = 8 - } - - client-socket-worker-pool { - pool-size-min = 2 - pool-size-factor = 1.0 - pool-size-max = 8 - } - } - - transports.udp = ${akka.remoting.transports.tcp} - transports.udp { - transport-protocol = udp - } - - transports.ssl = ${akka.remoting.transports.tcp} - transports.ssl = { - enable-ssl = true - } - - } - - remote { - - # Which implementation of akka.remote.RemoteTransport to use - # default is a TCP-based remote transport based on Netty - transport = "akka.remote.Remoting" - + ### Security settings # Enable untrusted mode for full security of server managed actors, prevents # system messages to be send by clients, e.g. messages like 'Create', # 'Suspend', 'Resume', 'Terminate', 'Supervise', 'Link' etc. untrusted-mode = off - # Timeout for ACK of cluster operations, like checking actor out etc. - remote-daemon-ack-timeout = 30s + # Should the remote server require that its peers share the same + # secure-cookie (defined in the 'remote' section)? Secure cookies are passed + # between during the initial handshake. Connections are refused if the initial + # message contains a mismatching cookie or the cookie is missing. + require-cookie = off + + # Generate your own with the script availbale in + # '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using + # 'akka.util.Crypt.generateSecureCookie' + secure-cookie = "" + + ### Logging # If this is "on", Akka will log all inbound messages at DEBUG level, # if off then they are not logged @@ -185,189 +124,134 @@ akka { log-sent-messages = off # If this is "on", Akka will log all RemoteLifeCycleEvents at the level - # defined for each, if off then they are not logged Failures to deserialize + # defined for each, if off then they are not logged. Failures to deserialize # received messages also fall under this flag. - log-remote-lifecycle-events = on + log-remote-lifecycle-events = off - # Each property is annotated with (I) or (O) or (I&O), where I stands for - # “inbound” and O for “outbound” connections. The NettyRemoteTransport always - # starts the server role to allow inbound connections, and it starts active - # client connections whenever sending to a destination which is not yet - # connected; if configured it reuses inbound connections for replies, which - # is called a passive client connection (i.e. from server to client). - netty { + ### Failure detection and recovery - # (O) In case of increased latency / overflow how long should we wait - # (blocking the sender) until we deem the send to be cancelled? - # 0 means "never backoff", any positive number will indicate the time to - # block at most. - backoff-timeout = 0ms + # how often should keep-alive heartbeat messages sent to connections. + heartbeat-interval = 1 s - # (I&O) Generate your own with the script availbale in - # '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using - # 'akka.util.Crypt.generateSecureCookie' - secure-cookie = "" + # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf + # [Hayashibara et al]) used by the remoting subsystem to detect failed connections. + failure-detector { + # defines the failure detector threshold + # A low threshold is prone to generate many wrong suspicions but ensures + # a quick detection in the event of a real crash. Conversely, a high + # threshold generates fewer mistakes but needs more time to detect + # actual crashes + threshold = 7.0 - # (I) Should the remote server require that its peers share the same - # secure-cookie (defined in the 'remote' section)? - require-cookie = off + # Number of the samples of inter-heartbeat arrival times to adaptively + # calculate the failure timeout for connections. + max-sample-size = 100 - # (I) Reuse inbound connections for outbound messages - use-passive-connections = on + # Minimum standard deviation to use for the normal distribution in + # AccrualFailureDetector. Too low standard deviation might result in + # too much sensitivity for sudden, but normal, deviations in heartbeat + # inter arrival times. + min-std-deviation = 100 ms - # (I) EXPERIMENTAL If "" then the specified dispatcher - # will be used to accept inbound connections, and perform IO. If "" then - # dedicated threads will be used. - # - # CAUTION: This might lead to the used dispatcher not shutting down properly! - # - may prevent the JVM from shutting down normally - # - may leak threads when shutting down an ActorSystem - # - use-dispatcher-for-io = "" + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # It is a factor of heartbeat-interval. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 3 s + } - # (I) The hostname or ip to bind the remoting to, - # InetAddress.getLocalHost.getHostAddress is used if empty - hostname = "" + # After failed to establish an outbound connection, the remoting will mark the + # address as failed. This configuration option controls how much time should + # be elapsed before reattempting a new connection. While the address is + # gated, all messages sent to the address are delivered to dead-letters. + # If this setting is 0, the remoting will always immediately reattempt + # to establish a failed outbound connection and will buffer writes until + # it succeeds. + retry-gate-closed-for = 0 s - # (I) The default remote server port clients should connect to. + # If the retry gate function is disabled (see retry-gate-closed-for) the + # remoting subsystem will always attempt to reestablish failed outbound + # connections. The settings below together control the maximum number of + # reattempts in a given time window. The number of reattempts during + # a window of "retry-window" will be maximum "maximum-retries-in-window". + retry-window = 3 s + maximum-retries-in-window = 5 + + ### Transports and adapters + + # List of the transport drivers that will be loaded by the remoting. + # A list of fully qualified config paths must be provided where + # the given configuration path contains a transport-class key + # pointing to an implementation class of the Transport interface. + # If multiple transports are provided, the address of the first + # one will be used as a default address. + enabled-transports = ["akka.remote.netty.tcp"] + + # Transport drivers can be augmented with adapters by adding their + # name to the applied-adapters setting in the configuration of a + # transport. The available adapters should be configured in this + # section by providing a name, and the fully qualified name of + # their corresponding implementation + adapters { + gremlin = "akka.remote.transport.FailureInjectorProvider" + trttl = "akka.remote.transport.ThrottlerProvider" + } + + ### Default configuration for the Netty based transport drivers + + netty.tcp { + transport-class = "akka.remote.transport.netty.NettyTransport" + + # Transport drivers can be augmented with adapters by adding their + # name to the applied-adapters list. The adapters will be applied + # in the order they are provided. + applied-adapters = [] + + transport-protocol = tcp + + # The default remote server port clients should connect to. # Default is 2552 (AKKA), use 0 if you want a random available port # This port needs to be unique for each actor system on the same machine. port = 2552 - # (O) The address of a local network interface (IP Address) to bind to when - # creating outbound connections. Set to "" or "auto" for automatic selection - # of local address. - outbound-local-address = "auto" + # The hostname or ip to bind the remoting to, + # InetAddress.getLocalHost.getHostAddress is used if empty + hostname = "" - # (I&O) Increase this if you want to be able to send messages with large - # payloads - message-frame-size = 1 MiB + # Enables SSL support on this transport + enable-ssl = false - # (O) Sets the connectTimeoutMillis of all outbound connections, + # Sets the connectTimeoutMillis of all outbound connections, # i.e. how long a connect may take until it is timed out connection-timeout = 120s - # (I) Sets the size of the connection backlog - backlog = 4096 + # If set to "" then the specified dispatcher + # will be used to accept inbound connections, and perform IO. If "" then + # dedicated threads will be used. + use-dispatcher-for-io = "" - # (I) Sets the SO_REUSE_ADDR flag, valid values are "on", "off" and "off-for-windows" - # due to the following Windows bug: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4476378 - # "off-for-windows" of course means that it's "on" for all other platforms - reuse-address = off-for-windows - - # (I) Length in akka.time-unit how long core threads will be kept alive if - # idling - execution-pool-keepalive = 60s - - # (I) Size in number of threads of the core pool of the remote execution - # unit. - # A value of 0 will turn this off, which is can lead to deadlocks under - # some configurations! - execution-pool-size = 4 - - # (I) Maximum channel size, 0 for off - max-channel-memory-size = 0b - - # (I) Maximum total size of all channels, 0 for off - max-total-memory-size = 0b - - # (I&O) Sets the high water mark for the in and outbound sockets, + # Sets the high water mark for the in and outbound sockets, # set to 0b for platform default write-buffer-high-water-mark = 0b - # (I&O) Sets the low water mark for the in and outbound sockets, + # Sets the low water mark for the in and outbound sockets, # set to 0b for platform default write-buffer-low-water-mark = 0b - # (I&O) Sets the send buffer size of the Sockets, + # Sets the send buffer size of the Sockets, # set to 0b for platform default - send-buffer-size = 0b + send-buffer-size = 32000b - # (I&O) Sets the receive buffer size of the Sockets, + # Sets the receive buffer size of the Sockets, # set to 0b for platform default - receive-buffer-size = 0b + receive-buffer-size = 32000b - # (O) Time between reconnect attempts for active clients - reconnect-delay = 5s + # Sets the size of the connection backlog + backlog = 4096 - # (O) Client read inactivity period (finest resolution is seconds) - # after which active client connection is shutdown; - # Connection will be re-established in case of new communication requests. - # A value of 0 will turn this feature off - # This value should be left to be 0 when use-passive-connections is off, or if - # no traffic is expected from the server side (i.e. it is a sink). - read-timeout = 0s - - # (O) Write inactivity period (lowest resolution is seconds) - # after which a heartbeat is sent across the wire. - # A value of 0 will turn this feature off - write-timeout = 10s - - # (O) Inactivity period of both reads and writes (lowest resolution is - # seconds) after which active client connection is shutdown; will be - # re-established in case of new communication requests. - # A value of 0 will turn this feature off - all-timeout = 0s - - # (O) Maximum time window that a client should try to reconnect for - reconnection-time-window = 600s - - ssl { - # (I&O) Enable SSL/TLS encryption. - # This must be enabled on both the client and server to work. - enable = off - - # (I) This is the Java Key Store used by the server connection - key-store = "keystore" - - # This password is used for decrypting the key store - key-store-password = "changeme" - - # (O) This is the Java Key Store used by the client connection - trust-store = "truststore" - - # This password is used for decrypting the trust store - trust-store-password = "changeme" - - # (I&O) Protocol to use for SSL encryption, choose from: - # Java 6 & 7: - # 'SSLv3', 'TLSv1' - # Java 7: - # 'TLSv1.1', 'TLSv1.2' - protocol = "TLSv1" - - # Example: ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"] - # You need to install the JCE Unlimited Strength Jurisdiction Policy - # Files to use AES 256. - # More info here: - # http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider - enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"] - - # Using /dev/./urandom is only necessary when using SHA1PRNG on Linux to - # prevent blocking. It is NOT as secure because it reuses the seed. - # '' => defaults to /dev/random or whatever is set in java.security for - # example: securerandom.source=file:/dev/random - # '/dev/./urandom' => NOT '/dev/urandom' as that doesn't work according - # to: http://bugs.sun.com/view_bug.do?bug_id=6202721 - sha1prng-random-source = "" - - # There are three options, in increasing order of security: - # "" or SecureRandom => (default) - # "SHA1PRNG" => Can be slow because of blocking issues on Linux - # "AES128CounterSecureRNG" => fastest startup and based on AES encryption - # algorithm - # "AES256CounterSecureRNG" - # The following use one of 3 possible seed sources, depending on - # availability: /dev/random, random.org and SecureRandom (provided by Java) - # "AES128CounterInetRNG" - # "AES256CounterInetRNG" (Install JCE Unlimited Strength Jurisdiction - # Policy Files first) - # Setting a value here may require you to supply the appropriate cipher - # suite (see enabled-algorithms section above) - random-number-generator = "" - } - - # (I&O) Used to configure the number of I/O worker threads on server sockets + # Used to configure the number of I/O worker threads on server sockets server-socket-worker-pool { # Min number of threads to cap factor-based number to pool-size-min = 2 @@ -382,7 +266,7 @@ akka { pool-size-max = 8 } - # (I&O) Used to configure the number of I/O worker threads on client sockets + # Used to configure the number of I/O worker threads on client sockets client-socket-worker-pool { # Min number of threads to cap factor-based number to pool-size-min = 2 @@ -397,5 +281,17 @@ akka { pool-size-max = 8 } } + + netty.udp = ${akka.remote.netty.tcp} + netty.udp { + transport-protocol = udp + } + + netty.ssl = ${akka.remote.netty.tcp} + netty.ssl = { + enable-ssl = true + } + } + } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 3e33633daa..e678fa9551 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -123,14 +123,14 @@ private[remote] class EndpointWriter( val localAddress: Address, val remoteAddress: Address, val transport: Transport, - val settings: RemotingSettings, + val settings: RemoteSettings, val codec: AkkaPduCodec) extends Actor with Stash with FSM[EndpointWriter.State, Unit] { import EndpointWriter._ import context.dispatcher val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem] - val eventPublisher = new EventPublisher(context.system, log, settings.LogLifecycleEvents) + val eventPublisher = new EventPublisher(context.system, log, settings.LogRemoteLifecycleEvents) var reader: Option[ActorRef] = None var handle: Option[AssociationHandle] = handleOrActive // FIXME: refactor into state data diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index b576ff68a3..d70dc96d2e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -76,7 +76,7 @@ class RemoteActorRefProvider( val dynamicAccess: DynamicAccess) extends ActorRefProvider { import RemoteActorRefProvider._ - val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName) + val remoteSettings: RemoteSettings = new RemoteSettings(settings.config) override val deployer: Deployer = createDeployer @@ -134,19 +134,8 @@ class RemoteActorRefProvider( local.registerExtraNames(Map(("remote", d))) d }, - serialization = SerializationExtension(system), - - transport = { - val fqn = remoteSettings.RemoteTransport - val args = List( - classOf[ExtendedActorSystem] -> system, - classOf[RemoteActorRefProvider] -> this) - - system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({ - case problem ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) - }).get - }) + transport = new Remoting(system, this)) _internals = internals remotingTerminator ! internals diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 3d60667435..92daf70f83 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -4,15 +4,58 @@ package akka.remote import com.typesafe.config.Config -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.util.Timeout +import scala.collection.immutable.Seq +import akka.japi.Util._ -class RemoteSettings(val config: Config, val systemName: String) { +class RemoteSettings(val config: Config) { import config._ - val RemoteTransport: String = getString("akka.remote.transport") + import scala.collection.JavaConverters._ + val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") + val LogSend: Boolean = getBoolean("akka.remote.log-sent-messages") - val RemoteSystemDaemonAckTimeout: Duration = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) + val UntrustedMode: Boolean = getBoolean("akka.remote.untrusted-mode") - val LogRemoteLifeCycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events") + + val LogRemoteLifecycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events") + + val ShutdownTimeout: Timeout = + Duration(getMilliseconds("akka.remote.shutdown-timeout"), MILLISECONDS) + + val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remote.flush-wait-on-shutdown"), MILLISECONDS) + + val StartupTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remote.startup-timeout"), MILLISECONDS)) + + val RetryGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.retry-gate-closed-for"), MILLISECONDS) + + val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections") + + val MaximumRetriesInWindow: Int = getInt("akka.remote.maximum-retries-in-window") + + val RetryWindow: FiniteDuration = Duration(getMilliseconds("akka.remote.retry-window"), MILLISECONDS) + + val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remote.backoff-interval"), MILLISECONDS) + + val CommandAckTimeout: Timeout = + Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS)) + + val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name ⇒ + val transportConfig = transportConfigFor(name) + (transportConfig.getString("transport-class"), + immutableSeq(transportConfig.getStringList("applied-adapters")), + transportConfig) + } + + val Adapters: Map[String, String] = configToMap(getConfig("akka.remote.adapters")) + + private def transportNames: Seq[String] = immutableSeq(getStringList("akka.remote.enabled-transports")) + + private def transportConfigFor(transportName: String): Config = getConfig(transportName) + + private def configToMap(cfg: Config): Map[String, String] = + cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } + } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index f0e4f61043..adad076911 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -23,50 +23,6 @@ import scala.concurrent.{ Promise, Await, Future } import scala.util.control.NonFatal import scala.util.{ Failure, Success } -class RemotingSettings(val config: Config) { - - import config._ - import scala.collection.JavaConverters._ - - val LogLifecycleEvents: Boolean = getBoolean("akka.remoting.log-remote-lifecycle-events") - - val ShutdownTimeout: Timeout = - Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS) - - val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remoting.flush-wait-on-shutdown"), MILLISECONDS) - - val StartupTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS)) - - val RetryGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remoting.retry-gate-closed-for"), MILLISECONDS) - - val UsePassiveConnections: Boolean = getBoolean("akka.remoting.use-passive-connections") - - val MaximumRetriesInWindow: Int = getInt("akka.remoting.maximum-retries-in-window") - - val RetryWindow: FiniteDuration = Duration(getMilliseconds("akka.remoting.retry-window"), MILLISECONDS) - - val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS) - - val CommandAckTimeout: Timeout = - Timeout(Duration(getMilliseconds("akka.remoting.command-ack-timeout"), MILLISECONDS)) - - val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name ⇒ - val transportConfig = transportConfigFor(name) - (transportConfig.getString("transport-class"), - immutableSeq(transportConfig.getStringList("applied-adapters")), - transportConfig) - } - - val Adapters: Map[String, String] = configToMap(getConfig("akka.remoting.adapters")) - - private def transportNames: Seq[String] = immutableSeq(getStringList("akka.remoting.enabled-transports")) - - private def transportConfigFor(transportName: String): Config = getConfig("akka.remoting.transports." + transportName) - - private def configToMap(cfg: Config): Map[String, String] = - cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } -} - private[remote] object AddressUrlEncoder { def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8") } @@ -136,14 +92,14 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc // a lazy val @volatile var defaultAddress: Address = _ - private val settings = new RemotingSettings(provider.remoteSettings.config) + import provider.remoteSettings._ val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[TransportSupervisor], "transports") override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) val log: LoggingAdapter = Logging(system.eventStream, "Remoting") - val eventPublisher = new EventPublisher(system, log, settings.LogLifecycleEvents) + val eventPublisher = new EventPublisher(system, log, LogRemoteLifecycleEvents) private def notifyError(msg: String, cause: Throwable): Unit = eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause))) @@ -152,7 +108,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc import scala.concurrent.ExecutionContext.Implicits.global endpointManager match { case Some(manager) ⇒ - implicit val timeout = settings.ShutdownTimeout + implicit val timeout = ShutdownTimeout val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean] def finalize(): Unit = { @@ -164,7 +120,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case Success(flushSuccessful) ⇒ if (!flushSuccessful) log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " + - "Increase akka.remoting.flush-wait-on-shutdown to a larger value to avoid this.") + "Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.") finalize() case Failure(e) ⇒ @@ -193,7 +149,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc manager ! Listen(addressesPromise) val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future, - settings.StartupTimeout.duration) + StartupTimeout.duration) if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null) transportMapping = transports.groupBy { @@ -203,6 +159,8 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc defaultAddress = transports.head._2 addresses = transports.map { _._2 }.toSet + log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]")) + manager ! StartupFinished eventPublisher.notifyListeners(RemotingListenEvent(addresses)) @@ -228,7 +186,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match { case Some(manager) ⇒ import system.dispatcher - implicit val timeout = settings.CommandAckTimeout + implicit val timeout = CommandAckTimeout manager ? ManagementCommand(cmd) map { case ManagementCommandAck(status) ⇒ status } case None ⇒ throw new IllegalStateException("Attempted to send management command but Remoting is not running.") } @@ -237,7 +195,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode // Not used anywhere only to keep compatibility with RemoteTransport interface - protected def logRemoteLifeCycleEvents: Boolean = provider.remoteSettings.LogRemoteLifeCycleEvents + protected def logRemoteLifeCycleEvents: Boolean = LogRemoteLifecycleEvents } @@ -356,11 +314,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends import EndpointManager._ import context.dispatcher - val settings = new RemotingSettings(conf) + val settings = new RemoteSettings(conf) val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem] val endpointId: Iterator[Int] = Iterator from 0 - val eventPublisher = new EventPublisher(context.system, log, settings.LogLifecycleEvents) + val eventPublisher = new EventPublisher(context.system, log, settings.LogRemoteLifecycleEvents) // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections // will be not part of this map! @@ -540,7 +498,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends private def createEndpoint(remoteAddress: Address, localAddress: Address, transport: Transport, - endpointSettings: RemotingSettings, + endpointSettings: RemoteSettings, handleOption: Option[AssociationHandle]): ActorRef = { assert(transportMapping contains localAddress) @@ -552,7 +510,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport, endpointSettings, AkkaPduProtobufCodec)) - .withDispatcher("akka.remoting.writer-dispatcher"), + .withDispatcher("akka.remote.writer-dispatcher"), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index e6fad8dd30..674517fb61 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -10,7 +10,7 @@ import akka.remote.Remoting.RegisterTransportActor import akka.remote.transport.ActorTransportAdapter.ListenUnderlying import akka.remote.transport.ActorTransportAdapter.ListenerRegistered import akka.remote.transport.Transport._ -import akka.remote.{ RARP, RemotingSettings } +import akka.remote.RARP import akka.util.Timeout import scala.collection.immutable import scala.concurrent.duration._ @@ -20,7 +20,7 @@ import scala.util.Success trait TransportAdapterProvider extends ((Transport, ExtendedActorSystem) ⇒ Transport) class TransportAdapters(system: ExtendedActorSystem) extends Extension { - val settings = new RemotingSettings(RARP(system).provider.remoteSettings.config) + val settings = RARP(system).provider.remoteSettings private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) ← settings.Adapters) yield { name -> system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({ diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 921c88a83d..85cb870727 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -30,24 +30,24 @@ private[remote] class AkkaProtocolSettings(config: Config) { import config._ - val FailureDetectorThreshold: Double = getDouble("akka.remoting.failure-detector.threshold") + val FailureDetectorThreshold: Double = getDouble("akka.remote.failure-detector.threshold") - val FailureDetectorMaxSampleSize: Int = getInt("akka.remoting.failure-detector.max-sample-size") + val FailureDetectorMaxSampleSize: Int = getInt("akka.remote.failure-detector.max-sample-size") val FailureDetectorStdDeviation: FiniteDuration = - Duration(getMilliseconds("akka.remoting.failure-detector.min-std-deviation"), MILLISECONDS) + Duration(getMilliseconds("akka.remote.failure-detector.min-std-deviation"), MILLISECONDS) val AcceptableHeartBeatPause: FiniteDuration = - Duration(getMilliseconds("akka.remoting.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) + Duration(getMilliseconds("akka.remote.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) val HeartBeatInterval: FiniteDuration = - Duration(getMilliseconds("akka.remoting.heartbeat-interval"), MILLISECONDS) + Duration(getMilliseconds("akka.remote.heartbeat-interval"), MILLISECONDS) - val WaitActivityEnabled: Boolean = getBoolean("akka.remoting.wait-activity-enabled") + val WaitActivityEnabled: Boolean = getBoolean("akka.remote.wait-activity-enabled") - val RequireCookie: Boolean = getBoolean("akka.remoting.require-cookie") + val RequireCookie: Boolean = getBoolean("akka.remote.require-cookie") - val SecureCookie: String = getString("akka.remoting.secure-cookie") + val SecureCookie: String = getString("akka.remote.secure-cookie") } private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead? diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 42373f09cc..cfdc8b1915 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -8,32 +8,54 @@ import language.postfixOps import akka.testkit.AkkaSpec import akka.actor.ExtendedActorSystem import scala.concurrent.duration._ -import akka.util.Helpers +import akka.remote.transport.AkkaProtocolSettings +import akka.util.{ Timeout, Helpers } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteConfigSpec extends AkkaSpec( """ akka.actor.provider = "akka.remote.RemoteActorRefProvider" - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 """) { // FIXME: These tests are ignored as it tests configuration specific to the old remoting. "Remoting" must { - "be able to parse generic remote config elements" ignore { - val settings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings + "be able to parse generic remote config elements" in { + val settings = RARP(system).provider.remoteSettings import settings._ - RemoteTransport must be("akka.remote.netty.NettyRemoteTransport") + StartupTimeout must be === Timeout(5.seconds) + ShutdownTimeout must be === Timeout(5.seconds) + FlushWait must be === 2.seconds + UsePassiveConnections must be(true) UntrustedMode must be(false) - RemoteSystemDaemonAckTimeout must be(30 seconds) - LogRemoteLifeCycleEvents must be(true) + LogRemoteLifecycleEvents must be(false) + LogReceive must be(false) + LogSend must be(false) + MaximumRetriesInWindow must be === 5 + RetryWindow must be === 3.seconds + BackoffPeriod must be === 10.milliseconds + CommandAckTimeout must be === Timeout(30.seconds) + } - "contain correct configuration values in reference.conf" in { - val c = system.asInstanceOf[ExtendedActorSystem]. - provider.asInstanceOf[RemoteActorRefProvider]. - remoteSettings.config.getConfig("akka.remote.netty") + "be able to parse AkkaProtocol related config elements" in { + val settings = new AkkaProtocolSettings(RARP(system).provider.remoteSettings.config) + import settings._ + + WaitActivityEnabled must be(true) + FailureDetectorThreshold must be === 7 + FailureDetectorMaxSampleSize must be === 100 + FailureDetectorStdDeviation must be === 100.milliseconds + AcceptableHeartBeatPause must be === 3.seconds + HeartBeatInterval must be === 1.seconds + RequireCookie must be(false) + SecureCookie must be === "" + } + + "contain correct configuration values in reference.conf" ignore { + val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp") // server-socket-worker-pool { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 533f8745b7..b62a26ad5a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -16,14 +16,14 @@ akka { /watchers.remote = "tcp.akka://other@localhost:2666" } } - remoting.tcp { + remote.netty.tcp { hostname = "localhost" port = 0 } } """)) with ImplicitSender with DefaultTimeout with DeathWatchSpec { - val other = ActorSystem("other", ConfigFactory.parseString("akka.remoting.transports.tcp.port=2666") + val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.netty.tcp.port=2666") .withFallback(system.settings.config)) override def beforeTermination() { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 369d1232db..609621dfc2 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -18,7 +18,7 @@ object RemoteDeployerSpec { remote = "akka://sys@wallace:2552" } } - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 """, ConfigParseOptions.defaults) class RecipeActor extends Actor { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 9f88ea8b19..01e38566bd 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -21,7 +21,7 @@ object RemoteRouterSpec { class RemoteRouterSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" - remoting.transports.tcp { + remote.netty.tcp { hostname = localhost port = 0 } @@ -61,7 +61,7 @@ akka { import RemoteRouterSpec._ - val conf = ConfigFactory.parseString("""akka.remoting.transports.tcp.port=12347 + val conf = ConfigFactory.parseString("""akka.remote.netty.tcp.port=12347 akka.actor.deployment { /remote-override { router = round-robin diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index c68fdda63d..67bbad5f41 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -49,16 +49,21 @@ object RemotingSpec { actor.provider = "akka.remote.RemoteActorRefProvider" remote.transport = "akka.remote.Remoting" - remoting.retry-latch-closed-for = 1 s - remoting.log-remote-lifecycle-events = on - remoting.enabled-transports = [test, tcp, udp, ssl] + remote.retry-latch-closed-for = 1 s + remote.log-remote-lifecycle-events = on + remote.enabled-transports = [ + "akka.remote.test", + "akka.remote.netty.tcp", + "akka.remote.netty.udp", + "akka.remote.netty.ssl" + ] - remoting.transports.tcp.port = 0 - remoting.transports.udp.port = 0 - remoting.transports.ssl.port = 0 - remoting.transports.ssl.ssl = ${common-ssl-settings} + remote.netty.tcp.port = 0 + remote.netty.udp.port = 0 + remote.netty.ssl.port = 0 + remote.netty.ssl.ssl = ${common-ssl-settings} - remoting.transports.test { + remote.test { transport-class = "akka.remote.transport.TestTransport" applied-adapters = [] registry-key = aX33k0jWKg @@ -73,7 +78,7 @@ object RemotingSpec { /looker/child/grandchild.remote = "test.akka://RemotingSpec@localhost:12345" } } -""".format( + """.format( getClass.getClassLoader.getResource("keystore").getPath, getClass.getClassLoader.getResource("truststore").getPath)) @@ -86,7 +91,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val conf = ConfigFactory.parseString( """ - akka.remoting.transports { + akka.remote { test.local-address = "test://remote-sys@localhost:12346" } """).withFallback(system.settings.config).resolve() diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index c7e2224efc..c157a1894e 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -34,9 +34,9 @@ object Configuration { default-timeout = 10s } - remoting.enabled-transports = ["ssl"] + remote.enabled-transports = ["akka.remote.netty.ssl"] - remoting.transports.ssl { + remote.netty.ssl { hostname = localhost port = %d ssl { @@ -62,7 +62,7 @@ object Configuration { //if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled") val config = ConfigFactory.parseString(conf.format(localPort, trustStore, keyStore, cipher, enabled.mkString(", "))) - val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remoting.transports.ssl.ssl") + val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty.ssl.ssl") val settings = new SSLSettings(fullConfig) val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, @@ -126,7 +126,7 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten lazy val other: ActorSystem = ActorSystem( "remote-sys", - ConfigFactory.parseString("akka.remoting.transports.ssl.port = " + cipherConfig.remotePort).withFallback(system.settings.config)) + ConfigFactory.parseString("akka.remote.netty.ssl.port = " + cipherConfig.remotePort).withFallback(system.settings.config)) override def afterTermination() { if (cipherConfig.runTest) { diff --git a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala index 2bc84a9763..9124aad9c5 100644 --- a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala @@ -25,13 +25,13 @@ import akka.actor.PoisonPill class UntrustedSpec extends AkkaSpec(""" akka.actor.provider = akka.remote.RemoteActorRefProvider akka.remote.untrusted-mode = on -akka.remoting.transports.tcp.port = 0 +akka.remote.netty.tcp.port = 0 akka.loglevel = DEBUG """) with ImplicitSender { val other = ActorSystem("UntrustedSpec-client", ConfigFactory.parseString(""" akka.actor.provider = akka.remote.RemoteActorRefProvider - akka.remoting.transports.tcp.port = 0 + akka.remote.netty.tcp.port = 0 """)) val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head val target1 = other.actorFor(RootActorPath(addr) / "remote") diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index af4614aa6e..b22632a10d 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -33,7 +33,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val conf = ConfigFactory.parseString( """ - akka.remoting { + akka.remote { failure-detector { threshold = 7.0 @@ -203,7 +203,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re remoteAddress, statusPromise, transport, - new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.wait-activity-enabled = off").withFallback(conf)), + new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), codec, failureDetector))) @@ -263,7 +263,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re localAddress, handle, ActorAssociationEventListener(testActor), - new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)), + new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, failureDetector))) @@ -282,7 +282,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re localAddress, handle, ActorAssociationEventListener(testActor), - new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)), + new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, failureDetector))) @@ -314,8 +314,8 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(ConfigFactory.parseString( """ - akka.remoting.require-cookie = on - akka.remoting.wait-activity-enabled = off + akka.remote.require-cookie = on + akka.remote.wait-activity-enabled = off """).withFallback(conf)), codec, failureDetector))) @@ -342,7 +342,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re remoteAddress, statusPromise, transport, - new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.wait-activity-enabled = off").withFallback(conf)), + new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), codec, failureDetector))) @@ -413,7 +413,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re remoteAddress, statusPromise, transport, - new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.wait-activity-enabled = off").withFallback(conf)), + new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), codec, failureDetector))) @@ -449,7 +449,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re remoteAddress, statusPromise, transport, - new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.wait-activity-enabled = off").withFallback(conf)), + new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), codec, failureDetector))) diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index aa6625ecc0..7d78f053a4 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -16,19 +16,19 @@ object AkkaProtocolStressTest { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" - remoting.retry-latch-closed-for = 0 s - remoting.log-remote-lifecycle-events = on + remote.retry-latch-closed-for = 0 s + remote.log-remote-lifecycle-events = on - remoting.failure-detector { + remote.failure-detector { threshold = 1.0 max-sample-size = 2 min-std-deviation = 1 ms acceptable-heartbeat-pause = 0.01 s } - remoting.retry-window = 1 s - remoting.maximum-retries-in-window = 1000 + remote.retry-window = 1 s + remote.maximum-retries-in-window = 1000 - remoting.transports.tcp { + remote.netty.tcp { applied-adapters = ["gremlin"] port = 0 } diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index 134f58ee09..dd38d2e4a4 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -18,11 +18,11 @@ object ThrottlerTransportAdapterSpec { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" - remoting.retry-latch-closed-for = 0 s - remoting.log-remote-lifecycle-events = on + remote.retry-latch-closed-for = 0 s + remote.log-remote-lifecycle-events = on - remoting.transports.tcp.applied-adapters = ["trttl"] - remoting.transports.tcp.port = 0 + remote.netty.tcp.applied-adapters = ["trttl"] + remote.netty.tcp.port = 0 } """)