2013-01-17 14:31:35 +01:00
|
|
|
package akka.io
|
|
|
|
|
|
2013-01-21 14:45:19 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean
|
2013-01-17 14:31:35 +01:00
|
|
|
import java.nio.ByteBuffer
|
|
|
|
|
import annotation.tailrec
|
|
|
|
|
|
|
|
|
|
trait WithBufferPool {
|
|
|
|
|
def tcp: TcpExt
|
|
|
|
|
|
|
|
|
|
def acquireBuffer(): ByteBuffer =
|
|
|
|
|
tcp.bufferPool.acquire()
|
|
|
|
|
|
|
|
|
|
def releaseBuffer(buffer: ByteBuffer) =
|
|
|
|
|
tcp.bufferPool.release(buffer)
|
|
|
|
|
}
|
|
|
|
|
|
2013-01-21 14:45:19 +01:00
|
|
|
trait BufferPool {
|
|
|
|
|
def acquire(): ByteBuffer
|
|
|
|
|
def release(buf: ByteBuffer): Unit
|
|
|
|
|
}
|
|
|
|
|
|
2013-01-17 14:31:35 +01:00
|
|
|
/**
|
2013-01-21 14:45:19 +01:00
|
|
|
* A buffer pool which keeps a free list of direct buffers of a specified default
|
|
|
|
|
* size in a simple fixed size stack.
|
2013-01-17 14:31:35 +01:00
|
|
|
*
|
2013-01-21 14:45:19 +01:00
|
|
|
* If the stack is full a buffer offered back is not kept but will be let for
|
|
|
|
|
* being freed by normal garbage collection.
|
2013-01-17 14:31:35 +01:00
|
|
|
*/
|
2013-01-21 14:45:19 +01:00
|
|
|
private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries: Int) extends BufferPool {
|
|
|
|
|
private[this] val locked = new AtomicBoolean(false)
|
2013-01-21 15:52:19 +01:00
|
|
|
private[this] val pool: Array[ByteBuffer] = new Array[ByteBuffer](maxPoolEntries)
|
2013-01-21 14:45:19 +01:00
|
|
|
private[this] var buffersInPool: Int = 0
|
2013-01-17 14:31:35 +01:00
|
|
|
|
2013-01-21 15:52:19 +01:00
|
|
|
def acquire(): ByteBuffer =
|
|
|
|
|
takeBufferFromPool()
|
2013-01-17 14:31:35 +01:00
|
|
|
|
|
|
|
|
def release(buf: ByteBuffer): Unit =
|
2013-01-21 14:45:19 +01:00
|
|
|
offerBufferToPool(buf)
|
2013-01-17 14:31:35 +01:00
|
|
|
|
2013-01-21 14:45:19 +01:00
|
|
|
private def allocate(size: Int): ByteBuffer =
|
|
|
|
|
ByteBuffer.allocateDirect(size)
|
2013-01-17 17:29:44 +01:00
|
|
|
|
2013-01-17 14:31:35 +01:00
|
|
|
@tailrec
|
2013-01-21 15:52:19 +01:00
|
|
|
private final def takeBufferFromPool(): ByteBuffer =
|
|
|
|
|
if (locked.compareAndSet(false, true)) {
|
|
|
|
|
val buffer =
|
|
|
|
|
try if (buffersInPool > 0) {
|
|
|
|
|
buffersInPool -= 1
|
|
|
|
|
pool(buffersInPool)
|
|
|
|
|
} else null
|
|
|
|
|
finally locked.set(false)
|
2013-01-21 14:45:19 +01:00
|
|
|
|
2013-01-21 15:52:19 +01:00
|
|
|
// allocate new and clear outside the lock
|
|
|
|
|
if (buffer == null)
|
|
|
|
|
allocate(defaultBufferSize)
|
|
|
|
|
else {
|
|
|
|
|
buffer.clear()
|
|
|
|
|
buffer
|
|
|
|
|
}
|
|
|
|
|
} else takeBufferFromPool() // spin while locked
|
2013-01-17 14:31:35 +01:00
|
|
|
|
|
|
|
|
@tailrec
|
2013-01-21 14:45:19 +01:00
|
|
|
private final def offerBufferToPool(buf: ByteBuffer): Unit =
|
|
|
|
|
if (locked.compareAndSet(false, true))
|
|
|
|
|
try if (buffersInPool < maxPoolEntries) {
|
2013-01-21 15:52:19 +01:00
|
|
|
pool(buffersInPool) = buf
|
2013-01-21 14:45:19 +01:00
|
|
|
buffersInPool += 1
|
|
|
|
|
} // else let the buffer be gc'd
|
|
|
|
|
finally locked.set(false)
|
|
|
|
|
else offerBufferToPool(buf) // spin while locked
|
2013-01-17 14:31:35 +01:00
|
|
|
}
|