io: remove SelectionKey lookup and reduce IO-layer-internal message volume

This commit is contained in:
Mathias 2013-05-06 17:01:01 +02:00 committed by Björn Antonsson
parent bc66aa97c6
commit 0e0a95ff95
15 changed files with 392 additions and 370 deletions

View file

@ -7,7 +7,7 @@ package akka.io
import java.io.{ File, IOException }
import java.net.{ URLClassLoader, ConnectException, InetSocketAddress, SocketException }
import java.nio.ByteBuffer
import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel }
import java.nio.channels._
import java.nio.channels.spi.SelectorProvider
import java.nio.channels.SelectionKey._
import scala.annotation.tailrec
@ -18,7 +18,7 @@ import org.scalatest.matchers._
import akka.io.Tcp._
import akka.io.SelectionHandler._
import akka.io.Inet.SocketOption
import akka.actor.{ PoisonPill, Terminated, DeathPactException }
import akka.actor._
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
import akka.util.{ Helpers, ByteString }
import akka.TestUtils._
@ -134,11 +134,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
// we unrealistically register the selector here so that we can observe
// the ordering between Received and ReadInterest
userHandler.send(connectionActor, Register(selector.ref))
selector.expectMsgType[Received].data.decodeString("ASCII") must be("immediatedata")
selector.expectMsg(ReadInterest)
userHandler.send(connectionActor, Register(userHandler.ref))
userHandler.expectMsgType[Received].data.decodeString("ASCII") must be("immediatedata")
interestCallReceiver.expectMsg(OP_READ)
}
}
@ -246,7 +244,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
// it will have to keep the rest of the piece and send it
// when possible
writer.send(connectionActor, firstWrite)
selector.expectMsg(WriteInterest)
interestCallReceiver.expectMsg(OP_WRITE)
// send another write which should fail immediately
// because we don't store more than one piece in flight
@ -275,9 +273,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
connectionHandler.send(connectionActor, SuspendReading)
// the selector interprets StopReading to deregister interest for reading
selector.expectMsg(DisableReadInterest)
interestCallReceiver.expectMsg(-OP_READ)
connectionHandler.send(connectionActor, ResumeReading)
selector.expectMsg(ReadInterest)
interestCallReceiver.expectMsg(OP_READ)
}
}
@ -706,11 +704,14 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
serverSideChannel
}
abstract class LocalServerTest {
abstract class LocalServerTest extends ChannelRegistry {
val localServerChannel = ServerSocketChannel.open()
val userHandler = TestProbe()
val selector = TestProbe()
var registerCallReceiver = TestProbe()
var interestCallReceiver = TestProbe()
def run(body: Unit): Unit = {
try {
setServerSocketOptions()
@ -720,16 +721,21 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
} finally localServerChannel.close()
}
def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit =
registerCallReceiver.ref.tell(channel -> initialOps, channelActor)
def setServerSocketOptions() = ()
def createConnectionActor(serverAddress: InetSocketAddress = serverAddress,
options: immutable.Seq[SocketOption] = Nil): TestActorRef[TcpOutgoingConnection] = {
val ref = TestActorRef(
new TcpOutgoingConnection(Tcp(system), userHandler.ref, Connect(serverAddress, options = options)) {
new TcpOutgoingConnection(Tcp(system), this, userHandler.ref, Connect(serverAddress, options = options)) {
override def postRestart(reason: Throwable): Unit = context.stop(self) // ensure we never restart
override def selector = LocalServerTest.this.selector.ref
})
ref ! ChannelRegistered
ref ! new ChannelRegistration {
def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op
def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op
}
ref
}
}
@ -745,7 +751,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
lazy val clientSideChannel = connectionActor.underlyingActor.channel
override def run(body: Unit): Unit = super.run {
selector.expectMsg(RegisterChannel(clientSideChannel, OP_CONNECT))
registerCallReceiver.expectMsg(clientSideChannel -> OP_CONNECT)
registerCallReceiver.sender must be(connectionActor)
body
}
}
@ -770,7 +777,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting))
selector.expectMsg(ReadInterest)
interestCallReceiver.expectMsg(OP_READ)
clientSelectionKey // trigger initialization
serverSelectionKey // trigger initialization
@ -781,7 +788,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
}
}
val TestSize = 10000
final val TestSize = 10000 // compile-time constant
def writeCmd(ack: AnyRef) =
Write(ByteString(Array.fill[Byte](TestSize)(0)), ack)
@ -822,12 +829,12 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
if (remainingTries <= 0)
throw new AssertionError("Pulling took too many loops, remaining data: " + remaining)
else if (remaining > 0) {
if (selector.msgAvailable) {
selector.expectMsg(WriteInterest)
clientSelectionKey.interestOps(SelectionKey.OP_WRITE)
if (interestCallReceiver.msgAvailable) {
interestCallReceiver.expectMsg(OP_WRITE)
clientSelectionKey.interestOps(OP_WRITE)
}
serverSelectionKey.interestOps(SelectionKey.OP_READ)
serverSelectionKey.interestOps(OP_READ)
nioSelector.select(10)
if (nioSelector.selectedKeys().contains(clientSelectionKey)) {
clientSelectionKey.interestOps(0)