diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index c5d13b73f8..4f3d218804 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -23,13 +23,13 @@ import org.codehaus.aspectwerkz.proxy.Uuid import org.multiverse.utils.ThreadLocalTransaction._ -@serializable sealed abstract class LifecycleMessage -case class Init(config: AnyRef) extends LifecycleMessage -case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage -case class Restart(reason: AnyRef) extends LifecycleMessage -case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage -case class Kill(killer: Actor) extends LifecycleMessage -//case object TransactionalInit extends LifecycleMessage +@serializable sealed abstract class LifeCycleMessage +case class Init(config: AnyRef) extends LifeCycleMessage +case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage +case class Restart(reason: AnyRef) extends LifeCycleMessage +case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage +case class Kill(killer: Actor) extends LifeCycleMessage +//case object TransactionalInit extends LifeCycleMessage class ActorKilledException(val killed: Actor, val killer: Actor) extends RuntimeException("Actor [" + killed + "] killed by [" + killer + "]") @@ -54,6 +54,17 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { object Actor { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) + + def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { + start + def receive = body + } + + def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() { + lifeCycle = Some(lifeCycleConfig) + start + def receive = body + } } /** @@ -97,7 +108,7 @@ trait Actor extends Logging with TransactionManagement { * This field is used for logging etc. but also as the identifier for persistence, which means that you can * use a custom name to be able to retrieve the "correct" persisted state upon restart, remote restart etc. */ - protected[akka] var id: String = this.getClass.toString + protected[akka] var id: String = this.getClass.getName /** * User overridable callback/setting. @@ -701,5 +712,5 @@ trait Actor extends Logging with TransactionManagement { } else message } else message - override def toString(): String = "Actor[" + id+ ":" + uuid + "]" + override def toString(): String = "Actor[" + id + ":" + uuid + "]" } diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index 902e0db0ec..20ea9ede9f 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -87,6 +87,11 @@ object Transaction extends TransactionManagement { def foreach(f: Transaction => Unit): Unit = atomic { f(getTransactionInScope) } // -- atomic block -------------------------- + private[akka] def atomically[T](body: => T): T = new AtomicTemplate[T]( + getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) { + def execute(mtx: MultiverseTransaction): T = body + }.execute() + def atomic[T](body: => T): T = new AtomicTemplate[T]( getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) { def execute(mtx: MultiverseTransaction): T = body @@ -96,12 +101,11 @@ object Transaction extends TransactionManagement { setTransaction(Some(tx)) } override def postCommit = { - if (isTransactionInScope) {}///getTransactionInScope.commit + if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } }.execute() -// FIXME: add these other atomic methods def atomic[T](retryCount: Int)(body: => T): T = { new AtomicTemplate[T](getGlobalStmInstance, "akka", false, false, retryCount) { def execute(mtx: MultiverseTransaction): T = body @@ -111,7 +115,7 @@ object Transaction extends TransactionManagement { setTransaction(Some(tx)) } override def postCommit = { - if (isTransactionInScope) {}///getTransactionInScope.commit + if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } }.execute @@ -126,7 +130,7 @@ object Transaction extends TransactionManagement { setTransaction(Some(tx)) } override def postCommit = { - if (isTransactionInScope) {}///getTransactionInScope.commit + if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } }.execute @@ -141,7 +145,7 @@ object Transaction extends TransactionManagement { setTransaction(Some(tx)) } override def postCommit = { - if (isTransactionInScope) {}///getTransactionInScope.commit + if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } }.execute @@ -172,7 +176,7 @@ object Transaction extends TransactionManagement { // --- public methods --------- def commit = synchronized { - atomic { + atomically { persistentStateMap.values.foreach(_.commit) TransactionManagement.clearTransaction } diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index ad6c49463e..06e65d0ceb 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -43,14 +43,12 @@ trait TransactionManagement extends Logging { private[akka] def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) { val tx = transaction.get - //log.debug("Setting transaction [%s]", transaction.get) currentTransaction.set(transaction) if (tx.transaction.isDefined) setThreadLocalTransaction(tx.transaction.get) else throw new IllegalStateException("No transaction defined") } private[akka] def clearTransaction = { - //if (isTransactionInScope) log.debug("Clearing transaction [%s]", getTransactionInScope) currentTransaction.set(None) setThreadLocalTransaction(null) } @@ -59,14 +57,10 @@ trait TransactionManagement extends Logging { private[akka] def isTransactionInScope = currentTransaction.get.isDefined - private[akka] def incrementTransaction = - if (isTransactionInScope) getTransactionInScope.increment - //else throw new IllegalStateException("No transaction in scope") + private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment + + private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement - private[akka] def decrementTransaction = - if (isTransactionInScope) getTransactionInScope.decrement - //else throw new IllegalStateException("No transaction in scope") - private[akka] def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx } } diff --git a/akka.iws b/akka.iws index fbc3432f32..9dbd34f457 100644 --- a/akka.iws +++ b/akka.iws @@ -5,8 +5,9 @@ + - + @@ -98,25 +99,52 @@ - + - + + + + + + + + + + + + + + + + + + + - + - + + + + + + + + + + - + @@ -125,7 +153,7 @@ - + @@ -146,7 +174,6 @@ @@ -217,6 +245,7 @@ + @@ -249,6 +278,84 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + - + - - + + @@ -692,13 +854,13 @@ - + - + @@ -854,6 +1016,14 @@