Changed Supervisors actor map to hold a list of actors per class entry

This commit is contained in:
Jonas Bonér 2010-03-18 08:37:44 +01:00
parent 3e106a4cd8
commit e9df98c945
6 changed files with 61 additions and 78 deletions

View file

@ -84,11 +84,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
faultHandler = Some(handler)
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
private val actors = new ConcurrentHashMap[String, Actor]
private val actors = new ConcurrentHashMap[String, List[Actor]]
// Cheating, should really go through the dispatcher rather than direct access to a CHM
def getInstance[T](clazz: Class[T]) = actors.get(clazz.getName).asInstanceOf[T]
def getComponentInterfaces: List[Class[_]] = actors.values.toArray.toList.map(_.getClass)
def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]]
def getComponentInterfaces: List[Class[_]] = List.flatten(
actors.values.toArray.toList.asInstanceOf[List[List[Class[_]]]]).map(_.getClass)
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
override def start: Actor = synchronized {
@ -106,7 +109,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
}
def receive = {
case unknown => throw new IllegalArgumentException("Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
case unknown => throw new IllegalArgumentException(
"Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
}
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
@ -114,15 +118,29 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
servers.map(server =>
server match {
case Supervise(actor, lifeCycle, remoteAddress) =>
actors.put(actor.getClass.getName, actor)
val className = actor.getClass.getName
val currentActors = {
val list = actors.get(className)
if (list eq null) List[Actor]()
else list
}
actors.put(className, actor :: currentActors)
actor.lifeCycle = Some(lifeCycle)
startLink(actor)
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.getId, actor))
remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port))
.actors.put(actor.getId, actor))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start
supervisor.lifeCycle = Some(LifeCycle(Permanent))
actors.put(supervisor.getClass.getName, supervisor)
val className = supervisor.getClass.getName
val currentSupervisors = {
val list = actors.get(className)
if (list eq null) List[Actor]()
else list
}
actors.put(className, supervisor :: currentSupervisors)
link(supervisor)
})
}

View file

