diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index c41408a1de..7777051ac1 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -196,19 +196,10 @@ trait ActorRef extends */ @volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None - /** - * This lock ensures thread safety in the dispatching: only one message can - * be dispatched at once on the actor. - */ - protected[akka] val dispatcherLock = new ReentrantLock - /** * This is a reference to the message currently being processed by the actor */ - protected[akka] var _currentMessage: Option[MessageInvocation] = None - - protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } - protected[akka] def currentMessage = guard.withGuard { _currentMessage } + @volatile protected[akka] var currentMessage: MessageInvocation = null /** * Comparison only takes uuid into account. @@ -978,12 +969,12 @@ class LocalActorRef private[akka]( protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) - if (isRemotingEnabled && remoteAddress.isDefined) { + if (remoteAddress.isDefined && isRemotingEnabled) { RemoteClientModule.send[Any]( message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor) } else { val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) - invocation.send + dispatcher dispatch invocation } } @@ -994,7 +985,7 @@ class LocalActorRef private[akka]( senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { joinTransaction(message) - if (isRemotingEnabled && remoteAddress.isDefined) { + if (remoteAddress.isDefined && isRemotingEnabled) { val future = RemoteClientModule.send[T]( message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor) if (future.isDefined) future.get @@ -1004,7 +995,7 @@ class LocalActorRef private[akka]( else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get) - invocation.send + dispatcher dispatch invocation future } } @@ -1016,7 +1007,7 @@ class LocalActorRef private[akka]( if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { - currentMessage = Option(messageHandle) + currentMessage = messageHandle try { dispatch(messageHandle) } catch { @@ -1024,7 +1015,7 @@ class LocalActorRef private[akka]( Actor.log.error(e, "Could not invoke actor [%s]", this) throw e } finally { - currentMessage = None //TODO: Don't reset this, we might want to resend the message + currentMessage = null //TODO: Don't reset this, we might want to resend the message } } } @@ -1188,7 +1179,7 @@ class LocalActorRef private[akka]( } private def dispatch[T](messageHandle: MessageInvocation) = { - Actor.log.trace("Invoking actor with message:\n" + messageHandle) + Actor.log.trace("Invoking actor with message: %s\n",messageHandle) val message = messageHandle.message //serializeMessage(messageHandle.message) var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = @@ -1535,10 +1526,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent from another Actor, else None. */ def sender: Option[ActorRef] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.sender + if (msg eq null) None + else msg.sender } /** @@ -1546,10 +1536,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ def senderFuture(): Option[CompletableFuture[Any]] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.senderFuture + if (msg eq null) None + else msg.senderFuture } diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index f3a479e6fd..51bbfd3477 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -125,7 +125,7 @@ object ActorRegistry extends ListenerManagement { actorsByUUID.put(actor.uuid, actor) // notify listeners - foreachListener(_ ! ActorRegistered(actor)) + notifyListeners(ActorRegistered(actor)) } /** @@ -137,7 +137,7 @@ object ActorRegistry extends ListenerManagement { actorsById.remove(actor.id,actor) // notify listeners - foreachListener(_ ! ActorUnregistered(actor)) + notifyListeners(ActorUnregistered(actor)) } /** diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index dbbfe3442a..6cabdec5e5 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} import java.util.Queue -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} +import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -64,7 +64,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} */ class ExecutorBasedEventDrivenDispatcher( _name: String, - throughput: Int = Dispatchers.THROUGHPUT, + val throughput: Int = Dispatchers.THROUGHPUT, mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG, config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { @@ -80,71 +80,84 @@ class ExecutorBasedEventDrivenDispatcher( val name = "akka:event-driven:dispatcher:" + _name init + /** + * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox + */ + trait ExecutableMailbox extends Runnable { self: MessageQueue => + final def run = { + + val reschedule = try { + processMailbox() + } finally { + dispatcherLock.unlock() + } + + if (reschedule || !self.isEmpty) + registerForExecution(self) + } + + /** + * Process the messages in the mailbox + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + final def processMailbox(): Boolean = { + val throttle = throughput > 0 + var processedMessages = 0 + var nextMessage = self.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke + + if(throttle) { //Will be elided when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !self.isEmpty + } + nextMessage = self.dequeue + } + while (nextMessage ne null) + } + + false + } + } + def dispatch(invocation: MessageInvocation) = { - getMailbox(invocation.receiver) enqueue invocation - dispatch(invocation.receiver) + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + registerForExecution(mbox) + } + + protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { + if (mailbox.dispatcherLock.tryLock()) { + try { + executor execute mailbox + } catch { + case e: RejectedExecutionException => + mailbox.dispatcherLock.unlock() + throw e + } + } + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) } /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false) - - def dispatch(receiver: ActorRef): Unit = if (active) { - - executor.execute(new Runnable() { - def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - val lock = receiver.dispatcherLock - val mailbox = getMailbox(receiver) - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - if (lock.tryLock) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - try { - finishedBeforeMailboxEmpty = processMailbox(receiver) - } finally { - lock.unlock - if (finishedBeforeMailboxEmpty) dispatch(receiver) - } - } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) - } - }) - } else { - log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver) + override def createMailbox(actorRef: ActorRef): AnyRef = { + if (mailboxCapacity > 0) + new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with ExecutableMailbox + else + new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox } - /** - * Process the messages in the mailbox of the given actor. - * - * @return true if the processing finished before the mailbox was empty, due to the throughput constraint - */ - def processMailbox(receiver: ActorRef): Boolean = { - var processedMessages = 0 - val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.dequeue - while (messageInvocation != null) { - messageInvocation.invoke - processedMessages += 1 - // check if we simply continue with other messages, or reached the throughput limit - if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue - else { - messageInvocation = null - return !mailbox.isEmpty - } - } - false - } - def start = if (!active) { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) active = true @@ -157,8 +170,10 @@ class ExecutorBasedEventDrivenDispatcher( uuids.clear } - def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( + def ensureNotActive(): Unit = if (active) { + throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") + } override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 9b1097213e..10afb1bfb6 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -56,21 +56,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def dispatch(invocation: MessageInvocation) = if (active) { - getMailbox(invocation.receiver).add(invocation) - executor.execute(new Runnable() { - def run = { - if (!tryProcessMailbox(invocation.receiver)) { - // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox - // to another actor and then process his mailbox in stead. - findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) ) - } - } - }) + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + executor execute mbox } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") /** @@ -79,22 +72,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * * @return true if the mailbox was processed, false otherwise */ - private def tryProcessMailbox(receiver: ActorRef): Boolean = { + private def tryProcessMailbox(mailbox: MessageQueue): Boolean = { var lockAcquiredOnce = false - val lock = receiver.dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { - if (lock.tryLock) { + if (mailbox.dispatcherLock.tryLock) { lockAcquiredOnce = true try { - processMailbox(receiver) + processMailbox(mailbox) } finally { - lock.unlock + mailbox.dispatcherLock.unlock } } - } while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty)) + } while ((lockAcquiredOnce && !mailbox.isEmpty)) lockAcquiredOnce } @@ -102,12 +94,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Process the messages in the mailbox of the given actor. */ - private def processMailbox(receiver: ActorRef) = { - val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.poll - while (messageInvocation != null) { + private def processMailbox(mailbox: MessageQueue) = { + var messageInvocation = mailbox.dequeue + while (messageInvocation ne null) { messageInvocation.invoke - messageInvocation = mailbox.poll + messageInvocation = mailbox.dequeue } } @@ -145,11 +136,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. */ private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { - if (thief.dispatcherLock.tryLock) { + val mailbox = getMailbox(thief) + if (mailbox.dispatcherLock.tryLock) { try { - while(donateMessage(receiver, thief)) processMailbox(thief) + while(donateMessage(receiver, thief)) processMailbox(mailbox) } finally { - thief.dispatcherLock.unlock + mailbox.dispatcherLock.unlock } } } @@ -191,18 +183,44 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } protected override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation] - else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) + if (mailboxCapacity <= 0) { + new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { + def enqueue(handle: MessageInvocation): Unit = this.add(handle) + def dequeue: MessageInvocation = this.poll() + + def run = { + if (!tryProcessMailbox(this)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) + } + } + } + } + else { + new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) with MessageQueue with Runnable { + def enqueue(handle: MessageInvocation): Unit = this.add(handle) + def dequeue: MessageInvocation = this.poll() + + def run = { + if (!tryProcessMailbox(this)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) + } + } + } + } } override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) - pooledActors.add(actorRef) + pooledActors add actorRef super.register(actorRef) } override def unregister(actorRef: ActorRef) = { - pooledActors.remove(actorRef) + pooledActors remove actorRef super.unregister(actorRef) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 383c58905a..25a02f2603 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce import org.multiverse.commitbarriers.CountDownCommitBarrier import se.scalablesolutions.akka.AkkaException -import se.scalablesolutions.akka.util.{Duration, HashCode, Logging} import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue +import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} /** * @author Jonas Bonér @@ -30,8 +30,6 @@ final class MessageInvocation(val receiver: ActorRef, "Don't call 'self ! message' in the Actor's constructor (e.g. body of the class).") } - def send = receiver.dispatcher.dispatch(this) - override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, receiver.actor) @@ -63,6 +61,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m * @author Jonas Bonér */ trait MessageQueue { + val dispatcherLock = new SimpleLock def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int @@ -84,40 +83,36 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD */ def newMailbox(bounds: Int = capacity, pushTime: Option[Duration] = pushTimeOut, - blockDequeue: Boolean = blockingDequeue) : MessageQueue = { - if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue - new LinkedTransferQueue[MessageInvocation] with MessageQueue { - def enqueue(handle: MessageInvocation): Unit = this add handle - def dequeue(): MessageInvocation = { - if(blockDequeue) this.take() - else this.poll() - } - } - } - else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue - val time = pushTime.get - new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue { - def enqueue(handle: MessageInvocation) { - if (!this.offer(handle,time.length,time.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) - } + blockDequeue: Boolean = blockingDequeue) : MessageQueue = + if (capacity > 0) new DefaultBoundedMessageQueue(bounds,pushTime,blockDequeue) + else new DefaultUnboundedMessageQueue(blockDequeue) +} - def dequeue(): MessageInvocation = { - if (blockDequeue) this.take() - else this.poll() - } - } +class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { + final def enqueue(handle: MessageInvocation) { + this add handle + } + + final def dequeue(): MessageInvocation = + if (blockDequeue) this.take() + else this.poll() +} + +class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue { + final def enqueue(handle: MessageInvocation) { + if (pushTimeOut.isDefined) { + if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) } - else { //BOUNDED: Blocking enqueue and optionally blocking dequeue - new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue { - def enqueue(handle: MessageInvocation): Unit = this put handle - def dequeue(): MessageInvocation = { - if(blockDequeue) this.take() - else this.poll() - } - } + else { + this put handle } } + + final def dequeue(): MessageInvocation = + if (blockDequeue) this.take() + else this.poll() + } /** @@ -139,7 +134,7 @@ trait MessageDispatcher extends Logging { } def unregister(actorRef: ActorRef) = { uuids remove actorRef.uuid - //actorRef.mailbox = null //FIXME should we null out the mailbox here? + actorRef.mailbox = null if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } @@ -156,14 +151,4 @@ trait MessageDispatcher extends Logging { * Creates and returns a mailbox for the given actor */ protected def createMailbox(actorRef: ActorRef): AnyRef = null -} - -/** - * @author Jonas Bonér - */ -trait MessageDemultiplexer { - def select - def wakeUp - def acquireSelectedInvocations: List[MessageInvocation] - def releaseSelectedInvocations -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala deleted file mode 100644 index 2ba88f25c3..0000000000 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import concurrent.forkjoin.LinkedTransferQueue -import java.util.concurrent.{TimeUnit, Semaphore} -import java.util.Iterator -import se.scalablesolutions.akka.util.Logger - -class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { - require(capacity > 0) - - protected val guard = new Semaphore(capacity) - - override def take(): E = { - val e = super.take - if (e ne null) guard.release - e - } - - override def poll(): E = { - val e = super.poll - if (e ne null) guard.release - e - } - - override def poll(timeout: Long, unit: TimeUnit): E = { - val e = super.poll(timeout,unit) - if (e ne null) guard.release - e - } - - override def remainingCapacity = guard.availablePermits - - override def remove(o: AnyRef): Boolean = { - if (super.remove(o)) { - guard.release - true - } else { - false - } - } - - override def offer(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false - } - - override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false - } - - override def add(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.add(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false - } - - override def put(e :E): Unit = { - guard.acquire - try { - super.put(e) - } catch { - case e => guard.release; throw e - } - } - - override def tryTransfer(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false - } - - override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false - } - - override def transfer(e: E): Unit = { - if (guard.tryAcquire) { - try { - super.transfer(e) - } catch { - case e => guard.release; throw e - } - } - } - - override def iterator: Iterator[E] = { - val it = super.iterator - new Iterator[E] { - def hasNext = it.hasNext - def next = it.next - def remove { - it.remove - guard.release //Assume remove worked if no exception was thrown - } - } - } -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/util/ListenerManagement.scala b/akka-actor/src/main/scala/util/ListenerManagement.scala index 0e17058380..7ad0f451f1 100644 --- a/akka-actor/src/main/scala/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/util/ListenerManagement.scala @@ -40,6 +40,23 @@ trait ListenerManagement extends Logging { if (manageLifeCycleOfListeners) listener.stop } + /* + * Returns whether there are any listeners currently + */ + def hasListeners: Boolean = !listeners.isEmpty + + protected def notifyListeners(message: => Any) { + if (hasListeners) { + val msg = message + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + if (listener.isRunning) listener ! msg + else log.warning("Can't notify [%s] since it is not running.", listener) + } + } + } + /** * Execute f with each listener as argument. */ diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 885e11def7..3d1261e468 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} +import java.util.concurrent.atomic.AtomicBoolean /** * @author Jonas Bonér @@ -58,3 +59,56 @@ class ReadWriteGuard { } } +/** + * A very simple lock that uses CCAS (Compare Compare-And-Swap) + * Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods + */ +class SimpleLock { + val acquired = new AtomicBoolean(false) + + def ifPossible(perform: () => Unit): Boolean = { + if (tryLock()) { + try { + perform + } finally { + unlock() + } + true + } else false + } + + def ifPossibleYield[T](perform: () => T): Option[T] = { + if (tryLock()) { + try { + Some(perform()) + } finally { + unlock() + } + } else None + } + + def ifPossibleApply[T,R](value: T)(function: (T) => R): Option[R] = { + if (tryLock()) { + try { + Some(function(value)) + } finally { + unlock() + } + } else None + } + + def tryLock() = { + if (acquired.get) false + else acquired.compareAndSet(false,true) + } + + def tryUnlock() = { + acquired.compareAndSet(true,false) + } + + def locked = acquired.get + + def unlock() { + acquired.set(false) + } +} \ No newline at end of file diff --git a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index cde57a0544..3285e450c6 100644 --- a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -5,11 +5,10 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test -import se.scalablesolutions.akka.dispatch.Dispatchers - import java.util.concurrent.{TimeUnit, CountDownLatch} import se.scalablesolutions.akka.actor.{IllegalActorStateException, Actor} import Actor._ +import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers} object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") @@ -18,7 +17,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { self.dispatcher = delayableActorDispatcher - var invocationCount = 0 + @volatile var invocationCount = 0 self.id = name def receive = { @@ -61,10 +60,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with val slow = actorOf(new DelayableActor("slow", 50, finishedCounter)).start val fast = actorOf(new DelayableActor("fast", 10, finishedCounter)).start + var sentToFast = 0 + for (i <- 1 to 100) { // send most work to slow actor - if (i % 20 == 0) + if (i % 20 == 0) { fast ! i + sentToFast += 1 + } else slow ! i } @@ -72,13 +75,18 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with // now send some messages to actors to keep the dispatcher dispatching messages for (i <- 1 to 10) { Thread.sleep(150) - if (i % 2 == 0) + if (i % 2 == 0) { fast ! i + sentToFast += 1 + } else slow ! i } finishedCounter.await(5, TimeUnit.SECONDS) + fast.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) + slow.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) + fast.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.actor.asInstanceOf[DelayableActor].invocationCount) slow.stop diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index f61a5d63a1..264081259f 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -220,10 +220,10 @@ class RemoteClient private[akka] ( val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - foreachListener(_ ! RemoteClientError(connection.getCause, this)) + notifyListeners(RemoteClientError(connection.getCause, this)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } - foreachListener(_ ! RemoteClientStarted(this)) + notifyListeners(RemoteClientStarted(this)) isRunning = true } } @@ -232,7 +232,7 @@ class RemoteClient private[akka] ( log.info("Shutting down %s", name) if (isRunning) { isRunning = false - foreachListener(_ ! RemoteClientShutdown(this)) + notifyListeners(RemoteClientShutdown(this)) timer.stop timer = null openChannels.close.awaitUninterruptibly @@ -250,7 +250,7 @@ class RemoteClient private[akka] ( @deprecated("Use removeListener instead") def deregisterListener(actorRef: ActorRef) = removeListener(actorRef) - override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected override def manageLifeCycleOfListeners = false @@ -287,7 +287,7 @@ class RemoteClient private[akka] ( } else { val exception = new RemoteClientException( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - foreachListener(l => l ! RemoteClientError(exception, this)) + notifyListeners(RemoteClientError(exception, this)) throw exception } @@ -403,12 +403,12 @@ class RemoteClientHandler( futures.remove(reply.getId) } else { val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.foreachListener(_ ! RemoteClientError(exception, client)) + client.notifyListeners(RemoteClientError(exception, client)) throw exception } } catch { case e: Exception => - client.foreachListener(_ ! RemoteClientError(e, client)) + client.notifyListeners(RemoteClientError(e, client)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -423,7 +423,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client)) + client.notifyListeners(RemoteClientError(client.connection.getCause, client)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -433,7 +433,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.foreachListener(_ ! RemoteClientConnected(client)) + client.notifyListeners(RemoteClientConnected(client)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -450,12 +450,12 @@ class RemoteClientHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.foreachListener(_ ! RemoteClientDisconnected(client)) + client.notifyListeners(RemoteClientDisconnected(client)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.foreachListener(_ ! RemoteClientError(event.getCause, client)) + client.notifyListeners(RemoteClientError(event.getCause, client)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index b10d8e5825..27b5a50bfc 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) - foreachListener(_ ! RemoteServerStarted(this)) + notifyListeners(RemoteServerStarted(this)) } } catch { case e => log.error(e, "Could not start up remote server") - foreachListener(_ ! RemoteServerError(e, this)) + notifyListeners(RemoteServerError(e, this)) } this } @@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) - foreachListener(_ ! RemoteServerShutdown(this)) + notifyListeners(RemoteServerShutdown(this)) } catch { case e: java.nio.channels.ClosedChannelException => {} case e => log.warning("Could not close remote server channel in a graceful way") @@ -323,7 +323,7 @@ class RemoteServer extends Logging with ListenerManagement { protected override def manageLifeCycleOfListeners = false - protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { RemoteServer.actorsFor(address).actors @@ -413,18 +413,18 @@ class RemoteServerHandler( def operationComplete(future: ChannelFuture): Unit = { if (future.isSuccess) { openChannels.add(future.getChannel) - server.foreachListener(_ ! RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server)) } else future.getChannel.close } }) } else { - server.foreachListener(_ ! RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server)) } } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { log.debug("Remote client disconnected from [%s]", server.name) - server.foreachListener(_ ! RemoteServerClientDisconnected(server)) + server.notifyListeners(RemoteServerClientDisconnected(server)) } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -446,7 +446,7 @@ class RemoteServerHandler( override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close - server.foreachListener(_ ! RemoteServerError(event.getCause, server)) + server.notifyListeners(RemoteServerError(event.getCause, server)) } private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { @@ -491,7 +491,7 @@ class RemoteServerHandler( } catch { case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) } } } @@ -523,10 +523,10 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => channel.write(createErrorReplyMessage(e, request, false)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) } } @@ -559,7 +559,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote actor instance") - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) throw e } } else actorRefOrNull @@ -590,7 +590,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote typed actor instance") - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) throw e } } else typedActorOrNull diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index b27f5b4b4d..52bd0e6cb6 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -674,7 +674,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { def register(proxy: AnyRef, init: AspectInit) = { val res = initializations.put(proxy, init) - foreachListener(_ ! AspectInitRegistered(proxy, init)) + notifyListeners(AspectInitRegistered(proxy, init)) res } @@ -683,7 +683,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { */ def unregister(proxy: AnyRef): AspectInit = { val init = initializations.remove(proxy) - foreachListener(_ ! AspectInitUnregistered(proxy, init)) + notifyListeners(AspectInitUnregistered(proxy, init)) init.actorRef.stop init }