diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 4168499dd8..a9a7397aee 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -35,28 +35,30 @@ private[io] class TcpListener(val selectorRouter: ActorRef, val bind: Bind) extends Actor with ActorLogging { import TcpListener._ import tcp.Settings._ - import bind._ - context.watch(handler) // sign death pact - val channel = { - val serverSocketChannel = ServerSocketChannel.open - serverSocketChannel.configureBlocking(false) - val socket = serverSocketChannel.socket - options.foreach(_.beforeServerSocketBind(socket)) + context.watch(bind.handler) // sign death pact + + val channel = ServerSocketChannel.open + channel.configureBlocking(false) + + val localAddress = try { - socket.bind(localAddress, backlog) - require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress], - s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]") + val socket = channel.socket + bind.options.foreach(_.beforeServerSocketBind(socket)) + socket.bind(bind.localAddress, bind.backlog) + val ret = socket.getLocalSocketAddress match { + case isa: InetSocketAddress ⇒ isa + case x ⇒ throw new IllegalArgumentException(s"bound to unknown SocketAddress [$x]") + } + context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) + log.debug("Successfully bound to {}", ret) + ret } catch { case NonFatal(e) ⇒ bindCommander ! bind.failureMessage - log.error(e, "Bind failed for TCP channel on endpoint [{}]", localAddress) + log.error(e, "Bind failed for TCP channel on endpoint [{}]", bind.localAddress) context.stop(self) } - serverSocketChannel - } - context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) - log.debug("Successfully bound to {}", localAddress) override def supervisorStrategy = IO.connectionSupervisorStrategy @@ -96,7 +98,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef, if (socketChannel != null) { log.debug("New connection accepted") socketChannel.configureBlocking(false) - selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(classOf[TcpIncomingConnection], socketChannel, tcp, handler, options)) + selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(classOf[TcpIncomingConnection], socketChannel, tcp, bind.handler, bind.options)) acceptAllPending(limit - 1) } else context.parent ! AcceptInterest } diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 9b50fb0fd9..c521db40a6 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -40,7 +40,7 @@ object Udp extends ExtensionKey[UdpExt] { } case class Bind(handler: ActorRef, - endpoint: InetSocketAddress, + localAddress: InetSocketAddress, options: immutable.Traversable[SocketOption] = Nil) extends Command case object Unbind extends Command diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index 570d0af209..aab2608323 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -22,32 +22,34 @@ private[io] class UdpListener(val udp: UdpExt, val bind: Bind) extends Actor with ActorLogging with WithUdpSend { - import bind._ import udp.bufferPool import udp.settings._ def selector: ActorRef = context.parent - context.watch(handler) // sign death pact - val channel = { - val datagramChannel = DatagramChannel.open - datagramChannel.configureBlocking(false) - val socket = datagramChannel.socket - options.foreach(_.beforeDatagramBind(socket)) + context.watch(bind.handler) // sign death pact + + val channel = DatagramChannel.open + channel.configureBlocking(false) + + val localAddress = try { - socket.bind(endpoint) - require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress], - s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]") + val socket = channel.socket + bind.options.foreach(_.beforeDatagramBind(socket)) + socket.bind(bind.localAddress) + val ret = socket.getLocalSocketAddress match { + case isa: InetSocketAddress ⇒ isa + case x ⇒ throw new IllegalArgumentException(s"bound to unknown SocketAddress [$x]") + } + context.parent ! RegisterChannel(channel, OP_READ) + log.debug("Successfully bound to [{}]", ret) + ret } catch { case NonFatal(e) ⇒ bindCommander ! CommandFailed(bind) - log.error(e, "Failed to bind UDP channel to endpoint [{}]", endpoint) + log.error(e, "Failed to bind UDP channel to endpoint [{}]", bind.localAddress) context.stop(self) } - datagramChannel - } - context.parent ! RegisterChannel(channel, OP_READ) - log.debug("Successfully bound to [{}]", endpoint) def receive: Receive = { case ChannelRegistered ⇒ @@ -58,14 +60,14 @@ private[io] class UdpListener(val udp: UdpExt, def readHandlers: Receive = { case StopReading ⇒ selector ! DisableReadInterest case ResumeReading ⇒ selector ! ReadInterest - case ChannelReadable ⇒ doReceive(handler) + case ChannelReadable ⇒ doReceive(bind.handler) case Unbind ⇒ - log.debug("Unbinding endpoint [{}]", endpoint) + log.debug("Unbinding endpoint [{}]", bind.localAddress) try { channel.close() sender ! Unbound - log.debug("Unbound endpoint [{}], stopping listener", endpoint) + log.debug("Unbound endpoint [{}], stopping listener", bind.localAddress) } finally context.stop(self) }