diff --git a/akka-io/src/main/scala/akka/io/TcpConnection.scala b/akka-io/src/main/scala/akka/io/TcpConnection.scala index e19a91e93b..25db734d79 100644 --- a/akka-io/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpConnection.scala @@ -38,7 +38,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, def waitingForRegistration(commander: ActorRef): Receive = { case Register(handler) ⇒ if (TraceLogging) log.debug("{} registered as connection handler", handler) - selector ! ReadInterest + doRead(handler, None) // immediately try reading context.setReceiveTimeout(Duration.Undefined) context.watch(handler) // sign death pact diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index abf0024664..8ca5d7cc70 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -59,6 +59,24 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") expectReceivedString("testdata2testdata3") } + "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒ + import unregisteredSetup._ + + localServer.configureBlocking(true) + val serverSideChannel = localServer.accept() + serverSideChannel must not be (null) + serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) + serverSideChannel.configureBlocking(false) + + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + + // we unrealistically register the selector here so that we can observe + // the ordering between Received and ReadInterest + userHandler.send(connectionActor, Register(selector.ref)) + selector.expectMsgType[Received].data.decodeString("ASCII") must be("immediatedata") + selector.expectMsg(ReadInterest) + } "write data to network (and acknowledge)" in withEstablishedConnection() { setup ⇒ import setup._