diff --git a/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala b/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala index 34ef40ffb7..eb0e583f2d 100644 --- a/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala +++ b/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala @@ -4,9 +4,9 @@ package akka.io +import java.util.concurrent.atomic.AtomicBoolean import java.nio.ByteBuffer -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit +import annotation.tailrec trait BufferPool { def acquire(): ByteBuffer @@ -27,9 +27,9 @@ trait BufferPool { * benefit to wrapping in-heap JVM data when writing with NIO. */ private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries: Int) extends BufferPool { - private[this] val lock = new ReentrantLock - private[this] val pool = new Array[ByteBuffer](maxPoolEntries) - private[this] var buffersInPool = 0 + private[this] val locked = new AtomicBoolean(false) + private[this] val pool: Array[ByteBuffer] = new Array[ByteBuffer](maxPoolEntries) + private[this] var buffersInPool: Int = 0 def acquire(): ByteBuffer = takeBufferFromPool() @@ -40,32 +40,32 @@ private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries: private def allocate(size: Int): ByteBuffer = ByteBuffer.allocateDirect(size) - private final def takeBufferFromPool(): ByteBuffer = { - var buffer: ByteBuffer = null - - if (lock.tryLock(1, TimeUnit.MILLISECONDS)) - try - if (buffersInPool > 0) { + @tailrec + private final def takeBufferFromPool(): ByteBuffer = + if (locked.compareAndSet(false, true)) { + val buffer = + try if (buffersInPool > 0) { buffersInPool -= 1 - buffer = pool(buffersInPool) - } - finally lock.unlock() + pool(buffersInPool) + } else null + finally locked.set(false) - // allocate new and clear outside the lock - if (buffer == null) - allocate(defaultBufferSize) - else { - buffer.clear() - buffer - } - } + // allocate new and clear outside the lock + if (buffer == null) + allocate(defaultBufferSize) + else { + buffer.clear() + buffer + } + } else takeBufferFromPool() // spin while locked + @tailrec private final def offerBufferToPool(buf: ByteBuffer): Unit = - if (lock.tryLock(1, TimeUnit.MILLISECONDS)) - try - if (buffersInPool < maxPoolEntries) { - pool(buffersInPool) = buf - buffersInPool += 1 - } // else let the buffer be gc'd - finally lock.unlock() + if (locked.compareAndSet(false, true)) + try if (buffersInPool < maxPoolEntries) { + pool(buffersInPool) = buf + buffersInPool += 1 + } // else let the buffer be gc'd + finally locked.set(false) + else offerBufferToPool(buf) // spin while locked }