diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala
index 870b74edd2..92a7800128 100644
--- a/akka-actor/src/main/scala/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/actor/ActorRef.scala
@@ -964,7 +964,7 @@ class LocalActorRef private[akka] (
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
} else {
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
- dispatcher dispatch invocation
+ dispatcher dispatchMessage invocation
}
}
@@ -985,7 +985,7 @@ class LocalActorRef private[akka] (
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(
this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
- dispatcher dispatch invocation
+ dispatcher dispatchMessage invocation
future
}
}
diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 2b0a7cbfe0..45b1dfa886 100644
--- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -91,11 +91,10 @@ class ExecutorBasedEventDrivenDispatcher(
val name = "akka:event-driven:dispatcher:" + _name
val mailboxType = Some(_mailboxType)
- private[akka] val active = new Switch(false)
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
- def dispatch(invocation: MessageInvocation) = {
+ protected def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
registerForExecution(mbox)
@@ -132,11 +131,9 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
- protected def start: Unit = active switchOn {
- log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
- }
+ protected def start= log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
- protected def shutdown: Unit = active switchOff {
+ protected def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)
diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index a6e40d2f50..f357ff4ab3 100644
--- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -44,7 +44,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
val mailboxType = Some(_mailboxType)
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
- private val active = new Switch(false)
/** Type of the actors registered in this dispatcher. */
@volatile private var actorType: Option[Class[_]] = None
@@ -55,8 +54,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0
-
-
/**
* @return the mailbox associated with the actor
*/
@@ -64,11 +61,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
- def dispatch(invocation: MessageInvocation) = if (active.isOn) {
+ protected def dispatch(invocation: MessageInvocation) {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
executorService.get() execute mbox
- } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
+ }
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
@@ -170,11 +167,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
} else false
}
- protected def start = active switchOn {
- log.debug("Starting up %s",toString)
- }
+ protected def start = log.debug("Starting up %s",toString)
- protected def shutdown: Unit = active switchOff {
+ protected def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)
diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala
index a5bc315204..50ae2da19e 100644
--- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala
@@ -142,18 +142,14 @@ object HawtDispatcher {
class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher {
import HawtDispatcher._
- private val active = new Switch(false)
-
val mailboxType: Option[MailboxType] = None
- def start = active switchOn { retainNonDaemon }
+ protected def start { retainNonDaemon }
- def shutdown = active switchOff { releaseNonDaemon }
+ protected def shutdown { releaseNonDaemon }
- def dispatch(invocation: MessageInvocation) = if (active.isOn) {
+ protected def dispatch(invocation: MessageInvocation){
mailbox(invocation.receiver).dispatch(invocation)
- } else {
- log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver)
}
// hawtdispatch does not have a way to get queue sizes, getting an accurate
diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
index 1e18f057cf..95923ebfe5 100644
--- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
@@ -4,12 +4,11 @@
package se.scalablesolutions.akka.dispatch
-import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInitializationException}
-
import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.concurrent._
-import se.scalablesolutions.akka.util. {ReentrantGuard, Logging, HashCode}
+import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode}
+import se.scalablesolutions.akka.actor._
/**
* @author Jonas Bonér
@@ -56,20 +55,34 @@ final class MessageInvocation(val receiver: ActorRef,
* @author Jonas Bonér
*/
trait MessageDispatcher extends MailboxFactory with Logging {
-
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val guard = new ReentrantGuard
+ protected val active = new Switch(false)
+ /**
+ * Attaches the specified actorRef to this dispatcher
+ */
final def attach(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef)
}
+ /**
+ * Detaches the specified actorRef from this dispatcher
+ */
final def detach(actorRef: ActorRef): Unit = guard withGuard {
unregister(actorRef)
}
+ private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = if (active.isOn) {
+ dispatch(invocation)
+ } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
+
protected def register(actorRef: ActorRef) {
- if (uuids.isEmpty()) start
+ if (uuids.isEmpty()) {
+ active.switchOn {
+ start
+ }
+ }
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
}
@@ -77,28 +90,52 @@ trait MessageDispatcher extends MailboxFactory with Logging {
protected def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
actorRef.mailbox = null
- if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero
+ if (uuids.isEmpty){
+ active switchOff {
+ shutdown // shut down in the dispatcher's references is zero
+ }
+ }
}
}
+ /**
+ * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
+ */
def stopAllLinkedActors {
val i = uuids.iterator
while(i.hasNext()) {
val uuid = i.next()
ActorRegistry.actorFor(uuid) match {
case Some(actor) => actor.stop
- case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid)
+ case None =>
+ log.error("stopAllLinkedActors couldn't find linked actor: " + uuid)
}
}
- if(uuids.isEmpty) shutdown
}
-
+
+ /**
+ * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
+ */
def suspend(actorRef: ActorRef): Unit
+
+ /*
+ * After the call to this method, the dispatcher must begin any new message processing for the specified reference
+ */
def resume(actorRef: ActorRef): Unit
- def dispatch(invocation: MessageInvocation): Unit
+ /**
+ * Will be called when the dispatcher is to queue an invocation for execution
+ */
+ protected def dispatch(invocation: MessageInvocation): Unit
+ /**
+ * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
+ */
protected def start: Unit
+
+ /**
+ * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
+ */
protected def shutdown: Unit
/**
diff --git a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala
index d1f663e9f4..d596ecfac1 100644
--- a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala
+++ b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala
@@ -28,8 +28,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
val x, y, z = new DataFlowVariable[Int]
thread {
z << x() + y()
- latch.countDown
result.set(z())
+ latch.countDown
}
thread { x << 40 }
thread { y << 2 }