diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 11e58842d3..7e3a62817c 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -139,6 +139,7 @@ class TcpConnectionSpec extends AkkaSpec(""" serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) serverSideChannel.configureBlocking(false) + interestCallReceiver.expectMsg(OP_CONNECT) selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) @@ -799,7 +800,7 @@ class TcpConnectionSpec extends AkkaSpec(""" lazy val clientSideChannel = connectionActor.underlyingActor.channel override def run(body: ⇒ Unit): Unit = super.run { - registerCallReceiver.expectMsg(Registration(clientSideChannel, OP_CONNECT)) + registerCallReceiver.expectMsg(Registration(clientSideChannel, 0)) registerCallReceiver.sender must be(connectionActor) body } @@ -821,6 +822,7 @@ class TcpConnectionSpec extends AkkaSpec(""" serverSideChannel.configureBlocking(false) serverSideChannel must not be (null) + interestCallReceiver.expectMsg(OP_CONNECT) selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index c177a0dff4..aa049bdbd3 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -4,10 +4,12 @@ package akka.io -import akka.testkit.AkkaSpec +import akka.testkit.{ TestProbe, AkkaSpec } import akka.util.ByteString import akka.TestUtils._ +import concurrent.duration._ import Tcp._ +import java.net.InetSocketAddress class TcpIntegrationSpec extends AkkaSpec(""" akka.loglevel = INFO @@ -71,6 +73,13 @@ class TcpIntegrationSpec extends AkkaSpec(""" override def bindOptions = List(SO.SendBufferSize(1024)) override def connectOptions = List(SO.ReceiveBufferSize(1024)) } + "don't report Connected when endpoint isn't responding" in { + val connectCommander = TestProbe() + // a "random" endpoint hopefully unavailable + val endpoint = new InetSocketAddress("10.226.182.48", 23825) + connectCommander.send(IO(Tcp), Connect(endpoint)) + connectCommander.expectNoMsg(1.second) + } } } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 6df4c226c3..9da5e23e62 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -32,7 +32,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, localAddress.foreach(channel.socket.bind) options.foreach(_.beforeConnect(channel.socket)) - channelRegistry.register(channel, SelectionKey.OP_CONNECT) + channelRegistry.register(channel, 0) timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage)) @@ -53,8 +53,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, reportConnectFailure { if (channel.connect(remoteAddress)) completeConnect(registration, commander, options) - else + else { + registration.enableInterest(SelectionKey.OP_CONNECT) context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries)) + } } }