diff --git a/akka-io/src/main/resources/reference.conf b/akka-io/src/main/resources/reference.conf index c48a246f5d..3595cd12f5 100644 --- a/akka-io/src/main/resources/reference.conf +++ b/akka-io/src/main/resources/reference.conf @@ -48,10 +48,13 @@ akka { # the worker-dispatcher batch-accept-limit = 10 - # The number of bytes per thread-local direct buffer used to read or write - # network data from the kernel. Those buffer directly add to the footprint - # of all threads from the dispatcher which TCP connection actors are using. - direct-buffer-size = 524288 + # The number of bytes per direct buffer in the pool used to read or write + # network data from the kernel. + direct-buffer-size = 131072 + + # The maximal number of direct buffers kept in the direct buffer pool for + # reuse. + max-direct-buffer-pool-size = 1000 # The duration a connection actor waits for a `Register` message from # its commander before aborting the connection. diff --git a/akka-io/src/main/scala/akka/io/DirectByteBufferPool.scala b/akka-io/src/main/scala/akka/io/DirectByteBufferPool.scala new file mode 100644 index 0000000000..4fd558a162 --- /dev/null +++ b/akka-io/src/main/scala/akka/io/DirectByteBufferPool.scala @@ -0,0 +1,67 @@ +package akka.io + +import java.util.concurrent.atomic.AtomicInteger +import java.nio.ByteBuffer +import annotation.tailrec + +trait WithBufferPool { + def tcp: TcpExt + + def acquireBuffer(): ByteBuffer = + tcp.bufferPool.acquire() + + def acquireBuffer(size: Int): ByteBuffer = + tcp.bufferPool.acquire(size) + + def releaseBuffer(buffer: ByteBuffer) = + tcp.bufferPool.release(buffer) +} + +/** + * A buffer pool which keeps direct buffers of a specified default size. + * If a buffer bigger than the default size is requested it is created + * but will not be pooled on release. + * + * This implementation is very loosely based on the one from Netty. + */ +class DirectByteBufferPool(bufferSize: Int, maxPoolSize: Int) { + private val Unlocked = 0 + private val Locked = 1 + + private[this] val state = new AtomicInteger(Unlocked) + @volatile private[this] var pool: List[ByteBuffer] = Nil + @volatile private[this] var poolSize: Int = 0 + + private[this] def allocate(size: Int): ByteBuffer = + ByteBuffer.allocateDirect(size) + + def acquire(size: Int = bufferSize): ByteBuffer = { + if (poolSize == 0 || size > bufferSize) allocate(size) + else takeBufferFromPool() + } + + def release(buf: ByteBuffer): Unit = + if (buf.capacity() <= bufferSize && poolSize < maxPoolSize) + addBufferToPool(buf) + + @tailrec + final def takeBufferFromPool(): ByteBuffer = + if (state.compareAndSet(Unlocked, Locked)) + try pool match { + case Nil ⇒ allocate(bufferSize) // we have no more buffer available, so create a new one + case buf :: tail ⇒ + pool = tail + poolSize -= 1 + buf + } finally state.set(Unlocked) + else takeBufferFromPool() // spin while locked + + @tailrec + final def addBufferToPool(buf: ByteBuffer): Unit = + if (state.compareAndSet(Unlocked, Locked)) { + buf.clear() // ensure that we never have dirty buffers in the pool + pool = buf :: pool + poolSize += 1 + state.set(Unlocked) + } else addBufferToPool(buf) // spin while locked +} diff --git a/akka-io/src/main/scala/akka/io/Tcp.scala b/akka-io/src/main/scala/akka/io/Tcp.scala index af2c30d0e2..4107ee3e82 100644 --- a/akka-io/src/main/scala/akka/io/Tcp.scala +++ b/akka-io/src/main/scala/akka/io/Tcp.scala @@ -220,6 +220,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val SelectorAssociationRetries = getInt("selector-association-retries") val BatchAcceptLimit = getInt("batch-accept-limit") val DirectBufferSize = getInt("direct-buffer-size") + val MaxDirectBufferPoolSize = getInt("max-direct-buffer-pool-size") val RegisterTimeout = if (getString("register-timeout") == "infinite") Duration.Undefined else Duration(getMilliseconds("register-timeout"), MILLISECONDS) @@ -240,4 +241,5 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf( Props.empty.withDispatcher(Settings.ManagementDispatcher), "IO-TCP") + val bufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize) } diff --git a/akka-io/src/main/scala/akka/io/TcpConnection.scala b/akka-io/src/main/scala/akka/io/TcpConnection.scala index f7589dd6d3..03069f0d01 100644 --- a/akka-io/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpConnection.scala @@ -14,24 +14,24 @@ import akka.actor._ import akka.util.ByteString import Tcp._ import annotation.tailrec +import java.nio.ByteBuffer /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. */ abstract class TcpConnection(val selector: ActorRef, - val channel: SocketChannel) extends Actor with ThreadLocalDirectBuffer with ActorLogging { + val channel: SocketChannel) extends Actor with ActorLogging with WithBufferPool { val tcp = Tcp(context.system) channel.configureBlocking(false) - var pendingWrite: Write = Write.Empty // a write "queue" of size 1 for holding one unfinished write command - var pendingWriteCommander: ActorRef = null + var pendingWrite: PendingWrite = null // Needed to send the ConnectionClosed message in the postStop handler. // First element is the handler, second the particular close message. var closedMessage: (ActorRef, ConnectionClosed) = null - def writePending = pendingWrite ne Write.Empty + def writePending = pendingWrite ne null def registerTimeout = tcp.Settings.RegisterTimeout def traceLoggingEnabled = tcp.Settings.TraceLogging @@ -74,8 +74,8 @@ abstract class TcpConnection(val selector: ActorRef, sender ! write.ack case write: Write ⇒ - pendingWriteCommander = sender - pendingWrite = write + pendingWrite = createWrite(write) + doWrite(handler) case ChannelWritable ⇒ doWrite(handler) @@ -119,16 +119,17 @@ abstract class TcpConnection(val selector: ActorRef, } def doRead(handler: ActorRef): Unit = { - val buffer = directBuffer() + val buffer = acquireBuffer() try { - log.debug("Trying to read from channel") val readBytes = channel.read(buffer) buffer.flip() if (readBytes > 0) { if (traceLoggingEnabled) log.debug("Read {} bytes", readBytes) handler ! Received(ByteString(buffer)) + releaseBuffer(buffer) + if (readBytes == buffer.capacity()) // directly try reading more because we exhausted our buffer self ! ChannelReadable @@ -147,23 +148,16 @@ abstract class TcpConnection(val selector: ActorRef, } def doWrite(handler: ActorRef): Unit = { - val write = pendingWrite - val data = write.data - - val buffer = directBuffer() - data.copyToBuffer(buffer) - buffer.flip() - try { - val writtenBytes = channel.write(buffer) - if (traceLoggingEnabled) log.debug("Wrote {} bytes", writtenBytes) - pendingWrite = consume(write, writtenBytes) + val writtenBytes = channel.write(pendingWrite.buffer) + if (traceLoggingEnabled) log.debug("Wrote {} bytes to channel", writtenBytes) - if (writePending) selector ! WriteInterest // still data to write - else if (write.wantsAck) { - pendingWriteCommander ! write.ack - pendingWriteCommander = null - } // everything written + if (pendingWrite.hasData) selector ! WriteInterest // still data to write + else if (pendingWrite.wantsAck) { // everything written + pendingWrite.commander ! pendingWrite.ack + releaseBuffer(pendingWrite.buffer) + pendingWrite = null + } } catch { case e: IOException ⇒ handleError(handler, e) } @@ -239,7 +233,7 @@ abstract class TcpConnection(val selector: ActorRef, closedMessage._1 ! msg if (writePending) - pendingWriteCommander ! msg + pendingWrite.commander ! msg } if (channel.isOpen) @@ -249,13 +243,15 @@ abstract class TcpConnection(val selector: ActorRef, override def postRestart(reason: Throwable): Unit = throw new IllegalStateException("Restarting not supported for connection actors.") - /** Returns a new write with `numBytes` removed from the front */ - def consume(write: Write, numBytes: Int): Write = - numBytes match { - case 0 ⇒ write - case x if x == write.data.length ⇒ Write.Empty - case _ ⇒ - require(numBytes > 0 && numBytes < write.data.length) - write.copy(data = write.data.drop(numBytes)) - } + private[TcpConnection] case class PendingWrite(commander: ActorRef, ack: AnyRef, buffer: ByteBuffer) { + def hasData = buffer.remaining() > 0 + def wantsAck = ack ne NoAck + } + def createWrite(write: Write): PendingWrite = { + val buffer = acquireBuffer(write.data.length) + write.data.copyToBuffer(buffer) + buffer.flip() + + PendingWrite(sender, write.ack, buffer) + } } diff --git a/akka-io/src/main/scala/akka/io/ThreadLocalDirectBuffer.scala b/akka-io/src/main/scala/akka/io/ThreadLocalDirectBuffer.scala deleted file mode 100644 index ab0b38510c..0000000000 --- a/akka-io/src/main/scala/akka/io/ThreadLocalDirectBuffer.scala +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.io - -import java.nio.ByteBuffer -import akka.actor.Actor - -/** - * Allows an actor to get a thread local direct buffer of a size defined in the - * configuration of the actor system. An underlying assumption is that all of - * the threads which call `getDirectBuffer` are owned by the actor system. - */ -trait ThreadLocalDirectBuffer { _: Actor ⇒ - def directBuffer(): ByteBuffer = { - val result = ThreadLocalDirectBuffer.threadLocalBuffer.get() - if (result == null) { - val size = Tcp(context.system).Settings.DirectBufferSize - val newBuffer = ByteBuffer.allocateDirect(size) - ThreadLocalDirectBuffer.threadLocalBuffer.set(newBuffer) - newBuffer - } else { - result.clear() - result - } - } -} - -object ThreadLocalDirectBuffer { - private val threadLocalBuffer = new ThreadLocal[ByteBuffer] -}