diff --git a/akka.ipr b/akka.ipr index 5257a9ab72..db730f4b1d 100644 --- a/akka.ipr +++ b/akka.ipr @@ -142,6 +142,8 @@ + + diff --git a/akka.iws b/akka.iws index 51a8ff6cd6..730f471357 100644 --- a/akka.iws +++ b/akka.iws @@ -1,34 +1,31 @@ - - + + - - + + + + - + - - - - + - + - - - - - + + + @@ -49,20 +46,7 @@ - - - + - - - - @@ -177,75 +135,10 @@ - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -254,7 +147,79 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -342,6 +307,80 @@ @@ -1846,19 +1535,19 @@ - + - + - - + + @@ -1870,13 +1559,13 @@ - + - + @@ -1889,7 +1578,7 @@ - + - - - - - - - - - + + + + + + + + + + + + + + + + + + - + - - - - - - - - - - - - + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + @@ -2414,22 +2187,25 @@ - + - - + + - - - + + + - - - + + + + + + localhost @@ -2510,8 +2286,8 @@ - - + + @@ -2520,7 +2296,7 @@ - + @@ -2575,117 +2351,115 @@ - + - + - + - + - + - + - + - + - + - + - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 9d7711a6af..39b1ed8ece 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -17,14 +17,15 @@ public class InMemoryStateTest extends TestCase { final private ActiveObjectFactory factory = new ActiveObjectFactory(); protected void setUp() { + conf.configureActiveObjects( new RestartStrategy(new AllForOne(), 3, 5000), - new Component[] { - // FIXME: remove string-name, add ctor to only accept target class - new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), - new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000) - //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) - }).inject().supervise(); + new Component[]{ + // FIXME: remove string-name, add ctor to only accept target class + new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), + new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000) + //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) + }).inject().supervise(); } protected void tearDown() { @@ -32,23 +33,23 @@ public class InMemoryStateTest extends TestCase { } public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); // conf.getActiveObject(InMemStateful.class); + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { - InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); // conf.getActiveObject(InMemStateful.class); - stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state - } + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = conf.getActiveObject(InMemStateful.class); @@ -59,16 +60,16 @@ public class InMemoryStateTest extends TestCase { } public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { - InMemStateful stateful = conf.getActiveObject(InMemStateful.class); - stateful.setVectorState("init"); // set init state - InMemFailer failer = conf.getActiveObject(InMemFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getVectorState()); // check that state is == init state - } + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setVectorState("init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getVectorState()); // check that state is == init state + } public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = conf.getActiveObject(InMemStateful.class); @@ -79,29 +80,29 @@ public class InMemoryStateTest extends TestCase { } public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { - InMemStateful stateful = conf.getActiveObject(InMemStateful.class); - stateful.setRefState("init"); // set init state - InMemFailer failer = conf.getActiveObject(InMemFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getRefState()); // check that state is == init state - } - /* - public void testNestedNonTransactionalMethodHangs() { - InMemStateful stateful = conf.getActiveObject(InMemStateful.class); - stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - InMemFailer failer = conf.getActiveObject(InMemFailer.class); - try { - stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state - } - */ + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setRefState("init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getRefState()); // check that state is == init state + } + /* + public void testNestedNonTransactionalMethodHangs() { + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } + */ // public void testShouldRollbackStateForStatefulServerInCaseOfMessageClash() // { // InMemStateful stateful = conf.getActiveObject(InMemStateful.class); diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java new file mode 100644 index 0000000000..b5fe68e463 --- /dev/null +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.kernel.config.*; +import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; +import se.scalablesolutions.akka.kernel.actor.*; +import se.scalablesolutions.akka.kernel.nio.NettyServer; + +import junit.framework.TestCase; + +public class RemoteInMemoryStateTest extends TestCase { + static String messageLog = ""; + + static { + new Thread(new Runnable() { + public void run() { + NettyServer server = new NettyServer(); + server.connect(); + } + }).start(); + } + final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); + final private se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory factory = new se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory(); + + protected void setUp() { + new se.scalablesolutions.akka.kernel.nio.NettyServer(); + conf.configureActiveObjects( + new RestartStrategy(new AllForOne(), 3, 5000), + new Component[]{ + // FIXME: remove string-name, add ctor to only accept target class + new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), + new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000) + //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) + }).inject().supervise(); + } + + protected void tearDown() { + conf.stop(); + } + + public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); + stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); + } + + public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { + InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } + + public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); + stateful.setVectorState("init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit + assertEquals("new state", stateful.getVectorState()); + } + + public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { + InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); + stateful.setVectorState("init"); // set init state + InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getVectorState()); // check that state is == init state + } + + public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); + stateful.setRefState("init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit + assertEquals("new state", stateful.getRefState()); + } + + public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { + InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); + stateful.setRefState("init"); // set init state + InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getRefState()); // check that state is == init state + } + /* + public void testNestedNonTransactionalMethodHangs() { + InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + InMemFailer failer = conf.getActiveObject(InMemFailer.class); + try { + stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } + */ + // public void testShouldRollbackStateForStatefulServerInCaseOfMessageClash() + // { + // InMemStateful stateful = conf.getActiveObject(InMemStateful.class); + // stateful.setState("stateful", "init"); // set init state + // + // InMemClasher clasher = conf.getActiveObject(InMemClasher.class); + // clasher.setState("clasher", "init"); // set init state + // + // // try { + // // stateful.clashOk("stateful", "new state", clasher); + // // } catch (RuntimeException e) { } // expected + // // assertEquals("new state", stateful.getState("stateful")); // check that + // // state is == init state + // // assertEquals("was here", clasher.getState("clasher")); // check that + // // state is == init state + // + // try { + // stateful.clashNotOk("stateful", "new state", clasher); + // fail("should have thrown an exception"); + // } catch (RuntimeException e) { + // System.out.println(e); + // } // expected + // assertEquals("init", stateful.getState("stateful")); // check that state is + // // == init state + // // assertEquals("init", clasher.getState("clasher")); // check that state + // is + // // == init state + // } +} \ No newline at end of file diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java new file mode 100644 index 0000000000..0f5015ef24 --- /dev/null +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.annotation.*; +import se.scalablesolutions.akka.kernel.config.*; +import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; +import se.scalablesolutions.akka.kernel.Kernel; +import se.scalablesolutions.akka.kernel.state.TransactionalMap; +import se.scalablesolutions.akka.kernel.state.CassandraPersistentTransactionalMap; +import se.scalablesolutions.akka.kernel.actor.*; +import se.scalablesolutions.akka.kernel.nio.NettyServer; +import junit.framework.TestCase; + +public class RemotePersistentStateTest extends TestCase { + static String messageLog = ""; + + static { + System.setProperty("storage-config", "config"); + Kernel.startCassandra(); + new Thread(new Runnable() { + public void run() { + NettyServer server = new NettyServer(); + server.connect(); + } + }).start(); + } + final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); + + final private ActiveObjectFactory factory = new ActiveObjectFactory(); + + protected void setUp() { + conf.configureActiveObjects( + new RestartStrategy(new AllForOne(), 3, 5000), + new Component[] { + new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), + new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000) + //new Component(PersistentClasher.class, new LifeCycle(new Permanent(), 1000), 100000) + }).supervise(); + } + + protected void tearDown() { + conf.stop(); + } + + public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); + stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); + } + + public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { + PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + } + + public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); + stateful.setVectorState("init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + assertEquals("init", stateful.getVectorState(0)); + assertEquals("new state", stateful.getVectorState(1)); + } + + public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { + PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); + stateful.setVectorState("init"); // set init state + PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getVectorState(0)); // check that state is == init state + } + + public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); + stateful.setRefState("init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + assertEquals("new state", stateful.getRefState()); + } + + public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { + PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); + stateful.setRefState("init"); // set init state + PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getRefState()); // check that state is == init state + } +} \ No newline at end of file diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala index 1a4e45d519..cc39caed0a 100644 --- a/kernel/src/main/scala/actor/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -133,7 +133,7 @@ object ActiveObject { // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts @serializable sealed class SequentialTransactionalAroundAdvice( - target: Class[_], targetInstance: AnyRef, actor: Actor, val remote: Boolean) extends AroundAdvice { + target: Class[_], targetInstance: AnyRef, actor: Actor, val isRemote: Boolean) extends AroundAdvice { private val changeSet = new ChangeSet(target.getName) private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) @@ -147,7 +147,7 @@ object ActiveObject { dispatcher.start import ActiveObject.threadBoundTx - private[this] var activeTx: Option[Transaction] = None + private[kernel] var activeTx: Option[Transaction] = None // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint def invoke(joinpoint: JoinPoint): AnyRef = { @@ -164,9 +164,9 @@ object ActiveObject { } try { incrementTransaction - if (remote) { + if (isRemote) { val future = NettyClient.send( - new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, isOneWay, false)) + new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, activeTx, isOneWay, false)) if (isOneWay) null // for void methods else { future.await_? @@ -232,9 +232,7 @@ object ActiveObject { removeTransactionIfTopLevel true } else false - - - + private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { try { result() @@ -271,17 +269,6 @@ object ActiveObject { else true } else true - /* - private def sendOneWay(joinpoint: JoinPoint) = - mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult)) - - private def sendAndReceiveEventually(joinpoint: JoinPoint): ResultOrFailure[AnyRef] = { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout(Invocation(joinpoint, activeTx), 1000) // FIXME configure - future.await_? - getResultOrThrowException(future) - } - */ - private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = { val future = new DefaultCompletableFutureResult(timeout) mailbox.append(new MessageHandle(this, message, future)) @@ -292,15 +279,13 @@ object ActiveObject { if (future.exception.isDefined) { val (_, cause) = future.exception.get throw new TransactionAwareException(cause, activeTx) - } else future.result.asInstanceOf[Option[T]] -/* - if (future.exception.isDefined) { - var resultOrFailure = ResultOrFailure(activeTx) - val (toBlame, cause) = future.exception.get - resultOrFailure() = throw cause - resultOrFailure - } else ResultOrFailure(future.result.get, activeTx) - */ + } else { + if (future.result.isDefined) { + val (res, tx) = future.result.get.asInstanceOf[Tuple2[AnyRef, Option[Transaction]]] + Some(res).asInstanceOf[Option[T]] + } else None + } + /** * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top. */ @@ -387,11 +372,7 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor { } catch { case e => throw new TransactionAwareException(e, tx) -/* - val resultOrFailure = ResultOrFailure(tx) - resultOrFailure() = throw e - reply(resultOrFailure) -*/ } + } case unexpected => throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]") diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala index 75be5aa015..6c290cdbf1 100644 --- a/kernel/src/main/scala/actor/Actor.scala +++ b/kernel/src/main/scala/actor/Actor.scala @@ -8,6 +8,8 @@ import java.util.concurrent.{CopyOnWriteArraySet, TimeUnit} import kernel.reactor._ import kernel.config.ScalaConfig._ +import kernel.nio.{NettyClient, RemoteRequest} +import kernel.stm.Transaction import kernel.util.Logging import kernel.util.Helpers._ @@ -32,7 +34,8 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler { trait Actor extends Logging { var timeout: Long = 5000L - + var isRemote = false + @volatile private[this] var isRunning: Boolean = false protected[this] var id: String = super.toString protected[this] var dispatcher: MessageDispatcher = _ @@ -50,6 +53,9 @@ trait Actor extends Logging { // ==== USER CALLBACKS TO OVERRIDE ==== // ==================================== + /** + * TODO: document + */ protected var faultHandler: Option[FaultHandlingStrategy] = None /** @@ -61,7 +67,7 @@ trait Actor extends Logging { /** * Set trapExit to true if actor should be able to trap linked actors exit messages. */ - @volatile protected[this] var trapExit: Boolean = false + protected[this] var trapExit: Boolean = false /** * Partial function implementing the server logic. @@ -112,9 +118,10 @@ trait Actor extends Logging { // ==== API ==== // ============= - def !(message: AnyRef) = - if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult)) - else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + def !(message: AnyRef) = if (isRunning) { + if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false)) + else mailbox.append(new MessageHandle(this, message, new NullFutureResult)) + } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) @@ -130,8 +137,13 @@ trait Actor extends Logging { getResultOrThrowException(future).get } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") - // FIXME can be deadlock prone - HOWTO? - def link(actor: Actor) = synchronized { actor.synchronized { + protected[this] def reply(message: AnyRef) = senderFuture match { + case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." ) + case Some(future) => future.completeWithResult(message) + } + + // FIXME can be deadlock prone if cyclic linking? - HOWTO? + protected[this] def link(actor: Actor) = synchronized { actor.synchronized { if (isRunning) { linkedActors.add(actor) if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") @@ -140,8 +152,8 @@ trait Actor extends Logging { } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") }} - // FIXME can be deadlock prone - HOWTO? - def unlink(actor: Actor) = synchronized { actor.synchronized { + // FIXME can be deadlock prone if cyclic linking? - HOWTO? + protected[this] def unlink(actor: Actor) = synchronized { actor.synchronized { if (isRunning) { if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink") linkedActors.remove(actor) @@ -150,14 +162,6 @@ trait Actor extends Logging { } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") }} - /** - * Atomically start and link actor. - */ - def spawnLink(actor: Actor) = actor.synchronized { - actor.start - link(actor) - } - def start = synchronized { if (!isRunning) { dispatcherType match { @@ -184,11 +188,39 @@ trait Actor extends Logging { } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } - protected def reply(message: AnyRef) = senderFuture match { - case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so switch to '!!' to wait using a future." ) - case Some(future) => future.completeWithResult(message) + /** + * Atomically start and link actor. + */ + def spawnLink(actor: Actor) = actor.synchronized { + actor.start + link(actor) } + /** + * Atomically start and link a remote actor. + */ + def spawnLinkRemote(actor: Actor) = actor.synchronized { + actor.makeRemote + actor.start + link(actor) + } + + def spawn(actorClass: Class[_]) = { + + // FIXME: should pass in dispatcher etc. - inherit + } + + def spawnRemote(actorClass: Class[_]) = { + } + + def spawnLink(actorClass: Class[_]) = { + } + + def spawnLinkRemote(actorClass: Class[_]) = { + } + + def makeRemote = isRemote = true + // ================================ // ==== IMPLEMENTATION DETAILS ==== // ================================ @@ -205,18 +237,38 @@ trait Actor extends Logging { } } - private def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: AnyRef, timeout: Long): CompletableFutureResult = { - val future = new DefaultCompletableFutureResult(timeout) - mailbox.append(new MessageHandle(this, message, future)) - future + private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = { + if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false)) + else { + val future = new DefaultCompletableFutureResult(timeout) + mailbox.append(new MessageHandle(this, message, future)) + future + } } private def getResultOrThrowException[T](future: FutureResult): Option[T] = + if (isRemote) getRemoteResultOrThrowException(future) + else { + if (future.exception.isDefined) { + val (_, cause) = future.exception.get + throw cause + } else { + future.result.asInstanceOf[Option[T]] + } + } + + // FIXME: UGLY - Either: make the remote tx work + // FIXME: OR: remove this method along with the tx tuple making in NettyClient.messageReceived and ActiveObject.getResultOrThrowException + private def getRemoteResultOrThrowException[T](future: FutureResult): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get - throw cause - } else future.result.asInstanceOf[Option[T]] + throw cause // throw new TransactionAwareException(cause, activeTx) + } else { + if (future.result.isDefined) { + val (res, tx) = future.result.get.asInstanceOf[Tuple2[Option[T], Option[Transaction]]] + res + } else None + } private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive) diff --git a/kernel/src/main/scala/collection/HashTrie.scala b/kernel/src/main/scala/collection/HashTrie.scala index efc89b42b1..02ad19ffe9 100755 --- a/kernel/src/main/scala/collection/HashTrie.scala +++ b/kernel/src/main/scala/collection/HashTrie.scala @@ -42,6 +42,7 @@ package se.scalablesolutions.akka.collection * @author Daniel Spiewak * @author Rich Hickey */ +@serializable final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] { lazy val size = root.size @@ -73,6 +74,7 @@ object HashTrie { // ============================================================================ // nodes +@serializable private[collection] sealed trait Node[K, +V] { val size: Int @@ -85,7 +87,7 @@ private[collection] sealed trait Node[K, +V] { def elements: Iterator[(K, V)] } - +@serializable private[collection] class EmptyNode[K] extends Node[K, Nothing] { val size = 0 diff --git a/kernel/src/main/scala/collection/Vector.scala b/kernel/src/main/scala/collection/Vector.scala index 976b46fcbe..437f263551 100755 --- a/kernel/src/main/scala/collection/Vector.scala +++ b/kernel/src/main/scala/collection/Vector.scala @@ -42,6 +42,7 @@ import Vector._ * @author Daniel Spiewak * @author Rich Hickey */ +@serializable class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail: Array[AnyRef]) extends RandomAccessSeq[T] { outer => private val tailOff = length - tail.length diff --git a/kernel/src/main/scala/nio/NettyClient.scala b/kernel/src/main/scala/nio/NettyClient.scala index 2a8df9893f..bb76accb02 100644 --- a/kernel/src/main/scala/nio/NettyClient.scala +++ b/kernel/src/main/scala/nio/NettyClient.scala @@ -116,8 +116,9 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu val result = event.getMessage if (result.isInstanceOf[RemoteReply]) { val reply = result.asInstanceOf[RemoteReply] + val tx = reply.tx val future = futures.get(reply.id) - if (reply.successful) future.completeWithResult(reply.message) + if (reply.successful) future.completeWithResult((reply.message, tx)) else future.completeWithException(null, reply.exception) } else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result) } catch { diff --git a/kernel/src/main/scala/nio/NettyServer.scala b/kernel/src/main/scala/nio/NettyServer.scala index 109cff8898..b7ff74f459 100644 --- a/kernel/src/main/scala/nio/NettyServer.scala +++ b/kernel/src/main/scala/nio/NettyServer.scala @@ -7,7 +7,8 @@ package se.scalablesolutions.akka.kernel.nio import java.lang.reflect.InvocationTargetException import java.net.InetSocketAddress import java.util.concurrent.{ConcurrentHashMap, Executors} -import kernel.actor.{ActiveObjectFactory, Dispatcher, ActiveObject, Invocation} +import kernel.actor._ +import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} import kernel.util.Logging import java.util.ArrayList import java.util.List @@ -15,6 +16,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.logging.Level import java.util.logging.Logger +import org.codehaus.aspectwerkz.intercept.Advisable import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory @@ -22,10 +24,16 @@ import org.jboss.netty.handler.codec.serialization.ObjectDecoder import org.jboss.netty.handler.codec.serialization.ObjectEncoder class NettyServer extends Logging { + def connect = NettyServer.connect +} + +object NettyServer extends Logging { private val HOSTNAME = "localhost" private val PORT = 9999 private val CONNECTION_TIMEOUT_MILLIS = 100 + @volatile private var isRunning = false + private val factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool, Executors.newCachedThreadPool) @@ -42,14 +50,23 @@ class NettyServer extends Logging { bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS) - log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT) - bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT)) + def connect = synchronized { + if (!isRunning) { + log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT) + bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT)) + isRunning = true + } + } } @ChannelPipelineCoverage {val value = "all"} class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { private val activeObjectFactory = new ActiveObjectFactory private val activeObjects = new ConcurrentHashMap[String, AnyRef] + private val actors = new ConcurrentHashMap[String, Actor] + + private val MESSAGE_HANDLE = classOf[Actor].getDeclaredMethod( + "handle", Array[Class[_]](classOf[AnyRef], classOf[CompletableFutureResult])) override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { @@ -82,37 +99,68 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { try { log.debug(request.toString) - if (request.isActor) { - log.debug("Dispatching to [receive :: %s]", request.target) - throw new UnsupportedOperationException("TODO: remote actors") - } else { - log.debug("Dispatching to [%s :: %s]", request.method, request.target) - val activeObject = createActiveObject(request.target) - val args = request.message.asInstanceOf[scala.List[AnyRef]] - val argClazzes = args.map(_.getClass) - val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes) - val method = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses) - try { - if (request.isOneWay) method.invoke(activeObject, unescapedArgs) - else { - val result = method.invoke(activeObject, unescapedArgs) - log.debug("Returning result [%s]", result) - channel.write(request.newReplyWithMessage(result)) - } - } catch { - case e: InvocationTargetException => - log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e.getCause) - e.getCause.printStackTrace - channel.write(request.newReplyWithException(e.getCause)) - } - } - } catch { + if (request.isActor) dispatchToActor(request, channel) + else dispatchToActiveObject(request, channel) + } catch { case e: Exception => log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e) e.printStackTrace } } + private def dispatchToActor(request: RemoteRequest, channel: Channel) = { + log.debug("Dispatching to actor [%s]", request.target) + val actor = createActor(request.target) + actor.start + if (request.isOneWay) actor ! request.message + else { + try { + val resultOrNone = actor !! request.message + val result = if (resultOrNone.isDefined) resultOrNone else null + log.debug("Returning result from actor invocation [%s]", result) + channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get)) + } catch { + case e: InvocationTargetException => + log.error("Could not invoke remote actor [%s] due to: %s", request.target, e.getCause) + e.getCause.printStackTrace + channel.write(request.newReplyWithException(e.getCause)) + } + } + } + + private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = { + log.debug("Dispatching to [%s :: %s]", request.method, request.target) + val activeObject = createActiveObject(request.target) + + val args = request.message.asInstanceOf[scala.List[AnyRef]] + val argClazzes = args.map(_.getClass) + val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes) + + continueTransaction(request) + try { + val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses) + if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs) + else { + val result = messageReceiver.invoke(activeObject, unescapedArgs) + log.debug("Returning result from active object invocation [%s]", result) + channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get)) + } + } catch { + case e: InvocationTargetException => + log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.method, request.target, e.getCause) + e.getCause.printStackTrace + channel.write(request.newReplyWithException(e.getCause)) + } + } + + private def continueTransaction(request: RemoteRequest) = { + val tx = request.tx + if (tx.isDefined) { + tx.get.reinit + ActiveObject.threadBoundTx.set(tx) + } else ActiveObject.threadBoundTx.set(None) + } + private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = { val unescapedArgs = new Array[AnyRef](args.size) val unescapedArgClasses = new Array[Class[_]](args.size) @@ -131,7 +179,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { (unescapedArgs, unescapedArgClasses) } - private def createActiveObject(name: String) = { + private def createActiveObject(name: String): AnyRef = { val activeObjectOrNull = activeObjects.get(name) if (activeObjectOrNull == null) { val clazz = Class.forName(name) @@ -145,8 +193,25 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { case e => log.debug("Could not create remote active object instance due to: %s", e) e.printStackTrace - throw e + throw e } - } else activeObjectOrNull + } else activeObjectOrNull + } + + private def createActor(name: String): Actor = { + val actorOrNull = actors.get(name) + if (actorOrNull == null) { + val clazz = Class.forName(name) + try { + val newInstance = clazz.newInstance.asInstanceOf[Actor] + actors.put(name, newInstance) + newInstance + } catch { + case e => + log.debug("Could not create remote actor instance due to: %s", e) + e.printStackTrace + throw e + } + } else actorOrNull } } diff --git a/kernel/src/main/scala/nio/RequestReply.scala b/kernel/src/main/scala/nio/RequestReply.scala index ef06da25ae..c7de631087 100644 --- a/kernel/src/main/scala/nio/RequestReply.scala +++ b/kernel/src/main/scala/nio/RequestReply.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.kernel.nio import java.util.concurrent.atomic.AtomicLong +import kernel.stm.Transaction import kernel.util.HashCode object IdFactory { @@ -18,13 +19,15 @@ object IdFactory { val message: AnyRef, val method: String, val target: String, + val tx: Option[Transaction], val isOneWay: Boolean, val isEscaped: Boolean) { private[RemoteRequest] var _id = IdFactory.nextId def id = _id override def toString: String = synchronized { - "RemoteRequest[isActor: " + isActor + " | message: " + message + " | method: " + method + " | target: " + target + " | isOneWay: " + isOneWay + "]" + "RemoteRequest[isActor: " + isActor + " | message: " + message + " | method: " + method + + " | target: " + target + " | tx: " + tx + " | isOneWay: " + isOneWay + "]" } override def hashCode(): Int = synchronized { @@ -45,20 +48,25 @@ object IdFactory { that.asInstanceOf[RemoteRequest].target == target } - def newReplyWithMessage(message: AnyRef) = synchronized { new RemoteReply(true, id, message, null) } + def newReplyWithMessage(message: AnyRef, tx: Option[Transaction]) = synchronized { new RemoteReply(true, id, message, null, tx) } - def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error) } + def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error, None) } def cloneWithNewMessage(message: AnyRef, isEscaped: Boolean) = synchronized { - val request = new RemoteRequest(isActor, message, method, target, isOneWay, isEscaped) + val request = new RemoteRequest(isActor, message, method, target, tx, isOneWay, isEscaped) request._id = id request } } -@serializable class RemoteReply(val successful: Boolean, val id: Long, val message: AnyRef, val exception: Throwable) { +@serializable class RemoteReply(val successful: Boolean, + val id: Long, + val message: AnyRef, + val exception: Throwable, + val tx: Option[Transaction]) { override def toString: String = synchronized { - "RemoteReply[successful: " + successful + " | id: " + id + " | message: " + message + " | exception: " + exception + "]" + "RemoteReply[successful: " + successful + " | id: " + id + " | message: " + + message + " | exception: " + exception + " | tx: " + tx + "]" } override def hashCode(): Int = synchronized { diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index fec5b0871c..a49d296de4 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -82,6 +82,7 @@ class TransactionalState { /** * @author Jonas Bonér */ +@serializable trait Transactional { val uuid = Uuid.newUuid.toString diff --git a/kernel/src/main/scala/stm/ChangeSet.scala b/kernel/src/main/scala/stm/ChangeSet.scala index e7ba12974b..378e97e44f 100644 --- a/kernel/src/main/scala/stm/ChangeSet.scala +++ b/kernel/src/main/scala/stm/ChangeSet.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.kernel.stm import kernel.state.{Transactional, TransactionalMap, TransactionalVector, TransactionalRef} import kernel.util.Helpers.ReadWriteLock +@serializable class ChangeSet(val id: String) { private val lock = new ReadWriteLock diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala index 5d7d1071f5..197e365063 100644 --- a/kernel/src/main/scala/stm/Transaction.scala +++ b/kernel/src/main/scala/stm/Transaction.scala @@ -6,6 +6,7 @@ package se.scalablesolutions.akka.kernel.stm import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import kernel.util.Logging + import scala.collection.mutable.{HashSet, HashMap} @serializable sealed abstract class TransactionStatus @@ -38,7 +39,7 @@ object TransactionIdFactory { log.debug("Creating a new transaction with id [%s]", id) // FIXME: add support for nested transactions - private[this] var parent: Option[Transaction] = None + //private[this] var parent: Option[Transaction] = None private[this] val participants = new HashSet[ChangeSet] private[this] val precommitted = new HashSet[ChangeSet] private[this] val depth = new AtomicInteger(0) @@ -94,6 +95,8 @@ object TransactionIdFactory { def join(changeSet: ChangeSet) = synchronized { ensureIsActive + println(" --- log ;" + log) + println(" --- changeset ;" + changeSet) log.debug("TX JOIN - Server [%s] is joining transaction [%s]" , changeSet.id, this) changeSet.full.foreach(_.begin) participants + changeSet @@ -113,6 +116,14 @@ object TransactionIdFactory { private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]") + private[kernel] def reinit = { + import net.lag.logging.{Logger, Level} + if (log == null) { + log = Logger.get(this.getClass.getName) + log.setLevel(Level.ALL) + } + } + override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[Transaction] && diff --git a/kernel/src/main/scala/util/Helpers.scala b/kernel/src/main/scala/util/Helpers.scala index 85a81525d7..45fbfc34c9 100644 --- a/kernel/src/main/scala/util/Helpers.scala +++ b/kernel/src/main/scala/util/Helpers.scala @@ -19,6 +19,7 @@ class SystemFailure(cause: Throwable) extends RuntimeException(cause) object Helpers extends Logging { // ================================================ + @serializable class ReadWriteLock { private val rwl = new ReentrantReadWriteLock private val readLock = rwl.readLock diff --git a/kernel/src/main/scala/util/Logging.scala b/kernel/src/main/scala/util/Logging.scala index 3440b60cdd..eebb667882 100755 --- a/kernel/src/main/scala/util/Logging.scala +++ b/kernel/src/main/scala/util/Logging.scala @@ -20,12 +20,11 @@ import java.net.UnknownHostException; * @author Jonas Bonér */ trait Logging { - @transient val log = { + @transient var log = { val log = Logger.get(this.getClass.getName) log.setLevel(Level.ALL) log } - } /** diff --git a/kernel/src/test/scala/ActorTest.scala b/kernel/src/test/scala/ActorTest.scala index deb559538d..2db753b123 100644 --- a/kernel/src/test/scala/ActorTest.scala +++ b/kernel/src/test/scala/ActorTest.scala @@ -18,7 +18,6 @@ class ActorTest { case "Failure" => throw new RuntimeException("expected") } - def restart(config: Option[AnyRef]) = {} } @Test @@ -29,7 +28,6 @@ class ActorTest { def receive: PartialFunction[Any, Unit] = { case "OneWay" => oneWay = "received" } - def restart(config: Option[AnyRef]) = {} } actor.start val result = actor ! "OneWay" diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala new file mode 100644 index 0000000000..fb3736092c --- /dev/null +++ b/kernel/src/test/scala/RemoteActorSpec.scala @@ -0,0 +1,92 @@ +package se.scalablesolutions.akka.kernel.actor + +import concurrent.Lock +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.TimeUnit +import kernel.nio.{NettyClient, NettyServer} +import reactor._ + +import org.junit.{Test, Before} +import org.junit.Assert._ + +object Global { + var oneWay = "nada" +} +class RemoteActorSpecActorUnidirectional extends Actor { + def receive: PartialFunction[Any, Unit] = { + case "OneWay" => + Global.oneWay = "received" + } +} + +class RemoteActorSpecActorBidirectional extends Actor { + def receive: PartialFunction[Any, Unit] = { + case "Hello" => + reply("World") + case "Failure" => + throw new RuntimeException("expected") + } +} + +class RemoteActorSpec { + + new Thread(new Runnable() { + def run = { + val server = new NettyServer + server.connect + } + }).start + NettyClient.connect + + private val unit = TimeUnit.MILLISECONDS + + @Test + def sendOneWay = { + implicit val timeout = 5000L + val actor = new RemoteActorSpecActorUnidirectional + actor.makeRemote + actor.start + val result = actor ! "OneWay" + Thread.sleep(100) + assertEquals("received", Global.oneWay) + actor.stop + } + + @Test + def sendReplySync = { + implicit val timeout = 5000L + val actor = new RemoteActorSpecActorBidirectional + actor.makeRemote + actor.start + val result: String = actor !? "Hello" + assertEquals("World", result) + actor.stop + } + + @Test + def sendReplyAsync = { + implicit val timeout = 5000L + val actor = new RemoteActorSpecActorBidirectional + actor.makeRemote + actor.start + val result = actor !! "Hello" + assertEquals("World", result.get.asInstanceOf[String]) + actor.stop + } + + @Test + def sendReceiveException = { + implicit val timeout = 5000L + val actor = new RemoteActorSpecActorBidirectional + actor.isRemote = true + actor.start + try { + actor !! "Failure" + fail("Should have thrown an exception") + } catch { + case e => + assertEquals("expected", e.getMessage()) + } + actor.stop + } +}