diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala
index ac5dc32303..632e849ad9 100644
--- a/akka-core/src/main/scala/actor/Supervisor.scala
+++ b/akka-core/src/main/scala/actor/Supervisor.scala
@@ -84,11 +84,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
faultHandler = Some(handler)
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
- private val actors = new ConcurrentHashMap[String, Actor]
+ private val actors = new ConcurrentHashMap[String, List[Actor]]
// Cheating, should really go through the dispatcher rather than direct access to a CHM
- def getInstance[T](clazz: Class[T]) = actors.get(clazz.getName).asInstanceOf[T]
- def getComponentInterfaces: List[Class[_]] = actors.values.toArray.toList.map(_.getClass)
+ def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]]
+
+ def getComponentInterfaces: List[Class[_]] = List.flatten(
+ actors.values.toArray.toList.asInstanceOf[List[List[Class[_]]]]).map(_.getClass)
+
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
override def start: Actor = synchronized {
@@ -106,7 +109,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
}
def receive = {
- case unknown => throw new IllegalArgumentException("Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
+ case unknown => throw new IllegalArgumentException(
+ "Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
}
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
@@ -114,15 +118,29 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
servers.map(server =>
server match {
case Supervise(actor, lifeCycle, remoteAddress) =>
- actors.put(actor.getClass.getName, actor)
+ val className = actor.getClass.getName
+ val currentActors = {
+ val list = actors.get(className)
+ if (list eq null) List[Actor]()
+ else list
+ }
+ actors.put(className, actor :: currentActors)
actor.lifeCycle = Some(lifeCycle)
startLink(actor)
- remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.getId, actor))
+ remoteAddress.foreach(address => RemoteServer.actorsFor(
+ RemoteServer.Address(address.hostname, address.port))
+ .actors.put(actor.getId, actor))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start
supervisor.lifeCycle = Some(LifeCycle(Permanent))
- actors.put(supervisor.getClass.getName, supervisor)
+ val className = supervisor.getClass.getName
+ val currentSupervisors = {
+ val list = actors.get(className)
+ if (list eq null) List[Actor]()
+ else list
+ }
+ actors.put(className, supervisor :: currentSupervisors)
link(supervisor)
})
}
diff --git a/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala
index 264b526002..7e9b6b6c69 100644
--- a/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala
+++ b/akka-core/src/main/scala/config/ActiveObjectConfigurator.scala
@@ -8,9 +8,9 @@ import JavaConfig._
import com.google.inject._
-import java.util._
-//import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
-//import org.apache.camel.{Endpoint, Routes}
+import org.scala_tools.javautils.Imports._
+
+import java.util.{List=>JList, ArrayList}
/**
* Configurator for the Active Objects. Used to do declarative configuration of supervision.
@@ -27,12 +27,20 @@ class ActiveObjectConfigurator {
private val INSTANCE = new ActiveObjectGuiceConfigurator
/**
- * Returns the active abject that has been put under supervision for the class specified.
+ * Returns the a list with all active objects that has been put under supervision for the class specified.
+ *
+ * @param clazz the class for the active object
+ * @return a list with all the active objects for the class
+ */
+ def getInstances[T](clazz: Class[T]): JList[T] = INSTANCE.getInstance(clazz).asJava
+
+ /**
+ * Returns the first item in a list of all active objects that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
*/
- def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz)
+ def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head
def configure(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectConfigurator = {
INSTANCE.configure(
@@ -56,13 +64,7 @@ class ActiveObjectConfigurator {
this
}
- //def addRoutes(routes: Routes): ActiveObjectConfigurator = {
- // INSTANCE.addRoutes(routes)
- // this
- // }
-
-
- def getComponentInterfaces: List[Class[_]] = {
+ def getComponentInterfaces: JList[Class[_]] = {
val al = new ArrayList[Class[_]]
for (c <- INSTANCE.getComponentInterfaces) al.add(c)
al
@@ -70,14 +72,8 @@ class ActiveObjectConfigurator {
def getExternalDependency[T](clazz: Class[T]): T = INSTANCE.getExternalDependency(clazz)
- //def getRoutingEndpoint(uri: String): Endpoint = INSTANCE.getRoutingEndpoint(uri)
-
- //def getRoutingEndpoints: java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints
-
- //def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints(uri)
-
// TODO: should this be exposed?
- def getGuiceModules: List[Module] = INSTANCE.getGuiceModules
+ def getGuiceModules: JList[Module] = INSTANCE.getGuiceModules
def reset = INSTANCE.reset
diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 9868edb98b..e01f91f92a 100644
--- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -11,22 +11,18 @@ import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher}
import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
-//import org.apache.camel.impl.{DefaultCamelContext}
-//import org.apache.camel.{CamelContext, Endpoint, Routes}
-
import scala.collection.mutable.HashMap
import java.net.InetSocketAddress
import java.lang.reflect.Method
/**
- * This is an class for internal usage. Instead use the se.scalablesolutions.akka.config.ActiveObjectConfigurator class for creating ActiveObjects.
+ * This is an class for internal usage. Instead use the se.scalablesolutions.akka.config.ActiveObjectConfigurator
+ * class for creating ActiveObjects.
*
* @author Jonas Bonér
*/
-private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { // with CamelConfigurator {
- //val AKKA_CAMEL_ROUTING_SCHEME = "akka"
-
+private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging {
private var injector: Injector = _
private var supervisor: Option[Supervisor] = None
private var restartStrategy: RestartStrategy = _
@@ -35,7 +31,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
- //private var camelContext = new DefaultCamelContext
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
@@ -43,9 +38,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
* Returns the active abject that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
- * @return the active object for the class
+ * @return the active objects for the class
*/
- override def getInstance[T](clazz: Class[T]): T = synchronized {
+ override def getInstance[T](clazz: Class[T]): List[T] = synchronized {
log.debug("Retrieving active object [%s]", clazz.getName)
if (injector eq null) throw new IllegalStateException(
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
@@ -54,7 +49,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
"Class [" + clazz.getName + "] has not been put under supervision" +
"\n(by passing in the config to the 'configure' and then invoking 'supervise') method"))
injector.injectMembers(targetInstance)
- proxy.asInstanceOf[T]
+ List(proxy.asInstanceOf[T])
}
override def isDefined(clazz: Class[_]): Boolean = synchronized {
@@ -70,30 +65,15 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
if (c.intf.isDefined) c.intf.get
else c.target
}
- /*
- override def getRoutingEndpoint(uri: String): Endpoint = synchronized {
- camelContext.getEndpoint(uri)
- }
- override def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized {
- camelContext.getEndpoints
- }
-
- override def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized {
- camelContext.getEndpoints(uri)
- }
- */
override def configure(restartStrategy: RestartStrategy, components: List[Component]):
ActiveObjectConfiguratorBase = synchronized {
this.restartStrategy = restartStrategy
this.components = components.toArray.toList.asInstanceOf[List[Component]]
bindings = for (component <- this.components) yield {
if (component.intf.isDefined) newDelegatingProxy(component)
- else newSubclassingProxy(component)
+ else newSubclassingProxy(component)
}
- //camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy)
- //for (method <- component.intf.getDeclaredMethods.toList) registerMethodForUri(method, component.name)
- //log.debug("Registering active object in Camel context under the name [%s]", component.target.getName)
val deps = new java.util.ArrayList[DependencyBinding](bindings.size)
for (b <- bindings) deps.add(b)
modules.add(new ActiveObjectGuiceModule(deps))
@@ -105,7 +85,8 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
- if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ if (component.remoteAddress.isDefined)
+ Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
@@ -125,9 +106,11 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
- if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ if (component.remoteAddress.isDefined)
+ Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
- val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ val proxy = ActiveObject.newInstance(
+ targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
RemoteServer
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
@@ -147,8 +130,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
override def supervise: ActiveObjectConfiguratorBase = synchronized {
if (injector eq null) inject
supervisor = Some(ActiveObject.supervise(restartStrategy, supervised))
- //camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
- //camelContext.start
supervisor.get.start
ConfiguratorRepository.registerConfigurator(this)
this
@@ -170,14 +151,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
modules.add(module)
this
}
- /*
- override def addRoutes(routes: Routes): ActiveObjectConfiguratorBase = synchronized {
- camelContext.addRoutes(routes)
- this
- }
- override def getCamelContext: CamelContext = camelContext
- */
def getGuiceModules: java.util.List[Module] = modules
def reset = synchronized {
@@ -187,21 +161,10 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
methodToUriRegistry = new HashMap[Method, String]
injector = null
restartStrategy = null
- //camelContext = new DefaultCamelContext
}
def stop = synchronized {
- //camelContext.stop
if (supervisor.isDefined) supervisor.get.stop
}
-
-// def registerMethodForUri(method: Method, componentName: String) =
-// methodToUriRegistry += method -> buildUri(method, componentName)
-
-// def lookupUriFor(method: Method): String =
-// methodToUriRegistry.getOrElse(method, throw new IllegalStateException("Could not find URI for method [" + method.getName + "]"))
-
-// def buildUri(method: Method, componentName: String): String =
-// AKKA_CAMEL_ROUTING_SCHEME + ":" + componentName + "." + method.getName
}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/config/Configurator.scala b/akka-core/src/main/scala/config/Configurator.scala
index 22ffd41214..fcb354a1f7 100644
--- a/akka-core/src/main/scala/config/Configurator.scala
+++ b/akka-core/src/main/scala/config/Configurator.scala
@@ -16,7 +16,7 @@ private[akka] trait Configurator {
* @param clazz the class for the active object
* @return the active object for the class
*/
- def getInstance[T](clazz: Class[T]): T
+ def getInstance[T](clazz: Class[T]): List[T]
def getComponentInterfaces: List[Class[_]]
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index cb1b828db1..f9a848a3a2 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -19,6 +19,12 @@ object DataFlow {
case object Start
case object Exit
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
+
+import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.dispatch.CompletableFuture
+
def thread(body: => Unit) = {
val thread = new IsolatedEventBasedThread(body).start
thread send Start
diff --git a/akka-core/src/test/scala/PerformanceTest.scala b/akka-core/src/test/scala/PerformanceTest.scala
index 43a4d46650..fe2495c356 100644
--- a/akka-core/src/test/scala/PerformanceTest.scala
+++ b/akka-core/src/test/scala/PerformanceTest.scala
@@ -15,7 +15,7 @@ import net.lag.logging.Logger
*/
class PerformanceTest extends JUnitSuite {
- @Test
+// @Test
def benchAkkaActorsVsScalaActors = {
def stressTestAkkaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {