Merge pull request #1729 from jrudolph/wip-3602-fix-connected

=act #3602 report `Connected` only when connection attempt was successful
This commit is contained in:
Björn Antonsson 2013-10-16 03:54:38 -07:00
commit f6179da523
3 changed files with 17 additions and 4 deletions

View file

@ -139,6 +139,7 @@ class TcpConnectionSpec extends AkkaSpec("""
serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII")))
serverSideChannel.configureBlocking(false) serverSideChannel.configureBlocking(false)
interestCallReceiver.expectMsg(OP_CONNECT)
selector.send(connectionActor, ChannelConnectable) selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
@ -799,7 +800,7 @@ class TcpConnectionSpec extends AkkaSpec("""
lazy val clientSideChannel = connectionActor.underlyingActor.channel lazy val clientSideChannel = connectionActor.underlyingActor.channel
override def run(body: Unit): Unit = super.run { override def run(body: Unit): Unit = super.run {
registerCallReceiver.expectMsg(Registration(clientSideChannel, OP_CONNECT)) registerCallReceiver.expectMsg(Registration(clientSideChannel, 0))
registerCallReceiver.sender must be(connectionActor) registerCallReceiver.sender must be(connectionActor)
body body
} }
@ -821,6 +822,7 @@ class TcpConnectionSpec extends AkkaSpec("""
serverSideChannel.configureBlocking(false) serverSideChannel.configureBlocking(false)
serverSideChannel must not be (null) serverSideChannel must not be (null)
interestCallReceiver.expectMsg(OP_CONNECT)
selector.send(connectionActor, ChannelConnectable) selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))

View file

@ -4,10 +4,12 @@
package akka.io package akka.io
import akka.testkit.AkkaSpec import akka.testkit.{ TestProbe, AkkaSpec }
import akka.util.ByteString import akka.util.ByteString
import akka.TestUtils._ import akka.TestUtils._
import concurrent.duration._
import Tcp._ import Tcp._
import java.net.InetSocketAddress
class TcpIntegrationSpec extends AkkaSpec(""" class TcpIntegrationSpec extends AkkaSpec("""
akka.loglevel = INFO akka.loglevel = INFO
@ -71,6 +73,13 @@ class TcpIntegrationSpec extends AkkaSpec("""
override def bindOptions = List(SO.SendBufferSize(1024)) override def bindOptions = List(SO.SendBufferSize(1024))
override def connectOptions = List(SO.ReceiveBufferSize(1024)) override def connectOptions = List(SO.ReceiveBufferSize(1024))
} }
"don't report Connected when endpoint isn't responding" in {
val connectCommander = TestProbe()
// a "random" endpoint hopefully unavailable
val endpoint = new InetSocketAddress("10.226.182.48", 23825)
connectCommander.send(IO(Tcp), Connect(endpoint))
connectCommander.expectNoMsg(1.second)
}
} }
} }

View file

@ -32,7 +32,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
localAddress.foreach(channel.socket.bind) localAddress.foreach(channel.socket.bind)
options.foreach(_.beforeConnect(channel.socket)) options.foreach(_.beforeConnect(channel.socket))
channelRegistry.register(channel, SelectionKey.OP_CONNECT) channelRegistry.register(channel, 0)
timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied
private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage)) private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage))
@ -53,10 +53,12 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
reportConnectFailure { reportConnectFailure {
if (channel.connect(remoteAddress)) if (channel.connect(remoteAddress))
completeConnect(registration, commander, options) completeConnect(registration, commander, options)
else else {
registration.enableInterest(SelectionKey.OP_CONNECT)
context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries)) context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries))
} }
} }
}
def connecting(registration: ChannelRegistration, commander: ActorRef, def connecting(registration: ChannelRegistration, commander: ActorRef,
options: immutable.Traversable[SocketOption], remainingFinishConnectRetries: Int): Receive = { options: immutable.Traversable[SocketOption], remainingFinishConnectRetries: Int): Receive = {