diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index d4700b9ba0..40d7ad8544 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -165,7 +165,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. */ - @volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + @volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher /** * Holds the hot swapped partial function. diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 0d2da205c6..06fd4e9699 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -5,7 +5,11 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config.config +import net.lag.configgy.ConfigMap +import se.scalablesolutions.akka.util.UUID +import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} /** * Scala API. Dispatcher factory. @@ -39,9 +43,12 @@ import se.scalablesolutions.akka.config.Config.config * * @author Jonas Bonér */ -object Dispatchers { +object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + + val defaultGlobalDispatcher = fromConfig("akka.actor.default-dispatcher",globalExecutorBasedEventDrivenDispatcher) + object globalHawtDispatcher extends HawtDispatcher object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { @@ -103,4 +110,65 @@ object Dispatchers { * E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) + + def fromConfig(identifier: String, defaultDispatcher: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = { + config.getConfigMap(identifier).map(from).getOrElse(defaultDispatcher) + } + + def from(map: ConfigMap): MessageDispatcher = { + lazy val name = map.getString("name",UUID.newUuid.toString) + + val dispatcher = map.getString("type").map({ + case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) + + case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name) + + case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,map.getInt("throughput",THROUGHPUT)) + + case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name) + + case "Hawt" => newHawtDispatcher(map.getBool("aggregate").getOrElse(true)) + + case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher + + case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher + + case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher + + case "GlobalHawt" => globalHawtDispatcher + + case "Default" => defaultGlobalDispatcher + + case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown) + + }).get + + if(dispatcher.isInstanceOf[ThreadPoolBuilder]) { + val configurable = dispatcher.asInstanceOf[ThreadPoolBuilder] + + map.getInt("keep-alive-ms").foreach(configurable.setKeepAliveTimeInMillis(_)) + + map.getInt("core-pool-size").foreach(configurable.setCorePoolSize(_)) + + map.getInt("max-pool-size").foreach(configurable.setMaxPoolSize(_)) + + map.getString("rejection-policy").map(_ match { + + case "abort" => new AbortPolicy() + + case "caller-runs" => new CallerRunsPolicy() + + case "discard-oldest" => new DiscardOldestPolicy() + + case "discard" => new DiscardPolicy() + + case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + + }).foreach(configurable.setRejectionPolicy(_)) + } + + log.info("Dispatchers.from: %s:%s for %s",dispatcher.getClass.getName,dispatcher,map.getString("type").getOrElse("")) + + dispatcher + } } diff --git a/akka-http/src/main/scala/AkkaBroadcaster.scala b/akka-http/src/main/scala/AkkaBroadcaster.scala index 57d8ccd549..ac15c18da0 100644 --- a/akka-http/src/main/scala/AkkaBroadcaster.scala +++ b/akka-http/src/main/scala/AkkaBroadcaster.scala @@ -6,11 +6,21 @@ package se.scalablesolutions.akka.comet import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource} import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.dispatch.Dispatchers class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster { name = classOf[AkkaBroadcaster].getName - val caster = actor { case f : Function0[_] => f() } + val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher") + + //FIXME should be supervised + val caster = actorOf(new Actor { + self.dispatcher = broadcasterDispatcher + def receive = { + case f : Function0[_] => f() + } + }) override def destroy { super.destroy diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 8c45c38830..7e9c1b13bb 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -21,6 +21,9 @@ akka { timeout = 5 # default timeout for future based invocations serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher + default-dispatcher { + type = "Default" + } } stm { @@ -47,6 +50,12 @@ akka { "sample.rest.java", "sample.security"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) + + comet-dispatcher { + type = "Hawt" + aggregate = on + } + #maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity #IF you are using a KerberosAuthenticationActor # kerberos {