Adding first support for config dispatchers

This commit is contained in:
Viktor Klang 2010-08-12 16:41:54 +02:00
parent 9421d1cbdf
commit f0ac45f374
4 changed files with 90 additions and 3 deletions

View file

@ -165,7 +165,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
@volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
@volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
/**
* Holds the hot swapped partial function.

View file

@ -5,7 +5,11 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
import net.lag.configgy.ConfigMap
import se.scalablesolutions.akka.util.UUID
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
/**
* Scala API. Dispatcher factory.
@ -39,9 +43,12 @@ import se.scalablesolutions.akka.config.Config.config
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers {
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val defaultGlobalDispatcher = fromConfig("akka.actor.default-dispatcher",globalExecutorBasedEventDrivenDispatcher)
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
@ -103,4 +110,65 @@ object Dispatchers {
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
def fromConfig(identifier: String, defaultDispatcher: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = {
config.getConfigMap(identifier).map(from).getOrElse(defaultDispatcher)
}
def from(map: ConfigMap): MessageDispatcher = {
lazy val name = map.getString("name",UUID.newUuid.toString)
val dispatcher = map.getString("type").map({
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,map.getInt("throughput",THROUGHPUT))
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
case "Hawt" => newHawtDispatcher(map.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case "Default" => defaultGlobalDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown)
}).get
if(dispatcher.isInstanceOf[ThreadPoolBuilder]) {
val configurable = dispatcher.asInstanceOf[ThreadPoolBuilder]
map.getInt("keep-alive-ms").foreach(configurable.setKeepAliveTimeInMillis(_))
map.getInt("core-pool-size").foreach(configurable.setCorePoolSize(_))
map.getInt("max-pool-size").foreach(configurable.setMaxPoolSize(_))
map.getString("rejection-policy").map(_ match {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(configurable.setRejectionPolicy(_))
}
log.info("Dispatchers.from: %s:%s for %s",dispatcher.getClass.getName,dispatcher,map.getString("type").getOrElse("<null>"))
dispatcher
}
}

View file

@ -6,11 +6,21 @@ package se.scalablesolutions.akka.comet
import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.Dispatchers
class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
name = classOf[AkkaBroadcaster].getName
val caster = actor { case f : Function0[_] => f() }
val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
//FIXME should be supervised
val caster = actorOf(new Actor {
self.dispatcher = broadcasterDispatcher
def receive = {
case f : Function0[_] => f()
}
})
override def destroy {
super.destroy

View file

@ -21,6 +21,9 @@ akka {
timeout = 5 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
default-dispatcher {
type = "Default"
}
}
stm {
@ -47,6 +50,12 @@ akka {
"sample.rest.java",
"sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
comet-dispatcher {
type = "Hawt"
aggregate = on
}
#maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity
#IF you are using a KerberosAuthenticationActor
# kerberos {