Massive refactoring of EBEDD and WorkStealer and basically everything...
This commit is contained in:
parent
5fdaad467f
commit
158ea29bfd
6 changed files with 280 additions and 201 deletions
|
|
@ -196,12 +196,6 @@ trait ActorRef extends
|
|||
*/
|
||||
@volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None
|
||||
|
||||
/**
|
||||
* This lock ensures thread safety in the dispatching: only one message can
|
||||
* be dispatched at once on the actor.
|
||||
*/
|
||||
protected[akka] val dispatcherLock = new ReentrantLock
|
||||
|
||||
/**
|
||||
* This is a reference to the message currently being processed by the actor
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -81,72 +81,70 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
init
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = {
|
||||
getMailbox(invocation.receiver) enqueue invocation
|
||||
dispatch(invocation.receiver)
|
||||
val mbox = getMailbox(invocation.receiver)
|
||||
mbox enqueue invocation
|
||||
dispatch(mbox)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the mailbox associated with the actor
|
||||
*/
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue]
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with Runnable]
|
||||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false)
|
||||
|
||||
def dispatch(receiver: ActorRef): Unit = if (active) {
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
var lockAcquiredOnce = false
|
||||
var finishedBeforeMailboxEmpty = false
|
||||
val lock = receiver.dispatcherLock
|
||||
val mailbox = getMailbox(receiver)
|
||||
// this do-while loop is required to prevent missing new messages between the end of the inner while
|
||||
// loop and releasing the lock
|
||||
do {
|
||||
if (lock.tryLock) {
|
||||
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
finishedBeforeMailboxEmpty = processMailbox(receiver,mailbox)
|
||||
} finally {
|
||||
lock.unlock
|
||||
if (finishedBeforeMailboxEmpty) dispatch(receiver)
|
||||
}
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = new DefaultMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,false) with Runnable {
|
||||
def run = {
|
||||
var lockAcquiredOnce = false
|
||||
var finishedBeforeMailboxEmpty = false
|
||||
// this do-while loop is required to prevent missing new messages between the end of the inner while
|
||||
// loop and releasing the lock
|
||||
do {
|
||||
if (dispatcherLock.tryLock()) {
|
||||
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
finishedBeforeMailboxEmpty = processMailbox()
|
||||
} finally {
|
||||
dispatcherLock.unlock()
|
||||
if (finishedBeforeMailboxEmpty)
|
||||
dispatch(this)
|
||||
}
|
||||
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
|
||||
}
|
||||
})
|
||||
} else {
|
||||
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver)
|
||||
}
|
||||
|
||||
}
|
||||
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !this.isEmpty))
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the messages in the mailbox of the given actor.
|
||||
* Process the messages in the mailbox
|
||||
*
|
||||
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
||||
*/
|
||||
def processMailbox(receiver: ActorRef,mailbox: MessageQueue): Boolean = {
|
||||
val throttle = throughput > 0
|
||||
var processedMessages = 0
|
||||
var nextMessage = mailbox.dequeue
|
||||
if (nextMessage ne null) {
|
||||
do {
|
||||
nextMessage.invoke
|
||||
def processMailbox(): Boolean = {
|
||||
val throttle = throughput > 0
|
||||
var processedMessages = 0
|
||||
var nextMessage = this.dequeue
|
||||
if (nextMessage ne null) {
|
||||
do {
|
||||
nextMessage.invoke
|
||||
|
||||
if(throttle) { //Will be JIT:Ed away when false
|
||||
processedMessages += 1
|
||||
if (processedMessages >= throughput) //If we're throttled, break out
|
||||
return !mailbox.isEmpty
|
||||
if(throttle) { //Will be JIT:Ed away when false
|
||||
processedMessages += 1
|
||||
if (processedMessages >= throughput) //If we're throttled, break out
|
||||
return !this.isEmpty
|
||||
}
|
||||
nextMessage = this.dequeue
|
||||
}
|
||||
nextMessage = mailbox.dequeue
|
||||
while (nextMessage ne null)
|
||||
}
|
||||
while (nextMessage ne null)
|
||||
}
|
||||
|
||||
false
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) {
|
||||
executor.execute(mailbox)
|
||||
} else {
|
||||
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
|
||||
}
|
||||
|
||||
def start = if (!active) {
|
||||
|
|
|
|||
|
|
@ -56,21 +56,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
/**
|
||||
* @return the mailbox associated with the actor
|
||||
*/
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]]
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable]
|
||||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = if (active) {
|
||||
getMailbox(invocation.receiver).add(invocation)
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
if (!tryProcessMailbox(invocation.receiver)) {
|
||||
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
|
||||
// to another actor and then process his mailbox in stead.
|
||||
findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) )
|
||||
}
|
||||
}
|
||||
})
|
||||
val mbox = getMailbox(invocation.receiver)
|
||||
mbox enqueue invocation
|
||||
executor execute mbox
|
||||
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
||||
|
||||
/**
|
||||
|
|
@ -79,22 +72,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
*
|
||||
* @return true if the mailbox was processed, false otherwise
|
||||
*/
|
||||
private def tryProcessMailbox(receiver: ActorRef): Boolean = {
|
||||
private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
|
||||
var lockAcquiredOnce = false
|
||||
val lock = receiver.dispatcherLock
|
||||
|
||||
// this do-wile loop is required to prevent missing new messages between the end of processing
|
||||
// the mailbox and releasing the lock
|
||||
do {
|
||||
if (lock.tryLock) {
|
||||
if (mailbox.dispatcherLock.tryLock) {
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
processMailbox(receiver)
|
||||
processMailbox(mailbox)
|
||||
} finally {
|
||||
lock.unlock
|
||||
mailbox.dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
} while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty))
|
||||
} while ((lockAcquiredOnce && !mailbox.isEmpty))
|
||||
|
||||
lockAcquiredOnce
|
||||
}
|
||||
|
|
@ -102,12 +94,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
/**
|
||||
* Process the messages in the mailbox of the given actor.
|
||||
*/
|
||||
private def processMailbox(receiver: ActorRef) = {
|
||||
val mailbox = getMailbox(receiver)
|
||||
var messageInvocation = mailbox.poll
|
||||
while (messageInvocation != null) {
|
||||
private def processMailbox(mailbox: MessageQueue) = {
|
||||
var messageInvocation = mailbox.dequeue
|
||||
while (messageInvocation ne null) {
|
||||
messageInvocation.invoke
|
||||
messageInvocation = mailbox.poll
|
||||
messageInvocation = mailbox.dequeue
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -145,11 +136,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
|
||||
*/
|
||||
private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
|
||||
if (thief.dispatcherLock.tryLock) {
|
||||
val mailbox = getMailbox(thief)
|
||||
if (mailbox.dispatcherLock.tryLock) {
|
||||
try {
|
||||
while(donateMessage(receiver, thief)) processMailbox(thief)
|
||||
while(donateMessage(receiver, thief)) processMailbox(mailbox)
|
||||
} finally {
|
||||
thief.dispatcherLock.unlock
|
||||
mailbox.dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -191,18 +183,45 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
}
|
||||
|
||||
protected override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]
|
||||
else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
|
||||
if (mailboxCapacity <= 0) {
|
||||
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable {
|
||||
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
|
||||
def dequeue: MessageInvocation = this.poll()
|
||||
|
||||
def run = {
|
||||
if (!tryProcessMailbox(this)) {
|
||||
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
|
||||
// to another actor and then process his mailbox in stead.
|
||||
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) with MessageQueue with Runnable {
|
||||
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
|
||||
|
||||
def dequeue: MessageInvocation = this.poll()
|
||||
|
||||
def run = {
|
||||
if (!tryProcessMailbox(this)) {
|
||||
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
|
||||
// to another actor and then process his mailbox in stead.
|
||||
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
verifyActorsAreOfSameType(actorRef)
|
||||
pooledActors.add(actorRef)
|
||||
pooledActors add actorRef
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
override def unregister(actorRef: ActorRef) = {
|
||||
pooledActors.remove(actorRef)
|
||||
pooledActors remove actorRef
|
||||
super.unregister(actorRef)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce
|
|||
|
||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
import se.scalablesolutions.akka.util.{Duration, HashCode, Logging}
|
||||
import java.util.{Queue, List}
|
||||
import java.util.concurrent._
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -63,6 +63,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageQueue {
|
||||
val dispatcherLock = new SimpleLock
|
||||
def enqueue(handle: MessageInvocation)
|
||||
def dequeue(): MessageInvocation
|
||||
def size: Int
|
||||
|
|
@ -84,40 +85,28 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD
|
|||
*/
|
||||
def newMailbox(bounds: Int = capacity,
|
||||
pushTime: Option[Duration] = pushTimeOut,
|
||||
blockDequeue: Boolean = blockingDequeue) : MessageQueue = {
|
||||
if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue
|
||||
new LinkedTransferQueue[MessageInvocation] with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation): Unit = this add handle
|
||||
def dequeue(): MessageInvocation = {
|
||||
if(blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue
|
||||
val time = pushTime.get
|
||||
new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation) {
|
||||
if (!this.offer(handle,time.length,time.unit))
|
||||
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString)
|
||||
}
|
||||
blockDequeue: Boolean = blockingDequeue) : MessageQueue = new DefaultMessageQueue(bounds,pushTime,blockDequeue)
|
||||
}
|
||||
|
||||
def dequeue(): MessageInvocation = {
|
||||
if (blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
class DefaultMessageQueue(override val capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends BoundableTransferQueue[MessageInvocation](capacity) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation) {
|
||||
if(bounded) {
|
||||
if (pushTimeOut.isDefined) {
|
||||
if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit))
|
||||
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString)
|
||||
}
|
||||
}
|
||||
else { //BOUNDED: Blocking enqueue and optionally blocking dequeue
|
||||
new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation): Unit = this put handle
|
||||
def dequeue(): MessageInvocation = {
|
||||
if(blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
else {
|
||||
this.put(handle)
|
||||
}
|
||||
} else {
|
||||
this.add(handle)
|
||||
}
|
||||
}
|
||||
|
||||
def dequeue(): MessageInvocation = {
|
||||
if (blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -156,14 +145,4 @@ trait MessageDispatcher extends Logging {
|
|||
* Creates and returns a mailbox for the given actor
|
||||
*/
|
||||
protected def createMailbox(actorRef: ActorRef): AnyRef = null
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageDemultiplexer {
|
||||
def select
|
||||
def wakeUp
|
||||
def acquireSelectedInvocations: List[MessageInvocation]
|
||||
def releaseSelectedInvocations
|
||||
}
|
||||
}
|
||||
|
|
@ -9,120 +9,160 @@ import java.util.concurrent.{TimeUnit, Semaphore}
|
|||
import java.util.Iterator
|
||||
import se.scalablesolutions.akka.util.Logger
|
||||
|
||||
class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
|
||||
require(capacity > 0)
|
||||
class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
|
||||
val bounded = (capacity > 0)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
protected lazy val guard = new Semaphore(capacity)
|
||||
|
||||
override def take(): E = {
|
||||
val e = super.take
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
if (!bounded) {
|
||||
super.take
|
||||
} else {
|
||||
val e = super.take
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
}
|
||||
|
||||
override def poll(): E = {
|
||||
val e = super.poll
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
if (!bounded) {
|
||||
super.poll
|
||||
} else {
|
||||
val e = super.poll
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
}
|
||||
|
||||
override def poll(timeout: Long, unit: TimeUnit): E = {
|
||||
val e = super.poll(timeout,unit)
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
if (!bounded) {
|
||||
super.poll(timeout,unit)
|
||||
} else {
|
||||
val e = super.poll(timeout,unit)
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
}
|
||||
|
||||
override def remainingCapacity = guard.availablePermits
|
||||
override def remainingCapacity: Int = {
|
||||
if (!bounded) super.remainingCapacity
|
||||
else guard.availablePermits
|
||||
}
|
||||
|
||||
override def remove(o: AnyRef): Boolean = {
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
if (!bounded) {
|
||||
super.remove(o)
|
||||
} else {
|
||||
false
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E): Boolean = {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
if (!bounded) {
|
||||
super.offer(e)
|
||||
} else {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
if (!bounded) {
|
||||
super.offer(e,timeout,unit)
|
||||
} else {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
override def add(e: E): Boolean = {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.add(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
if (!bounded) {
|
||||
super.add(e)
|
||||
} else {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.add(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
override def put(e :E): Unit = {
|
||||
guard.acquire
|
||||
try {
|
||||
if (!bounded) {
|
||||
super.put(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
} else {
|
||||
guard.acquire
|
||||
try {
|
||||
super.put(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E): Boolean = {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
if (!bounded) {
|
||||
super.tryTransfer(e)
|
||||
} else {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
if (!bounded) {
|
||||
super.tryTransfer(e,timeout,unit)
|
||||
} else {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
override def transfer(e: E): Unit = {
|
||||
if (guard.tryAcquire) {
|
||||
try {
|
||||
super.transfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
if (!bounded) {
|
||||
super.transfer(e)
|
||||
} else {
|
||||
if (guard.tryAcquire) {
|
||||
try {
|
||||
super.transfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -134,7 +174,8 @@ class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransfe
|
|||
def next = it.next
|
||||
def remove {
|
||||
it.remove
|
||||
guard.release //Assume remove worked if no exception was thrown
|
||||
if (bounded)
|
||||
guard.release //Assume remove worked if no exception was thrown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package se.scalablesolutions.akka.util
|
||||
|
||||
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -58,3 +59,50 @@ class ReadWriteGuard {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A very simple lock that uses CCAS (Compare Compare-And-Swap)
|
||||
* Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods
|
||||
*/
|
||||
class SimpleLock {
|
||||
val acquired = new AtomicBoolean(false)
|
||||
|
||||
def ifPossible(perform: () => Unit): Boolean = {
|
||||
if (tryLock()) {
|
||||
try {
|
||||
perform
|
||||
} finally {
|
||||
unlock()
|
||||
}
|
||||
true
|
||||
} else false
|
||||
}
|
||||
|
||||
def ifPossibleYield[T](perform: () => T): Option[T] = {
|
||||
if (tryLock()) {
|
||||
try {
|
||||
Some(perform())
|
||||
} finally {
|
||||
unlock()
|
||||
}
|
||||
} else None
|
||||
}
|
||||
|
||||
def ifPossibleApply[T,R](value: T)(function: (T) => R): Option[R] = {
|
||||
if (tryLock()) {
|
||||
try {
|
||||
Some(function(value))
|
||||
} finally {
|
||||
unlock()
|
||||
}
|
||||
} else None
|
||||
}
|
||||
|
||||
def tryLock() = {
|
||||
if (acquired.get) false
|
||||
else acquired.compareAndSet(false,true)
|
||||
}
|
||||
|
||||
def unlock() {
|
||||
acquired.set(false)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue