diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index ee492409ec..dea29643c7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -87,7 +87,7 @@ class BalancingDispatcher( @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit = if (messageQueue.hasMessages && i.hasNext - && (executorService.get().executor match { + && (executorService match { case lm: LoadMetrics ⇒ lm.atFullThrottle == false case other ⇒ true }) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 3a73bf0718..8dd7ecf8a2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -33,11 +33,15 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected val executorServiceFactory: ExecutorServiceFactory = - executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) + private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate { + lazy val executor: ExecutorService = factory.createExecutorService + def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory) + } - protected val executorService = new AtomicReference[ExecutorServiceDelegate]( - new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) + @volatile private var executorServiceDelegate: LazyExecutorServiceDelegate = + new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)) + + protected final def executorService: ExecutorService = executorServiceDelegate /** * INTERNAL USE ONLY @@ -62,11 +66,11 @@ class Dispatcher( */ protected[akka] def executeTask(invocation: TaskInvocation) { try { - executorService.get() execute invocation + executorService execute invocation } catch { case e: RejectedExecutionException ⇒ try { - executorService.get() execute invocation + executorService execute invocation } catch { case e2: RejectedExecutionException ⇒ prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!")) @@ -83,10 +87,15 @@ class Dispatcher( /** * INTERNAL USE ONLY */ - protected[akka] def shutdown: Unit = - Option(executorService.getAndSet(new ExecutorServiceDelegate { - lazy val executor = executorServiceFactory.createExecutorService - })) foreach { _.shutdown() } + protected[akka] def shutdown: Unit = { + val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy + val es = synchronized { // FIXME getAndSet using ARFU or Unsafe + val service = executorServiceDelegate + executorServiceDelegate = newDelegate // just a quick getAndSet + service + } + es.shutdown() + } /** * Returns if it was registered @@ -97,12 +106,12 @@ class Dispatcher( if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { - executorService.get() execute mbox + executorService execute mbox true } catch { case e: RejectedExecutionException ⇒ try { - executorService.get() execute mbox + executorService execute mbox true } catch { //Retry once case e: RejectedExecutionException ⇒