=rem #15007 add ability to bind to a different address than the remoting waits messages from

This can be used to traverse NATs with the following configuration:

akka.remote.netty.tcp {
  ...
  hostname = my-external-address.lt
  bind-hostname = 192.168.1.100
}

Use Akka BoundAddressesExtension to get bound addresses
This commit is contained in:
Martynas Mickevicius 2014-09-15 18:30:12 +03:00
parent 60ab0fb3d0
commit 47556a0ebf
13 changed files with 328 additions and 24 deletions

View file

@ -0,0 +1,32 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
/**
* Extension provides access to bound addresses.
*/
object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): BoundAddressesExtension = super.get(system)
override def lookup = BoundAddressesExtension
override def createExtension(system: ExtendedActorSystem): BoundAddressesExtension =
new BoundAddressesExtension(system)
}
class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension {
/**
* Returns a mapping from a protocol to a set of bound addresses.
*/
def boundAddresses: Map[String, Set[Address]] = system.provider
.asInstanceOf[RemoteActorRefProvider].transport
.asInstanceOf[Remoting].boundAddresses
}

View file

@ -229,6 +229,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
// Not used anywhere only to keep compatibility with RemoteTransport interface
protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode
private[akka] def boundAddresses: Map[String, Set[Address]] = {
transportMapping.map {
case (scheme, transports)
scheme -> transports.map { case (transport, _) transport.boundAddress }
}
}
}
/**

View file

@ -89,6 +89,8 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor
} yield (augmentScheme(listenAddress), upstreamListenerPromise)
}
override def boundAddress: Address = wrappedTransport.boundAddress
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
// Prepare a future, and pass its promise to the manager
val statusPromise: Promise[AssociationHandle] = Promise()

View file

@ -111,6 +111,7 @@ class TestTransport(
(_) registry.logActivity(ShutdownAttempt(localAddress)))
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(())
override def boundAddress = localAddress
override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress)
override def shutdown(): Future[Boolean] = shutdownBehavior(())

View file

@ -103,6 +103,12 @@ trait Transport {
*/
def listen: Future[(Address, Promise[AssociationEventListener])]
/**
* @return
* The address this Transport is listening to.
*/
def boundAddress: Address
/**
* Asynchronously opens a logical duplex link between two Transport Entities over a network. It could be backed by a
* real transport-layer connection (TCP), more lightweight connections provided over datagram protocols (UDP with

View file

@ -124,9 +124,20 @@ class NettyTransportSettings(config: Config) {
case value value
}
val BindHostname: String = getString("bind-hostname") match {
case "" Hostname
case value value
}
@deprecated("WARNING: This should only be used by professionals.", "2.0")
val PortSelector: Int = getInt("port")
@deprecated("WARNING: This should only be used by professionals.", "2.4")
val BindPortSelector: Int = getString("bind-port") match {
case "" PortSelector
case value value.toInt
}
val SslSettings: Option[SSLSettings] = if (EnableSsl) Some(new SSLSettings(config.getConfig("security"))) else None
val ServerSocketWorkerPoolSize: Int = computeWPS(config.getConfig("server-socket-worker-pool"))
@ -159,7 +170,7 @@ private[netty] trait CommonHandlers extends NettyHelpers {
final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)(
op: (AssociationHandle Any)): Unit = {
import transport._
NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname)) match {
NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), None) match {
case Some(localAddress)
val handle = createHandle(channel, localAddress, remoteAddress)
handle.readHandlerPromise.future.onSuccess {
@ -188,7 +199,7 @@ private[netty] abstract class ServerHandler(protected final val transport: Netty
associationListenerFuture.onSuccess {
case listener: AssociationEventListener
val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier,
transport.system.name, hostName = None).getOrElse(
transport.system.name, hostName = None, port = None).getOrElse(
throw new NettyTransportException(s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]"))
init(channel, remoteSocketAddress, remoteAddress, msg) { listener notify InboundAssociation(_) }
}
@ -227,9 +238,9 @@ private[transport] object NettyTransport {
val uniqueIdCounter = new AtomicInteger(0)
def addressFromSocketAddress(addr: SocketAddress, schemeIdentifier: String, systemName: String,
hostName: Option[String]): Option[Address] = addr match {
hostName: Option[String], port: Option[Int]): Option[Address] = addr match {
case sa: InetSocketAddress Some(Address(schemeIdentifier, systemName,
hostName.getOrElse(sa.getAddress.getHostAddress), sa.getPort)) // perhaps use getHostString in jdk 1.7
hostName.getOrElse(sa.getAddress.getHostAddress), port.getOrElse(sa.getPort))) // perhaps use getHostString in jdk 1.7
case _ None
}
}
@ -254,6 +265,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
private final val isDatagram = TransportMode == Udp
@volatile private var localAddress: Address = _
@volatile private var boundTo: Address = _
@volatile private var serverChannel: Channel = _
private val log = Logging(system, this.getClass)
@ -386,7 +398,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
for {
address addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector))
address addressToSocketAddress(Address("", "", settings.BindHostname, settings.BindPortSelector))
} yield {
try {
val newServerChannel = inboundBootstrap match {
@ -400,13 +412,18 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
serverChannel = newServerChannel
addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname)) match {
case Some(address)
localAddress = address
associationListenerPromise.future.onSuccess { case listener newServerChannel.setReadable(true) }
(address, associationListenerPromise)
case None throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
}
addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname),
if (settings.PortSelector == 0) None else Some(settings.PortSelector)) match {
case Some(address)
addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, None, None) match {
case Some(address) boundTo = address
case None throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
}
localAddress = address
associationListenerPromise.future.onSuccess { case listener newServerChannel.setReadable(true) }
(address, associationListenerPromise)
case None throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]")
}
} catch {
case NonFatal(e) {
log.error("failed to bind to {}, shutting down Netty transport", address)
@ -417,6 +434,8 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
}
}
override def boundAddress = boundTo
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound"))
else {