fix issues discussed in the pull request

This commit is contained in:
Johannes Rudolph 2013-01-17 14:45:50 +01:00
parent e11c3fe6bb
commit 18aecef4bd
4 changed files with 131 additions and 73 deletions

View file

@ -48,15 +48,20 @@ akka {
# the worker-dispatcher
batch-accept-limit = 10
# The size of the thread-local direct buffers used to read or write
# The number of bytes per thread-local direct buffer used to read or write
# network data from the kernel. Those buffer directly add to the footprint
# of the threads from the dispatcher tcp connection actors are using.
# of all threads from the dispatcher which TCP connection actors are using.
direct-buffer-size = 524288
# The duration a connection actor waits for a `Register` message from
# its commander before aborting the connection.
register-timeout = 5s
# Enable fine grained logging of what goes on inside the implementation.
# Be aware that this may log more than once per message sent to the actors
# of the tcp implementation.
trace-logging = off
# Fully qualified config path which holds the dispatcher configuration
# to be used for running the select() calls in the selectors
selector-dispatcher = "akka.io.pinned-dispatcher"

View file

@ -152,11 +152,21 @@ object Tcp extends ExtensionKey[TcpExt] {
case object ConfirmedClose extends CloseCommand
case object Abort extends CloseCommand
case class Write(data: ByteString, ack: AnyRef) extends Command
case object NoAck
/**
* Write data to the TCP connection. If no ack is needed use the special
* `NoAck` object.
*/
case class Write(data: ByteString, ack: AnyRef) extends Command {
require(ack ne null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = ack ne NoAck
}
object Write {
val Empty: Write = Write(ByteString.empty, null)
val Empty: Write = Write(ByteString.empty, NoAck)
def apply(data: ByteString): Write =
if (data.isEmpty) Empty else Write(data, null)
if (data.isEmpty) Empty else Write(data, NoAck)
}
case object StopReading extends Command
@ -178,7 +188,7 @@ object Tcp extends ExtensionKey[TcpExt] {
case object Aborted extends ConnectionClosed
case object ConfirmedClosed extends ConnectionClosed
case object PeerClosed extends ConnectionClosed
case class ErrorClose(cause: Throwable) extends ConnectionClosed
case class ErrorClose(cause: String) extends ConnectionClosed
/// INTERNAL
case class RegisterOutgoingConnection(channel: SocketChannel)
@ -216,6 +226,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
val SelectorDispatcher = getString("selector-dispatcher")
val WorkerDispatcher = getString("worker-dispatcher")
val ManagementDispatcher = getString("management-dispatcher")
val TraceLogging = getBoolean("trace-logging")
require(NrOfSelectors > 0, "nr-of-selectors must be > 0")
require(MaxChannels >= 0, "max-channels must be >= 0")

View file

@ -13,26 +13,35 @@ import scala.concurrent.duration._
import akka.actor._
import akka.util.ByteString
import Tcp._
import annotation.tailrec
/**
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
*/
abstract class TcpConnection(val selector: ActorRef,
val channel: SocketChannel) extends Actor with ThreadLocalDirectBuffer with ActorLogging {
val tcp = Tcp(context.system)
channel.configureBlocking(false)
var pendingWrite: Write = Write.Empty // a write "queue" of size 1 for holding one unfinished write command
var pendingWriteCommander: ActorRef = null
// Needed to send the ConnectionClosed message in the postStop handler.
// First element is the handler, second the particular close message.
var closedMessage: (ActorRef, ConnectionClosed) = null
def writePending = pendingWrite ne Write.Empty
def registerTimeout = Tcp(context.system).Settings.RegisterTimeout
def registerTimeout = tcp.Settings.RegisterTimeout
def traceLoggingEnabled = tcp.Settings.TraceLogging
// STATES
/** connection established, waiting for registration from user handler */
def waitingForRegistration(commander: ActorRef): Receive = {
case Register(handler)
log.debug("{} registered as connection handler", handler)
if (traceLoggingEnabled) log.debug("{} registered as connection handler", handler)
selector ! ReadInterest
context.setReceiveTimeout(Duration.Undefined)
@ -44,8 +53,8 @@ abstract class TcpConnection(val selector: ActorRef,
handleClose(commander, closeResponse(cmd))
case ReceiveTimeout
// TODO: just shutting down, as we do here, presents a race condition to the user
// Should we introduce a dedicated `Registered` event message to notify the user of successful registration?
// after sending `Register` user should watch this actor to make sure
// it didn't die because of the timeout
log.warning("Configured registration timeout of {} expired, stopping", registerTimeout)
context.stop(self)
}
@ -57,11 +66,18 @@ abstract class TcpConnection(val selector: ActorRef,
case ChannelReadable doRead(handler)
case write: Write if writePending
log.debug("Dropping write because queue is full")
handler ! CommandFailed(write)
if (traceLoggingEnabled) log.debug("Dropping write because queue is full")
sender ! CommandFailed(write)
case write: Write doWrite(handler, write)
case ChannelWritable doWrite(handler, pendingWrite)
case write: Write if write.data.isEmpty
if (write.wantsAck)
sender ! write.ack
case write: Write
pendingWriteCommander = sender
pendingWrite = write
doWrite(handler)
case ChannelWritable doWrite(handler)
case cmd: CloseCommand handleClose(handler, closeResponse(cmd))
}
@ -73,7 +89,7 @@ abstract class TcpConnection(val selector: ActorRef,
case ChannelReadable doRead(handler)
case ChannelWritable
doWrite(handler, pendingWrite)
doWrite(handler)
if (!writePending) // writing is now finished
handleClose(handler, closedEvent)
@ -111,17 +127,17 @@ abstract class TcpConnection(val selector: ActorRef,
buffer.flip()
if (readBytes > 0) {
log.debug("Read {} bytes", readBytes)
handler ! Received(ByteString(buffer).take(readBytes))
if (traceLoggingEnabled) log.debug("Read {} bytes", readBytes)
handler ! Received(ByteString(buffer))
if (readBytes == buffer.capacity())
// directly try reading more because we exhausted our buffer
self ! ChannelReadable
else selector ! ReadInterest
} else if (readBytes == 0) {
log.debug("Read nothing. Registering read interest with selector")
if (traceLoggingEnabled) log.debug("Read nothing. Registering read interest with selector")
selector ! ReadInterest
} else if (readBytes == -1) {
log.debug("Read returned end-of-stream")
if (traceLoggingEnabled) log.debug("Read returned end-of-stream")
doCloseConnection(handler, closeReason)
} else throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
@ -130,7 +146,8 @@ abstract class TcpConnection(val selector: ActorRef,
}
}
def doWrite(handler: ActorRef, write: Write): Unit = {
def doWrite(handler: ActorRef): Unit = {
val write = pendingWrite
val data = write.data
val buffer = directBuffer()
@ -138,13 +155,15 @@ abstract class TcpConnection(val selector: ActorRef,
buffer.flip()
try {
log.debug("Trying to write to channel")
val writtenBytes = channel.write(buffer)
log.debug("Wrote {} bytes", writtenBytes)
if (traceLoggingEnabled) log.debug("Wrote {} bytes", writtenBytes)
pendingWrite = consume(write, writtenBytes)
if (writePending) selector ! WriteInterest // still data to write
else if (write.ack != null) handler ! write.ack // everything written
else if (write.wantsAck) {
pendingWriteCommander ! write.ack
pendingWriteCommander = null
} // everything written
} catch {
case e: IOException handleError(handler, e)
}
@ -156,20 +175,20 @@ abstract class TcpConnection(val selector: ActorRef,
def handleClose(handler: ActorRef, closedEvent: ConnectionClosed): Unit =
if (closedEvent == Aborted) { // close instantly
log.debug("Got Abort command. RESETing connection.")
if (traceLoggingEnabled) log.debug("Got Abort command. RESETing connection.")
doCloseConnection(handler, closedEvent)
} else if (writePending) { // finish writing first
log.debug("Got Close command but write is still pending.")
if (traceLoggingEnabled) log.debug("Got Close command but write is still pending.")
context.become(closingWithPendingWrite(handler, closedEvent))
} else if (closedEvent == ConfirmedClosed) { // shutdown output and wait for confirmation
log.debug("Got ConfirmedClose command, sending FIN.")
if (traceLoggingEnabled) log.debug("Got ConfirmedClose command, sending FIN.")
channel.socket.shutdownOutput()
context.become(closing(handler))
} else { // close now
log.debug("Got Close command, closing connection.")
if (traceLoggingEnabled) log.debug("Got Close command, closing connection.")
doCloseConnection(handler, closedEvent)
}
@ -177,7 +196,8 @@ abstract class TcpConnection(val selector: ActorRef,
if (closedEvent == Aborted) abort()
else channel.close()
handler ! closedEvent
closedMessage = (handler, closedEvent)
context.stop(self)
}
@ -189,10 +209,18 @@ abstract class TcpConnection(val selector: ActorRef,
}
def handleError(handler: ActorRef, exception: IOException): Unit = {
exception.setStackTrace(Array.empty)
handler ! ErrorClose(exception)
closedMessage = (handler, ErrorClose(extractMsg(exception)))
throw exception
}
@tailrec private[this] def extractMsg(t: Throwable): String =
if (t == null) "unknown"
else {
t.getMessage match {
case null | "" extractMsg(t.getCause)
case msg msg
}
}
def abort(): Unit = {
try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST
@ -200,26 +228,34 @@ abstract class TcpConnection(val selector: ActorRef,
case NonFatal(e)
// setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574
// (also affected: OS/X Java 1.6.0_37)
log.debug("setSoLinger(true, 0) failed with {}", e)
if (traceLoggingEnabled) log.debug("setSoLinger(true, 0) failed with {}", e)
}
channel.close()
}
override def postStop(): Unit =
override def postStop(): Unit = {
if (closedMessage != null) {
val msg = closedMessage._2
closedMessage._1 ! msg
if (writePending)
pendingWriteCommander ! msg
}
if (channel.isOpen)
abort()
}
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
/** Returns a new write with `numBytes` removed from the front */
def consume(write: Write, numBytes: Int): Write =
write match {
case Write.Empty if numBytes == 0 write
numBytes match {
case 0 write
case x if x == write.data.length Write.Empty
case _
numBytes match {
case 0 write
case x if x == write.data.length Write.Empty
case _
require(numBytes > 0 && numBytes < write.data.length)
write.copy(data = write.data.drop(numBytes))
}
require(numBytes > 0 && numBytes < write.data.length)
write.copy(data = write.data.drop(numBytes))
}
}

View file

@ -53,29 +53,32 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII")))
// emulate selector behavior
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgPF(remaining) {
case Received(data) if data.decodeString("ASCII") == "testdata"
}
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") must be("testdata")
// have two packets in flight before the selector notices
serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII")))
serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII")))
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgPF(remaining) {
case Received(data) if data.decodeString("ASCII") == "testdata2testdata3"
}
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") must be("testdata2testdata3")
}
"write data to network (and acknowledge)" in withEstablishedConnection() { setup
import setup._
serverSideChannel.configureBlocking(false)
object Ack
val writer = TestProbe()
// directly acknowledge an empty write
writer.send(connectionActor, Write(ByteString.empty, Ack))
writer.expectMsg(Ack)
val write = Write(ByteString("testdata"), Ack)
val buffer = ByteBuffer.allocate(100)
serverSideChannel.read(buffer) must be(0)
// emulate selector behavior
connectionHandler.send(connectionActor, write)
connectionHandler.expectMsg(Ack)
writer.send(connectionActor, write)
// make sure the writer gets the ack
writer.expectMsg(Ack)
serverSideChannel.read(buffer) must be(8)
buffer.flip()
ByteString(buffer).take(8).decodeString("ASCII") must be("testdata")
@ -90,6 +93,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
//serverSideChannel.configureBlocking(false)
clientSideChannel.socket.setSendBufferSize(1024)
val writer = TestProbe()
// producing backpressure by sending much more than currently fits into
// our send buffer
val firstWrite = writeCmd(Ack1)
@ -97,14 +102,18 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
// try to write the buffer but since the SO_SNDBUF is too small
// it will have to keep the rest of the piece and send it
// when possible
connectionHandler.send(connectionActor, firstWrite)
writer.send(connectionActor, firstWrite)
selector.expectMsg(WriteInterest)
// send another write which should fail immediately
// because we don't store more than one piece in flight
val secondWrite = writeCmd(Ack2)
connectionHandler.send(connectionActor, secondWrite)
connectionHandler.expectMsg(CommandFailed(secondWrite))
writer.send(connectionActor, secondWrite)
writer.expectMsg(CommandFailed(secondWrite))
// reject even empty writes
writer.send(connectionActor, Write.Empty)
writer.expectMsg(CommandFailed(Write.Empty))
// there will be immediately more space in the send buffer because
// some data will have been sent by now, so we assume we can write
@ -113,8 +122,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
// both buffers should now be filled so no more writing
// is possible
setup.pullFromServerSide(TestSize)
connectionHandler.expectMsg(Ack1)
pullFromServerSide(TestSize)
writer.expectMsg(Ack1)
}
"respect StopReading and ResumeReading" in withEstablishedConnection() { setup
@ -141,10 +150,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
connectionHandler.send(connectionActor, writeCmd(Ack))
connectionHandler.send(connectionActor, Close)
setup.pullFromServerSide(TestSize)
pullFromServerSide(TestSize)
connectionHandler.expectMsg(Ack)
connectionHandler.expectMsg(Closed)
connectionActor.isTerminated must be(true)
assertThisConnectionActorTerminated()
val buffer = ByteBuffer.allocate(1)
serverSideChannel.read(buffer) must be(-1)
@ -177,7 +186,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
connectionHandler.send(connectionActor, ConfirmedClose)
connectionHandler.expectNoMsg(100.millis)
setup.pullFromServerSide(TestSize)
pullFromServerSide(TestSize)
connectionHandler.expectMsg(Ack)
selector.send(connectionActor, ChannelReadable)
@ -207,9 +216,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
abortClose(serverSideChannel)
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgPF(remaining) {
case ErrorClose(exc: IOException) exc.getMessage must be("Connection reset by peer")
}
connectionHandler.expectMsgType[ErrorClose].cause must be("Connection reset by peer")
// wait a while
connectionHandler.expectNoMsg(200.millis)
@ -218,11 +225,13 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"report when peer closed the connection when trying to write" in withEstablishedConnection() { setup
import setup._
val writer = TestProbe()
abortClose(serverSideChannel)
connectionHandler.send(connectionActor, Write(ByteString("testdata")))
connectionHandler.expectMsgPF(remaining) {
case ErrorClose(_: IOException) // ok
}
writer.send(connectionActor, Write(ByteString("testdata")))
// bother writer and handler should get the message
writer.expectMsgType[ErrorClose]
connectionHandler.expectMsgType[ErrorClose]
assertThisConnectionActorTerminated()
}
@ -234,9 +243,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
localServer.close()
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsgPF() {
case ErrorClose(e) e.getMessage must be("Connection reset by peer")
}
userHandler.expectMsgType[ErrorClose].cause must be("Connection reset by peer")
assertActorTerminated(connectionActor)
}
@ -252,9 +259,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
key.isConnectable must be(true)
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsgPF() {
case ErrorClose(e) e.getMessage must be("Connection refused")
}
userHandler.expectMsgType[ErrorClose].cause must be("Connection refused")
assertActorTerminated(connectionActor)
}
@ -419,7 +424,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
channel.close()
}
def assertActorTerminated(connectionActor: TestActorRef[TcpOutgoingConnection]): Unit = {
watch(connectionActor)
expectMsgType[Terminated].actor must be(connectionActor)
val watcher = TestProbe()
watcher.watch(connectionActor)
watcher.expectMsgType[Terminated].actor must be(connectionActor)
}
}