=act #3602 report Connected only when connection attempt was successful

Before this change, a client connection was always instantly reported as
`Connected`, even if the endpoint would never respond at all.

The reason is a weird behavior of OP_CONNECT and SocketChannel in the
JDK (observed in Linux):
 - a channel is always connectable before the connection is attempted
   (`channel.connect`). Selecting for OP_CONNECT before the `connect`
   call will instantly report connectable
 - even worse: after OP_CONNECT was reported true, also `finishConnect`
   will always return true, even if the connection wasn't yet established.
   That's probably the case because `finishConnect` is internally implemented
   depending on previous epoll results (on Linux).
This commit is contained in:
Johannes Rudolph 2013-09-12 11:28:45 +02:00
parent 9456a081b7
commit 1a67e937c7
3 changed files with 17 additions and 4 deletions

View file

@ -138,6 +138,7 @@ class TcpConnectionSpec extends AkkaSpec("""
serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII")))
serverSideChannel.configureBlocking(false)
interestCallReceiver.expectMsg(OP_CONNECT)
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
@ -762,7 +763,7 @@ class TcpConnectionSpec extends AkkaSpec("""
lazy val clientSideChannel = connectionActor.underlyingActor.channel
override def run(body: Unit): Unit = super.run {
registerCallReceiver.expectMsg(Registration(clientSideChannel, OP_CONNECT))
registerCallReceiver.expectMsg(Registration(clientSideChannel, 0))
registerCallReceiver.sender must be(connectionActor)
body
}
@ -784,6 +785,7 @@ class TcpConnectionSpec extends AkkaSpec("""
serverSideChannel.configureBlocking(false)
serverSideChannel must not be (null)
interestCallReceiver.expectMsg(OP_CONNECT)
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))

View file

@ -4,10 +4,12 @@
package akka.io
import akka.testkit.AkkaSpec
import akka.testkit.{ TestProbe, AkkaSpec }
import akka.util.ByteString
import akka.TestUtils._
import concurrent.duration._
import Tcp._
import java.net.InetSocketAddress
class TcpIntegrationSpec extends AkkaSpec("""
akka.loglevel = INFO
@ -71,6 +73,13 @@ class TcpIntegrationSpec extends AkkaSpec("""
override def bindOptions = List(SO.SendBufferSize(1024))
override def connectOptions = List(SO.ReceiveBufferSize(1024))
}
"don't report Connected when endpoint isn't responding" in {
val connectCommander = TestProbe()
// a "random" endpoint hopefully unavailable
val endpoint = new InetSocketAddress("10.226.182.48", 23825)
connectCommander.send(IO(Tcp), Connect(endpoint))
connectCommander.expectNoMsg(1.second)
}
}
}

View file

@ -33,7 +33,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
localAddress.foreach(channel.socket.bind)
options.foreach(_.beforeConnect(channel.socket))
channelRegistry.register(channel, SelectionKey.OP_CONNECT)
channelRegistry.register(channel, 0)
timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied
private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage))
@ -54,10 +54,12 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
reportConnectFailure {
if (channel.connect(remoteAddress))
completeConnect(registration, commander, options)
else
else {
registration.enableInterest(SelectionKey.OP_CONNECT)
context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries))
}
}
}
def connecting(registration: ChannelRegistration, commander: ActorRef,
options: immutable.Traversable[SocketOption], remainingFinishConnectRetries: Int): Receive = {