Removing the AtomicReference from Dispatcher and restructured the code a bit

This commit is contained in:
Viktor Klang 2012-05-20 19:03:20 +02:00
parent 1a3329baa2
commit 4a2227fc95
2 changed files with 22 additions and 13 deletions

View file

@ -87,7 +87,7 @@ class BalancingDispatcher(
@tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit = @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit =
if (messageQueue.hasMessages if (messageQueue.hasMessages
&& i.hasNext && i.hasNext
&& (executorService.get().executor match { && (executorService match {
case lm: LoadMetrics lm.atFullThrottle == false case lm: LoadMetrics lm.atFullThrottle == false
case other true case other true
}) })

View file

@ -33,11 +33,15 @@ class Dispatcher(
val shutdownTimeout: Duration) val shutdownTimeout: Duration)
extends MessageDispatcher(_prerequisites) { extends MessageDispatcher(_prerequisites) {
protected val executorServiceFactory: ExecutorServiceFactory = private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {
executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) lazy val executor: ExecutorService = factory.createExecutorService
def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory)
}
protected val executorService = new AtomicReference[ExecutorServiceDelegate]( @volatile private var executorServiceDelegate: LazyExecutorServiceDelegate =
new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory))
protected final def executorService: ExecutorService = executorServiceDelegate
/** /**
* INTERNAL USE ONLY * INTERNAL USE ONLY
@ -62,11 +66,11 @@ class Dispatcher(
*/ */
protected[akka] def executeTask(invocation: TaskInvocation) { protected[akka] def executeTask(invocation: TaskInvocation) {
try { try {
executorService.get() execute invocation executorService execute invocation
} catch { } catch {
case e: RejectedExecutionException case e: RejectedExecutionException
try { try {
executorService.get() execute invocation executorService execute invocation
} catch { } catch {
case e2: RejectedExecutionException case e2: RejectedExecutionException
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!")) prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!"))
@ -83,10 +87,15 @@ class Dispatcher(
/** /**
* INTERNAL USE ONLY * INTERNAL USE ONLY
*/ */
protected[akka] def shutdown: Unit = protected[akka] def shutdown: Unit = {
Option(executorService.getAndSet(new ExecutorServiceDelegate { val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy
lazy val executor = executorServiceFactory.createExecutorService val es = synchronized { // FIXME getAndSet using ARFU or Unsafe
})) foreach { _.shutdown() } val service = executorServiceDelegate
executorServiceDelegate = newDelegate // just a quick getAndSet
service
}
es.shutdown()
}
/** /**
* Returns if it was registered * Returns if it was registered
@ -97,12 +106,12 @@ class Dispatcher(
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) { if (mbox.setAsScheduled()) {
try { try {
executorService.get() execute mbox executorService execute mbox
true true
} catch { } catch {
case e: RejectedExecutionException case e: RejectedExecutionException
try { try {
executorService.get() execute mbox executorService execute mbox
true true
} catch { //Retry once } catch { //Retry once
case e: RejectedExecutionException case e: RejectedExecutionException