Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
775099a3a8
7 changed files with 26 additions and 2 deletions
|
|
@ -25,6 +25,8 @@ class ConfigSpec extends WordSpec with MustMatchers {
|
||||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0))
|
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0))
|
||||||
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0))
|
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0))
|
||||||
getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1))
|
getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1))
|
||||||
|
getInt("akka.actor.default-dispatcher.task-queue-size") must equal(Some(-1))
|
||||||
|
getString("akka.actor.default-dispatcher.task-queue-type") must equal(Some("linked"))
|
||||||
getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true))
|
getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true))
|
||||||
getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs"))
|
getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs"))
|
||||||
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1))
|
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1))
|
||||||
|
|
|
||||||
|
|
@ -60,3 +60,8 @@ trait BootableActorLoaderService extends Bootable {
|
||||||
Actor.registry.local.shutdownAll
|
Actor.registry.local.shutdownAll
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API for the default JAX-RS/Mist Initializer
|
||||||
|
*/
|
||||||
|
class DefaultBootableActorLoaderService extends BootableActorLoaderService
|
||||||
|
|
|
||||||
|
|
@ -281,12 +281,21 @@ abstract class MessageDispatcherConfigurator {
|
||||||
conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)),
|
conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)),
|
||||||
conf_?(config getInt "executor-bounds")(bounds ⇒ _.setExecutorBounds(bounds)),
|
conf_?(config getInt "executor-bounds")(bounds ⇒ _.setExecutorBounds(bounds)),
|
||||||
conf_?(config getBool "allow-core-timeout")(allow ⇒ _.setAllowCoreThreadTimeout(allow)),
|
conf_?(config getBool "allow-core-timeout")(allow ⇒ _.setAllowCoreThreadTimeout(allow)),
|
||||||
|
conf_?(config getInt "task-queue-size" flatMap {
|
||||||
|
case size if size > 0 ⇒
|
||||||
|
config getString "task-queue-type" map {
|
||||||
|
case "array" ⇒ ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
|
||||||
|
case "" | "linked" ⇒ ThreadPoolConfig.linkedBlockingQueue(size)
|
||||||
|
case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
|
||||||
|
}
|
||||||
|
case _ ⇒ None
|
||||||
|
})(queueFactory ⇒ _.setQueueFactory(queueFactory)),
|
||||||
conf_?(config getString "rejection-policy" map {
|
conf_?(config getString "rejection-policy" map {
|
||||||
case "abort" ⇒ new AbortPolicy()
|
case "abort" ⇒ new AbortPolicy()
|
||||||
case "caller-runs" ⇒ new CallerRunsPolicy()
|
case "caller-runs" ⇒ new CallerRunsPolicy()
|
||||||
case "discard-oldest" ⇒ new DiscardOldestPolicy()
|
case "discard-oldest" ⇒ new DiscardOldestPolicy()
|
||||||
case "discard" ⇒ new DiscardPolicy()
|
case "discard" ⇒ new DiscardPolicy()
|
||||||
case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy [abort|caller-runs|discard-oldest|discard]!" format x)
|
||||||
})(policy ⇒ _.setRejectionPolicy(policy)))
|
})(policy ⇒ _.setRejectionPolicy(policy)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,9 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
|
||||||
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
||||||
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
||||||
|
|
||||||
|
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
|
||||||
|
this.copy(config = config.copy(queueFactory = newQueueFactory))
|
||||||
|
|
||||||
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c))
|
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
|
||||||
clusteredRef ! "Hello"
|
clusteredRef ! "Hello"
|
||||||
}
|
}
|
||||||
|
|
||||||
g node.shutdown()
|
node.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,9 @@ If you deploy Akka in a JEE container, don't forget to create an Akka initializa
|
||||||
// loader.boot(true, new BootableActorLoaderService {}) // If you don't need akka-remote
|
// loader.boot(true, new BootableActorLoaderService {}) // If you don't need akka-remote
|
||||||
}
|
}
|
||||||
|
|
||||||
|
For Java users, it's currently only possible to use BootableActorLoaderService, but you'll need to use: akka.actor.DefaultBootableActorLoaderService
|
||||||
|
|
||||||
|
|
||||||
Then you just declare it in your web.xml:
|
Then you just declare it in your web.xml:
|
||||||
|
|
||||||
.. code-block:: xml
|
.. code-block:: xml
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,8 @@ akka {
|
||||||
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
|
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
|
||||||
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
|
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
|
||||||
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||||
|
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
|
||||||
|
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
||||||
allow-core-timeout = on # Allow core threads to time out
|
allow-core-timeout = on # Allow core threads to time out
|
||||||
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
||||||
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue