Revert "=act #17216 fix DirectByteBufferPoll locking"
This reverts commit 4528ef8c45.
This commit is contained in:
parent
0c659cd923
commit
baa37f330c
1 changed files with 29 additions and 29 deletions
|
|
@ -4,9 +4,9 @@
|
|||
|
||||
package akka.io
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.TimeUnit
|
||||
import annotation.tailrec
|
||||
|
||||
trait BufferPool {
|
||||
def acquire(): ByteBuffer
|
||||
|
|
@ -27,9 +27,9 @@ trait BufferPool {
|
|||
* benefit to wrapping in-heap JVM data when writing with NIO.
|
||||
*/
|
||||
private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries: Int) extends BufferPool {
|
||||
private[this] val lock = new ReentrantLock
|
||||
private[this] val pool = new Array[ByteBuffer](maxPoolEntries)
|
||||
private[this] var buffersInPool = 0
|
||||
private[this] val locked = new AtomicBoolean(false)
|
||||
private[this] val pool: Array[ByteBuffer] = new Array[ByteBuffer](maxPoolEntries)
|
||||
private[this] var buffersInPool: Int = 0
|
||||
|
||||
def acquire(): ByteBuffer =
|
||||
takeBufferFromPool()
|
||||
|
|
@ -40,32 +40,32 @@ private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries:
|
|||
private def allocate(size: Int): ByteBuffer =
|
||||
ByteBuffer.allocateDirect(size)
|
||||
|
||||
private final def takeBufferFromPool(): ByteBuffer = {
|
||||
var buffer: ByteBuffer = null
|
||||
|
||||
if (lock.tryLock(1, TimeUnit.MILLISECONDS))
|
||||
try
|
||||
if (buffersInPool > 0) {
|
||||
@tailrec
|
||||
private final def takeBufferFromPool(): ByteBuffer =
|
||||
if (locked.compareAndSet(false, true)) {
|
||||
val buffer =
|
||||
try if (buffersInPool > 0) {
|
||||
buffersInPool -= 1
|
||||
buffer = pool(buffersInPool)
|
||||
}
|
||||
finally lock.unlock()
|
||||
pool(buffersInPool)
|
||||
} else null
|
||||
finally locked.set(false)
|
||||
|
||||
// allocate new and clear outside the lock
|
||||
if (buffer == null)
|
||||
allocate(defaultBufferSize)
|
||||
else {
|
||||
buffer.clear()
|
||||
buffer
|
||||
}
|
||||
}
|
||||
// allocate new and clear outside the lock
|
||||
if (buffer == null)
|
||||
allocate(defaultBufferSize)
|
||||
else {
|
||||
buffer.clear()
|
||||
buffer
|
||||
}
|
||||
} else takeBufferFromPool() // spin while locked
|
||||
|
||||
@tailrec
|
||||
private final def offerBufferToPool(buf: ByteBuffer): Unit =
|
||||
if (lock.tryLock(1, TimeUnit.MILLISECONDS))
|
||||
try
|
||||
if (buffersInPool < maxPoolEntries) {
|
||||
pool(buffersInPool) = buf
|
||||
buffersInPool += 1
|
||||
} // else let the buffer be gc'd
|
||||
finally lock.unlock()
|
||||
if (locked.compareAndSet(false, true))
|
||||
try if (buffersInPool < maxPoolEntries) {
|
||||
pool(buffersInPool) = buf
|
||||
buffersInPool += 1
|
||||
} // else let the buffer be gc'd
|
||||
finally locked.set(false)
|
||||
else offerBufferToPool(buf) // spin while locked
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue