diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala index 6ef7f4bd3f..3167b02099 100644 --- a/akka-actors/src/main/scala/actor/ActiveObject.scala +++ b/akka-actors/src/main/scala/actor/ActiveObject.scala @@ -12,13 +12,11 @@ import nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} import config.ScalaConfig._ import util._ -import serialization.Serializer -import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice, Advice} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} -import org.codehaus.aspectwerkz.aspect.management.Aspects +import se.scalablesolutions.akka.serialization.Serializer sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) @@ -124,18 +122,6 @@ class ActiveObjectFactory { private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = ActiveObject.supervise(restartStrategy, components) - - /* - def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = { - val actor = new Dispatcher(None)(target.getName) - ActiveObject.newInstance(target, actor) - } - - def newInstanceAndLink[T](intf: Class[T], target: AnyRef, supervisor: AnyRef): T = { - val actor = new Dispatcher(None)(target.getName) - ActiveObject.newInstance(intf, target, actor) - } - */ } /** @@ -144,8 +130,6 @@ class ActiveObjectFactory { * @author Jonas Bonér */ object ActiveObject { - - val MATCH_ALL = "execution(* *.*(..))" val AKKA_CAMEL_ROUTING_SCHEME = "akka" def newInstance[T](target: Class[T], timeout: Long): T = @@ -221,8 +205,7 @@ object ActiveObject { } private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - //if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP") - if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) + //if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) val proxy = Proxy.newInstance(target, false, true) actor.initialize(target, proxy) actor.timeout = timeout @@ -232,8 +215,7 @@ object ActiveObject { } private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - //if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP") - if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) + //if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) actor.initialize(target.getClass, target) actor.timeout = timeout @@ -281,7 +263,7 @@ sealed class ActiveObjectAspect { var remoteAddress: Option[InetSocketAddress] = _ var timeout: Long = _ - @Around("execution(* *..*(..))") + @Around("execution(* *.*(..))") def invoke(joinpoint: JoinPoint): AnyRef = { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinpoint.getThis) @@ -431,9 +413,9 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends if (postRestart.isDefined) postRestart.get.setAccessible(true) // see if we have a method annotated with @inittransactionalstate, if so invoke it - initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) - if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") - if (initTxState.isDefined) initTxState.get.setAccessible(true) + //initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) + //if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") + //if (initTxState.isDefined) initTxState.get.setAccessible(true) } override def receive: PartialFunction[Any, Unit] = { @@ -457,12 +439,11 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends } catch { case e: InvocationTargetException => throw e.getCause } } - override protected def initTransactionalState() { - try { - if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) - } catch { case e: InvocationTargetException => throw e.getCause } - } - + //override protected def initTransactionalState = { + // try { + // if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + // } catch { case e: InvocationTargetException => throw e.getCause } + //} private def serializeArguments(joinpoint: JoinPoint) = { val args = joinpoint.getRtti.asInstanceOf[MethodRtti].getParameterValues diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index c20884d5ca..cd4ae2556c 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -7,16 +7,16 @@ package se.scalablesolutions.akka.actor import com.google.protobuf.ByteString import java.net.InetSocketAddress -import java.util.concurrent.CopyOnWriteArraySet +import java.util.HashSet import reactor._ import config.ScalaConfig._ import stm.TransactionManagement -import util.Helpers.ReadWriteLock import nio.protobuf.RemoteProtocol.RemoteRequest -import util.Logging -import serialization.{Serializer, Serializable, SerializationProtocol} import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} +import serialization.{Serializer, Serializable, SerializationProtocol} +import util.Helpers.ReadWriteLock +import util.Logging import org.multiverse.utils.TransactionThreadLocal._ @@ -70,7 +70,7 @@ trait Actor extends Logging with TransactionManagement { protected[Actor] var mailbox: MessageQueue = _ protected[this] var senderFuture: Option[CompletableFutureResult] = None - protected[this] val linkedActors = new CopyOnWriteArraySet[Actor] + protected[this] val linkedActors = new HashSet[Actor] protected[actor] var lifeCycleConfig: Option[LifeCycle] = None val name = this.getClass.getName @@ -224,6 +224,8 @@ trait Actor extends Logging with TransactionManagement { def stop = synchronized { if (isRunning) { dispatcher.unregisterHandler(this) + if (dispatcher.isInstanceOf[ThreadBasedDispatcher]) dispatcher.shutdown + // FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down isRunning = false shutdown } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") @@ -509,6 +511,11 @@ trait Actor extends Logging with TransactionManagement { try { if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision + if (TransactionManagement.threadBoundTx.get.isDefined && !TransactionManagement.threadBoundTx.get.get.isActive) { + TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor + setThreadLocalTransaction(null) + } + if (isInExistingTransaction) joinExistingTransaction else if (isTransactional) startNewTransaction @@ -536,10 +543,8 @@ trait Actor extends Logging with TransactionManagement { } private def getResultOrThrowException[T](future: FutureResult): Option[T] = - if (future.exception.isDefined) { - val (_, cause) = future.exception.get - throw cause - } else future.result.asInstanceOf[Option[T]] + if (future.exception.isDefined) throw future.exception.get._2 + else future.result.asInstanceOf[Option[T]] private def rescheduleClashedMessages = if (messageToReschedule.isDefined) { val handle = messageToReschedule.get diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index 6e384e609a..3da964a10e 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -79,14 +79,10 @@ object Transaction { private[this] val depth = new AtomicInteger(0) - def increment = synchronized { depth.incrementAndGet } - def decrement = synchronized { depth.decrementAndGet } - def isTopLevel = synchronized { depth.get == 0 } + def increment = depth.incrementAndGet + def decrement = depth.decrementAndGet + def isTopLevel = depth.compareAndSet(0, 0) - def register(transactional: Transactional) = synchronized { - ensureIsActiveOrNew - } - def begin(participant: String) = synchronized { ensureIsActiveOrNew transaction = Multiverse.STM.startUpdateTransaction("akka") @@ -150,12 +146,12 @@ object Transaction { participants ::= participant } - def isNew = status == TransactionStatus.New - def isActive = status == TransactionStatus.Active - def isCompleted = status == TransactionStatus.Completed - def isAborted = status == TransactionStatus.Aborted + def isNew = synchronized { status == TransactionStatus.New } + def isActive = synchronized { status == TransactionStatus.Active } + def isCompleted = synchronized { status == TransactionStatus.Completed } + def isAborted = synchronized { status == TransactionStatus.Aborted } - private def reset = { + private def reset = synchronized { participants = Nil precommitted = Nil } @@ -174,7 +170,7 @@ object Transaction { import net.lag.logging.{Logger, Level} if (log == null) { log = Logger.get(this.getClass.getName) - log.setLevel(Level.ALL) + log.setLevel(Level.ALL) // TODO: preserve logging level } } @@ -184,11 +180,9 @@ object Transaction { that.asInstanceOf[Transaction].id == this.id } - override def hashCode(): Int = id.toInt + override def hashCode(): Int = synchronized { id.toInt } - override def toString(): String = synchronized { - "Transaction[" + id + ", " + status + "]" - } + override def toString(): String = synchronized { "Transaction[" + id + ", " + status + "]" } } diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index 63b3efab9d..9b74a63fba 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -59,7 +59,6 @@ trait TransactionManagement extends Logging { val currentTx = cflowTx.get currentTx.join(uuid) activeTx = Some(currentTx) - log.debug("Joining transaction [%s]", currentTx) } } @@ -91,11 +90,11 @@ trait TransactionManagement extends Logging { do { Thread.sleep(TransactionManagement.TIME_WAITING_FOR_COMPLETION) nrRetries += 1 - log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries) + log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get.id, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries) failed = !tryToCommitTransaction } while(nrRetries < TransactionManagement.NR_OF_TIMES_WAITING_FOR_COMPLETION && failed) if (failed) { - log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get, latestMessage) + log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get.id, latestMessage) rollback(activeTx) if (TransactionManagement.RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get) else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]") @@ -112,7 +111,7 @@ trait TransactionManagement extends Logging { protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement - protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) { activeTx = None } + protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) activeTx = None protected def reenteringExistingTransaction= if (activeTx.isDefined) { val cflowTx = threadBoundTx.get diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala index 96548f4621..72bfe8c925 100644 --- a/akka-actors/src/test/scala/InMemoryActorSpec.scala +++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala @@ -93,9 +93,9 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state - Thread.sleep(100) + Thread.sleep(1000) stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - Thread.sleep(100) + Thread.sleep(1000) assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get) } @@ -115,9 +115,9 @@ class InMemoryActorSpec extends TestCase { val failer = new InMemFailerActor failer.start stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state - Thread.sleep(100) + Thread.sleep(1000) stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method - Thread.sleep(100) + Thread.sleep(1000) assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state } @@ -140,9 +140,9 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful ! SetVectorStateOneWay("init") // set init state - Thread.sleep(100) + Thread.sleep(1000) stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - Thread.sleep(100) + Thread.sleep(1000) assertEquals(2, (stateful !! GetVectorSize).get) } @@ -160,12 +160,12 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful ! SetVectorStateOneWay("init") // set init state - Thread.sleep(100) + Thread.sleep(1000) val failer = new InMemFailerActor failer.start stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method - Thread.sleep(100) - assertEquals(1, (stateful !! GetVectorSize).get) // check that state is == init state + Thread.sleep(1000) + assertEquals(1, (stateful !! GetVectorSize).get) } @Test @@ -179,7 +179,7 @@ class InMemoryActorSpec extends TestCase { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => {}} - assertEquals(1, (stateful !! GetVectorSize).get) // check that state is == init state + assertEquals(1, (stateful !! GetVectorSize).get) } @Test @@ -187,9 +187,9 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful ! SetRefStateOneWay("init") // set init state - Thread.sleep(100) + Thread.sleep(1000) stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - Thread.sleep(100) + Thread.sleep(1000) assertEquals("new state", (stateful !! GetRefState).get) } @@ -207,11 +207,11 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful ! SetRefStateOneWay("init") // set init state - Thread.sleep(100) + Thread.sleep(1000) val failer = new InMemFailerActor failer.start stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method - Thread.sleep(100) + Thread.sleep(1000) assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java index 91f097036a..bca67d68d1 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java @@ -27,7 +27,11 @@ public class InMemNestedStateTest extends TestCase { new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000) //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) }).inject().supervise(); - Config.config(); + Config.config(); + InMemStateful stateful = conf.getInstance(InMemStateful.class); + stateful.init(); + InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); + nested.init(); } protected void tearDown() { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java index 7378df5c36..22ff23bcab 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java @@ -11,12 +11,15 @@ public class InMemStateful { private TransactionalMap mapState; private TransactionalVector vectorState; private TransactionalRef refState; - - @inittransactionalstate + private boolean isInitialized = false; + public void init() { - mapState = TransactionalState.newMap(); - vectorState = TransactionalState.newVector(); - refState = TransactionalState.newRef(); + if (!isInitialized) { + mapState = TransactionalState.newMap(); + vectorState = TransactionalState.newVector(); + refState = TransactionalState.newRef(); + isInitialized = true; + } } public String getMapState(String key) { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java index 7bb3826a5a..b89a555c58 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java @@ -6,77 +6,80 @@ import se.scalablesolutions.akka.state.*; @transactionrequired public class InMemStatefulNested { - private TransactionalMap mapState; - private TransactionalVector vectorState; - private TransactionalRef refState; + private TransactionalMap mapState; + private TransactionalVector vectorState; + private TransactionalRef refState; + private boolean isInitialized = false; - @inittransactionalstate - public void init() { - mapState = TransactionalState.newMap(); - vectorState = TransactionalState.newVector(); - refState = TransactionalState.newRef(); - } - - public String getMapState(String key) { - return (String)mapState.get(key).get(); - } + public void init() { + if (!isInitialized) { + mapState = TransactionalState.newMap(); + vectorState = TransactionalState.newVector(); + refState = TransactionalState.newRef(); + isInitialized = true; + } + } - - public String getVectorState() { - return (String)vectorState.last(); - } + public String getMapState(String key) { + return (String) mapState.get(key).get(); + } - - public String getRefState() { - return (String)refState.get().get(); - } - - public void setMapState(String key, String msg) { - mapState.put(key, msg); - } + public String getVectorState() { + return (String) vectorState.last(); + } - - public void setVectorState(String msg) { - vectorState.add(msg); - } - - public void setRefState(String msg) { - refState.swap(msg); - } + public String getRefState() { + return (String) refState.get().get(); + } - - public void success(String key, String msg) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); - } - - public String failure(String key, String msg, InMemFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); - failer.fail(); - return msg; - } + public void setMapState(String key, String msg) { + mapState.put(key, msg); + } - - public void thisMethodHangs(String key, String msg, InMemFailer failer) { - setMapState(key, msg); - } - /* - public void clashOk(String key, String msg, InMemClasher clasher) { - mapState.put(key, msg); - clasher.clash(); - } + public void setVectorState(String msg) { + vectorState.add(msg); + } - public void clashNotOk(String key, String msg, InMemClasher clasher) { - mapState.put(key, msg); - clasher.clash(); - this.success("clash", "clash"); - } - */ + + public void setRefState(String msg) { + refState.swap(msg); + } + + + public void success(String key, String msg) { + mapState.put(key, msg); + vectorState.add(msg); + refState.swap(msg); + } + + + public String failure(String key, String msg, InMemFailer failer) { + mapState.put(key, msg); + vectorState.add(msg); + refState.swap(msg); + failer.fail(); + return msg; + } + + + public void thisMethodHangs(String key, String msg, InMemFailer failer) { + setMapState(key, msg); + } + + /* + public void clashOk(String key, String msg, InMemClasher clasher) { + mapState.put(key, msg); + clasher.clash(); + } + + public void clashNotOk(String key, String msg, InMemClasher clasher) { + mapState.put(key, msg); + clasher.clash(); + this.success("clash", "clash"); + } + */ } \ No newline at end of file diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 9c1e36f5aa..35879be8eb 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -32,6 +32,8 @@ public class InMemoryStateTest extends TestCase { new LifeCycle(new Permanent(), 1000), 10000) }).inject().supervise(); + InMemStateful stateful = conf.getInstance(InMemStateful.class); + stateful.init(); } protected void tearDown() { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java index b663cf7903..b7693ecd07 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java @@ -32,6 +32,7 @@ public class RemoteInMemoryStateTest extends TestCase { public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + stateful.init(); stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); @@ -39,6 +40,7 @@ public class RemoteInMemoryStateTest extends TestCase { public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + stateful.init(); stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class); try { @@ -51,6 +53,7 @@ public class RemoteInMemoryStateTest extends TestCase { public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + stateful.init(); stateful.setVectorState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired assertEquals("new state", stateful.getVectorState()); @@ -58,6 +61,7 @@ public class RemoteInMemoryStateTest extends TestCase { public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + stateful.init(); stateful.setVectorState("init"); // set init state InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class); try { @@ -70,6 +74,7 @@ public class RemoteInMemoryStateTest extends TestCase { public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + stateful.init(); stateful.setRefState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired assertEquals("new state", stateful.getRefState()); @@ -77,6 +82,7 @@ public class RemoteInMemoryStateTest extends TestCase { public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + stateful.init(); stateful.setRefState("init"); // set init state InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class); try { diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 5dd8bcf190..24efe78eff 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -45,8 +45,6 @@ object Kernel extends Logging { printBanner log.info("Starting Akka...") - runApplicationBootClasses - if (RUN_REMOTE_SERVICE) startRemoteService if (RUN_REST_SERVICE) startREST @@ -56,29 +54,7 @@ object Kernel extends Logging { } } - - private[akka] def runApplicationBootClasses = { - val loader = - if (HOME.isDefined) { - val CONFIG = HOME.get + "/config" - val DEPLOY = HOME.get + "/deploy" - val DEPLOY_DIR = new File(DEPLOY) - if (!DEPLOY_DIR.exists) {log.error("Could not find a deploy directory at [" + DEPLOY + "]"); System.exit(-1)} - val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL - //val toDeploy = DEPLOY_DIR.toURL :: (for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL) - log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) - new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) - } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { - getClass.getClassLoader - } else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") - for (clazz <- BOOT_CLASSES) { - log.info("Loading boot class [%s]", clazz) - loader.loadClass(clazz).newInstance - } - applicationLoader = Some(loader) - } - - private[akka] def startRemoteService = { + def startRemoteService = { // FIXME manage remote serve thread for graceful shutdown val remoteServerThread = new Thread(new Runnable() { def run = RemoteServer.start(applicationLoader) @@ -87,6 +63,7 @@ object Kernel extends Logging { } def startREST = { + runApplicationBootClasses val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() val scheme = uri.getScheme @@ -112,6 +89,29 @@ object Kernel extends Logging { log.info("REST service started successfully. Listening to port [" + REST_PORT + "]") } + private def runApplicationBootClasses = { + val loader = + if (HOME.isDefined) { + val CONFIG = HOME.get + "/config" + val DEPLOY = HOME.get + "/deploy" + val DEPLOY_DIR = new File(DEPLOY) + if (!DEPLOY_DIR.exists) { + log.error("Could not find a deploy directory at [" + DEPLOY + "]") + System.exit(-1) + } + val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL + log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) + new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) + } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { + getClass.getClassLoader + } else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") + for (clazz <- BOOT_CLASSES) { + log.info("Loading boot class [%s]", clazz) + loader.loadClass(clazz).newInstance + } + applicationLoader = Some(loader) + } + private def printBanner = { log.info( """============================== diff --git a/akka-util/src/main/scala/Config.scala b/akka-util/src/main/scala/Config.scala index 4894a8de8e..c57dd5ea0c 100644 --- a/akka-util/src/main/scala/Config.scala +++ b/akka-util/src/main/scala/Config.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka import util.Logging -import net.lag.configgy.{Config => ConfiggyConfig, Configgy, RuntimeEnvironment, ParseException} +import net.lag.configgy.{Config => ConfiggyConfig, Configgy, ParseException} /** * @author Jonas Bonér @@ -14,9 +14,12 @@ import net.lag.configgy.{Config => ConfiggyConfig, Configgy, RuntimeEnvironment, object Config extends Logging { val VERSION = "0.6" val HOME = { - val home = System.getenv("AKKA_HOME") - if (home == null) None - else Some(home) + val systemHome = System.getenv("AKKA_HOME") + if (systemHome == null || systemHome.length == 0) { + val optionHome = System.getProperty("akka.home", "") + if (optionHome.length != 0) Some(optionHome) + else None + } else Some(systemHome) } val config = { diff --git a/akka.iml b/akka.iml index f748697a86..607be142dc 100644 --- a/akka.iml +++ b/akka.iml @@ -1,5 +1,5 @@ - + diff --git a/akka.ipr b/akka.ipr index 7f0eb5d60c..03a6a74314 100644 --- a/akka.ipr +++ b/akka.ipr @@ -244,7 +244,6 @@ - @@ -616,83 +615,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -759,6 +681,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -869,26 +846,15 @@ - + - + - + - - - - - - - - - - - - + @@ -902,15 +868,26 @@ - + - + - + - + + + + + + + + + + + + @@ -946,17 +923,6 @@ - - - - - - - - - - - @@ -968,323 +934,70 @@ - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -1353,6 +1066,182 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1375,59 +1264,26 @@ - + - + - + - + - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + diff --git a/akka.iws b/akka.iws index dc2f513d29..11c532ab33 100644 --- a/akka.iws +++ b/akka.iws @@ -1,27 +1,30 @@ - + + - - + - + + - - - - + - - - + + + + @@ -33,33 +36,7 @@ - - - - - + - + + + + + + + + + + + + + + + + + + @@ -114,19 +108,91 @@ - + - + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -142,21 +208,22 @@ @@ -212,10 +279,6 @@ - - - - @@ -250,8 +313,36 @@ + + + + + + + + + + + + + + + + + + @@ -273,101 +364,7 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + + @@ -398,6 +400,7 @@ + + + @@ -430,7 +442,6 @@ @@ -440,6 +451,7 @@ @@ -457,13 +469,13 @@ @@ -476,6 +488,7 @@ @@ -494,36 +507,11 @@ - - - - - - - - - @@ -545,9 +532,7 @@ - @@ -563,7 +548,6 @@ @@ -571,18 +555,66 @@ - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + localhost @@ -623,27 +655,27 @@ - + - - + + - - + + + - @@ -682,68 +714,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -751,21 +721,103 @@ - + + + + + + + + + + + + + - + + + + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +