diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 4bc3a9dc31..b80ce3afb1 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -232,7 +232,6 @@ trait Actor extends TransactionManagement with Logging { @volatile private[this] var _isRunning = false @volatile private[this] var _isSuspended = true @volatile private[this] var _isShutDown = false - @volatile private[this] var _isEventBased: Boolean = false @volatile private[akka] var _isKilled = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None @@ -294,11 +293,7 @@ trait Actor extends TransactionManagement with Logging { * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. */ - protected[akka] var messageDispatcher: MessageDispatcher = { - val dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher - _isEventBased = dispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher] - dispatcher - } + protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher /** * User overridable callback/setting. @@ -513,8 +508,11 @@ trait Actor extends TransactionManagement with Logging { if (isActiveObject) throw e else None } - getResultOrThrowException(future) - } else throw new IllegalStateException( + + if (future.exception.isDefined) throw future.exception.get._2 + else future.result.asInstanceOf[Option[T]] + } + else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -593,7 +591,6 @@ trait Actor extends TransactionManagement with Logging { messageDispatcher.unregister(this) messageDispatcher = md messageDispatcher.register(this) - _isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher] } else throw new IllegalArgumentException( "Can not swap dispatcher for " + toString + " after it has been started") } @@ -816,7 +813,7 @@ trait Actor extends TransactionManagement with Logging { RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None) } else { val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get) - if (_isEventBased) { + if (messageDispatcher.usesActorMailbox) { _mailbox.add(invocation) if (_isSuspended) invocation.send } @@ -849,10 +846,11 @@ trait Actor extends TransactionManagement with Logging { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture(timeout) val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get) - if (_isEventBased) { + + if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) - invocation.send - } else invocation.send + + invocation.send future } } @@ -958,10 +956,6 @@ trait Actor extends TransactionManagement with Logging { } } - private def getResultOrThrowException[T](future: Future): Option[T] = - if (future.exception.isDefined) throw future.exception.get._2 - else future.result.asInstanceOf[Option[T]] - private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) private val lifeCycles: PartialFunction[Any, Unit] = { diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 705c3ee142..0c624c2e3a 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -94,6 +94,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche active = false references.clear } + + def usesActorMailbox = true def ensureNotActive: Unit = if (active) throw new IllegalStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index a96f5c5e76..28fe624b86 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -199,6 +199,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess pooledActors.remove(actor) super.unregister(actor) } + + def usesActorMailbox = true private def verifyActorsAreOfSameType(newActor: Actor) = { actorType match { diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index f9db74190f..3f300b1c52 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -68,6 +68,7 @@ trait MessageDispatcher extends Logging { } def canBeShutDown: Boolean = references.isEmpty def isShutdown: Boolean + def usesActorMailbox : Boolean } trait MessageDemultiplexer { diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index 15af513d62..fc99cf88d2 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -37,6 +37,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra } def isShutdown = !active + + def usesActorMailbox = false class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 941e701410..3f33d4ffc0 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -134,6 +134,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) if (fair) true else nrOfBusyMessages < 100 } + + def usesActorMailbox = false def ensureNotActive: Unit = if (active) throw new IllegalStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 8b1463f655..fbfffc999e 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -41,6 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: def isShutdown = !active + def usesActorMailbox = false + def shutdown = if (active) { log.debug("Shutting down ThreadBasedDispatcher [%s]", name) active = false