diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index c2bc63457a..3fbe5913b2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -182,6 +182,14 @@ class RemoteClientException private[akka] ( class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) +/** + * The remote transport is responsible for sending and receiving messages. + * Each transport has an address, which it should provide in + * Serialization.currentTransportAddress (thread-local) while serializing + * actor references (which might also be part of messages). This address must + * be available (i.e. fully initialized) by the time the first message is + * received or when the start() method returns, whatever happens first. + */ abstract class RemoteTransport { /** * Shuts down the remoting diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 28dbdb3df6..4179beeb3d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,6 +4,8 @@ package akka.remote.netty +import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.Executors @@ -21,7 +23,7 @@ import akka.actor.{ Address, ActorSystemImpl, ActorRef } import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol -import akka.remote.{ RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef } +import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef } /** * Provides the implementation of the Netty remote support @@ -55,11 +57,24 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor case ex ⇒ shutdown(); throw ex } - val address = Address("akka", remoteSettings.systemName, Some(settings.Hostname), Some(settings.Port)) + // the address is set in start() or from the RemoteServerHandler, whichever comes first + private val _address = new AtomicReference[Address] + private[akka] def setAddressFromChannel(ch: Channel) = { + val addr = ch.getLocalAddress match { + case sa: InetSocketAddress ⇒ sa + case x ⇒ throw new RemoteTransportException("unknown local address type " + x.getClass, null) + } + _address.compareAndSet(null, Address("akka", remoteSettings.systemName, Some(settings.Hostname), Some(addr.getPort))) + } + + def address = _address.get val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")") - def start(): Unit = server.start() + def start(): Unit = { + server.start() + setAddressFromChannel(server.channel) + } def shutdown(): Unit = { clientsLock.writeLock().lock() diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 749e01d63f..9ebeb8f3e8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -20,6 +20,7 @@ import akka.actor.Address import java.net.InetAddress import akka.actor.ActorSystemImpl import org.jboss.netty.channel.ChannelLocal +import org.jboss.netty.channel.ChannelEvent class NettyRemoteServer(val netty: NettyRemoteTransport) { @@ -45,8 +46,12 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) { bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("reuseAddress", true) + @volatile + private[akka] var channel: Channel = _ + def start(): Unit = { - openChannels.add(bootstrap.bind(new InetSocketAddress(ip, settings.Port))) + channel = bootstrap.bind(new InetSocketAddress(ip, settings.Port)) + openChannels.add(channel) netty.notifyListeners(RemoteServerStarted(netty)) } @@ -132,6 +137,16 @@ class RemoteServerHandler( import netty.settings + private var addressToSet = true + + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { + if (addressToSet) { + netty.setAddressFromChannel(event.getChannel) + addressToSet = false + } + super.handleUpstream(ctx, event) + } + /** * ChannelOpen overridden to store open channels for a clean postStop of a node. * If a channel is closed before, it is automatically removed from the open channels group. diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 4e31720b2b..123e2d53dc 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -38,14 +38,7 @@ class NettySettings(config: Config, val systemName: String) { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } - val Port = getInt("port") match { - case 0 ⇒ - try { - val s = new java.net.ServerSocket(0) - try s.getLocalPort finally s.close() - } catch { case e ⇒ throw new ConfigurationException("Unable to obtain random port", e) } - case other ⇒ other - } + val Port = getInt("port") val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 942ca0cd8d..392cee216b 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -18,7 +18,7 @@ akka { } remote.netty { hostname = "127.0.0.1" - port = 2665 + port = 0 } } """)) with ImplicitSender with DefaultTimeout with DeathWatchSpec { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index ac3833130c..26c28860f7 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -22,13 +22,13 @@ akka { actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty { hostname = localhost - port = 12345 + port = 0 } actor.deployment { /blub { router = round-robin nr-of-instances = 2 - target.nodes = ["akka://remote_sys@localhost:12346"] + target.nodes = ["akka://remote_sys@localhost:12347"] } /elastic-blub { router = round-robin @@ -36,7 +36,7 @@ akka { lower-bound = 2 upper-bound = 3 } - target.nodes = ["akka://remote_sys@localhost:12346"] + target.nodes = ["akka://remote_sys@localhost:12347"] } } } @@ -44,7 +44,7 @@ akka { import RemoteRouterSpec._ - val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config) + val conf = ConfigFactory.parseString("akka.remote.netty.port=12347").withFallback(system.settings.config) val other = ActorSystem("remote_sys", conf) override def atTermination() { @@ -56,26 +56,26 @@ akka { "deploy its children on remote host driven by configuration" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)), "blub") router ! "" - expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c1" + expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" router ! "" - expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2" + expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" } "deploy its children on remote host driven by programatic definition" in { val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2), - Seq("akka://remote_sys@localhost:12346"))), "blub2") + Seq("akka://remote_sys@localhost:12347"))), "blub2") router ! "" - expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c1" + expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" router ! "" - expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c2" + expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" } "deploy dynamic resizable number of children on remote host driven by configuration" in { val router = system.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub") router ! "" - expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c1" + expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" router ! "" - expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c2" + expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" } }