@ -8,9 +8,9 @@ import JavaConfig._
import com.google.inject._
import java.util._
//import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
//import org.apache.camel.{Endpoint, Routes}
import org.scala_tools.javautils.Imports._
import java.util.{List=>JList, ArrayList}
/**
* Configurator for the Active Objects. Used to do declarative configuration of supervision.
@ -27,12 +27,20 @@ class ActiveObjectConfigurator {
private val INSTANCE = new ActiveObjectGuiceConfigurator
/**
* Returns the active abject that has been put under supervision for the class specified.
* Returns the a list with all active objects that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return a list with all the active objects for the class
*/
def getInstances[T](clazz: Class[T]): JList[T] = INSTANCE.getInstance(clazz).asJava
/**
* Returns the first item in a list of all active objects that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
*/
def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz)
def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head
def configure(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectConfigurator = {
INSTANCE.configure(
@ -56,13 +64,7 @@ class ActiveObjectConfigurator {
this
}
//def addRoutes(routes: Routes): ActiveObjectConfigurator = {
// INSTANCE.addRoutes(routes)
// this
// }
def getComponentInterfaces: List[Class[_]] = {
def getComponentInterfaces: JList[Class[_]] = {
val al = new ArrayList[Class[_]]
for (c <- INSTANCE.getComponentInterfaces) al.add(c)
al
@ -70,14 +72,8 @@ class ActiveObjectConfigurator {
def getExternalDependency[T](clazz: Class[T]): T = INSTANCE.getExternalDependency(clazz)
//def getRoutingEndpoint(uri: String): Endpoint = INSTANCE.getRoutingEndpoint(uri)
//def getRoutingEndpoints: java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints
//def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints(uri)
// TODO: should this be exposed?
def getGuiceModules: List[Module] = INSTANCE.getGuiceModules
def getGuiceModules: JList[Module] = INSTANCE.getGuiceModules
def reset = INSTANCE.reset

View file

@ -11,22 +11,18 @@ import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher}
import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
//import org.apache.camel.impl.{DefaultCamelContext}
//import org.apache.camel.{CamelContext, Endpoint, Routes}
import scala.collection.mutable.HashMap
import java.net.InetSocketAddress
import java.lang.reflect.Method
/**
* This is an class for internal usage. Instead use the <code>se.scalablesolutions.akka.config.ActiveObjectConfigurator</code> class for creating ActiveObjects.
* This is an class for internal usage. Instead use the <code>se.scalablesolutions.akka.config.ActiveObjectConfigurator</code>
* class for creating ActiveObjects.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { // with CamelConfigurator {
//val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging {
private var injector: Injector = _
private var supervisor: Option[Supervisor] = None
private var restartStrategy: RestartStrategy = _
@ -35,7 +31,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
//private var camelContext = new DefaultCamelContext
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
@ -43,9 +38,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
* Returns the active abject that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
* @return the active objects for the class
*/
override def getInstance[T](clazz: Class[T]): T = synchronized {
override def getInstance[T](clazz: Class[T]): List[T] = synchronized {
log.debug("Retrieving active object [%s]", clazz.getName)
if (injector eq null) throw new IllegalStateException(
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
@ -54,7 +49,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
"Class [" + clazz.getName + "] has not been put under supervision" +
"\n(by passing in the config to the 'configure' and then invoking 'supervise') method"))
injector.injectMembers(targetInstance)
proxy.asInstanceOf[T]
List(proxy.asInstanceOf[T])
}
override def isDefined(clazz: Class[_]): Boolean = synchronized {
@ -70,30 +65,15 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
if (c.intf.isDefined) c.intf.get
else c.target
}
/*
override def getRoutingEndpoint(uri: String): Endpoint = synchronized {
camelContext.getEndpoint(uri)
}
override def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized {
camelContext.getEndpoints
}
override def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized {
camelContext.getEndpoints(uri)
}
*/
override def configure(restartStrategy: RestartStrategy, components: List[Component]):
ActiveObjectConfiguratorBase = synchronized {
this.restartStrategy = restartStrategy
this.components = components.toArray.toList.asInstanceOf[List[Component]]
bindings = for (component <- this.components) yield {
if (component.intf.isDefined) newDelegatingProxy(component)
else newSubclassingProxy(component)
else newSubclassingProxy(component)
}
//camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy)
//for (method <- component.intf.getDeclaredMethods.toList) registerMethodForUri(method, component.name)
//log.debug("Registering active object in Camel context under the name [%s]", component.target.getName)
val deps = new java.util.ArrayList[DependencyBinding](bindings.size)
for (b <- bindings) deps.add(b)
modules.add(new ActiveObjectGuiceModule(deps))
@ -105,7 +85,8 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
@ -125,9 +106,11 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
val proxy = ActiveObject.newInstance(
targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
RemoteServer
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
@ -147,8 +130,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
override def supervise: ActiveObjectConfiguratorBase = synchronized {
if (injector eq null) inject
supervisor = Some(ActiveObject.supervise(restartStrategy, supervised))
//camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
//camelContext.start
supervisor.get.start
ConfiguratorRepository.registerConfigurator(this)
this
@ -170,14 +151,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
modules.add(module)
this
}
/*
override def addRoutes(routes: Routes): ActiveObjectConfiguratorBase = synchronized {
camelContext.addRoutes(routes)
this
}
override def getCamelContext: CamelContext = camelContext
*/
def getGuiceModules: java.util.List[Module] = modules
def reset = synchronized {
@ -187,21 +161,10 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
methodToUriRegistry = new HashMap[Method, String]
injector = null
restartStrategy = null
//camelContext = new DefaultCamelContext
}
def stop = synchronized {
//camelContext.stop
if (supervisor.isDefined) supervisor.get.stop
}
// def registerMethodForUri(method: Method, componentName: String) =
// methodToUriRegistry += method -> buildUri(method, componentName)
// def lookupUriFor(method: Method): String =
// methodToUriRegistry.getOrElse(method, throw new IllegalStateException("Could not find URI for method [" + method.getName + "]"))
// def buildUri(method: Method, componentName: String): String =
// AKKA_CAMEL_ROUTING_SCHEME + ":" + componentName + "." + method.getName
}

View file

@ -16,7 +16,7 @@ private[akka] trait Configurator {
* @param clazz the class for the active object
* @return the active object for the class
*/
def getInstance[T](clazz: Class[T]): T
def getInstance[T](clazz: Class[T]): List[T]
def getComponentInterfaces: List[Class[_]]

View file

@ -19,6 +19,12 @@ object DataFlow {
case object Start
case object Exit
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFuture
def thread(body: => Unit) = {
val thread = new IsolatedEventBasedThread(body).start
thread send Start

View file

@ -15,7 +15,7 @@ import net.lag.logging.Logger
*/
class PerformanceTest extends JUnitSuite {
@Test
// @Test
def benchAkkaActorsVsScalaActors = {
def stressTestAkkaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {