From 990b933d8febaf01f61c383930e657ce69f814a9 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 24 Oct 2010 16:01:00 +0200 Subject: [PATCH] Moved active flag into MessageDispatcher and let it handle the callbacks, also fixed race in DataFlowSpec --- .../src/main/scala/actor/ActorRef.scala | 4 +- .../ExecutorBasedEventDrivenDispatcher.scala | 9 +-- ...sedEventDrivenWorkStealingDispatcher.scala | 13 ++--- .../main/scala/dispatch/HawtDispatcher.scala | 10 +--- .../main/scala/dispatch/MessageHandling.scala | 57 +++++++++++++++---- .../test/scala/dataflow/DataFlowSpec.scala | 2 +- 6 files changed, 60 insertions(+), 35 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 870b74edd2..92a7800128 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -964,7 +964,7 @@ class LocalActorRef private[akka] ( message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor) } else { val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) - dispatcher dispatch invocation + dispatcher dispatchMessage invocation } } @@ -985,7 +985,7 @@ class LocalActorRef private[akka] ( else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get) - dispatcher dispatch invocation + dispatcher dispatchMessage invocation future } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 2b0a7cbfe0..45b1dfa886 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -91,11 +91,10 @@ class ExecutorBasedEventDrivenDispatcher( val name = "akka:event-driven:dispatcher:" + _name val mailboxType = Some(_mailboxType) - private[akka] val active = new Switch(false) private[akka] val threadFactory = new MonitorableThreadFactory(name) private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) - def dispatch(invocation: MessageInvocation) = { + protected def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation registerForExecution(mbox) @@ -132,11 +131,9 @@ class ExecutorBasedEventDrivenDispatcher( case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } - protected def start: Unit = active switchOn { - log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) - } + protected def start= log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) - protected def shutdown: Unit = active switchOff { + protected def shutdown { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index a6e40d2f50..f357ff4ab3 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -44,7 +44,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( val mailboxType = Some(_mailboxType) val name = "akka:event-driven-work-stealing:dispatcher:" + _name - private val active = new Switch(false) /** Type of the actors registered in this dispatcher. */ @volatile private var actorType: Option[Class[_]] = None @@ -55,8 +54,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** The index in the pooled actors list which was last used to steal work */ @volatile private var lastThiefIndex = 0 - - /** * @return the mailbox associated with the actor */ @@ -64,11 +61,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - def dispatch(invocation: MessageInvocation) = if (active.isOn) { + protected def dispatch(invocation: MessageInvocation) { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation executorService.get() execute mbox - } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") + } /** * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by @@ -170,11 +167,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } else false } - protected def start = active switchOn { - log.debug("Starting up %s",toString) - } + protected def start = log.debug("Starting up %s",toString) - protected def shutdown: Unit = active switchOff { + protected def shutdown { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index a5bc315204..50ae2da19e 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -142,18 +142,14 @@ object HawtDispatcher { class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher { import HawtDispatcher._ - private val active = new Switch(false) - val mailboxType: Option[MailboxType] = None - def start = active switchOn { retainNonDaemon } + protected def start { retainNonDaemon } - def shutdown = active switchOff { releaseNonDaemon } + protected def shutdown { releaseNonDaemon } - def dispatch(invocation: MessageInvocation) = if (active.isOn) { + protected def dispatch(invocation: MessageInvocation){ mailbox(invocation.receiver).dispatch(invocation) - } else { - log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver) } // hawtdispatch does not have a way to get queue sizes, getting an accurate diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 1e18f057cf..95923ebfe5 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -4,12 +4,11 @@ package se.scalablesolutions.akka.dispatch -import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInitializationException} - import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.concurrent._ -import se.scalablesolutions.akka.util. {ReentrantGuard, Logging, HashCode} +import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode} +import se.scalablesolutions.akka.actor._ /** * @author Jonas Bonér @@ -56,20 +55,34 @@ final class MessageInvocation(val receiver: ActorRef, * @author Jonas Bonér */ trait MessageDispatcher extends MailboxFactory with Logging { - protected val uuids = new ConcurrentSkipListSet[Uuid] protected val guard = new ReentrantGuard + protected val active = new Switch(false) + /** + * Attaches the specified actorRef to this dispatcher + */ final def attach(actorRef: ActorRef): Unit = guard withGuard { register(actorRef) } + /** + * Detaches the specified actorRef from this dispatcher + */ final def detach(actorRef: ActorRef): Unit = guard withGuard { unregister(actorRef) } + private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = if (active.isOn) { + dispatch(invocation) + } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") + protected def register(actorRef: ActorRef) { - if (uuids.isEmpty()) start + if (uuids.isEmpty()) { + active.switchOn { + start + } + } if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) uuids add actorRef.uuid } @@ -77,28 +90,52 @@ trait MessageDispatcher extends MailboxFactory with Logging { protected def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { actorRef.mailbox = null - if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero + if (uuids.isEmpty){ + active switchOff { + shutdown // shut down in the dispatcher's references is zero + } + } } } + /** + * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors + */ def stopAllLinkedActors { val i = uuids.iterator while(i.hasNext()) { val uuid = i.next() ActorRegistry.actorFor(uuid) match { case Some(actor) => actor.stop - case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid) + case None => + log.error("stopAllLinkedActors couldn't find linked actor: " + uuid) } } - if(uuids.isEmpty) shutdown } - + + /** + * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference + */ def suspend(actorRef: ActorRef): Unit + + /* + * After the call to this method, the dispatcher must begin any new message processing for the specified reference + */ def resume(actorRef: ActorRef): Unit - def dispatch(invocation: MessageInvocation): Unit + /** + * Will be called when the dispatcher is to queue an invocation for execution + */ + protected def dispatch(invocation: MessageInvocation): Unit + /** + * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown + */ protected def start: Unit + + /** + * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached + */ protected def shutdown: Unit /** diff --git a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala index d1f663e9f4..d596ecfac1 100644 --- a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala +++ b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala @@ -28,8 +28,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { val x, y, z = new DataFlowVariable[Int] thread { z << x() + y() - latch.countDown result.set(z()) + latch.countDown } thread { x << 40 } thread { y << 2 }