diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index c41408a1de..86b6c2ec65 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -196,12 +196,6 @@ trait ActorRef extends */ @volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None - /** - * This lock ensures thread safety in the dispatching: only one message can - * be dispatched at once on the actor. - */ - protected[akka] val dispatcherLock = new ReentrantLock - /** * This is a reference to the message currently being processed by the actor */ diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index bbe4b53cde..0e9acf62e1 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -81,72 +81,70 @@ class ExecutorBasedEventDrivenDispatcher( init def dispatch(invocation: MessageInvocation) = { - getMailbox(invocation.receiver) enqueue invocation - dispatch(invocation.receiver) + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + dispatch(mbox) } /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with Runnable] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false) - - def dispatch(receiver: ActorRef): Unit = if (active) { - - executor.execute(new Runnable() { - def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - val lock = receiver.dispatcherLock - val mailbox = getMailbox(receiver) - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - if (lock.tryLock) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - try { - finishedBeforeMailboxEmpty = processMailbox(receiver,mailbox) - } finally { - lock.unlock - if (finishedBeforeMailboxEmpty) dispatch(receiver) - } + override def createMailbox(actorRef: ActorRef): AnyRef = new DefaultMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,false) with Runnable { + def run = { + var lockAcquiredOnce = false + var finishedBeforeMailboxEmpty = false + // this do-while loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock + do { + if (dispatcherLock.tryLock()) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + lockAcquiredOnce = true + try { + finishedBeforeMailboxEmpty = processMailbox() + } finally { + dispatcherLock.unlock() + if (finishedBeforeMailboxEmpty) + dispatch(this) } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) - } - }) - } else { - log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver) - } - + } + } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !this.isEmpty)) + } /** - * Process the messages in the mailbox of the given actor. + * Process the messages in the mailbox * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - def processMailbox(receiver: ActorRef,mailbox: MessageQueue): Boolean = { - val throttle = throughput > 0 - var processedMessages = 0 - var nextMessage = mailbox.dequeue - if (nextMessage ne null) { - do { - nextMessage.invoke + def processMailbox(): Boolean = { + val throttle = throughput > 0 + var processedMessages = 0 + var nextMessage = this.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke - if(throttle) { //Will be JIT:Ed away when false - processedMessages += 1 - if (processedMessages >= throughput) //If we're throttled, break out - return !mailbox.isEmpty + if(throttle) { //Will be JIT:Ed away when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !this.isEmpty + } + nextMessage = this.dequeue } - nextMessage = mailbox.dequeue + while (nextMessage ne null) } - while (nextMessage ne null) - } - false + false + } + } + + def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) { + executor.execute(mailbox) + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) } def start = if (!active) { diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 9b1097213e..c97c16c238 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -56,21 +56,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def dispatch(invocation: MessageInvocation) = if (active) { - getMailbox(invocation.receiver).add(invocation) - executor.execute(new Runnable() { - def run = { - if (!tryProcessMailbox(invocation.receiver)) { - // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox - // to another actor and then process his mailbox in stead. - findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) ) - } - } - }) + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + executor execute mbox } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") /** @@ -79,22 +72,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * * @return true if the mailbox was processed, false otherwise */ - private def tryProcessMailbox(receiver: ActorRef): Boolean = { + private def tryProcessMailbox(mailbox: MessageQueue): Boolean = { var lockAcquiredOnce = false - val lock = receiver.dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { - if (lock.tryLock) { + if (mailbox.dispatcherLock.tryLock) { lockAcquiredOnce = true try { - processMailbox(receiver) + processMailbox(mailbox) } finally { - lock.unlock + mailbox.dispatcherLock.unlock } } - } while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty)) + } while ((lockAcquiredOnce && !mailbox.isEmpty)) lockAcquiredOnce } @@ -102,12 +94,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Process the messages in the mailbox of the given actor. */ - private def processMailbox(receiver: ActorRef) = { - val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.poll - while (messageInvocation != null) { + private def processMailbox(mailbox: MessageQueue) = { + var messageInvocation = mailbox.dequeue + while (messageInvocation ne null) { messageInvocation.invoke - messageInvocation = mailbox.poll + messageInvocation = mailbox.dequeue } } @@ -145,11 +136,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. */ private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { - if (thief.dispatcherLock.tryLock) { + val mailbox = getMailbox(thief) + if (mailbox.dispatcherLock.tryLock) { try { - while(donateMessage(receiver, thief)) processMailbox(thief) + while(donateMessage(receiver, thief)) processMailbox(mailbox) } finally { - thief.dispatcherLock.unlock + mailbox.dispatcherLock.unlock } } } @@ -191,18 +183,45 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } protected override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation] - else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) + if (mailboxCapacity <= 0) { + new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { + def enqueue(handle: MessageInvocation): Unit = this.add(handle) + def dequeue: MessageInvocation = this.poll() + + def run = { + if (!tryProcessMailbox(this)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) + } + } + } + } + else { + new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) with MessageQueue with Runnable { + def enqueue(handle: MessageInvocation): Unit = this.add(handle) + + def dequeue: MessageInvocation = this.poll() + + def run = { + if (!tryProcessMailbox(this)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) + } + } + } + } } override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) - pooledActors.add(actorRef) + pooledActors add actorRef super.register(actorRef) } override def unregister(actorRef: ActorRef) = { - pooledActors.remove(actorRef) + pooledActors remove actorRef super.unregister(actorRef) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 383c58905a..015ae9422b 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce import org.multiverse.commitbarriers.CountDownCommitBarrier import se.scalablesolutions.akka.AkkaException -import se.scalablesolutions.akka.util.{Duration, HashCode, Logging} import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue +import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} /** * @author Jonas Bonér @@ -63,6 +63,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m * @author Jonas Bonér */ trait MessageQueue { + val dispatcherLock = new SimpleLock def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int @@ -84,40 +85,28 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD */ def newMailbox(bounds: Int = capacity, pushTime: Option[Duration] = pushTimeOut, - blockDequeue: Boolean = blockingDequeue) : MessageQueue = { - if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue - new LinkedTransferQueue[MessageInvocation] with MessageQueue { - def enqueue(handle: MessageInvocation): Unit = this add handle - def dequeue(): MessageInvocation = { - if(blockDequeue) this.take() - else this.poll() - } - } - } - else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue - val time = pushTime.get - new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue { - def enqueue(handle: MessageInvocation) { - if (!this.offer(handle,time.length,time.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) - } + blockDequeue: Boolean = blockingDequeue) : MessageQueue = new DefaultMessageQueue(bounds,pushTime,blockDequeue) +} - def dequeue(): MessageInvocation = { - if (blockDequeue) this.take() - else this.poll() - } +class DefaultMessageQueue(override val capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends BoundableTransferQueue[MessageInvocation](capacity) with MessageQueue { + def enqueue(handle: MessageInvocation) { + if(bounded) { + if (pushTimeOut.isDefined) { + if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) } - } - else { //BOUNDED: Blocking enqueue and optionally blocking dequeue - new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue { - def enqueue(handle: MessageInvocation): Unit = this put handle - def dequeue(): MessageInvocation = { - if(blockDequeue) this.take() - else this.poll() - } + else { + this.put(handle) } + } else { + this.add(handle) } } + + def dequeue(): MessageInvocation = { + if (blockDequeue) this.take() + else this.poll() + } } /** @@ -156,14 +145,4 @@ trait MessageDispatcher extends Logging { * Creates and returns a mailbox for the given actor */ protected def createMailbox(actorRef: ActorRef): AnyRef = null -} - -/** - * @author Jonas Bonér - */ -trait MessageDemultiplexer { - def select - def wakeUp - def acquireSelectedInvocations: List[MessageInvocation] - def releaseSelectedInvocations -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala index 2ba88f25c3..8c75d6a42b 100644 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ b/akka-actor/src/main/scala/dispatch/Queues.scala @@ -9,120 +9,160 @@ import java.util.concurrent.{TimeUnit, Semaphore} import java.util.Iterator import se.scalablesolutions.akka.util.Logger -class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { - require(capacity > 0) +class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { + val bounded = (capacity > 0) - protected val guard = new Semaphore(capacity) + protected lazy val guard = new Semaphore(capacity) override def take(): E = { - val e = super.take - if (e ne null) guard.release - e + if (!bounded) { + super.take + } else { + val e = super.take + if (e ne null) guard.release + e + } } override def poll(): E = { - val e = super.poll - if (e ne null) guard.release - e + if (!bounded) { + super.poll + } else { + val e = super.poll + if (e ne null) guard.release + e + } } override def poll(timeout: Long, unit: TimeUnit): E = { - val e = super.poll(timeout,unit) - if (e ne null) guard.release - e + if (!bounded) { + super.poll(timeout,unit) + } else { + val e = super.poll(timeout,unit) + if (e ne null) guard.release + e + } } - override def remainingCapacity = guard.availablePermits + override def remainingCapacity: Int = { + if (!bounded) super.remainingCapacity + else guard.availablePermits + } override def remove(o: AnyRef): Boolean = { - if (super.remove(o)) { - guard.release - true + if (!bounded) { + super.remove(o) } else { - false + if (super.remove(o)) { + guard.release + true + } else false } } override def offer(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.offer(e) + } else { + if (guard.tryAcquire) { + val result = try { + super.offer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.offer(e,timeout,unit) + } else { + if (guard.tryAcquire(timeout,unit)) { + val result = try { + super.offer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def add(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.add(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.add(e) + } else { + if (guard.tryAcquire) { + val result = try { + super.add(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def put(e :E): Unit = { - guard.acquire - try { + if (!bounded) { super.put(e) - } catch { - case e => guard.release; throw e + } else { + guard.acquire + try { + super.put(e) + } catch { + case e => guard.release; throw e + } } } override def tryTransfer(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.tryTransfer(e) + } else { + if (guard.tryAcquire) { + val result = try { + super.tryTransfer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.tryTransfer(e,timeout,unit) + } else { + if (guard.tryAcquire(timeout,unit)) { + val result = try { + super.tryTransfer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def transfer(e: E): Unit = { - if (guard.tryAcquire) { - try { - super.transfer(e) - } catch { - case e => guard.release; throw e + if (!bounded) { + super.transfer(e) + } else { + if (guard.tryAcquire) { + try { + super.transfer(e) + } catch { + case e => guard.release; throw e + } } } } @@ -134,7 +174,8 @@ class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransfe def next = it.next def remove { it.remove - guard.release //Assume remove worked if no exception was thrown + if (bounded) + guard.release //Assume remove worked if no exception was thrown } } } diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 885e11def7..ee7f4f0efc 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} +import java.util.concurrent.atomic.AtomicBoolean /** * @author Jonas Bonér @@ -58,3 +59,50 @@ class ReadWriteGuard { } } +/** + * A very simple lock that uses CCAS (Compare Compare-And-Swap) + * Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods + */ +class SimpleLock { + val acquired = new AtomicBoolean(false) + + def ifPossible(perform: () => Unit): Boolean = { + if (tryLock()) { + try { + perform + } finally { + unlock() + } + true + } else false + } + + def ifPossibleYield[T](perform: () => T): Option[T] = { + if (tryLock()) { + try { + Some(perform()) + } finally { + unlock() + } + } else None + } + + def ifPossibleApply[T,R](value: T)(function: (T) => R): Option[R] = { + if (tryLock()) { + try { + Some(function(value)) + } finally { + unlock() + } + } else None + } + + def tryLock() = { + if (acquired.get) false + else acquired.compareAndSet(false,true) + } + + def unlock() { + acquired.set(false) + } +} \ No newline at end of file