Merge pull request #15610 from 2m/wip-bind-hostname
=rem #15007 add ability to bind to a different address than the remoting...
This commit is contained in:
commit
0d599ed859
13 changed files with 328 additions and 24 deletions
|
|
@ -111,7 +111,7 @@ Observe all the parts you need here:
|
||||||
* ``system`` is the remote system’s name (must match exactly, case-sensitive!)
|
* ``system`` is the remote system’s name (must match exactly, case-sensitive!)
|
||||||
|
|
||||||
* ``host`` is the remote system’s IP address or DNS name, and it must match that
|
* ``host`` is the remote system’s IP address or DNS name, and it must match that
|
||||||
system’s configuration (i.e. `akka.remote.netty.hostname`)
|
system’s configuration (i.e. `akka.remote.netty.tcp.hostname`)
|
||||||
|
|
||||||
* ``1234`` is the port number on which the remote system is listening for
|
* ``1234`` is the port number on which the remote system is listening for
|
||||||
connections and receiving messages
|
connections and receiving messages
|
||||||
|
|
@ -131,6 +131,14 @@ The most common reason is that the local system’s name (i.e. the
|
||||||
system’s network location, e.g. because ``host`` was configured to be ``0.0.0.0``,
|
system’s network location, e.g. because ``host`` was configured to be ``0.0.0.0``,
|
||||||
``localhost`` or a NAT’ed IP address.
|
``localhost`` or a NAT’ed IP address.
|
||||||
|
|
||||||
|
If you are running an ActorSystem under a NAT or inside a docker container, make sure to
|
||||||
|
set `akka.remote.netty.tcp.hostname` and `akka.remote.netty.tcp.port` to the address
|
||||||
|
it is reachable at from other ActorSystems. If you need to bind your network interface
|
||||||
|
to a different address - use `akka.remote.netty.tcp.bind-hostname` and
|
||||||
|
`akka.remote.netty.tcp.bind-port` settings. Also make sure your network is configured
|
||||||
|
to translate from the address your ActorSystem is reachable at to the address your
|
||||||
|
ActorSystem network interface is bound to.
|
||||||
|
|
||||||
How reliable is the message delivery?
|
How reliable is the message delivery?
|
||||||
-------------------------------------
|
-------------------------------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,8 @@ be set to a specific :class:`Deploy` instance; this has the same effect as
|
||||||
putting an equivalent deployment into the configuration file (if both are
|
putting an equivalent deployment into the configuration file (if both are
|
||||||
given, configuration file wins).
|
given, configuration file wins).
|
||||||
|
|
||||||
|
.. _symmetric-communication:
|
||||||
|
|
||||||
Peer-to-Peer vs. Client-Server
|
Peer-to-Peer vs. Client-Server
|
||||||
------------------------------
|
------------------------------
|
||||||
|
|
||||||
|
|
@ -65,12 +67,27 @@ design decisions:
|
||||||
is no system that only accepts connections, and there is no system that only initiates connections.
|
is no system that only accepts connections, and there is no system that only initiates connections.
|
||||||
|
|
||||||
The consequence of these decisions is that it is not possible to safely create
|
The consequence of these decisions is that it is not possible to safely create
|
||||||
pure client-server setups with predefined roles (violates assumption 2) and
|
pure client-server setups with predefined roles (violates assumption 2).
|
||||||
using setups involving Network Address Translation or Load Balancers (violates
|
|
||||||
assumption 1).
|
|
||||||
|
|
||||||
For client-server setups it is better to use HTTP or Akka I/O.
|
For client-server setups it is better to use HTTP or Akka I/O.
|
||||||
|
|
||||||
|
Using setups involving Network Address Translation, Load Balancers or Docker
|
||||||
|
containers violates assumption 1, unless additional steps are taken in the
|
||||||
|
network configuration to allow symmetric communication between involved systems.
|
||||||
|
In such situations Akka can be configured to bind to a different network
|
||||||
|
address than the one used for establishing connections between Akka nodes::
|
||||||
|
|
||||||
|
akka {
|
||||||
|
remote {
|
||||||
|
netty.tcp {
|
||||||
|
hostname = my.domain.com # external (logical) hostname
|
||||||
|
port = 8000 # external (logical) port
|
||||||
|
|
||||||
|
bind-hostname = local.address # internal (bind) hostname
|
||||||
|
bind-port = 2552 # internal (bind) port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Marking Points for Scaling Up with Routers
|
Marking Points for Scaling Up with Routers
|
||||||
------------------------------------------
|
------------------------------------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,8 +10,10 @@ For an introduction of remoting capabilities of Akka please see :ref:`remoting`.
|
||||||
|
|
||||||
As explained in that chapter Akka remoting is designed for communication in a
|
As explained in that chapter Akka remoting is designed for communication in a
|
||||||
peer-to-peer fashion and it has limitations for client-server setups. In
|
peer-to-peer fashion and it has limitations for client-server setups. In
|
||||||
particular Akka Remoting does not work with Network Address Translation and
|
particular Akka Remoting does not work transparently with Network Address Translation,
|
||||||
Load Balancers, among others.
|
Load Balancers, or in Docker containers. For symmetric communication in these situations
|
||||||
|
network and/or Akka configuration will have to be changed as described in
|
||||||
|
:ref:`symmetric-communication`.
|
||||||
|
|
||||||
Preparing your ActorSystem for Remoting
|
Preparing your ActorSystem for Remoting
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
|
||||||
|
|
@ -4,16 +4,16 @@
|
||||||
Remoting
|
Remoting
|
||||||
##########
|
##########
|
||||||
|
|
||||||
|
|
||||||
For an introduction of remoting capabilities of Akka please see :ref:`remoting`.
|
For an introduction of remoting capabilities of Akka please see :ref:`remoting`.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
||||||
As explained in that chapter Akka remoting is designed for communication in a
|
As explained in that chapter Akka remoting is designed for communication in a
|
||||||
peer-to-peer fashion and it has limitations for client-server setups. In
|
peer-to-peer fashion and it has limitations for client-server setups. In
|
||||||
particular Akka Remoting does not work with Network Address Translation and
|
particular Akka Remoting does not work transparently with Network Address Translation,
|
||||||
Load Balancers, among others.
|
Load Balancers, or in Docker containers. For symmetric communication in these situations
|
||||||
|
network and/or Akka configuration will have to be changed as described in
|
||||||
|
:ref:`symmetric-communication`.
|
||||||
|
|
||||||
Preparing your ActorSystem for Remoting
|
Preparing your ActorSystem for Remoting
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
|
||||||
|
|
@ -312,10 +312,49 @@ akka {
|
||||||
# This port needs to be unique for each actor system on the same machine.
|
# This port needs to be unique for each actor system on the same machine.
|
||||||
port = 2552
|
port = 2552
|
||||||
|
|
||||||
# The hostname or ip to bind the remoting to,
|
# The hostname or ip clients should connect to.
|
||||||
# InetAddress.getLocalHost.getHostAddress is used if empty
|
# InetAddress.getLocalHost.getHostAddress is used if empty
|
||||||
hostname = ""
|
hostname = ""
|
||||||
|
|
||||||
|
# Use this setting to bind a network interface to a different port
|
||||||
|
# than remoting protocol expects messages at. This may be used
|
||||||
|
# when running akka nodes in a separated networks (under NATs or docker containers).
|
||||||
|
# Use 0 if you want a random available port. Examples:
|
||||||
|
#
|
||||||
|
# akka.remote.netty.tcp.port = 2552
|
||||||
|
# akka.remote.netty.tcp.bind-port = 2553
|
||||||
|
# Network interface will be bound to the 2553 port, but remoting protocol will
|
||||||
|
# expect messages sent to port 2552.
|
||||||
|
#
|
||||||
|
# akka.remote.netty.tcp.port = 0
|
||||||
|
# akka.remote.netty.tcp.bind-port = 0
|
||||||
|
# Network interface will be bound to a random port, and remoting protocol will
|
||||||
|
# expect messages sent to the bound port.
|
||||||
|
#
|
||||||
|
# akka.remote.netty.tcp.port = 2552
|
||||||
|
# akka.remote.netty.tcp.bind-port = 0
|
||||||
|
# Network interface will be bound to a random port, but remoting protocol will
|
||||||
|
# expect messages sent to port 2552.
|
||||||
|
#
|
||||||
|
# akka.remote.netty.tcp.port = 0
|
||||||
|
# akka.remote.netty.tcp.bind-port = 2553
|
||||||
|
# Network interface will be bound to the 2553 port, and remoting protocol will
|
||||||
|
# expect messages sent to the bound port.
|
||||||
|
#
|
||||||
|
# akka.remote.netty.tcp.port = 2552
|
||||||
|
# akka.remote.netty.tcp.bind-port = ""
|
||||||
|
# Network interface will be bound to the 2552 port, and remoting protocol will
|
||||||
|
# expect messages sent to the bound port.
|
||||||
|
#
|
||||||
|
# akka.remote.netty.tcp.port if empty
|
||||||
|
bind-port = ""
|
||||||
|
|
||||||
|
# Use this setting to bind a network interface to a different hostname or ip
|
||||||
|
# than remoting protocol expects messages at.
|
||||||
|
# Use "0.0.0.0" to bind to all interfaces.
|
||||||
|
# akka.remote.netty.tcp.hostname if empty
|
||||||
|
bind-hostname = ""
|
||||||
|
|
||||||
# Enables SSL support on this transport
|
# Enables SSL support on this transport
|
||||||
enable-ssl = false
|
enable-ssl = false
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.remote
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.Address
|
||||||
|
import akka.actor.ExtendedActorSystem
|
||||||
|
import akka.actor.Extension
|
||||||
|
import akka.actor.ExtensionId
|
||||||
|
import akka.actor.ExtensionIdProvider
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extension provides access to bound addresses.
|
||||||
|
*/
|
||||||
|
object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with ExtensionIdProvider {
|
||||||
|
override def get(system: ActorSystem): BoundAddressesExtension = super.get(system)
|
||||||
|
|
||||||
|
override def lookup = BoundAddressesExtension
|
||||||
|
|
||||||
|
override def createExtension(system: ExtendedActorSystem): BoundAddressesExtension =
|
||||||
|
new BoundAddressesExtension(system)
|
||||||
|
}
|
||||||
|
|
||||||
|
class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension {
|
||||||
|
/**
|
||||||
|
* Returns a mapping from a protocol to a set of bound addresses.
|
||||||
|
*/
|
||||||
|
def boundAddresses: Map[String, Set[Address]] = system.provider
|
||||||
|
.asInstanceOf[RemoteActorRefProvider].transport
|
||||||
|
.asInstanceOf[Remoting].boundAddresses
|
||||||
|
}
|
||||||
|
|
@ -229,6 +229,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
||||||
// Not used anywhere only to keep compatibility with RemoteTransport interface
|
// Not used anywhere only to keep compatibility with RemoteTransport interface
|
||||||
protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode
|
protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode
|
||||||
|
|
||||||
|
private[akka] def boundAddresses: Map[String, Set[Address]] = {
|
||||||
|
transportMapping.map {
|
||||||
|
case (scheme, transports) ⇒
|
||||||
|
scheme -> transports.map { case (transport, _) ⇒ transport.boundAddress }
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,8 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor
|
||||||
} yield (augmentScheme(listenAddress), upstreamListenerPromise)
|
} yield (augmentScheme(listenAddress), upstreamListenerPromise)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def boundAddress: Address = wrappedTransport.boundAddress
|
||||||
|
|
||||||
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
|
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
|
||||||
// Prepare a future, and pass its promise to the manager
|
// Prepare a future, and pass its promise to the manager
|
||||||
val statusPromise: Promise[AssociationHandle] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,7 @@ class TestTransport(
|
||||||
(_) ⇒ registry.logActivity(ShutdownAttempt(localAddress)))
|
(_) ⇒ registry.logActivity(ShutdownAttempt(localAddress)))
|
||||||
|
|
||||||
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(())
|
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(())
|
||||||
|
override def boundAddress = localAddress
|
||||||
override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress)
|
override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress)
|
||||||
override def shutdown(): Future[Boolean] = shutdownBehavior(())
|
override def shutdown(): Future[Boolean] = shutdownBehavior(())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -103,6 +103,12 @@ trait Transport {
|
||||||
*/
|
*/
|
||||||
def listen: Future[(Address, Promise[AssociationEventListener])]
|
def listen: Future[(Address, Promise[AssociationEventListener])]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return
|
||||||
|
* The address this Transport is listening to.
|
||||||
|
*/
|
||||||
|
def boundAddress: Address
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asynchronously opens a logical duplex link between two Transport Entities over a network. It could be backed by a
|
* Asynchronously opens a logical duplex link between two Transport Entities over a network. It could be backed by a
|
||||||
* real transport-layer connection (TCP), more lightweight connections provided over datagram protocols (UDP with
|
* real transport-layer connection (TCP), more lightweight connections provided over datagram protocols (UDP with
|
||||||
|
|
|
||||||
|
|
@ -124,9 +124,20 @@ class NettyTransportSettings(config: Config) {
|
||||||
case value ⇒ value
|
case value ⇒ value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val BindHostname: String = getString("bind-hostname") match {
|
||||||
|
case "" ⇒ Hostname
|
||||||
|
case value ⇒ value
|
||||||
|
}
|
||||||
|
|
||||||
@deprecated("WARNING: This should only be used by professionals.", "2.0")
|
@deprecated("WARNING: This should only be used by professionals.", "2.0")
|
||||||
val PortSelector: Int = getInt("port")
|
val PortSelector: Int = getInt("port")
|
||||||
|
|
||||||
|
@deprecated("WARNING: This should only be used by professionals.", "2.4")
|
||||||
|
val BindPortSelector: Int = getString("bind-port") match {
|
||||||
|
case "" ⇒ PortSelector
|
||||||
|
case value ⇒ value.toInt
|
||||||
|
}
|
||||||
|
|
||||||
val SslSettings: Option[SSLSettings] = if (EnableSsl) Some(new SSLSettings(config.getConfig("security"))) else None
|
val SslSettings: Option[SSLSettings] = if (EnableSsl) Some(new SSLSettings(config.getConfig("security"))) else None
|
||||||
|
|
||||||
val ServerSocketWorkerPoolSize: Int = computeWPS(config.getConfig("server-socket-worker-pool"))
|
val ServerSocketWorkerPoolSize: Int = computeWPS(config.getConfig("server-socket-worker-pool"))
|
||||||
|
|
@ -159,7 +170,7 @@ private[netty] trait CommonHandlers extends NettyHelpers {
|
||||||
final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)(
|
final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)(
|
||||||
op: (AssociationHandle ⇒ Any)): Unit = {
|
op: (AssociationHandle ⇒ Any)): Unit = {
|
||||||
import transport._
|
import transport._
|
||||||
NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname)) match {
|
NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), None) match {
|
||||||
case Some(localAddress) ⇒
|
case Some(localAddress) ⇒
|
||||||
val handle = createHandle(channel, localAddress, remoteAddress)
|
val handle = createHandle(channel, localAddress, remoteAddress)
|
||||||
handle.readHandlerPromise.future.onSuccess {
|
handle.readHandlerPromise.future.onSuccess {
|
||||||
|
|
@ -188,7 +199,7 @@ private[netty] abstract class ServerHandler(protected final val transport: Netty
|
||||||
associationListenerFuture.onSuccess {
|
associationListenerFuture.onSuccess {
|
||||||
case listener: AssociationEventListener ⇒
|
case listener: AssociationEventListener ⇒
|
||||||
val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier,
|
val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier,
|
||||||
transport.system.name, hostName = None).getOrElse(
|
transport.system.name, hostName = None, port = None).getOrElse(
|
||||||
throw new NettyTransportException(s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]"))
|
throw new NettyTransportException(s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]"))
|
||||||
init(channel, remoteSocketAddress, remoteAddress, msg) { listener notify InboundAssociation(_) }
|
init(channel, remoteSocketAddress, remoteAddress, msg) { listener notify InboundAssociation(_) }
|
||||||
}
|
}
|
||||||
|
|
@ -227,9 +238,9 @@ private[transport] object NettyTransport {
|
||||||
val uniqueIdCounter = new AtomicInteger(0)
|
val uniqueIdCounter = new AtomicInteger(0)
|
||||||
|
|
||||||
def addressFromSocketAddress(addr: SocketAddress, schemeIdentifier: String, systemName: String,
|
def addressFromSocketAddress(addr: SocketAddress, schemeIdentifier: String, systemName: String,
|
||||||
hostName: Option[String]): Option[Address] = addr match {
|
hostName: Option[String], port: Option[Int]): Option[Address] = addr match {
|
||||||
case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName,
|
case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName,
|
||||||
hostName.getOrElse(sa.getAddress.getHostAddress), sa.getPort)) // perhaps use getHostString in jdk 1.7
|
hostName.getOrElse(sa.getAddress.getHostAddress), port.getOrElse(sa.getPort))) // perhaps use getHostString in jdk 1.7
|
||||||
case _ ⇒ None
|
case _ ⇒ None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -254,6 +265,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
||||||
private final val isDatagram = TransportMode == Udp
|
private final val isDatagram = TransportMode == Udp
|
||||||
|
|
||||||
@volatile private var localAddress: Address = _
|
@volatile private var localAddress: Address = _
|
||||||
|
@volatile private var boundTo: Address = _
|
||||||
@volatile private var serverChannel: Channel = _
|
@volatile private var serverChannel: Channel = _
|
||||||
|
|
||||||
private val log = Logging(system, this.getClass)
|
private val log = Logging(system, this.getClass)
|
||||||
|
|
@ -386,7 +398,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
||||||
|
|
||||||
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
|
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
|
||||||
for {
|
for {
|
||||||
address ← addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector))
|
address ← addressToSocketAddress(Address("", "", settings.BindHostname, settings.BindPortSelector))
|
||||||
} yield {
|
} yield {
|
||||||
try {
|
try {
|
||||||
val newServerChannel = inboundBootstrap match {
|
val newServerChannel = inboundBootstrap match {
|
||||||
|
|
@ -400,13 +412,18 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
||||||
|
|
||||||
serverChannel = newServerChannel
|
serverChannel = newServerChannel
|
||||||
|
|
||||||
addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname)) match {
|
addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname),
|
||||||
case Some(address) ⇒
|
if (settings.PortSelector == 0) None else Some(settings.PortSelector)) match {
|
||||||
localAddress = address
|
case Some(address) ⇒
|
||||||
associationListenerPromise.future.onSuccess { case listener ⇒ newServerChannel.setReadable(true) }
|
addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, None, None) match {
|
||||||
(address, associationListenerPromise)
|
case Some(address) ⇒ boundTo = address
|
||||||
case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
|
case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
|
||||||
}
|
}
|
||||||
|
localAddress = address
|
||||||
|
associationListenerPromise.future.onSuccess { case listener ⇒ newServerChannel.setReadable(true) }
|
||||||
|
(address, associationListenerPromise)
|
||||||
|
case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒ {
|
case NonFatal(e) ⇒ {
|
||||||
log.error("failed to bind to {}, shutting down Netty transport", address)
|
log.error("failed to bind to {}, shutting down Netty transport", address)
|
||||||
|
|
@ -417,6 +434,8 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def boundAddress = boundTo
|
||||||
|
|
||||||
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
|
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
|
||||||
if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound"))
|
if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound"))
|
||||||
else {
|
else {
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,8 @@ class RemoteConfigSpec extends AkkaSpec(
|
||||||
TcpKeepalive should be(true)
|
TcpKeepalive should be(true)
|
||||||
TcpReuseAddr should be(!Helpers.isWindows)
|
TcpReuseAddr should be(!Helpers.isWindows)
|
||||||
c.getString("hostname") should be("")
|
c.getString("hostname") should be("")
|
||||||
|
c.getString("bind-hostname") should be("")
|
||||||
|
c.getString("bind-port") should be("")
|
||||||
ServerSocketWorkerPoolSize should be(2)
|
ServerSocketWorkerPoolSize should be(2)
|
||||||
ClientSocketWorkerPoolSize should be(2)
|
ClientSocketWorkerPoolSize should be(2)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,170 @@
|
||||||
|
package akka.remote.transport.netty
|
||||||
|
|
||||||
|
import java.net.{ InetAddress, InetSocketAddress }
|
||||||
|
|
||||||
|
import akka.TestUtils
|
||||||
|
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
|
||||||
|
import akka.remote.BoundAddressesExtension
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import org.scalatest.{ Matchers, WordSpec }
|
||||||
|
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration.Duration
|
||||||
|
|
||||||
|
object NettyTransportSpec {
|
||||||
|
val commonConfig = ConfigFactory.parseString("""
|
||||||
|
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
""")
|
||||||
|
|
||||||
|
def getInternal()(implicit sys: ActorSystem) =
|
||||||
|
BoundAddressesExtension(sys).boundAddresses.values.flatten
|
||||||
|
|
||||||
|
def getExternal()(implicit sys: ActorSystem) =
|
||||||
|
sys.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
|
|
||||||
|
implicit class RichInetSocketAddress(address: InetSocketAddress) {
|
||||||
|
def toAkkaAddress(protocol: String)(implicit system: ActorSystem) =
|
||||||
|
Address(protocol, system.name, address.getAddress.getHostAddress, address.getPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
implicit class RichAkkaAddress(address: Address) {
|
||||||
|
def withProtocol(protocol: String)(implicit system: ActorSystem) =
|
||||||
|
address.copy(protocol = protocol)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class NettyTransportSpec extends WordSpec with Matchers with BindBehaviour {
|
||||||
|
import akka.remote.transport.netty.NettyTransportSpec._
|
||||||
|
|
||||||
|
"NettyTransport" should {
|
||||||
|
behave like theOneWhoKnowsTheDifferenceBetweenBoundAndRemotingAddress("tcp")
|
||||||
|
behave like theOneWhoKnowsTheDifferenceBetweenBoundAndRemotingAddress("udp")
|
||||||
|
|
||||||
|
"bind to a random port" in {
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote.netty.tcp {
|
||||||
|
port = 0
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getInternal should contain(getExternal.withProtocol("tcp"))
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
"bind to a random port but remoting accepts from a specified port" in {
|
||||||
|
val address = TestUtils.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false)
|
||||||
|
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote.netty.tcp {
|
||||||
|
port = ${address.getPort}
|
||||||
|
bind-port = 0
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getExternal should be(address.toAkkaAddress("akka.tcp"))
|
||||||
|
getInternal should not contain (address.toAkkaAddress("tcp"))
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
"bind to a specified port and remoting accepts from a bound port" in {
|
||||||
|
val address = TestUtils.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false)
|
||||||
|
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote.netty.tcp {
|
||||||
|
port = 0
|
||||||
|
bind-port = ${address.getPort}
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getExternal should be(address.toAkkaAddress("akka.tcp"))
|
||||||
|
getInternal should contain(address.toAkkaAddress("tcp"))
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
"bind to multiple transports" in {
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote {
|
||||||
|
netty.tcp.port = 0
|
||||||
|
netty.udp.port = 0
|
||||||
|
enabled-transports = ["akka.remote.netty.tcp", "akka.remote.netty.udp"]
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getInternal should contain(getExternal.withProtocol("tcp"))
|
||||||
|
getInternal.size should be(2)
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
"bind to all interfaces" in {
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote {
|
||||||
|
netty.tcp.bind-hostname = "0.0.0.0"
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getInternal.flatMap(_.port) should contain(getExternal.port.get)
|
||||||
|
getInternal.map(_.host.get should include regex "0.0.0.0".r) // regexp dot is intentional to match IPv4 and 6 addresses
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait BindBehaviour { this: WordSpec with Matchers ⇒
|
||||||
|
import akka.remote.transport.netty.NettyTransportSpec._
|
||||||
|
|
||||||
|
def theOneWhoKnowsTheDifferenceBetweenBoundAndRemotingAddress(proto: String) = {
|
||||||
|
s"bind to default $proto address" in {
|
||||||
|
val address = TestUtils.temporaryServerAddress(udp = proto == "udp")
|
||||||
|
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote {
|
||||||
|
netty.$proto {
|
||||||
|
hostname = ${address.getAddress.getHostAddress}
|
||||||
|
port = ${address.getPort}
|
||||||
|
}
|
||||||
|
enabled-transports = ["akka.remote.netty.$proto"]
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getExternal should be(address.toAkkaAddress(s"akka.$proto"))
|
||||||
|
getInternal should contain(address.toAkkaAddress(proto))
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
s"bind to specified $proto address" in {
|
||||||
|
val address = TestUtils.temporaryServerAddress(address = "127.0.0.1", udp = proto == "udp")
|
||||||
|
val bindAddress = TestUtils.temporaryServerAddress(address = "127.0.1.1", udp = proto == "udp")
|
||||||
|
|
||||||
|
val bindConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote {
|
||||||
|
netty.$proto {
|
||||||
|
hostname = ${address.getAddress.getHostAddress}
|
||||||
|
port = ${address.getPort}
|
||||||
|
|
||||||
|
bind-hostname = ${bindAddress.getAddress.getHostAddress}
|
||||||
|
bind-port = ${bindAddress.getPort}
|
||||||
|
}
|
||||||
|
enabled-transports = ["akka.remote.netty.$proto"]
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
implicit val sys = ActorSystem("sys", bindConfig.withFallback(commonConfig))
|
||||||
|
|
||||||
|
getExternal should be(address.toAkkaAddress(s"akka.$proto"))
|
||||||
|
getInternal should contain(bindAddress.toAkkaAddress(proto))
|
||||||
|
|
||||||
|
Await.result(sys.terminate(), Duration.Inf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue