diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 50b9684177..943110a98d 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -5,8 +5,7 @@ package akka.remote.testkit import language.implicitConversions import language.postfixOps - -import java.net.InetSocketAddress +import java.net.{ InetAddress, InetSocketAddress } import java.util.concurrent.TimeoutException import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } import scala.concurrent.{ Await, Awaitable } @@ -137,9 +136,15 @@ object MultiNodeSpec { * {{{ * -Dmultinode.host=host.example.com * }}} + * + * InetAddress.getLocalHost.getHostAddress is used if empty or "localhost" + * is defined as system property "multinode.host". */ - val selfName: String = Option(System.getProperty("multinode.host")) getOrElse - (throw new IllegalStateException("need system property multinode.host to be set")) + val selfName: String = Option(System.getProperty("multinode.host")) match { + case None ⇒ throw new IllegalStateException("need system property multinode.host to be set") + case Some("") ⇒ InetAddress.getLocalHost.getHostAddress + case Some(host) ⇒ host + } require(selfName != "", "multinode.host must not be empty") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index 361a471b95..545119be08 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -90,8 +90,8 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with ST } val (min, max) = - if(isNode(master))(0 seconds, 500 millis) - else (0.3 seconds, 2 seconds) + if (isNode(master)) (0 seconds, 500 millis) + else (0.3 seconds, 3 seconds) within(min, max) { expectMsg(500 millis, 10) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index bc0c9e7c0e..cf6d8c313d 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -155,7 +155,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val inMode = getInboundMode(naked) wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked)) wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor - handleTable ::= nakedAddress(naked) -> wrappedHandle + handleTable ::= naked -> wrappedHandle statusPromise.success(wrappedHandle) case SetThrottle(address, direction, mode) ⇒ val naked = nakedAddress(address) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 0a3c1a65e4..9a4362d837 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -135,10 +135,11 @@ trait CommonHandlers extends NettyHelpers { msg: ChannelBuffer, remoteSocketAddress: InetSocketAddress): Unit - final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer)(op: (AssociationHandle ⇒ Any)): Unit = { + final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)( + op: (AssociationHandle ⇒ Any)): Unit = { import transport._ - (addressFromSocketAddress(channel.getLocalAddress), addressFromSocketAddress(remoteSocketAddress)) match { - case (Some(localAddress), Some(remoteAddress)) ⇒ + NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname)) match { + case Some(localAddress) ⇒ val handle = createHandle(channel, localAddress, remoteAddress) handle.readHandlerPromise.future.onSuccess { case listener: HandleEventListener ⇒ @@ -161,19 +162,23 @@ abstract class ServerHandler(protected final val transport: NettyTransport, final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { channel.setReadable(false) associationListenerFuture.onSuccess { - case listener: AssociationEventListener ⇒ init(channel, remoteSocketAddress, msg) { listener notify InboundAssociation(_) } + case listener: AssociationEventListener ⇒ + val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier, + transport.system.name, hostName = None).getOrElse( + throw new NettyTransportException(s"Unknown remote address type [${remoteSocketAddress.getClass.getName}]")) + init(channel, remoteSocketAddress, remoteAddress, msg) { listener notify InboundAssociation(_) } } } } -abstract class ClientHandler(protected final val transport: NettyTransport) +abstract class ClientHandler(protected final val transport: NettyTransport, remoteAddress: Address) extends NettyClientHelpers with CommonHandlers { final protected val statusPromise = Promise[AssociationHandle]() def statusFuture = statusPromise.future final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { - init(channel, remoteSocketAddress, msg)(statusPromise.success) + init(channel, remoteSocketAddress, remoteAddress, msg)(statusPromise.success) } } @@ -184,10 +189,17 @@ private[transport] object NettyTransport { def gracefulClose(channel: Channel): Unit = channel.disconnect().addListener(ChannelFutureListener.CLOSE) val uniqueIdCounter = new AtomicInteger(0) + + def addressFromSocketAddress(addr: SocketAddress, schemeIdentifier: String, systemName: String, + hostName: Option[String]): Option[Address] = addr match { + case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName, + hostName.getOrElse(sa.getAddress.getHostAddress), sa.getPort)) // perhaps use getHostString in jdk 1.7 + case _ ⇒ None + } } // FIXME: Split into separate UDP and TCP classes -class NettyTransport(private val settings: NettyTransportSettings, private val system: ExtendedActorSystem) extends Transport { +class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedActorSystem) extends Transport { def this(system: ExtendedActorSystem, conf: Config) = this(new NettyTransportSettings(conf), system) @@ -271,13 +283,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s } } - private val clientPipelineFactory: ChannelPipelineFactory = + private def clientPipelineFactory(remoteAddress: Address): ChannelPipelineFactory = new ChannelPipelineFactory { override def getPipeline: ChannelPipeline = { val pipeline = newPipeline if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true)) - val handler = if (isDatagram) new UdpClientHandler(NettyTransport.this) - else new TcpClientHandler(NettyTransport.this) + val handler = if (isDatagram) new UdpClientHandler(NettyTransport.this, remoteAddress) + else new TcpClientHandler(NettyTransport.this, remoteAddress) pipeline.addLast("clienthandler", handler) pipeline } @@ -303,21 +315,14 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s case Udp ⇒ setupBootstrap(new ConnectionlessBootstrap(serverChannelFactory), serverPipelineFactory) } - private def outboundBootstrap: ClientBootstrap = { - val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory) + private def outboundBootstrap(remoteAddress: Address): ClientBootstrap = { + val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory(remoteAddress)) bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) bootstrap } override def isResponsibleFor(address: Address): Boolean = true //TODO: Add configurable subnet filtering - def addressFromSocketAddress(addr: SocketAddress, - systemName: Option[String] = None, - hostName: Option[String] = None): Option[Address] = addr match { - case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName.getOrElse(""), hostName.getOrElse(sa.getHostName), sa.getPort)) - case _ ⇒ None - } - // TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960 def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = Future { new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get) } @@ -337,12 +342,12 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s serverChannel = newServerChannel - addressFromSocketAddress(newServerChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match { + addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname)) match { case Some(address) ⇒ localAddress = address associationListenerPromise.future.onSuccess { case listener ⇒ newServerChannel.setReadable(true) } (address, associationListenerPromise) - case None ⇒ throw new NettyTransportException(s"Unknown local address type ${newServerChannel.getLocalAddress.getClass}") + case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") } } } @@ -350,7 +355,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s override def associate(remoteAddress: Address): Future[AssociationHandle] = { if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound")) else { - val bootstrap: ClientBootstrap = outboundBootstrap + val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress) (for { socketAddress ← addressToSocketAddress(remoteAddress) @@ -372,7 +377,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s case listener ⇒ udpConnectionTable.put(addr, listener) } handle - case unknown ⇒ throw new NettyTransportException(s"Unknown remote address type ${unknown.getClass}") + case unknown ⇒ throw new NettyTransportException(s"Unknown remote address type [${unknown.getClass.getName}]") } } else diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 4cded02003..879564f57c 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -53,8 +53,8 @@ private[remote] class TcpServerHandler(_transport: NettyTransport, _associationL } -private[remote] class TcpClientHandler(_transport: NettyTransport) - extends ClientHandler(_transport) with TcpHandlers { +private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress: Address) + extends ClientHandler(_transport, remoteAddress) with TcpHandlers { override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = initOutbound(e.getChannel, e.getChannel.getRemoteAddress, null) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index b00724e430..5630079749 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -53,7 +53,8 @@ private[remote] class UdpServerHandler(_transport: NettyTransport, _associationL initInbound(channel, remoteSocketAddress, msg) } -private[remote] class UdpClientHandler(_transport: NettyTransport) extends ClientHandler(_transport) with UdpHandlers { +private[remote] class UdpClientHandler(_transport: NettyTransport, remoteAddress: Address) + extends ClientHandler(_transport, remoteAddress) with UdpHandlers { override def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = initOutbound(channel, remoteSocketAddress, msg)