From 1a67e937c77e3c73a3b4b7158e004d07b2efd193 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 12 Sep 2013 11:28:45 +0200 Subject: [PATCH] =act #3602 report `Connected` only when connection attempt was successful Before this change, a client connection was always instantly reported as `Connected`, even if the endpoint would never respond at all. The reason is a weird behavior of OP_CONNECT and SocketChannel in the JDK (observed in Linux): - a channel is always connectable before the connection is attempted (`channel.connect`). Selecting for OP_CONNECT before the `connect` call will instantly report connectable - even worse: after OP_CONNECT was reported true, also `finishConnect` will always return true, even if the connection wasn't yet established. That's probably the case because `finishConnect` is internally implemented depending on previous epoll results (on Linux). --- .../src/test/scala/akka/io/TcpConnectionSpec.scala | 4 +++- .../src/test/scala/akka/io/TcpIntegrationSpec.scala | 11 ++++++++++- .../main/scala/akka/io/TcpOutgoingConnection.scala | 6 ++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 9560548293..41f3fd7b0b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -138,6 +138,7 @@ class TcpConnectionSpec extends AkkaSpec(""" serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) serverSideChannel.configureBlocking(false) + interestCallReceiver.expectMsg(OP_CONNECT) selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) @@ -762,7 +763,7 @@ class TcpConnectionSpec extends AkkaSpec(""" lazy val clientSideChannel = connectionActor.underlyingActor.channel override def run(body: ⇒ Unit): Unit = super.run { - registerCallReceiver.expectMsg(Registration(clientSideChannel, OP_CONNECT)) + registerCallReceiver.expectMsg(Registration(clientSideChannel, 0)) registerCallReceiver.sender must be(connectionActor) body } @@ -784,6 +785,7 @@ class TcpConnectionSpec extends AkkaSpec(""" serverSideChannel.configureBlocking(false) serverSideChannel must not be (null) + interestCallReceiver.expectMsg(OP_CONNECT) selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index c177a0dff4..aa049bdbd3 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -4,10 +4,12 @@ package akka.io -import akka.testkit.AkkaSpec +import akka.testkit.{ TestProbe, AkkaSpec } import akka.util.ByteString import akka.TestUtils._ +import concurrent.duration._ import Tcp._ +import java.net.InetSocketAddress class TcpIntegrationSpec extends AkkaSpec(""" akka.loglevel = INFO @@ -71,6 +73,13 @@ class TcpIntegrationSpec extends AkkaSpec(""" override def bindOptions = List(SO.SendBufferSize(1024)) override def connectOptions = List(SO.ReceiveBufferSize(1024)) } + "don't report Connected when endpoint isn't responding" in { + val connectCommander = TestProbe() + // a "random" endpoint hopefully unavailable + val endpoint = new InetSocketAddress("10.226.182.48", 23825) + connectCommander.send(IO(Tcp), Connect(endpoint)) + connectCommander.expectNoMsg(1.second) + } } } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 6ac141b3cc..3d917819a9 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -33,7 +33,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, localAddress.foreach(channel.socket.bind) options.foreach(_.beforeConnect(channel.socket)) - channelRegistry.register(channel, SelectionKey.OP_CONNECT) + channelRegistry.register(channel, 0) timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage)) @@ -54,8 +54,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, reportConnectFailure { if (channel.connect(remoteAddress)) completeConnect(registration, commander, options) - else + else { + registration.enableInterest(SelectionKey.OP_CONNECT) context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries)) + } } }