();
+ for (int i = 0; i < components.length; i++) {
+ final Component c = components[i];
+ final ActiveObjectProxy activeObjectProxy = new ActiveObjectProxy(c.intf(), c.target(), c.timeout());
+ workers.add(c.newWorker(activeObjectProxy));
+ activeObjectRegistry.put(c.intf(), activeObjectProxy);
+ }
+ supervisor = activeObjectFactory.supervise(restartStrategy.transform(), workers);
+ return this;
+ }
+
+
+ /**
+ * Add additional services to be wired in.
+ *
+ * ActiveObjectGuiceConfigurator.addExternalGuiceModule(new AbstractModule {
+ * protected void configure() {
+ * bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
+ * bind(BarImpl.class);
+ * link(Bar.class).to(BarImpl.class);
+ * bindConstant(named("port")).to(8080);
+ * }})
+ *
+ */
+ public synchronized ActiveObjectGuiceConfigurator addExternalGuiceModule(Module module) {
+ modules.add(module);
+ return this;
+ }
+
+ public List getGuiceModules() {
+ return modules;
+ }
+
+ public synchronized void reset() {
+ modules = new ArrayList();
+ configRegistry = new HashMap();
+ activeObjectRegistry = new HashMap();
+ injector = null;
+ restartStrategy = null;
+ }
+
+ public synchronized void stop() {
+ // TODO: fix supervisor.stop();
+ }
+}
diff --git a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
new file mode 100755
index 0000000000..ca00180c27
--- /dev/null
+++ b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package com.scalablesolutions.akka.api;
+
+import com.google.inject.Inject;
+import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
+
+import scala.actors.annotation.oneway;
+
+import junit.framework.TestCase;
+
+public class ActiveObjectGuiceConfiguratorTest extends TestCase {
+ static String messageLog = "";
+
+ final private ActiveObjectGuiceConfigurator conf = new ActiveObjectGuiceConfigurator();
+
+ protected void setUp() {
+ conf.addExternalGuiceModule(new AbstractModule() {
+ protected void configure() {
+ bind(Ext.class).to(ExtImpl.class).in(Scopes.SINGLETON);
+ }
+ }).configureActiveObjects(
+ new RestartStrategy(new AllForOne(), 3, 100), new Component[]{
+ new Component(
+ Foo.class,
+ FooImpl.class,
+ new LifeCycle(new Permanent(), 100),
+ 1000),
+ new Component(
+ Bar.class,
+ BarImpl.class,
+ new LifeCycle(new Permanent(), 100),
+ 1000)
+ }).inject().supervise();
+
+ }
+
+ public void testGuiceActiveObjectInjection() {
+ messageLog = "";
+ Foo foo = conf.getActiveObject(Foo.class);
+ Bar bar = conf.getActiveObject(Bar.class);
+ assertTrue(foo.getBar().toString().equals(bar.toString()));
+ }
+
+ public void testGuiceExternalDependencyInjection() {
+ messageLog = "";
+ Bar bar = conf.getActiveObject(Bar.class);
+ Ext ext = conf.getExternalDependency(Ext.class);
+ assertTrue(bar.getExt().toString().equals(ext.toString()));
+ }
+
+ public void testActiveObjectInvocation() throws InterruptedException {
+ messageLog = "";
+ Foo foo = conf.getActiveObject(Foo.class);
+ messageLog += foo.foo("foo ");
+ foo.bar("bar ");
+ messageLog += "before_bar ";
+ Thread.sleep(500);
+ assertEquals("foo return_foo before_bar ", messageLog);
+ }
+
+ public void testActiveObjectInvocationsInvocation() throws InterruptedException {
+ messageLog = "";
+ Foo foo = conf.getActiveObject(Foo.class);
+ Bar bar = conf.getActiveObject(Bar.class);
+ messageLog += foo.foo("foo ");
+ foo.bar("bar ");
+ messageLog += "before_bar ";
+ Thread.sleep(500);
+ assertEquals("foo return_foo before_bar ", messageLog);
+ }
+
+ public void testForcedTimeout() {
+ messageLog = "";
+ Foo foo = conf.getActiveObject(Foo.class);
+ try {
+ foo.longRunning();
+ fail("exception should have been thrown");
+ } catch (com.scalablesolutions.akka.kernel.ActiveObjectInvocationTimeoutException e) {
+ }
+ }
+
+ public void testForcedException() {
+ messageLog = "";
+ Foo foo = conf.getActiveObject(Foo.class);
+ try {
+ foo.throwsException();
+ fail("exception should have been thrown");
+ } catch (RuntimeException e) {
+ }
+ }
+}
+
+// ============== TEST SERVICES ===============
+
+interface Foo {
+ public String foo(String msg);
+
+ public
+ @oneway
+ void bar(String msg);
+
+ public void longRunning();
+
+ public void throwsException();
+
+ public Bar getBar();
+}
+
+class FooImpl implements Foo {
+ @Inject
+ private Bar bar;
+
+ public Bar getBar() {
+ return bar;
+ }
+
+ public String foo(String msg) {
+ return msg + "return_foo ";
+ }
+
+ public void bar(String msg) {
+ bar.bar(msg);
+ }
+
+ public void longRunning() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public void throwsException() {
+ throw new RuntimeException("expected");
+ }
+}
+
+interface Bar {
+ @oneway
+ void bar(String msg);
+
+ Ext getExt();
+}
+
+class BarImpl implements Bar {
+ @Inject
+ private Ext ext;
+
+ public Ext getExt() {
+ return ext;
+ }
+
+ public void bar(String msg) {
+ }
+}
+
+interface Ext {
+ void ext();
+}
+
+class ExtImpl implements Ext {
+ public void ext() {
+ }
+}
+
+
\ No newline at end of file
diff --git a/api-java/testng.xml b/api-java/testng.xml
index 8133747c6f..b894d3880b 100755
--- a/api-java/testng.xml
+++ b/api-java/testng.xml
@@ -10,7 +10,7 @@
-
+
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 1bdd19f43c..76c1be5e58 100755
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -14,37 +14,6 @@
${akka.version}
-
-
- repo1.maven
- Maven Main Repository
- http://repo1.maven.org/maven2
-
-
- scala-tools-snapshots
- Scala-Tools Maven2 Snapshot Repository
- http://scala-tools.org/repo-snapshots
-
-
- scala-tools
- Scala-Tools Maven2 Repository
- http://scala-tools.org/repo-releases
-
-
- lag
- Configgy's' Repository
- http://www.lag.net/repo
-
-
-
-
-
- scala-tools.org
- Scala-Tools Maven2 Repository
- http://scala-tools.org/repo-releases
-
-
-
org.scala-lang
diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala
index f6c1ac5bf6..6e07aa8159 100755
--- a/kernel/src/main/scala/ActiveObject.scala
+++ b/kernel/src/main/scala/ActiveObject.scala
@@ -1,115 +1,113 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package com.scalablesolutions.akka.kernel
-
-import scala.actors.behavior._
-
-import java.util.{List => JList, ArrayList}
-
-import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
-import java.lang.annotation.Annotation
-
-sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
-class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
-
-/**
- * @author Jonas Bonér
- */
-object ActiveObject {
-
- def newInstance[T](intf: Class[T] forSome {type T}, target: AnyRef, timeout: Int): T = {
- val proxy = new ActiveObjectProxy(target, timeout)
- supervise(proxy)
- newInstance(intf, proxy)
- }
-
- def newInstance[T](intf: Class[T] forSome {type T}, proxy: ActiveObjectProxy): T = {
- Proxy.newProxyInstance(
- proxy.target.getClass.getClassLoader,
- Array(intf),
- proxy).asInstanceOf[T]
- }
-
- def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
- object factory extends SupervisorFactory {
- override def getSupervisorConfig: SupervisorConfig = {
- SupervisorConfig(restartStrategy, components)
- }
- }
- val supervisor = factory.newSupervisor
- supervisor ! scala.actors.behavior.Start
- supervisor
- }
-
- private def supervise(proxy: ActiveObjectProxy): Supervisor =
- supervise(
- RestartStrategy(OneForOne, 5, 1000),
- Worker(
- proxy.server,
- LifeCycle(Permanent, 100))
- :: Nil)
-}
-
-/**
- * @author Jonas Bonér
- */
-class ActiveObjectProxy(val target: AnyRef, val timeout: Int) extends InvocationHandler {
- private val oneway = classOf[scala.actors.annotation.oneway]
-
- private[ActiveObjectProxy] object dispatcher extends GenericServer {
- override def body: PartialFunction[Any, Unit] = {
- case invocation: Invocation =>
- try {
- reply(ErrRef(invocation.invoke))
- } catch {
- case e: InvocationTargetException => reply(ErrRef({ throw e.getTargetException }))
- case e => reply(ErrRef({ throw e }))
- }
- case 'exit => exit; reply()
- case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
- }
- }
-
- private[akka] val server = new GenericServerContainer(target.getClass.getName, () => dispatcher)
- server.setTimeout(timeout)
-
- def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = invoke(Invocation(m, args, target))
-
- def invoke(invocation: Invocation): AnyRef = {
- if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
- else {
- val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("proxy invocation timed out after " + timeout + " milliseconds") }))
- result()
- }
- }
-}
-
-/**
- * Represents a snapshot of the current invocation.
- *
- * @author Jonas Bonér
- */
-case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef) {
- def invoke: AnyRef = method.invoke(target, args: _*)
-
- override def toString: String = "Invocation [method: " + method.getName + ", args: " + args + ", target: " + target + "]"
-
- override def hashCode(): Int = {
- var result = HashCode.SEED
- result = HashCode.hash(result, method)
- result = HashCode.hash(result, args)
- result = HashCode.hash(result, target)
- result
- }
-
- override def equals(that: Any): Boolean = {
- that != null &&
- that.isInstanceOf[Invocation] &&
- that.asInstanceOf[Invocation].method == method &&
- that.asInstanceOf[Invocation].args == args
- that.asInstanceOf[Invocation].target == target
- }
-}
-
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package com.scalablesolutions.akka.kernel
+
+import scala.actors.behavior._
+
+import java.util.{List => JList, ArrayList}
+
+import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
+import java.lang.annotation.Annotation
+
+sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
+class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
+
+/**
+ * @author Jonas Bonér
+ */
+class ActiveObjectFactory {
+ def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = ActiveObject.newInstance(intf, proxy)
+
+ def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor =
+ ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]])
+}
+
+
+/**
+ * @author Jonas Bonér
+ */
+object ActiveObject {
+ def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = {
+ Proxy.newProxyInstance(
+ intf.getClassLoader,
+ Array(intf),
+ proxy).asInstanceOf[T]
+ }
+
+ def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
+ object factory extends SupervisorFactory {
+ override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
+ }
+ val supervisor = factory.newSupervisor
+ supervisor ! scala.actors.behavior.Start
+ supervisor
+ }
+}
+
+/**
+ * @author Jonas Bonér
+ */
+class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
+ private val oneway = classOf[scala.actors.annotation.oneway]
+ private var targetInstance: AnyRef = _
+ private[akka] def setTargetInstance(instance: AnyRef) = targetInstance = instance
+
+ private[ActiveObjectProxy] object dispatcher extends GenericServer {
+ override def body: PartialFunction[Any, Unit] = {
+ case invocation: Invocation =>
+ try {
+ reply(ErrRef(invocation.invoke))
+ } catch {
+ case e: InvocationTargetException => reply(ErrRef({ throw e.getTargetException }))
+ case e => reply(ErrRef({ throw e }))
+ }
+ case 'exit => exit; reply()
+ case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
+ }
+ }
+
+ private[akka] val server = new GenericServerContainer(target.getName, () => dispatcher)
+ server.setTimeout(timeout)
+
+ def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = invoke(Invocation(m, args, targetInstance))
+
+ def invoke(invocation: Invocation): AnyRef = {
+ if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
+ else {
+ val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("proxy invocation timed out after " + timeout + " milliseconds") }))
+ result()
+ }
+ }
+}
+
+/**
+ * Represents a snapshot of the current invocation.
+ *
+ * @author Jonas Bonér
+ */
+case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef) {
+ method.setAccessible(true);
+
+ def invoke: AnyRef = method.invoke(target, args: _*)
+
+ override def toString: String = "Invocation[method: " + method.getName + ", args: " + args + ", target: " + target + "]"
+
+ override def hashCode(): Int = {
+ var result = HashCode.SEED
+ result = HashCode.hash(result, method)
+ result = HashCode.hash(result, args)
+ result = HashCode.hash(result, target)
+ result
+ }
+
+ override def equals(that: Any): Boolean = {
+ that != null &&
+ that.isInstanceOf[Invocation] &&
+ that.asInstanceOf[Invocation].method == method &&
+ that.asInstanceOf[Invocation].args == args
+ that.asInstanceOf[Invocation].target == target
+ }
+}
+
diff --git a/kernel/src/main/scala/Configuration.scala b/kernel/src/main/scala/Configuration.scala
index 4923c74c96..3ba50aaa6d 100755
--- a/kernel/src/main/scala/Configuration.scala
+++ b/kernel/src/main/scala/Configuration.scala
@@ -1,91 +1,105 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package com.scalablesolutions.akka.api
-
-import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy}
-
-import java.util.{List => JList}
-
-import scala.actors.behavior._
-import scala.reflect.BeanProperty
-
-sealed abstract class Configuration
-
-class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
- def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
-}
-class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
- def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime)
-}
-
-abstract class Scope extends Configuration {
- def transform: scala.actors.behavior.Scope
-}
-class Permanent extends Scope {
- override def transform = scala.actors.behavior.Permanent
-}
-class Transient extends Scope {
- override def transform = scala.actors.behavior.Transient
-}
-class Temporary extends Scope {
- override def transform = scala.actors.behavior.Temporary
-}
-
-abstract class FailOverScheme extends Configuration {
- def transform: scala.actors.behavior.FailOverScheme
-}
-class AllForOne extends FailOverScheme {
- override def transform = scala.actors.behavior.AllForOne
-}
-class OneForOne extends FailOverScheme {
- override def transform = scala.actors.behavior.OneForOne
-}
-
-abstract class Server extends Configuration
-class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
- def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Component]].map(_.transform))
-}
-class Component(@BeanProperty val proxy: ActiveObjectProxy, @BeanProperty val lifeCycle: LifeCycle) extends Server {
- def transform = scala.actors.behavior.Worker(proxy.server, lifeCycle.transform)
-}
-
-
-object Configuration {
- import com.google.inject.{AbstractModule, CreationException, Guice, Injector, Provides, Singleton}
- import com.google.inject.spi.CloseFailedException
- import com.google.inject.jsr250.{ResourceProviderFactory}
- import javax.annotation.Resource
- import javax.naming.Context
-
- def supervise(restartStrategy: RestartStrategy, components: JList[Component]): Supervisor = {
- val componentList = components.toArray.toList.asInstanceOf[List[Component]]
-
- val injector = Guice.createInjector(new AbstractModule {
- protected def configure = {
- bind(classOf[ResourceProviderFactory[_]])
- componentList.foreach(c => bind(c.getClass).in(classOf[Singleton]))
- }
-
- // @Provides
- // def createJndiContext: Context = {
- // val answer = new JndiContext
- // answer.bind("foo", new AnotherBean("Foo"))
- // answer.bind("xyz", new AnotherBean("XYZ"))
- // answer
- // }
- })
-
- val injectedComponents = componentList.map(c => injector.getInstance(c.getClass))
-
- // TODO: swap 'target' in proxy before running supervise
-
- ActiveObject.supervise(
- restartStrategy.transform,
- componentList.map(c => scala.actors.behavior.Worker(c.proxy.server, c.lifeCycle.transform)))
-
- }
-}
-
-
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package com.scalablesolutions.akka.api
+
+import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy}
+import google.inject.{AbstractModule}
+
+import java.util.{List => JList, ArrayList}
+
+import scala.actors.behavior._
+import scala.reflect.BeanProperty
+
+// ============================================
+// Java version of the configuration API
+
+sealed abstract class Configuration
+
+class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
+ def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
+}
+class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
+ def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime)
+}
+
+abstract class Scope extends Configuration {
+ def transform: scala.actors.behavior.Scope
+}
+class Permanent extends Scope {
+ override def transform = scala.actors.behavior.Permanent
+}
+class Transient extends Scope {
+ override def transform = scala.actors.behavior.Transient
+}
+class Temporary extends Scope {
+ override def transform = scala.actors.behavior.Temporary
+}
+
+abstract class FailOverScheme extends Configuration {
+ def transform: scala.actors.behavior.FailOverScheme
+}
+class AllForOne extends FailOverScheme {
+ override def transform = scala.actors.behavior.AllForOne
+}
+class OneForOne extends FailOverScheme {
+ override def transform = scala.actors.behavior.OneForOne
+}
+
+abstract class Server extends Configuration
+//class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
+// def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
+//}
+class Component(@BeanProperty val intf: Class[_],
+ @BeanProperty val target: Class[_],
+ @BeanProperty val lifeCycle: LifeCycle,
+ @BeanProperty val timeout: Int) extends Server {
+ def newWorker(proxy: ActiveObjectProxy) = scala.actors.behavior.Worker(proxy.server, lifeCycle.transform)
+}
+
+
+// ============================================
+
+/**
+ * @author Jonas Bonér
+ */
+//object Configuration {
+// import com.google.inject.{Module, AbstractModule, CreationException, Guice, Injector, Provides, Singleton, Binder}
+// import com.google.inject.jsr250.{ResourceProviderFactory}
+//
+// private val modules = new ArrayList[Module]
+//
+// def addModule(module: Module) = modules.add(module)
+//
+// def supervise(restartStrategy: RestartStrategy, components: Array[Component]): Supervisor = {
+// val componentList = components.toList.asInstanceOf[List[Component]]
+//
+// object defaultModule extends AbstractModule {
+// protected def configure {
+// bind(classOf[ResourceProviderFactory[_]])
+// //componentList.foreach(c => bind(c.proxy.intf.asInstanceOf[Class[_]]).to(c.proxy.target.getClass.asInstanceOf[Class[_]]).in(classOf[Singleton]))
+// }
+//
+// // @Provides
+// // def createJndiContext: Context = {
+// // val answer = new JndiContext
+// // answer.bind("foo", new AnotherBean("Foo"))
+// // answer.bind("xyz", new AnotherBean("XYZ"))
+// // answer
+// // }
+// }
+// modules.add(defaultModule)
+// val injector = Guice.createInjector(modules)
+//
+// // swap 'target' in proxy before running supervise
+// // componentList.foreach(c => c.proxy.target = injector.getInstance(c.proxy.targetClass))
+//
+// ActiveObject.supervise(
+// restartStrategy.transform,
+// componentList.map(c => scala.actors.behavior.Worker(c.proxy.server, c.lifeCycle.transform)))
+//
+// }
+//}
+
+
diff --git a/pom.xml b/pom.xml
index 2075618ac8..4e2395692f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -23,4 +23,35 @@
api-java
+
+
+ repo1.maven
+ Maven Main Repository
+ http://repo1.maven.org/maven2
+
+
+ scala-tools-snapshots
+ Scala-Tools Maven2 Snapshot Repository
+ http://scala-tools.org/repo-snapshots
+
+
+ scala-tools
+ Scala-Tools Maven2 Repository
+ http://scala-tools.org/repo-releases
+
+
+ lag
+ Configgy's' Repository
+ http://www.lag.net/repo
+
+
+
+
+
+ scala-tools.org
+ Scala-Tools Maven2 Repository
+ http://scala-tools.org/repo-releases
+
+
+