diff --git a/akka-io/src/main/resources/reference.conf b/akka-io/src/main/resources/reference.conf index 6104ac96c5..c48a246f5d 100644 --- a/akka-io/src/main/resources/reference.conf +++ b/akka-io/src/main/resources/reference.conf @@ -48,15 +48,20 @@ akka { # the worker-dispatcher batch-accept-limit = 10 - # The size of the thread-local direct buffers used to read or write + # The number of bytes per thread-local direct buffer used to read or write # network data from the kernel. Those buffer directly add to the footprint - # of the threads from the dispatcher tcp connection actors are using. + # of all threads from the dispatcher which TCP connection actors are using. direct-buffer-size = 524288 # The duration a connection actor waits for a `Register` message from # its commander before aborting the connection. register-timeout = 5s + # Enable fine grained logging of what goes on inside the implementation. + # Be aware that this may log more than once per message sent to the actors + # of the tcp implementation. + trace-logging = off + # Fully qualified config path which holds the dispatcher configuration # to be used for running the select() calls in the selectors selector-dispatcher = "akka.io.pinned-dispatcher" diff --git a/akka-io/src/main/scala/akka/io/Tcp.scala b/akka-io/src/main/scala/akka/io/Tcp.scala index c3f49a762e..af2c30d0e2 100644 --- a/akka-io/src/main/scala/akka/io/Tcp.scala +++ b/akka-io/src/main/scala/akka/io/Tcp.scala @@ -152,11 +152,21 @@ object Tcp extends ExtensionKey[TcpExt] { case object ConfirmedClose extends CloseCommand case object Abort extends CloseCommand - case class Write(data: ByteString, ack: AnyRef) extends Command + case object NoAck + + /** + * Write data to the TCP connection. If no ack is needed use the special + * `NoAck` object. + */ + case class Write(data: ByteString, ack: AnyRef) extends Command { + require(ack ne null, "ack must be non-null. Use NoAck if you don't want acks.") + + def wantsAck: Boolean = ack ne NoAck + } object Write { - val Empty: Write = Write(ByteString.empty, null) + val Empty: Write = Write(ByteString.empty, NoAck) def apply(data: ByteString): Write = - if (data.isEmpty) Empty else Write(data, null) + if (data.isEmpty) Empty else Write(data, NoAck) } case object StopReading extends Command @@ -178,7 +188,7 @@ object Tcp extends ExtensionKey[TcpExt] { case object Aborted extends ConnectionClosed case object ConfirmedClosed extends ConnectionClosed case object PeerClosed extends ConnectionClosed - case class ErrorClose(cause: Throwable) extends ConnectionClosed + case class ErrorClose(cause: String) extends ConnectionClosed /// INTERNAL case class RegisterOutgoingConnection(channel: SocketChannel) @@ -216,6 +226,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val SelectorDispatcher = getString("selector-dispatcher") val WorkerDispatcher = getString("worker-dispatcher") val ManagementDispatcher = getString("management-dispatcher") + val TraceLogging = getBoolean("trace-logging") require(NrOfSelectors > 0, "nr-of-selectors must be > 0") require(MaxChannels >= 0, "max-channels must be >= 0") diff --git a/akka-io/src/main/scala/akka/io/TcpConnection.scala b/akka-io/src/main/scala/akka/io/TcpConnection.scala index 707e1664de..f7589dd6d3 100644 --- a/akka-io/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpConnection.scala @@ -13,26 +13,35 @@ import scala.concurrent.duration._ import akka.actor._ import akka.util.ByteString import Tcp._ +import annotation.tailrec /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. */ abstract class TcpConnection(val selector: ActorRef, val channel: SocketChannel) extends Actor with ThreadLocalDirectBuffer with ActorLogging { + val tcp = Tcp(context.system) channel.configureBlocking(false) var pendingWrite: Write = Write.Empty // a write "queue" of size 1 for holding one unfinished write command + var pendingWriteCommander: ActorRef = null + + // Needed to send the ConnectionClosed message in the postStop handler. + // First element is the handler, second the particular close message. + var closedMessage: (ActorRef, ConnectionClosed) = null + def writePending = pendingWrite ne Write.Empty - def registerTimeout = Tcp(context.system).Settings.RegisterTimeout + def registerTimeout = tcp.Settings.RegisterTimeout + def traceLoggingEnabled = tcp.Settings.TraceLogging // STATES /** connection established, waiting for registration from user handler */ def waitingForRegistration(commander: ActorRef): Receive = { case Register(handler) ⇒ - log.debug("{} registered as connection handler", handler) + if (traceLoggingEnabled) log.debug("{} registered as connection handler", handler) selector ! ReadInterest context.setReceiveTimeout(Duration.Undefined) @@ -44,8 +53,8 @@ abstract class TcpConnection(val selector: ActorRef, handleClose(commander, closeResponse(cmd)) case ReceiveTimeout ⇒ - // TODO: just shutting down, as we do here, presents a race condition to the user - // Should we introduce a dedicated `Registered` event message to notify the user of successful registration? + // after sending `Register` user should watch this actor to make sure + // it didn't die because of the timeout log.warning("Configured registration timeout of {} expired, stopping", registerTimeout) context.stop(self) } @@ -57,11 +66,18 @@ abstract class TcpConnection(val selector: ActorRef, case ChannelReadable ⇒ doRead(handler) case write: Write if writePending ⇒ - log.debug("Dropping write because queue is full") - handler ! CommandFailed(write) + if (traceLoggingEnabled) log.debug("Dropping write because queue is full") + sender ! CommandFailed(write) - case write: Write ⇒ doWrite(handler, write) - case ChannelWritable ⇒ doWrite(handler, pendingWrite) + case write: Write if write.data.isEmpty ⇒ + if (write.wantsAck) + sender ! write.ack + + case write: Write ⇒ + pendingWriteCommander = sender + pendingWrite = write + doWrite(handler) + case ChannelWritable ⇒ doWrite(handler) case cmd: CloseCommand ⇒ handleClose(handler, closeResponse(cmd)) } @@ -73,7 +89,7 @@ abstract class TcpConnection(val selector: ActorRef, case ChannelReadable ⇒ doRead(handler) case ChannelWritable ⇒ - doWrite(handler, pendingWrite) + doWrite(handler) if (!writePending) // writing is now finished handleClose(handler, closedEvent) @@ -111,17 +127,17 @@ abstract class TcpConnection(val selector: ActorRef, buffer.flip() if (readBytes > 0) { - log.debug("Read {} bytes", readBytes) - handler ! Received(ByteString(buffer).take(readBytes)) + if (traceLoggingEnabled) log.debug("Read {} bytes", readBytes) + handler ! Received(ByteString(buffer)) if (readBytes == buffer.capacity()) // directly try reading more because we exhausted our buffer self ! ChannelReadable else selector ! ReadInterest } else if (readBytes == 0) { - log.debug("Read nothing. Registering read interest with selector") + if (traceLoggingEnabled) log.debug("Read nothing. Registering read interest with selector") selector ! ReadInterest } else if (readBytes == -1) { - log.debug("Read returned end-of-stream") + if (traceLoggingEnabled) log.debug("Read returned end-of-stream") doCloseConnection(handler, closeReason) } else throw new IllegalStateException("Unexpected value returned from read: " + readBytes) @@ -130,7 +146,8 @@ abstract class TcpConnection(val selector: ActorRef, } } - def doWrite(handler: ActorRef, write: Write): Unit = { + def doWrite(handler: ActorRef): Unit = { + val write = pendingWrite val data = write.data val buffer = directBuffer() @@ -138,13 +155,15 @@ abstract class TcpConnection(val selector: ActorRef, buffer.flip() try { - log.debug("Trying to write to channel") val writtenBytes = channel.write(buffer) - log.debug("Wrote {} bytes", writtenBytes) + if (traceLoggingEnabled) log.debug("Wrote {} bytes", writtenBytes) pendingWrite = consume(write, writtenBytes) if (writePending) selector ! WriteInterest // still data to write - else if (write.ack != null) handler ! write.ack // everything written + else if (write.wantsAck) { + pendingWriteCommander ! write.ack + pendingWriteCommander = null + } // everything written } catch { case e: IOException ⇒ handleError(handler, e) } @@ -156,20 +175,20 @@ abstract class TcpConnection(val selector: ActorRef, def handleClose(handler: ActorRef, closedEvent: ConnectionClosed): Unit = if (closedEvent == Aborted) { // close instantly - log.debug("Got Abort command. RESETing connection.") + if (traceLoggingEnabled) log.debug("Got Abort command. RESETing connection.") doCloseConnection(handler, closedEvent) } else if (writePending) { // finish writing first - log.debug("Got Close command but write is still pending.") + if (traceLoggingEnabled) log.debug("Got Close command but write is still pending.") context.become(closingWithPendingWrite(handler, closedEvent)) } else if (closedEvent == ConfirmedClosed) { // shutdown output and wait for confirmation - log.debug("Got ConfirmedClose command, sending FIN.") + if (traceLoggingEnabled) log.debug("Got ConfirmedClose command, sending FIN.") channel.socket.shutdownOutput() context.become(closing(handler)) } else { // close now - log.debug("Got Close command, closing connection.") + if (traceLoggingEnabled) log.debug("Got Close command, closing connection.") doCloseConnection(handler, closedEvent) } @@ -177,7 +196,8 @@ abstract class TcpConnection(val selector: ActorRef, if (closedEvent == Aborted) abort() else channel.close() - handler ! closedEvent + closedMessage = (handler, closedEvent) + context.stop(self) } @@ -189,10 +209,18 @@ abstract class TcpConnection(val selector: ActorRef, } def handleError(handler: ActorRef, exception: IOException): Unit = { - exception.setStackTrace(Array.empty) - handler ! ErrorClose(exception) + closedMessage = (handler, ErrorClose(extractMsg(exception))) + throw exception } + @tailrec private[this] def extractMsg(t: Throwable): String = + if (t == null) "unknown" + else { + t.getMessage match { + case null | "" ⇒ extractMsg(t.getCause) + case msg ⇒ msg + } + } def abort(): Unit = { try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST @@ -200,26 +228,34 @@ abstract class TcpConnection(val selector: ActorRef, case NonFatal(e) ⇒ // setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574 // (also affected: OS/X Java 1.6.0_37) - log.debug("setSoLinger(true, 0) failed with {}", e) + if (traceLoggingEnabled) log.debug("setSoLinger(true, 0) failed with {}", e) } channel.close() } - override def postStop(): Unit = + override def postStop(): Unit = { + if (closedMessage != null) { + val msg = closedMessage._2 + closedMessage._1 ! msg + + if (writePending) + pendingWriteCommander ! msg + } + if (channel.isOpen) abort() + } + + override def postRestart(reason: Throwable): Unit = + throw new IllegalStateException("Restarting not supported for connection actors.") /** Returns a new write with `numBytes` removed from the front */ def consume(write: Write, numBytes: Int): Write = - write match { - case Write.Empty if numBytes == 0 ⇒ write + numBytes match { + case 0 ⇒ write + case x if x == write.data.length ⇒ Write.Empty case _ ⇒ - numBytes match { - case 0 ⇒ write - case x if x == write.data.length ⇒ Write.Empty - case _ ⇒ - require(numBytes > 0 && numBytes < write.data.length) - write.copy(data = write.data.drop(numBytes)) - } + require(numBytes > 0 && numBytes < write.data.length) + write.copy(data = write.data.drop(numBytes)) } } diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 934cdf3fd8..6dd5810e17 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -53,29 +53,32 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII"))) // emulate selector behavior selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsgPF(remaining) { - case Received(data) if data.decodeString("ASCII") == "testdata" ⇒ - } + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") must be("testdata") // have two packets in flight before the selector notices serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII"))) serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII"))) selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsgPF(remaining) { - case Received(data) if data.decodeString("ASCII") == "testdata2testdata3" ⇒ - } + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") must be("testdata2testdata3") } "write data to network (and acknowledge)" in withEstablishedConnection() { setup ⇒ import setup._ serverSideChannel.configureBlocking(false) + object Ack + val writer = TestProbe() + + // directly acknowledge an empty write + writer.send(connectionActor, Write(ByteString.empty, Ack)) + writer.expectMsg(Ack) + val write = Write(ByteString("testdata"), Ack) val buffer = ByteBuffer.allocate(100) serverSideChannel.read(buffer) must be(0) - // emulate selector behavior - connectionHandler.send(connectionActor, write) - connectionHandler.expectMsg(Ack) + writer.send(connectionActor, write) + // make sure the writer gets the ack + writer.expectMsg(Ack) serverSideChannel.read(buffer) must be(8) buffer.flip() ByteString(buffer).take(8).decodeString("ASCII") must be("testdata") @@ -90,6 +93,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") //serverSideChannel.configureBlocking(false) clientSideChannel.socket.setSendBufferSize(1024) + val writer = TestProbe() + // producing backpressure by sending much more than currently fits into // our send buffer val firstWrite = writeCmd(Ack1) @@ -97,14 +102,18 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // try to write the buffer but since the SO_SNDBUF is too small // it will have to keep the rest of the piece and send it // when possible - connectionHandler.send(connectionActor, firstWrite) + writer.send(connectionActor, firstWrite) selector.expectMsg(WriteInterest) // send another write which should fail immediately // because we don't store more than one piece in flight val secondWrite = writeCmd(Ack2) - connectionHandler.send(connectionActor, secondWrite) - connectionHandler.expectMsg(CommandFailed(secondWrite)) + writer.send(connectionActor, secondWrite) + writer.expectMsg(CommandFailed(secondWrite)) + + // reject even empty writes + writer.send(connectionActor, Write.Empty) + writer.expectMsg(CommandFailed(Write.Empty)) // there will be immediately more space in the send buffer because // some data will have been sent by now, so we assume we can write @@ -113,8 +122,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // both buffers should now be filled so no more writing // is possible - setup.pullFromServerSide(TestSize) - connectionHandler.expectMsg(Ack1) + pullFromServerSide(TestSize) + writer.expectMsg(Ack1) } "respect StopReading and ResumeReading" in withEstablishedConnection() { setup ⇒ @@ -141,10 +150,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") connectionHandler.send(connectionActor, writeCmd(Ack)) connectionHandler.send(connectionActor, Close) - setup.pullFromServerSide(TestSize) + pullFromServerSide(TestSize) connectionHandler.expectMsg(Ack) connectionHandler.expectMsg(Closed) - connectionActor.isTerminated must be(true) + assertThisConnectionActorTerminated() val buffer = ByteBuffer.allocate(1) serverSideChannel.read(buffer) must be(-1) @@ -177,7 +186,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") connectionHandler.send(connectionActor, ConfirmedClose) connectionHandler.expectNoMsg(100.millis) - setup.pullFromServerSide(TestSize) + pullFromServerSide(TestSize) connectionHandler.expectMsg(Ack) selector.send(connectionActor, ChannelReadable) @@ -207,9 +216,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") abortClose(serverSideChannel) selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsgPF(remaining) { - case ErrorClose(exc: IOException) ⇒ exc.getMessage must be("Connection reset by peer") - } + connectionHandler.expectMsgType[ErrorClose].cause must be("Connection reset by peer") // wait a while connectionHandler.expectNoMsg(200.millis) @@ -218,11 +225,13 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "report when peer closed the connection when trying to write" in withEstablishedConnection() { setup ⇒ import setup._ + val writer = TestProbe() + abortClose(serverSideChannel) - connectionHandler.send(connectionActor, Write(ByteString("testdata"))) - connectionHandler.expectMsgPF(remaining) { - case ErrorClose(_: IOException) ⇒ // ok - } + writer.send(connectionActor, Write(ByteString("testdata"))) + // bother writer and handler should get the message + writer.expectMsgType[ErrorClose] + connectionHandler.expectMsgType[ErrorClose] assertThisConnectionActorTerminated() } @@ -234,9 +243,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") localServer.close() selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsgPF() { - case ErrorClose(e) ⇒ e.getMessage must be("Connection reset by peer") - } + userHandler.expectMsgType[ErrorClose].cause must be("Connection reset by peer") assertActorTerminated(connectionActor) } @@ -252,9 +259,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") key.isConnectable must be(true) selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsgPF() { - case ErrorClose(e) ⇒ e.getMessage must be("Connection refused") - } + userHandler.expectMsgType[ErrorClose].cause must be("Connection refused") assertActorTerminated(connectionActor) } @@ -419,7 +424,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") channel.close() } def assertActorTerminated(connectionActor: TestActorRef[TcpOutgoingConnection]): Unit = { - watch(connectionActor) - expectMsgType[Terminated].actor must be(connectionActor) + val watcher = TestProbe() + watcher.watch(connectionActor) + watcher.expectMsgType[Terminated].actor must be(connectionActor) } }