diff --git a/akka.iws b/akka.iws index 03d248f4cb..56432c0e34 100644 --- a/akka.iws +++ b/akka.iws @@ -2,25 +2,38 @@ - + - - - + + - + + - - - - - - + + + + + + + + + + + + + + + + + + + - + @@ -36,73 +49,6 @@ - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -235,19 +118,44 @@ - + - - + + - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -297,6 +205,118 @@ @@ -1070,7 +1150,7 @@ - + @@ -1202,7 +1282,7 @@ - @@ -1213,7 +1293,7 @@ @@ -1262,7 +1342,7 @@ - @@ -1283,10 +1363,16 @@ + + + - @@ -1310,7 +1396,7 @@ - @@ -1340,18 +1426,18 @@ - - - - - - - - - + + + + + + + + localhost @@ -1406,18 +1492,18 @@ - + - + - + @@ -1468,116 +1554,116 @@ - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + @@ -1607,7 +1693,7 @@ - @@ -1667,7 +1753,7 @@ - diff --git a/api-java/.classpath b/api-java/.classpath index 53228f60a3..6c657284ba 100644 --- a/api-java/.classpath +++ b/api-java/.classpath @@ -56,5 +56,6 @@ + diff --git a/api-java/pom.xml b/api-java/pom.xml index d36821bc75..514030f5ec 100755 --- a/api-java/pom.xml +++ b/api-java/pom.xml @@ -24,31 +24,37 @@ com.sun.grizzly grizzly-servlet-webserver 1.9.9 + test com.sun.jersey jersey-server 1.0.3 + test com.sun.jersey jersey-json 1.0.3 + test com.sun.jersey jersey-client 1.1.0-ea + test com.sun.jersey jersey-atom 1.0.3 + test junit junit 4.5 + test org.jmock diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java index 0419097865..22f43013cb 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java @@ -32,17 +32,14 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { }).configureActiveObjects( new RestartStrategy(new AllForOne(), 3, 5000), new Component[]{ new Component( - "foo", Foo.class, - FooImpl.class, new LifeCycle(new Permanent(), 1000), - 1000), + 10000), new Component( - "bar", Bar.class, BarImpl.class, new LifeCycle(new Permanent(), 1000), - 1000) + 10000) }).inject().supervise(); } @@ -51,7 +48,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { messageLog = ""; Foo foo = conf.getActiveObject(Foo.class); Bar bar = conf.getActiveObject(Bar.class); - assertTrue(foo.getBar().toString().equals(bar.toString())); + assertEquals(foo.getBar(), bar); } public void testGuiceExternalDependencyInjection() { @@ -66,7 +63,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { String str = conf.getActiveObject(String.class); fail("exception should have been thrown"); } catch (Exception e) { - assertEquals("Class string has not been put under supervision (by passing in the config to the supervise() method", e.getMessage()); + assertEquals("Class [java.lang.String] has not been put under supervision (by passing in the config to the 'configureActiveObjects' and then invoking 'supervise') method", e.getMessage()); } } @@ -113,57 +110,4 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { } } -interface Foo { - public String foo(String msg); - @oneway public void bar(String msg); - public void longRunning(); - public void throwsException(); - public Bar getBar(); -} - -class FooImpl implements Foo { - @Inject private Bar bar; - public Bar getBar() { - return bar; - } - public String foo(String msg) { - return msg + "return_foo "; - } - public void bar(String msg) { - bar.bar(msg); - } - public void longRunning() { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - } - } - public void throwsException() { - throw new RuntimeException("expected"); - } -} - -interface Bar { - @oneway void bar(String msg); - Ext getExt(); -} - -class BarImpl implements Bar { - @Inject private Ext ext; - public Ext getExt() { - return ext; - } - public void bar(String msg) { - } -} - -interface Ext { - void ext(); -} - -class ExtImpl implements Ext { - public void ext() { - } -} - diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/Bar.java b/api-java/src/test/java/se/scalablesolutions/akka/api/Bar.java new file mode 100644 index 0000000000..9a3ff80aca --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/Bar.java @@ -0,0 +1,9 @@ +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.annotation.oneway; + +public interface Bar { + @oneway + void bar(String msg); + Ext getExt(); +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java new file mode 100644 index 0000000000..bb93a1ad03 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java @@ -0,0 +1,13 @@ +package se.scalablesolutions.akka.api; + +import com.google.inject.Inject; + +public class BarImpl implements Bar { + @Inject + private Ext ext; + public Ext getExt() { + return ext; + } + public void bar(String msg) { + } +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/Ext.java b/api-java/src/test/java/se/scalablesolutions/akka/api/Ext.java new file mode 100644 index 0000000000..1929058fac --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/Ext.java @@ -0,0 +1,6 @@ +package se.scalablesolutions.akka.api; + +public interface Ext { + void ext(); +} + diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java new file mode 100644 index 0000000000..3c9c9fd3f4 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java @@ -0,0 +1,6 @@ +package se.scalablesolutions.akka.api; + +public class ExtImpl implements Ext { + public void ext() { + } +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/Foo.java b/api-java/src/test/java/se/scalablesolutions/akka/api/Foo.java new file mode 100644 index 0000000000..d1a4dce5c7 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/Foo.java @@ -0,0 +1,28 @@ +package se.scalablesolutions.akka.api; + +import com.google.inject.Inject; +import se.scalablesolutions.akka.annotation.oneway; + +public class Foo { + @Inject + private Bar bar; + public Bar getBar() { + return bar; + } + public String foo(String msg) { + return msg + "return_foo "; + } + @oneway + public void bar(String msg) { + bar.bar(msg); + } + public void longRunning() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + } + } + public void throwsException() { + throw new RuntimeException("expected"); + } +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java index ed985ee0df..1af3300300 100644 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java @@ -1,5 +1,7 @@ package se.scalablesolutions.akka.api; -public interface InMemFailer { - public void fail(); +public class InMemFailer { + public void fail() { + throw new RuntimeException("expected"); + } } diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java index 4358ce1c05..69d1012f59 100644 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java @@ -1,23 +1,68 @@ package se.scalablesolutions.akka.api; +import se.scalablesolutions.akka.annotation.state; import se.scalablesolutions.akka.annotation.transactional; +import se.scalablesolutions.akka.kernel.*; -public interface InMemStateful { - // transactional - @transactional - public void success(String key, String msg); +public class InMemStateful { + @state private TransactionalMap mapState = new InMemoryTransactionalMap(); + @state private TransactionalVector vectorState = new InMemoryTransactionalVector(); + @state private TransactionalRef refState = new TransactionalRef(); + + public String getMapState(String key) { + return (String)mapState.get(key).get(); + } + + public String getVectorState() { + return (String)vectorState.last(); + } + + public String getRefState() { + return (String)refState.get().get(); + } + + public void setMapState(String key, String msg) { + mapState.put(key, msg); + } + + public void setVectorState(String msg) { + vectorState.add(msg); + } + + public void setRefState(String msg) { + refState.swap(msg); + } @transactional - public void failure(String key, String msg, InMemFailer failer); + public void success(String key, String msg) { + mapState.put(key, msg); + vectorState.add(msg); + refState.swap(msg); + } - //@transactional - //public void clashOk(String key, String msg, InMemClasher clasher); + @transactional + public void failure(String key, String msg, InMemFailer failer) { + mapState.put(key, msg); + vectorState.add(msg); + refState.swap(msg); + failer.fail(); + } - //@transactional - //public void clashNotOk(String key, String msg, InMemClasher clasher); + @transactional + public void thisMethodHangs(String key, String msg, InMemFailer failer) { + setMapState(key, msg); + } - // non-transactional - public String getState(String key); + /* + public void clashOk(String key, String msg, InMemClasher clasher) { + mapState.put(key, msg); + clasher.clash(); + } - public void setState(String key, String value); -} + public void clashNotOk(String key, String msg, InMemClasher clasher) { + mapState.put(key, msg); + clasher.clash(); + this.success("clash", "clash"); + } + */ +} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulImpl.java deleted file mode 100644 index 31420012fe..0000000000 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.annotation.state; -import se.scalablesolutions.akka.kernel.TransactionalMap; -import se.scalablesolutions.akka.kernel.InMemoryTransactionalMap; - -public class InMemStatefulImpl implements InMemStateful { - @state - private TransactionalMap state = new InMemoryTransactionalMap(); - - public String getState(String key) { - return state.get(key).get(); - } - - public void setState(String key, String msg) { - state.put(key, msg); - } - - public void success(String key, String msg) { - state.put(key, msg); - } - - public void failure(String key, String msg, InMemFailer failer) { - state.put(key, msg); - failer.fail(); - } - - /* - public void clashOk(String key, String msg, InMemClasher clasher) { - state.put(key, msg); - clasher.clash(); - } - - public void clashNotOk(String key, String msg, InMemClasher clasher) { - state.put(key, msg); - clasher.clash(); - this.success("clash", "clash"); - } - */ -} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 00b5514b1c..97a7fda6d0 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -4,11 +4,8 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.*; -import se.scalablesolutions.akka.kernel.config.*; import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; -import se.scalablesolutions.akka.kernel.TransactionalMap; -import se.scalablesolutions.akka.kernel.InMemoryTransactionalMap; +import se.scalablesolutions.akka.kernel.config.*; import junit.framework.TestCase; @@ -22,35 +19,68 @@ public class InMemoryStateTest extends TestCase { new RestartStrategy(new AllForOne(), 3, 5000), new Component[] { // FIXME: remove string-name, add ctor to only accept target class - new Component("inmem-stateful", InMemStateful.class, InMemStatefulImpl.class, new LifeCycle(new Permanent(), 1000), 10000000), - new Component("inmem-failer", InMemFailer.class, InMemFailerImpl.class, new LifeCycle(new Permanent(), 1000), 1000) + new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), + new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000) //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) }).inject().supervise(); } - + protected void tearDown() { conf.stop(); } - public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = conf.getActiveObject(InMemStateful.class); - stateful.setState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state + stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit - assertEquals("new state", stateful.getState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); + assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } - public void testShouldRollbackStateForStatefulServerInCaseOfFailure() { + public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } + + public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = conf.getActiveObject(InMemStateful.class); - stateful.setState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - InMemFailer failer = conf.getActiveObject(InMemFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + stateful.setVectorState("init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit + assertEquals("new state", stateful.getVectorState()); } + + public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setVectorState("init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getVectorState()); // check that state is == init state + } + + public void testNestedNonTransactionalMethodHangs() { + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } + /* */ // public void testShouldRollbackStateForStatefulServerInCaseOfMessageClash() diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java b/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java index 7b92f1c8a2..6828ba421f 100644 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java @@ -5,8 +5,10 @@ import javax.ws.rs.GET; import javax.ws.rs.Produces; @Path("/foo") -public interface JerseyFoo { +public class JerseyFoo { @GET @Produces({"application/json"}) - public String foo(); -} + public String foo() { + return "hello foo"; + } +} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFooImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFooImpl.java deleted file mode 100644 index 51e176eb89..0000000000 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/JerseyFooImpl.java +++ /dev/null @@ -1,14 +0,0 @@ -package se.scalablesolutions.akka.api; - -import javax.ws.rs.Path; -import javax.ws.rs.GET; -import javax.ws.rs.Produces; - -//@Path("/foo") -public class JerseyFooImpl implements JerseyFoo { - //@GET - //@Produces({"application/json"}) - public String foo() { - return "hello foo"; - } -} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java new file mode 100644 index 0000000000..14a39ea659 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java @@ -0,0 +1,29 @@ +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.kernel.TransactionalMap; +import se.scalablesolutions.akka.kernel.CassandraPersistentTransactionalMap; + +public class PersistentClasher { + private TransactionalMap state = new CassandraPersistentTransactionalMap(this); + + public String getState(String key) { + return (String)state.get(key).get(); + } + + public void setState(String key, String msg) { + state.put(key, msg); + } + + public void clash() { + state.put("clasher", "was here"); + // spend some time here + + // FIXME: this statement gives me this error: + // se.scalablesolutions.akka.kernel.ActiveObjectException: + // Unexpected message [!(scala.actors.Channel@c2b2f6,ErrRef[Right(null)])] + // to + // [GenericServer[se.scalablesolutions.akka.api.StatefulImpl]] from + // [GenericServer[se.scalablesolutions.akka.api.ClasherImpl]]] + // try { Thread.sleep(1000); } catch (InterruptedException e) {} + } +} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailerImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java similarity index 67% rename from api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailerImpl.java rename to api-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java index d8614f3850..e1bb422bf7 100644 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailerImpl.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java @@ -1,6 +1,6 @@ package se.scalablesolutions.akka.api; -public class InMemFailerImpl implements InMemFailer { +public class PersistentFailer { public void fail() { throw new RuntimeException("expected"); } diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java index 21197a885a..e531528bc8 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java @@ -6,11 +6,6 @@ package se.scalablesolutions.akka.api; import se.scalablesolutions.akka.annotation.*; import se.scalablesolutions.akka.kernel.config.*; -import se.scalablesolutions.akka.kernel.config.JavaConfig.AllForOne; -import se.scalablesolutions.akka.kernel.config.JavaConfig.Component; -import se.scalablesolutions.akka.kernel.config.JavaConfig.LifeCycle; -import se.scalablesolutions.akka.kernel.config.JavaConfig.Permanent; -import se.scalablesolutions.akka.kernel.config.JavaConfig.RestartStrategy; import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; import se.scalablesolutions.akka.kernel.Kernel; import se.scalablesolutions.akka.kernel.TransactionalMap; @@ -29,11 +24,11 @@ public class PersistentStateTest extends TestCase { protected void setUp() { conf.configureActiveObjects( - new JavaConfig.RestartStrategy(new JavaConfig.AllForOne(), 3, 5000), + new RestartStrategy(new AllForOne(), 3, 5000), new Component[] { - new Component("persistent-stateful", PersistentStateful.class, PersistentStatefulImpl.class, new LifeCycle(new Permanent(), 1000), 10000000), - new Component("persistent-failer", PersistentFailer.class, PersistentFailerImpl.class, new LifeCycle(new Permanent(), 1000), 1000), - new Component("persistent-clasher", PersistentClasher.class, PersistentClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) + new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), + new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000), + new Component(PersistentClasher.class, new LifeCycle(new Permanent(), 1000), 100000) }).supervise(); } @@ -61,96 +56,3 @@ public class PersistentStateTest extends TestCase { assertEquals("init", stateful.getState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state } } - -interface PersistentStateful { - @transactional - public void success(String key, String msg); - - @transactional - public void failure(String key, String msg, PersistentFailer failer); - - @transactional - public void clashOk(String key, String msg, PersistentClasher clasher); - - @transactional - public void clashNotOk(String key, String msg, PersistentClasher clasher); - - public String getState(String key); - - public void setState(String key, String value); -} - -class PersistentStatefulImpl implements PersistentStateful { - private TransactionalMap state = new CassandraPersistentTransactionalMap(this); - - public String getState(String key) { - return (String)state.get(key).get(); - } - - public void setState(String key, String msg) { - state.put(key, msg); - } - - public void success(String key, String msg) { - state.put(key, msg); - } - - public void failure(String key, String msg, PersistentFailer failer) { - state.put(key, msg); - failer.fail(); - } - - public void clashOk(String key, String msg, PersistentClasher clasher) { - state.put(key, msg); - clasher.clash(); - } - - public void clashNotOk(String key, String msg, PersistentClasher clasher) { - state.put(key, msg); - clasher.clash(); - clasher.clash(); - } -} - -interface PersistentFailer { - public void fail(); -} - -class PersistentFailerImpl implements PersistentFailer { - public void fail() { - throw new RuntimeException("expected"); - } -} - -interface PersistentClasher { - public void clash(); - - public String getState(String key); - - public void setState(String key, String value); -} - -class PersistentClasherImpl implements PersistentClasher { - private TransactionalMap state = new CassandraPersistentTransactionalMap(this); - - public String getState(String key) { - return (String)state.get(key).get(); - } - - public void setState(String key, String msg) { - state.put(key, msg); - } - - public void clash() { - state.put("clasher", "was here"); - // spend some time here - - // FIXME: this statement gives me this error: - // se.scalablesolutions.akka.kernel.ActiveObjectException: - // Unexpected message [!(scala.actors.Channel@c2b2f6,ErrRef[Right(null)])] - // to - // [GenericServer[se.scalablesolutions.akka.api.StatefulImpl]] from - // [GenericServer[se.scalablesolutions.akka.api.ClasherImpl]]] - // try { Thread.sleep(1000); } catch (InterruptedException e) {} - } -} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java new file mode 100644 index 0000000000..f7044b5146 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -0,0 +1,42 @@ +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.kernel.TransactionalMap; +import se.scalablesolutions.akka.kernel.CassandraPersistentTransactionalMap; +import se.scalablesolutions.akka.annotation.transactional; + +public class PersistentStateful { + private TransactionalMap state = new CassandraPersistentTransactionalMap(this); + + public String getState(String key) { + return (String)state.get(key).get(); + } + + public void setState(String key, String msg) { + state.put(key, msg); + } + + @transactional + public void success(String key, String msg) { + state.put(key, msg); + } + + @transactional + public void failure(String key, String msg, PersistentFailer failer) { + state.put(key, msg); + failer.fail(); + } + + @transactional + public void clashOk(String key, String msg, PersistentClasher clasher) { + state.put(key, msg); + clasher.clash(); + } + + @transactional + public void clashNotOk(String key, String msg, PersistentClasher clasher) { + state.put(key, msg); + clasher.clash(); + clasher.clash(); + } +} + diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java index 7687d1744c..def5859b1c 100644 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java @@ -4,7 +4,6 @@ package se.scalablesolutions.akka.api; -import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.WebResource; import com.sun.grizzly.http.SelectorThread; @@ -18,7 +17,6 @@ import javax.servlet.Servlet; import org.junit.*; import static org.junit.Assert.*; -import java.util.Map; import java.io.IOException; import java.net.URI; @@ -39,9 +37,7 @@ public class RestTest extends TestSuite { new JavaConfig.RestartStrategy(new JavaConfig.AllForOne(), 3, 5000), new JavaConfig.Component[] { new JavaConfig.Component( - "jersey-foo", JerseyFoo.class, - JerseyFooImpl.class, new JavaConfig.LifeCycle(new JavaConfig.Permanent(), 1000), 10000000) }).inject().supervise(); selector = startJersey(); diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index a87148be9e..f094d5b672 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -10,10 +10,11 @@ import config.ScalaConfig._ import java.util.{List => JList, ArrayList} import java.lang.reflect.{Method, Field} +import java.lang.annotation.Annotation + import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy -import java.lang.annotation.Annotation import org.apache.camel.{Processor, Exchange} @@ -39,11 +40,11 @@ class ActiveObjectFactory { } def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { - ActiveObject.newInstance(intf, target, server) + ActiveObject.newInstance(intf, target, server) } - def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor = - ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]]) + def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = + ActiveObject.supervise(restartStrategy, components) } /** @@ -61,13 +62,13 @@ object ActiveObject { def newInstance[T](target: Class[T], server: GenericServerContainer): T = { val proxy = Proxy.newInstance(target, false, true) // FIXME switch to weaving in the aspect at compile time - proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new ActorAroundAdvice(target, proxy, server)) + proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(target, proxy, server)) proxy.asInstanceOf[T] } def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) - proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new ActorAroundAdvice(intf, target, server)) + proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(intf, target, server)) proxy.asInstanceOf[T] } @@ -79,16 +80,6 @@ object ActiveObject { supervisor ! se.scalablesolutions.akka.kernel.Start supervisor } - - /* - private def supervise(proxy: AnyRef): Supervisor = - supervise( - RestartStrategy(OneForOne, 5, 1000), - Worker( - proxy.server, - LifeCycle(Permanent, 100)) - :: Nil) - */ } /** @@ -96,30 +87,27 @@ object ActiveObject { */ // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts -sealed class ActorAroundAdvice(target: Class[_], - targetInstance: AnyRef, - val server: GenericServerContainer) extends AroundAdvice { - val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) +sealed class TransactionalAroundAdvice(target: Class[_], + targetInstance: AnyRef, + server: GenericServerContainer) extends AroundAdvice { + val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) server.transactionalRefs = refs server.transactionalMaps = maps server.transactionalVectors = vectors import ActiveObject.threadBoundTx private[this] var activeTx: Option[Transaction] = None - + def invoke(joinpoint: JoinPoint): AnyRef = { // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint - val method = joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod + val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] + val method = rtti.getMethod if (method.isAnnotationPresent(Annotations.transactional)) { if (activeTx.isDefined) { val tx = activeTx.get - //val cflowTx = threadBoundTx.get - // if (cflowTx.isDefined && cflowTx.get != tx) { - // new tx in scope; try to commit tx.commit(server) threadBoundTx.set(None) activeTx = None - // } } // FIXME: check if we are already in a transaction if so NEST (set parent) val newTx = new Transaction @@ -134,37 +122,14 @@ sealed class ActorAroundAdvice(target: Class[_], activeTx = Some(currentTx) } activeTx = threadBoundTx.get - invoke(joinpoint, activeTx) - //invoke(Invocation(method, joinpoint.getRtti.asInstanceOf[MethodRtti].getParameterValues, targetInstance, activeTx)) - } - private def invoke(joinpoint: JoinPoint, tx: Option[Transaction]): AnyRef = { + // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint val result: AnyRef = -/* - if (joinpoint.target.isInstanceOf[MessageDriven] && - joinpoint.method.getName == "onMessage") { - val m = joinpoint.method - - val endpointName = m.getDeclaringClass.getName + "." + m.getName - val activeObjectName = m.getDeclaringClass.getName - val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m)) - val producer = endpoint.createProducer - val exchange = endpoint.createExchange - exchange.getIn().setBody(joinpoint) - producer.process(exchange) - val fault = exchange.getException(); - if (fault != null) throw new InvocationTargetException(fault) - - // FIXME: need some timeout and future here... - exchange.getOut.getBody - - } else */ - // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint - if (joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod.isAnnotationPresent(Annotations.oneway)) { - server ! (tx, joinpoint) + if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) { + server ! (activeTx, joinpoint) } else { val result: ErrRef[AnyRef] = - server !!! ((tx, joinpoint), { + server !!! ((activeTx, joinpoint), { var ref = ErrRef(activeTx) ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds") ref @@ -172,8 +137,7 @@ sealed class ActorAroundAdvice(target: Class[_], try { result() } catch { - case e => - println("$$$$$$$$$$$$$$ " + joinpoint) + case e => rollback(result.tx) throw e } @@ -190,29 +154,41 @@ sealed class ActorAroundAdvice(target: Class[_], threadBoundTx.set(Some(tx)) } - private def getTransactionalItemsFor(targetInstance: AnyRef): + /** + * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top. + */ + private def getTransactionalItemsFor(targetInstance: AnyRef): Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = { require(targetInstance != null) - var maps: List[TransactionalMap[_, _]] = Nil - var vectors: List[TransactionalVector[_]] = Nil - var refs: List[TransactionalRef[_]] = Nil + var maps: List[TransactionalMap[_, _]] = Nil + var refs: List[TransactionalRef[_]] = Nil + var vectors: List[TransactionalVector[_]] = Nil + + def getTransactionalItemsFor(target: Class[_]): + Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = { for { - field <- targetInstance.getClass.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]] - fieldType = field.getType - if fieldType == classOf[TransactionalMap[_, _]] || - fieldType == classOf[TransactionalVector[_]] || - fieldType == classOf[TransactionalRef[_]] - txItem = { - field.setAccessible(true) - field.get(targetInstance) + field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]] + fieldType = field.getType + if fieldType == classOf[TransactionalMap[_, _]] || + fieldType == classOf[TransactionalVector[_]] || + fieldType == classOf[TransactionalRef[_]] + txItem = { + field.setAccessible(true) + field.get(targetInstance) + } + if txItem != null + } { + if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]] + else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]] + else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]] } - if txItem != null - } { - if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]] - else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]] - else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]] + val parent = target.getSuperclass + if (parent == null) (maps, vectors, refs) + else getTransactionalItemsFor(parent) } - (maps, vectors, refs) + + // start the search for transactional items, crawl the class hierarchy up until we reach 'null' + getTransactionalItemsFor(targetInstance.getClass) } } @@ -244,60 +220,10 @@ private[kernel] class Dispatcher(val targetName: String) extends GenericServer { case unexpected => throw new ActiveObjectException("Unexpected message [" + unexpected + "] to [" + this + "] from [" + sender + "]") } - + override def toString(): String = "GenericServer[" + targetName + "]" } -/** - * Represents a snapshot of the current invocation. - * - * @author Jonas Bonér - */ -private[kernel] case class Invocation(val method: Method, - val args: Array[AnyRef], - val target: AnyRef, - val tx: Option[Transaction]) { - method.setAccessible(true) - - def invoke: AnyRef = synchronized { - println("======== " + this.toString) - if (method.getDeclaringClass.isInterface) { - target.getClass.getDeclaredMethod(method.getName, method.getParameterTypes).invoke(target, args:_*) - } else method.invoke(target, args:_*) - } - - override def toString: String = synchronized { - "Invocation [method: " + method.getName + ", args: " + argsToString(args) + ", target: " + target + "]" - } - - override def hashCode(): Int = synchronized { - var result = HashCode.SEED - result = HashCode.hash(result, method) - result = HashCode.hash(result, args) - result = HashCode.hash(result, target) - result - } - - override def equals(that: Any): Boolean = synchronized { - that != null && - that.isInstanceOf[Invocation] && - that.asInstanceOf[Invocation].method == method && - that.asInstanceOf[Invocation].target == target && - isEqual(that.asInstanceOf[Invocation].args, args) - } - - private[this] def isEqual(a1: Array[Object], a2: Array[Object]): Boolean = - (a1 == null && a2 == null) || - (a1 != null && - a2 != null && - a1.size == a2.size && - a1.zip(a2).find(t => t._1 == t._2).isDefined) - - private[this] def argsToString(array: Array[Object]): String = - array.foldLeft("(")(_ + " " + _) + ")" -} - - /* ublic class CamelInvocationHandler implements InvocationHandler { private final Endpoint endpoint; @@ -332,4 +258,24 @@ ublic class CamelInvocationHandler implements InvocationHandler { } } } + + if (joinpoint.target.isInstanceOf[MessageDriven] && + joinpoint.method.getName == "onMessage") { + val m = joinpoint.method + + val endpointName = m.getDeclaringClass.getName + "." + m.getName + val activeObjectName = m.getDeclaringClass.getName + val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m)) + val producer = endpoint.createProducer + val exchange = endpoint.createExchange + exchange.getIn().setBody(joinpoint) + producer.process(exchange) + val fault = exchange.getException(); + if (fault != null) throw new InvocationTargetException(fault) + + // FIXME: need some timeout and future here... + exchange.getOut.getBody + + } else + */ \ No newline at end of file diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala index 9ecf942619..205ce99568 100644 --- a/kernel/src/main/scala/GenericServer.scala +++ b/kernel/src/main/scala/GenericServer.scala @@ -23,6 +23,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServ * @author Jonas Bonér */ trait GenericServer extends Actor { + private val actorScheduler = new ManagedActorScheduler /** * Template method implementing the server logic. @@ -62,6 +63,8 @@ trait GenericServer extends Actor { def act = loop { react { genericBase orElse actorBase } } + //override protected def scheduler = actorScheduler + private def actorBase: PartialFunction[Any, Unit] = hotswap getOrElse body private var hotswap: Option[PartialFunction[Any, Unit]] = None @@ -226,6 +229,7 @@ class GenericServerContainer( */ def !!![T](message: Any, errorHandler: => T, time: Int): T = { require(server != null) + println("---------- SERVER " + server + " MESSAGE " + message) val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } future.receiveWithin(time) match { case None => errorHandler diff --git a/kernel/src/main/scala/ManagedActorScheduler.scala b/kernel/src/main/scala/ManagedActorScheduler.scala index cede9e6a5b..64960fbd34 100644 --- a/kernel/src/main/scala/ManagedActorScheduler.scala +++ b/kernel/src/main/scala/ManagedActorScheduler.scala @@ -1,6 +1,6 @@ /** -* Copyright (C) 2009 Scalable Solutions. -*/ + * Copyright (C) 2009 Scalable Solutions. + */ package se.scalablesolutions.akka.kernel @@ -12,9 +12,45 @@ import scala.actors.{FJTaskScheduler2, Scheduler, IScheduler, Actor} // FIXME: add managing interface to this class using JMX // FIXME: configure one instance per GenericServer -object ManagedActorScheduler extends Logging { + +class ManagedActorScheduler extends IScheduler with Logging { + protected var threadPoolSize = 10 + protected val threadPool = Executors.newFixedThreadPool(threadPoolSize) + + def execute(fun: => Unit): Unit = threadPool.execute(new Runnable { + def run = { + try { + fun + } catch { + case e => log.error("Actor scheduler", e) + } + } + }) + + def execute(task: Runnable) = threadPool.execute(new Runnable { + def run = { + try { + task.run + } catch { + case e => log.error("Actor scheduler", e) + } + } + }) + + def tick(a: Actor): Unit = {} + + def shutdown: Unit = threadPool.shutdown + + def onLockup(handler: () => Unit): Unit = {} + + def onLockup(millis: Int)(handler: () => Unit): Unit = {} + + def printActorDump: Unit = {} +} + +object GlobalManagedActorScheduler extends Logging { @volatile private var isRegistered = false - private var threadPoolSize = 10 + private var threadPoolSize = 10 def register = synchronized { if (!isRegistered) { @@ -24,38 +60,8 @@ object ManagedActorScheduler extends Logging { fj.shutdown case _ => } - - Scheduler.impl = { - val threadPool = Executors.newFixedThreadPool(threadPoolSize) - new IScheduler { - def execute(fun: => Unit): Unit = threadPool.execute(new Runnable { - def run = { - try { - fun - } catch { - case e => log.error("Actor scheduler", e) - } - } - }) - - def execute(task: Runnable) = es.execute(new Runnable { - def run = { - try { - task.run - } catch { - case e => log.error("Actor scheduler", e) - } - } - }) - - def tick(a: Actor): Unit = {} - def shutdown: Unit = { threadPool.shutdown } - def onLockup(handler: () => Unit): Unit = {} - def onLockup(millis: Int)(handler: () => Unit): Unit = {} - def printActorDump: Unit = {} - } - } + Scheduler.impl = new ManagedActorScheduler + isRegistered = true } - isRegistered = true } } \ No newline at end of file diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 6b0732f015..2a5a52e45f 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -29,9 +29,10 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC private var supervisor: Supervisor = _ private var restartStrategy: RestartStrategy = _ private var components: List[Component] = _ + private var workers: List[Worker] = Nil private var bindings: List[DependencyBinding] = Nil private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? - private var activeObjectRegistry = new HashMap[Class[_], Tuple2[Component, GenericServerContainer]] + private var activeObjectRegistry = new HashMap[Class[_], Tuple4[AnyRef, AnyRef, Component, GenericServerContainer]] private var activeObjectFactory = new ActiveObjectFactory private var camelContext = new DefaultCamelContext private var modules = new java.util.ArrayList[Module] @@ -44,23 +45,14 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC * @return the active object for the class */ override def getActiveObject[T](clazz: Class[T]): T = synchronized { - log.debug("Creating new active object [%s]", clazz.getName) + log.debug("Retrieving active object [%s]", clazz.getName) if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getActiveObject(clazz)") - val activeObjectOption: Option[Tuple2[Component, GenericServerContainer]] = activeObjectRegistry.get(clazz) - if (activeObjectOption.isDefined) { - val (component, server) = activeObjectOption.get - server.setTimeout(component.timeout) - val proxy = if (component.intf == null) { // subclassing proxy - activeObjectFactory.newInstance(component.target, server).asInstanceOf[T] - } else { // delegating proxy - component.target.getConstructor(Array[Class[_]]()).setAccessible(true) - val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry - activeObjectFactory.newInstance(component.intf, targetInstance, server).asInstanceOf[T] - } - injector.injectMembers(proxy) - proxy - } else throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'supervise') method") + val (proxy, targetInstance, component, server) = + activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'configureActiveObjects' and then invoking 'supervise') method")) + injector.injectMembers(targetInstance) + proxy.asInstanceOf[T] } + /* override def getActiveObjectProxy(clazz: Class[_]): ActiveObjectProxy = synchronized { log.debug("Looking up active object proxy [%s]", clazz.getName) @@ -74,8 +66,12 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC injector.getInstance(clazz).asInstanceOf[T] } - override def getComponentInterfaces: List[Class[_]] = components.map(_.intf) - + override def getComponentInterfaces: List[Class[_]] = + for (c <- components) yield { + if (c.intf.isDefined) c.intf.get + else c.target + } + override def getRoutingEndpoint(uri: String): Endpoint = synchronized { camelContext.getEndpoint(uri) } @@ -88,17 +84,46 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC camelContext.getEndpoints(uri) } - override def configureActiveObjects(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectConfigurator = synchronized { + override def configureActiveObjects( + restartStrategy: RestartStrategy, + components: List[Component]): ActiveObjectConfigurator = synchronized { this.restartStrategy = restartStrategy this.components = components.toArray.toList.asInstanceOf[List[Component]] - bindings = for (c <- this.components) - yield new DependencyBinding(c.intf, c.target) // build up the Guice interface class -> impl class bindings + bindings = for (component <- this.components) yield { + if (component.intf.isDefined) newDelegatingProxy(component) + else newSubclassingProxy(component) + } + //camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) + //for (method <- component.intf.getDeclaredMethods.toList) registerMethodForUri(method, component.name) + //log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) val deps = new java.util.ArrayList[DependencyBinding](bindings.size) for (b <- bindings) deps.add(b) modules.add(new ActiveObjectGuiceModule(deps)) this } + private def newSubclassingProxy(component: Component): DependencyBinding = { + val targetClass = component.target + val server = new GenericServerContainer(targetClass.getName, () => new Dispatcher(component.target.getName)) + server.setTimeout(component.timeout) + workers ::= Worker(server, component.lifeCycle) + val proxy = activeObjectFactory.newInstance(targetClass, server).asInstanceOf[AnyRef] + activeObjectRegistry.put(targetClass, (proxy, proxy, component, server)) + new DependencyBinding(targetClass, proxy) + } + + private def newDelegatingProxy(component: Component): DependencyBinding = { + val targetClass = component.intf.get + val server = new GenericServerContainer(targetClass.getName, () => new Dispatcher(component.target.getName)) + server.setTimeout(component.timeout) + workers ::= Worker(server, component.lifeCycle) + component.target.getConstructor(Array[Class[_]]()).setAccessible(true) + val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry + val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, server).asInstanceOf[AnyRef] + activeObjectRegistry.put(targetClass, (proxy, targetInstance, component, server)) + new DependencyBinding(targetClass, proxy) + } + override def inject: ActiveObjectConfigurator = synchronized { if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator") injector = Guice.createInjector(modules) @@ -107,17 +132,6 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC override def supervise: ActiveObjectConfigurator = synchronized { if (injector == null) inject - var workers = new java.util.ArrayList[Worker] - for (component <- components) { - val target = if (component.intf != null) component.intf // TODO: use Option - else component.target - val server = new GenericServerContainer(target.getName, () => new Dispatcher(component.target.getName)) - activeObjectRegistry.put(target, (component, server)) - workers.add(Worker(server, component.lifeCycle)) - //camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) - for (method <- component.intf.getDeclaredMethods.toList) registerMethodForUri(method, component.name) - log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) - } supervisor = activeObjectFactory.supervise(restartStrategy, workers) //camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) //camelContext.start @@ -154,7 +168,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC def reset = synchronized { modules = new java.util.ArrayList[Module] configRegistry = new HashMap[Class[_], Component] - activeObjectRegistry = new HashMap[Class[_], Tuple2[Component, GenericServerContainer]] + activeObjectRegistry = new HashMap[Class[_], Tuple4[AnyRef, AnyRef, Component, GenericServerContainer]] methodToUriRegistry = new HashMap[Method, String] injector = null restartStrategy = null diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala index 68da4fe643..b74f2de230 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala @@ -57,6 +57,7 @@ class ActiveObjectGuiceConfiguratorForJava { this } + def getComponentInterfaces: List[Class[_]] = { val al = new ArrayList[Class[_]] for (c <- INSTANCE.getComponentInterfaces) al.add(c) diff --git a/kernel/src/main/scala/config/Config.scala b/kernel/src/main/scala/config/Config.scala index 4eeb106b3b..e1f6c5fdf3 100644 --- a/kernel/src/main/scala/config/Config.scala +++ b/kernel/src/main/scala/config/Config.scala @@ -33,11 +33,18 @@ object ScalaConfig { case object Transient extends Scope case object Temporary extends Scope - case class Component(val name: String, - val intf: Class[_], + class Component(_intf: Class[_], val target: Class[_], val lifeCycle: LifeCycle, - val timeout: Int) extends Server + val timeout: Int) extends Server { + val intf: Option[Class[_]] = if (_intf == null) None else Some(_intf) + } + object Component { + def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) = + new Component(intf, target, lifeCycle, timeout) + def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int) = + new Component(null, target, lifeCycle, timeout) + } } /** @@ -81,13 +88,14 @@ object JavaConfig { } abstract class Server extends ConfigElement - class Component(@BeanProperty val name: String, - @BeanProperty val intf: Class[_], + class Component(@BeanProperty val intf: Class[_], @BeanProperty val target: Class[_], @BeanProperty val lifeCycle: LifeCycle, @BeanProperty val timeout: Int) extends Server { + def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int) = + this(null, target, lifeCycle, timeout) def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Component( - name, intf, target, lifeCycle.transform, timeout) + intf, target, lifeCycle.transform, timeout) def newWorker(server: GenericServerContainer) = se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(server, lifeCycle.transform) } diff --git a/kernel/src/test/scala/JerseySpec.scala b/kernel/src/test/scala/JerseySpec.scala index f2c6c12eac..fbc836a737 100644 --- a/kernel/src/test/scala/JerseySpec.scala +++ b/kernel/src/test/scala/JerseySpec.scala @@ -32,30 +32,29 @@ class JerseySpec extends Spec with ShouldMatchers { describe("A Jersey REST service") { it("should ...") { - //val selector = startJersey - //oselector.start - + val selector = startJersey + selector.start + /* val conf = new ActiveObjectGuiceConfigurator conf.configureActiveObjects( RestartStrategy(AllForOne, 3, 5000), Component( - "jerseyfoo", classOf[resource.JerseyFoo], - classOf[resource.JerseyFooImpl], LifeCycle(Permanent, 1000), 1000) :: Nil).supervise conf.getActiveObject(classOf[resource.JerseyFoo]) + */ -/* + /* val client = Client.create val webResource = client.resource(UriBuilder.fromUri("http://localhost/").port(9998).build) //val webResource = client.resource("http://localhost:9998/foo") val responseMsg = webResource.get(classOf[String]) responseMsg should equal ("Hello World") selector.stopEndpoint -*/ + */ } } @@ -79,20 +78,14 @@ class JerseySpec extends Spec with ShouldMatchers { package resource { import javax.ws.rs.{Produces, Path, GET} - trait JerseyFoo { - def foo: String - } - trait JerseyBar { - def bar(msg: String): String - } - - @Path("/foo") - class JerseyFooImpl extends JerseyFoo { + class JerseyFoo { @GET @Produces(Array("application/json")) def foo: String = { val ret = "JerseyFoo.foo"; println(ret); ret } } - class JerseyBarImpl extends JerseyBar { + @Path("/foo") + class JerseyFooSub extends JerseyFoo + class JerseyBar { def bar(msg: String) = msg + "return_bar " } } diff --git a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java index 4e135bd887..89632f5d79 100644 --- a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java +++ b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java @@ -24,7 +24,9 @@ public class ActiveObjectGuiceModule extends AbstractModule { bind(ResourceProviderFactory.class); for (int i = 0; i < bindings.size(); i++) { final DependencyBinding db = bindings.get(i); - bind((Class) db.getInterface()).to((Class) db.getTarget()).in(Singleton.class); + //if (db.getInterface() != null) bind((Class) db.getInterface()).to((Class) db.getTarget()).in(Singleton.class); + //else + this.bind(db.getInterface()).toInstance(db.getTarget()); } } } diff --git a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java index 42176be9dc..40d45869a8 100644 --- a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java +++ b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java @@ -9,16 +9,16 @@ package se.scalablesolutions.akka.kernel.config; */ public class DependencyBinding { private final Class intf; - private final Class target; + private final Object target; - public DependencyBinding(final Class intf, final Class target) { + public DependencyBinding(final Class intf, final Object target) { this.intf = intf; this.target = target; } public Class getInterface() { return intf; } - public Class getTarget() { + public Object getTarget() { return target; } }