From e9df98c9459dcc4455dadb2a6a3fda0ba708ae54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 18 Mar 2010 08:37:44 +0100 Subject: [PATCH] Changed Supervisors actor map to hold a list of actors per class entry --- .../src/main/scala/actor/Supervisor.scala | 32 +++++++--- .../config/ActiveObjectConfigurator.scala | 34 +++++----- .../ActiveObjectGuiceConfigurator.scala | 63 ++++--------------- .../src/main/scala/config/Configurator.scala | 2 +- .../src/main/scala/stm/DataFlowVariable.scala | 6 ++ .../src/test/scala/PerformanceTest.scala | 2 +- 6 files changed, 61 insertions(+), 78 deletions(-) diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index ac5dc32303..632e849ad9 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -84,11 +84,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep faultHandler = Some(handler) dispatcher = Dispatchers.newThreadBasedDispatcher(this) - private val actors = new ConcurrentHashMap[String, Actor] + private val actors = new ConcurrentHashMap[String, List[Actor]] // Cheating, should really go through the dispatcher rather than direct access to a CHM - def getInstance[T](clazz: Class[T]) = actors.get(clazz.getName).asInstanceOf[T] - def getComponentInterfaces: List[Class[_]] = actors.values.toArray.toList.map(_.getClass) + def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]] + + def getComponentInterfaces: List[Class[_]] = List.flatten( + actors.values.toArray.toList.asInstanceOf[List[List[Class[_]]]]).map(_.getClass) + def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName) override def start: Actor = synchronized { @@ -106,7 +109,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep } def receive = { - case unknown => throw new IllegalArgumentException("Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]") + case unknown => throw new IllegalArgumentException( + "Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]") } def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { @@ -114,15 +118,29 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep servers.map(server => server match { case Supervise(actor, lifeCycle, remoteAddress) => - actors.put(actor.getClass.getName, actor) + val className = actor.getClass.getName + val currentActors = { + val list = actors.get(className) + if (list eq null) List[Actor]() + else list + } + actors.put(className, actor :: currentActors) actor.lifeCycle = Some(lifeCycle) startLink(actor) - remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.getId, actor)) + remoteAddress.foreach(address => RemoteServer.actorsFor( + RemoteServer.Address(address.hostname, address.port)) + .actors.put(actor.getId, actor)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val supervisor = factory.newInstanceFor(supervisorConfig).start supervisor.lifeCycle = Some(LifeCycle(Permanent)) - actors.put(supervisor.getClass.getName, supervisor) + val className = supervisor.getClass.getName + val currentSupervisors = { + val list = actors.get(className) + if (list eq null) List[Actor]() + else list + } + actors.put(className, supervisor :: currentSupervisors) link(supervisor) }) } diff --git a/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala index 264b526002..7e9b6b6c69 100644 --- a/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala @@ -8,9 +8,9 @@ import JavaConfig._ import com.google.inject._ -import java.util._ -//import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext} -//import org.apache.camel.{Endpoint, Routes} +import org.scala_tools.javautils.Imports._ + +import java.util.{List=>JList, ArrayList} /** * Configurator for the Active Objects. Used to do declarative configuration of supervision. @@ -27,12 +27,20 @@ class ActiveObjectConfigurator { private val INSTANCE = new ActiveObjectGuiceConfigurator /** - * Returns the active abject that has been put under supervision for the class specified. + * Returns the a list with all active objects that has been put under supervision for the class specified. + * + * @param clazz the class for the active object + * @return a list with all the active objects for the class + */ + def getInstances[T](clazz: Class[T]): JList[T] = INSTANCE.getInstance(clazz).asJava + + /** + * Returns the first item in a list of all active objects that has been put under supervision for the class specified. * * @param clazz the class for the active object * @return the active object for the class */ - def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz) + def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head def configure(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectConfigurator = { INSTANCE.configure( @@ -56,13 +64,7 @@ class ActiveObjectConfigurator { this } - //def addRoutes(routes: Routes): ActiveObjectConfigurator = { - // INSTANCE.addRoutes(routes) - // this - // } - - - def getComponentInterfaces: List[Class[_]] = { + def getComponentInterfaces: JList[Class[_]] = { val al = new ArrayList[Class[_]] for (c <- INSTANCE.getComponentInterfaces) al.add(c) al @@ -70,14 +72,8 @@ class ActiveObjectConfigurator { def getExternalDependency[T](clazz: Class[T]): T = INSTANCE.getExternalDependency(clazz) - //def getRoutingEndpoint(uri: String): Endpoint = INSTANCE.getRoutingEndpoint(uri) - - //def getRoutingEndpoints: java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints - - //def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints(uri) - // TODO: should this be exposed? - def getGuiceModules: List[Module] = INSTANCE.getGuiceModules + def getGuiceModules: JList[Module] = INSTANCE.getGuiceModules def reset = INSTANCE.reset diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 9868edb98b..e01f91f92a 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -11,22 +11,18 @@ import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher} import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.util.Logging -//import org.apache.camel.impl.{DefaultCamelContext} -//import org.apache.camel.{CamelContext, Endpoint, Routes} - import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.lang.reflect.Method /** - * This is an class for internal usage. Instead use the se.scalablesolutions.akka.config.ActiveObjectConfigurator class for creating ActiveObjects. + * This is an class for internal usage. Instead use the se.scalablesolutions.akka.config.ActiveObjectConfigurator + * class for creating ActiveObjects. * * @author Jonas Bonér */ -private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { // with CamelConfigurator { - //val AKKA_CAMEL_ROUTING_SCHEME = "akka" - +private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { private var injector: Injector = _ private var supervisor: Option[Supervisor] = None private var restartStrategy: RestartStrategy = _ @@ -35,7 +31,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat private var bindings: List[DependencyBinding] = Nil private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]] - //private var camelContext = new DefaultCamelContext private var modules = new java.util.ArrayList[Module] private var methodToUriRegistry = new HashMap[Method, String] @@ -43,9 +38,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat * Returns the active abject that has been put under supervision for the class specified. * * @param clazz the class for the active object - * @return the active object for the class + * @return the active objects for the class */ - override def getInstance[T](clazz: Class[T]): T = synchronized { + override def getInstance[T](clazz: Class[T]): List[T] = synchronized { log.debug("Retrieving active object [%s]", clazz.getName) if (injector eq null) throw new IllegalStateException( "inject() and/or supervise() must be called before invoking getInstance(clazz)") @@ -54,7 +49,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat "Class [" + clazz.getName + "] has not been put under supervision" + "\n(by passing in the config to the 'configure' and then invoking 'supervise') method")) injector.injectMembers(targetInstance) - proxy.asInstanceOf[T] + List(proxy.asInstanceOf[T]) } override def isDefined(clazz: Class[_]): Boolean = synchronized { @@ -70,30 +65,15 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat if (c.intf.isDefined) c.intf.get else c.target } - /* - override def getRoutingEndpoint(uri: String): Endpoint = synchronized { - camelContext.getEndpoint(uri) - } - override def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized { - camelContext.getEndpoints - } - - override def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized { - camelContext.getEndpoints(uri) - } - */ override def configure(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectConfiguratorBase = synchronized { this.restartStrategy = restartStrategy this.components = components.toArray.toList.asInstanceOf[List[Component]] bindings = for (component <- this.components) yield { if (component.intf.isDefined) newDelegatingProxy(component) - else newSubclassingProxy(component) + else newSubclassingProxy(component) } - //camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) - //for (method <- component.intf.getDeclaredMethods.toList) registerMethodForUri(method, component.name) - //log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) val deps = new java.util.ArrayList[DependencyBinding](bindings.size) for (b <- bindings) deps.add(b) modules.add(new ActiveObjectGuiceModule(deps)) @@ -105,7 +85,8 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks) if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get val remoteAddress = - if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) + if (component.remoteAddress.isDefined) + Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) else None val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] if (remoteAddress.isDefined) { @@ -125,9 +106,11 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks) if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get val remoteAddress = - if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) + if (component.remoteAddress.isDefined) + Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) else None - val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] + val proxy = ActiveObject.newInstance( + targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] if (remoteAddress.isDefined) { RemoteServer .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) @@ -147,8 +130,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat override def supervise: ActiveObjectConfiguratorBase = synchronized { if (injector eq null) inject supervisor = Some(ActiveObject.supervise(restartStrategy, supervised)) - //camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) - //camelContext.start supervisor.get.start ConfiguratorRepository.registerConfigurator(this) this @@ -170,14 +151,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat modules.add(module) this } - /* - override def addRoutes(routes: Routes): ActiveObjectConfiguratorBase = synchronized { - camelContext.addRoutes(routes) - this - } - override def getCamelContext: CamelContext = camelContext - */ def getGuiceModules: java.util.List[Module] = modules def reset = synchronized { @@ -187,21 +161,10 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat methodToUriRegistry = new HashMap[Method, String] injector = null restartStrategy = null - //camelContext = new DefaultCamelContext } def stop = synchronized { - //camelContext.stop if (supervisor.isDefined) supervisor.get.stop } - -// def registerMethodForUri(method: Method, componentName: String) = -// methodToUriRegistry += method -> buildUri(method, componentName) - -// def lookupUriFor(method: Method): String = -// methodToUriRegistry.getOrElse(method, throw new IllegalStateException("Could not find URI for method [" + method.getName + "]")) - -// def buildUri(method: Method, componentName: String): String = -// AKKA_CAMEL_ROUTING_SCHEME + ":" + componentName + "." + method.getName } \ No newline at end of file diff --git a/akka-core/src/main/scala/config/Configurator.scala b/akka-core/src/main/scala/config/Configurator.scala index 22ffd41214..fcb354a1f7 100644 --- a/akka-core/src/main/scala/config/Configurator.scala +++ b/akka-core/src/main/scala/config/Configurator.scala @@ -16,7 +16,7 @@ private[akka] trait Configurator { * @param clazz the class for the active object * @return the active object for the class */ - def getInstance[T](clazz: Class[T]): T + def getInstance[T](clazz: Class[T]): List[T] def getComponentInterfaces: List[Class[_]] diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index cb1b828db1..f9a848a3a2 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -19,6 +19,12 @@ object DataFlow { case object Start case object Exit +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.dispatch.CompletableFuture + def thread(body: => Unit) = { val thread = new IsolatedEventBasedThread(body).start thread send Start diff --git a/akka-core/src/test/scala/PerformanceTest.scala b/akka-core/src/test/scala/PerformanceTest.scala index 43a4d46650..fe2495c356 100644 --- a/akka-core/src/test/scala/PerformanceTest.scala +++ b/akka-core/src/test/scala/PerformanceTest.scala @@ -15,7 +15,7 @@ import net.lag.logging.Logger */ class PerformanceTest extends JUnitSuite { - @Test +// @Test def benchAkkaActorsVsScalaActors = { def stressTestAkkaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {