diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index a61962e223..600aed6114 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -9,7 +9,7 @@ import Tcp._ import TestUtils._ class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4") - with IntegrationSpecSupport { + with TcpIntegrationSpecSupport { "The TCP transport implementation" should { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index a59f2ccad4..a09dd68cdd 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -573,7 +573,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") commander: ActorRef): TestActorRef[TcpOutgoingConnection] = { TestActorRef( - new TcpOutgoingConnection(Tcp(system), commander, serverAddress, localAddress, options) { + new TcpOutgoingConnection(Tcp(system), commander, Connect(serverAddress, localAddress, options)) { override def postRestart(reason: Throwable) { // ensure we never restart context.stop(self) diff --git a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala similarity index 96% rename from akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala rename to akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index e4d53f5f9b..9f35951ad9 100644 --- a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -11,7 +11,7 @@ import TestUtils._ import akka.testkit.EventFilter import java.io.IOException -class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationSpecSupport { +class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegrationSpecSupport { "The TCP transport implementation" should { diff --git a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala similarity index 97% rename from akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala rename to akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index 692815b96a..4ed3bd9950 100644 --- a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -12,7 +12,7 @@ import akka.io.Inet.SocketOption import Tcp._ import TestUtils._ -trait IntegrationSpecSupport { _: AkkaSpec ⇒ +trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ class TestSetup { val bindHandler = TestProbe() diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index e68cc6536e..b04d07d7d8 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -97,8 +97,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { private class ListenerParent extends Actor { val listener = context.actorOf( - props = Props(new TcpListener(selectorRouter.ref, handler.ref, endpoint, 100, bindCommander.ref, - Tcp(system), Nil)), + props = Props(new TcpListener(selectorRouter.ref, Tcp(system), bindCommander.ref, Bind(handler.ref, endpoint, 100, Nil))), name = "test-listener-" + counter.next()) parent.watch(listener) def receive: Receive = { diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 356319286f..51419fa0a5 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -25,17 +25,15 @@ private[io] object TcpListener { } -private[io] class TcpListener(selectorRouter: ActorRef, - handler: ActorRef, - endpoint: InetSocketAddress, - backlog: Int, - bindCommander: ActorRef, - tcp: TcpExt, - options: immutable.Traversable[SocketOption]) extends Actor with ActorLogging { +private[io] class TcpListener(val selectorRouter: ActorRef, + val tcp: TcpExt, + val bindCommander: ActorRef, + val bind: Bind) extends Actor with ActorLogging { def selector: ActorRef = context.parent import TcpListener._ import tcp.Settings._ + import bind._ context.watch(handler) // sign death pact val channel = { @@ -43,7 +41,13 @@ private[io] class TcpListener(selectorRouter: ActorRef, serverSocketChannel.configureBlocking(false) val socket = serverSocketChannel.socket options.foreach(_.beforeServerSocketBind(socket)) - socket.bind(endpoint, backlog) // will blow up the actor constructor if the bind fails + try socket.bind(endpoint, backlog) + catch { + case NonFatal(e) ⇒ + bindCommander ! CommandFailed(bind) + log.error(e, "Bind failed for TCP channel") + context.stop(self) + } serverSocketChannel } context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index 8761104ba5..032bcfd6bc 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -46,12 +46,12 @@ import akka.io.IO.SelectorBasedManager private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { def receive = workerForCommand { - case Connect(remoteAddress, localAddress, options) ⇒ + case c: Connect ⇒ val commander = sender - Props(new TcpOutgoingConnection(tcp, commander, remoteAddress, localAddress, options)) - case Bind(handler, endpoint, backlog, options) ⇒ + Props(new TcpOutgoingConnection(tcp, commander, c)) + case b: Bind ⇒ val commander = sender - Props(new TcpListener(selectorPool, handler, endpoint, backlog, commander, tcp, options)) + Props(new TcpListener(selectorPool, tcp, commander, b)) } } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 6c9231fa46..39817efe99 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -4,14 +4,13 @@ package akka.io -import java.net.InetSocketAddress +import akka.actor.ActorRef +import akka.io.Inet.SocketOption +import akka.io.SelectionHandler._ +import akka.io.Tcp._ import java.io.IOException import java.nio.channels.{ SelectionKey, SocketChannel } import scala.collection.immutable -import akka.actor.ActorRef -import akka.io.SelectionHandler._ -import akka.io.Inet.SocketOption -import akka.io.Tcp._ /** * An actor handling the connection state machine for an outgoing connection @@ -19,11 +18,11 @@ import akka.io.Tcp._ */ private[io] class TcpOutgoingConnection(_tcp: TcpExt, commander: ActorRef, - remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], - options: immutable.Traversable[SocketOption]) + connect: Connect) extends TcpConnection(TcpOutgoingConnection.newSocketChannel(), _tcp) { + import connect._ + context.watch(commander) // sign death pact localAddress.foreach(channel.socket.bind) diff --git a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala index 8dbe806c7d..a93a21259d 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala @@ -10,9 +10,9 @@ import akka.io.UdpConn.Connect class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { def receive = workerForCommand { - case Connect(handler, localAddress, remoteAddress, options) ⇒ + case c: Connect ⇒ val commander = sender - Props(new UdpConnection(selectorPool, handler, localAddress, remoteAddress, commander, udpConn, options)) + Props(new UdpConnection(udpConn, commander, c)) } } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 11a5a17f71..32e6d09bb6 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -16,17 +16,14 @@ import scala.util.control.NonFatal import java.nio.ByteBuffer import scala.annotation.tailrec -private[io] class UdpConnection(selectorRouter: ActorRef, - handler: ActorRef, - localAddress: Option[InetSocketAddress], - remoteAddress: InetSocketAddress, - bindCommander: ActorRef, - val udpConn: UdpConnExt, - options: immutable.Traversable[SocketOption]) extends Actor with ActorLogging { +private[io] class UdpConnection(val udpConn: UdpConnExt, + val commander: ActorRef, + val connect: Connect) extends Actor with ActorLogging { def selector: ActorRef = context.parent import udpConn._ + import connect._ import udpConn.settings._ var pendingSend: (Send, ActorRef) = null @@ -38,9 +35,15 @@ private[io] class UdpConnection(selectorRouter: ActorRef, datagramChannel.configureBlocking(false) val socket = datagramChannel.socket options.foreach(_.beforeDatagramBind(socket)) - // FIXME: All bind failures have to be reported to the commander in TCP as well - localAddress foreach { socket.bind } // will blow up the actor constructor if the bind fails - datagramChannel.connect(remoteAddress) + try { + localAddress foreach { socket.bind } // will blow up the actor constructor if the bind fails + datagramChannel.connect(remoteAddress) + } catch { + case NonFatal(e) ⇒ + log.error(e, "Failure while connecting UDP channel") + commander ! CommandFailed(connect) + context.stop(self) + } datagramChannel } selector ! RegisterChannel(channel, OP_READ) @@ -48,7 +51,7 @@ private[io] class UdpConnection(selectorRouter: ActorRef, def receive = { case ChannelRegistered ⇒ - bindCommander ! Connected + commander ! Connected selector ! ReadInterest context.become(connected, discardOld = true) } diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index aa8000eb0b..0edccefab3 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -4,29 +4,24 @@ package akka.io import akka.actor.{ ActorLogging, Actor, ActorRef } -import akka.io.UdpFF._ -import akka.io.Inet.SocketOption import akka.io.SelectionHandler._ +import akka.io.UdpFF._ import akka.util.ByteString import java.net.InetSocketAddress +import java.nio.ByteBuffer import java.nio.channels.DatagramChannel import java.nio.channels.SelectionKey._ -import scala.collection.immutable -import scala.util.control.NonFatal -import akka.io.UdpFF.Received -import akka.io.SelectionHandler.RegisterChannel import scala.annotation.tailrec -import java.nio.ByteBuffer +import scala.util.control.NonFatal -private[io] class UdpFFListener(selectorRouter: ActorRef, - handler: ActorRef, - endpoint: InetSocketAddress, - bindCommander: ActorRef, - val udpFF: UdpFFExt, - options: immutable.Traversable[SocketOption]) +private[io] class UdpFFListener(val udpFF: UdpFFExt, + val bindCommander: ActorRef, + val bind: Bind) extends Actor with ActorLogging with WithUdpFFSend { - import udpFF.settings._ + + import bind._ import udpFF.bufferPool + import udpFF.settings._ def selector: ActorRef = context.parent @@ -36,8 +31,13 @@ private[io] class UdpFFListener(selectorRouter: ActorRef, datagramChannel.configureBlocking(false) val socket = datagramChannel.socket options.foreach(_.beforeDatagramBind(socket)) - // FIXME: signal bind failures - socket.bind(endpoint) // will blow up the actor constructor if the bind fails + try socket.bind(endpoint) + catch { + case NonFatal(e) ⇒ + bindCommander ! CommandFailed(bind) + log.error(e, "Failed to bind UDP channel") + context.stop(self) + } datagramChannel } context.parent ! RegisterChannel(channel, OP_READ) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 28c47ef995..8e3e03617f 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -3,7 +3,7 @@ */ package akka.io -import akka.actor.{ ActorRef, Props } +import akka.actor.Props import akka.io.IO.SelectorBasedManager import akka.io.UdpFF._ @@ -45,9 +45,9 @@ import akka.io.UdpFF._ private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) { def receive = workerForCommand { - case Bind(handler, endpoint, options) ⇒ + case b: Bind ⇒ val commander = sender - Props(new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options)) + Props(new UdpFFListener(udpFF, commander, b)) case SimpleSender(options) ⇒ val commander = sender Props(new UdpFFSender(udpFF, options, commander))