From 727c7de58df502ee8073ca10856196d0a338c5f1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 15 Nov 2011 10:25:05 +0100 Subject: [PATCH] Removing bounded executors since they have probably never been used, also, removing possibility to specify own RejectedExecutionHandler since Akka needs to know what to do there anyway. Implementing a sane version of CallerRuns --- .../akka/actor/dispatch/DispatchersSpec.scala | 2 - .../test/scala/akka/config/ConfigSpec.scala | 2 - .../TellThroughputPerformanceSpec.scala | 1 - .../akka/dispatch/AbstractDispatcher.scala | 10 +-- .../scala/akka/dispatch/Dispatchers.scala | 2 - .../akka/dispatch/ThreadPoolBuilder.scala | 83 +++++-------------- .../akka/spring/DispatcherFactoryBean.scala | 3 +- config/akka-reference.conf | 3 - 8 files changed, 21 insertions(+), 85 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 3e8336be51..795c0b9335 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -18,9 +18,7 @@ class DispatchersSpec extends AkkaSpec { val keepalivems = "keep-alive-time" val corepoolsizefactor = "core-pool-size-factor" val maxpoolsizefactor = "max-pool-size-factor" - val executorbounds = "executor-bounds" val allowcoretimeout = "allow-core-timeout" - val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard val throughput = "throughput" // Throughput for Dispatcher def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index a0066c208b..39e3374700 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -24,11 +24,9 @@ class ConfigSpec extends AkkaSpec(ActorSystem("ConfigSpec", Configuration.fromFi getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60)) getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0)) getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0)) - getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1)) getInt("akka.actor.default-dispatcher.task-queue-size") must equal(Some(-1)) getString("akka.actor.default-dispatcher.task-queue-type") must equal(Some("linked")) getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true)) - getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("abort")) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1)) getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(Some(10)) getLong("akka.actor.dispatcher-shutdown-timeout") must equal(Some(1)) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index bf0a4dc0eb..c2d7c8160a 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -15,7 +15,6 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(app, name, 5, 0, UnboundedMailbox(), config, 60000), ThreadPoolConfig(app)) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - .setFlowHandler(Left(new AbortPolicy)) .setCorePoolSize(maxClients) .build diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index eafdbeb1dc..2ac2b85df5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -354,7 +354,6 @@ abstract class MessageDispatcherConfigurator(val app: ActorSystem) { conf_?(config getInt "keep-alive-time")(time ⇒ _.setKeepAliveTime(Duration(time, app.AkkaConfig.DefaultTimeUnit))), conf_?(config getDouble "core-pool-size-factor")(factor ⇒ _.setCorePoolSizeFromFactor(factor)), conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)), - conf_?(config getInt "executor-bounds")(bounds ⇒ _.setExecutorBounds(bounds)), conf_?(config getBool "allow-core-timeout")(allow ⇒ _.setAllowCoreThreadTimeout(allow)), conf_?(config getInt "task-queue-size" flatMap { case size if size > 0 ⇒ @@ -364,13 +363,6 @@ abstract class MessageDispatcherConfigurator(val app: ActorSystem) { case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) } case _ ⇒ None - })(queueFactory ⇒ _.setQueueFactory(queueFactory)), - conf_?(config getString "rejection-policy" map { - case "abort" ⇒ new AbortPolicy() - case "caller-runs" ⇒ new CallerRunsPolicy() - case "discard-oldest" ⇒ new DiscardOldestPolicy() - case "discard" ⇒ new DiscardPolicy() - case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy [abort|caller-runs|discard-oldest|discard]!" format x) - })(policy ⇒ _.setRejectionPolicy(policy))) + })(queueFactory ⇒ _.setQueueFactory(queueFactory))) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 65eef3821d..a91af81cb1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -169,9 +169,7 @@ class Dispatchers(val app: ActorSystem) { * keep-alive-time = 60 # Keep alive time for threads in akka.time-unit * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) - * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded * allow-core-timeout = on # Allow core threads to time out - * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard * throughput = 5 # Throughput for Dispatcher * } * ex: from(config.getConfigMap(identifier).get) diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 6cb927a5dd..11fc885446 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -5,26 +5,21 @@ package akka.dispatch import java.util.Collection -import java.util.concurrent._ -import atomic.{ AtomicLong, AtomicInteger } +import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger } import akka.util.Duration import akka.event.Logging.{ Warning, Error } import akka.actor.ActorSystem -import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy } +import java.util.concurrent._ object ThreadPoolConfig { type Bounds = Int - type FlowHandler = Either[RejectedExecutionHandler, Bounds] + type FlowHandler = Either[SaneRejectedExecutionHandler, Bounds] type QueueFactory = () ⇒ BlockingQueue[Runnable] val defaultAllowCoreThreadTimeout: Boolean = false val defaultCorePoolSize: Int = 16 val defaultMaxPoolSize: Int = 128 val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS) - def defaultFlowHandler: FlowHandler = flowHandler(new AbortPolicy) - - def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler) - def flowHandler(bounds: Int): FlowHandler = Right(bounds) def fixedPoolSize(size: Int): Int = size def scaledPoolSize(multiplier: Double): Int = @@ -73,20 +68,14 @@ case class ThreadPoolConfig(app: ActorSystem, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, - flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler, queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) extends ExecutorServiceFactoryProvider { final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { val threadFactory = new MonitorableThreadFactory(name) - def createExecutorService: ExecutorService = flowHandler match { - case Left(rejectHandler) ⇒ - val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler) - service.allowCoreThreadTimeOut(allowCorePoolTimeout) - service - case Right(bounds) ⇒ - val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory) - service.allowCoreThreadTimeOut(allowCorePoolTimeout) - new BoundedExecutorDecorator(app, service, bounds) + def createExecutorService: ExecutorService = { + val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, new SaneRejectedExecutionHandler) + service.allowCoreThreadTimeOut(allowCorePoolTimeout) + service } } } @@ -106,26 +95,23 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi import ThreadPoolConfig._ def build = dispatcherFactory(config) - def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) - def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory)) + this.copy(config = config.copy(queueFactory = newQueueFactory)) def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder = withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue)) def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler)) + this.copy(config = config.copy(queueFactory = linkedBlockingQueue())) def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler)) + this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity))) def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler)) + this.copy(config = config.copy(queueFactory = synchronousQueue(fair))) def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler)) + this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair))) def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(corePoolSize = size)) @@ -139,21 +125,12 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = setMaxPoolSize(scaledPoolSize(multiplier)) - def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = flowHandler(bounds))) - def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder = setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS)) def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(threadTimeout = time)) - def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder = - setFlowHandler(flowHandler(policy)) - - def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = newFlowHandler)) - def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(allowCorePoolTimeout = allow)) @@ -207,35 +184,6 @@ class MonitorableThread(runnable: Runnable, name: String) } } -/** - * @author Jonas Bonér - */ -class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { - protected val semaphore = new Semaphore(bound) - - override def execute(command: Runnable) = { - semaphore.acquire - try { - executor.execute(new Runnable() { - def run = { - try { - command.run - } finally { - semaphore.release - } - } - }) - } catch { - case e: RejectedExecutionException ⇒ - app.eventStream.publish(Warning(this, e.toString)) - semaphore.release - case e: Throwable ⇒ - app.eventStream.publish(Error(e, this, e.getMessage)) - throw e - } - } -} - /** * As the name says */ @@ -269,3 +217,10 @@ trait ExecutorServiceDelegate extends ExecutorService { def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) } + +class SaneRejectedExecutionHandler extends RejectedExecutionHandler { + def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = { + if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown") + else runnable.run() + } +} diff --git a/akka-spring/src/main/scala/akka/spring/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/akka/spring/DispatcherFactoryBean.scala index 47e0b0d54d..2f53ce0b5a 100644 --- a/akka-spring/src/main/scala/akka/spring/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/akka/spring/DispatcherFactoryBean.scala @@ -34,7 +34,7 @@ object DispatcherFactoryBean { val maxPoolSize = if (threadPool.maxPoolSize > -1) Some(threadPool.maxPoolSize) else None val keepAlive = if (threadPool.keepAlive > -1) Some(threadPool.keepAlive) else None val executorBounds = if (threadPool.bound > -1) Some(threadPool.bound) else None - val flowHandler = threadPool.rejectionPolicy match { + val flowHandler = threadPool.rejectionPolicy match { //REMOVE THIS FROM THE CONFIG case null | "" ⇒ None case "abort-policy" ⇒ Some(new AbortPolicy()) case "caller-runs-policy" ⇒ Some(new CallerRunsPolicy()) @@ -63,7 +63,6 @@ object DispatcherFactoryBean { conf_?(corePoolSize)(count ⇒ _.setCorePoolSize(count)), conf_?(maxPoolSize)(count ⇒ _.setMaxPoolSize(count)), conf_?(executorBounds)(bounds ⇒ _.setExecutorBounds(bounds)), - conf_?(flowHandler)(policy ⇒ _.setRejectionPolicy(policy))) } else ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 6abc7cc91c..695bdb04cf 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -32,7 +32,6 @@ akka { task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out - rejection-policy = "abort" # abort, caller-runs, discard-oldest, discard throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) @@ -124,11 +123,9 @@ akka { keep-alive-time = 60 # Keep alive time for threads core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) - executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out - rejection-policy = "abort" # abort, caller-runs, discard-oldest, discard throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)