From 671ed5bf7e186c297ba60c1b6c279785fd8f669f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 17 Feb 2010 22:18:14 +0100 Subject: [PATCH] added check that transactional ref is only touched within a transaction --- akka-core/src/main/scala/actor/Actor.scala | 10 ++- .../main/scala/stm/TransactionalState.scala | 71 +++++++++++++++---- akka-patterns/src/main/scala/Agent.scala | 12 ++-- 3 files changed, 68 insertions(+), 25 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index b5313afb7c..41b2673bfa 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -795,17 +795,17 @@ trait Actor extends TransactionManagement { .setIsEscaped(false) val id = registerSupervisorAsRemoteActor - if(id.isDefined) + if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) // set the source fields used to reply back to the original sender // (i.e. not the remote proxy actor) - if(sender.isDefined) { + if (sender.isDefined) { val s = sender.get requestBuilder.setSourceTarget(s.getClass.getName) requestBuilder.setSourceUuid(s.uuid) - val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT)) + val (host, port) = s._replyToAddress.map(actor => (actor.getHostName, actor.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) @@ -819,9 +819,7 @@ trait Actor extends TransactionManagement { if (_isEventBased) { _mailbox.add(invocation) if (_isSuspended) invocation.send - } - else - invocation.send + } else invocation.send } } diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index b2e69c7d63..6003a89f89 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.state import se.scalablesolutions.akka.stm.Transaction.atomic +import se.scalablesolutions.akka.stm.NoTransactionInScopeException import se.scalablesolutions.akka.collection._ import se.scalablesolutions.akka.util.UUID @@ -72,45 +73,87 @@ object TransactionalRef { * @author Jonas Bonér */ class TransactionalRef[T] extends Transactional { - implicit val txInitName = "TransactionalRef:Init" import org.multiverse.api.ThreadLocalTransaction._ + + implicit val txInitName = "TransactionalRef:Init" val uuid = UUID.newUuid.toString private[this] val ref: Ref[T] = atomic { new Ref } - def swap(elem: T) = ref.set(elem) - + def swap(elem: T) = { + ensureIsInTransaction + ref.set(elem) + } + def get: Option[T] = { + ensureIsInTransaction if (ref.isNull) None else Some(ref.get) } - def getOrWait: T = ref.getOrAwait + def getOrWait: T = { + ensureIsInTransaction + ref.getOrAwait + } def getOrElse(default: => T): T = { + ensureIsInTransaction if (ref.isNull) default else ref.get } - def isDefined: Boolean = !ref.isNull + def isDefined: Boolean = { + ensureIsInTransaction + !ref.isNull + } - def isEmpty: Boolean = ref.isNull + def isEmpty: Boolean = { + ensureIsInTransaction + ref.isNull + } - def map[B](f: T => B): Option[B] = if (isEmpty) None else Some(f(ref.get)) + def map[B](f: T => B): Option[B] = { + ensureIsInTransaction + if (isEmpty) None else Some(f(ref.get)) + } - def flatMap[B](f: T => Option[B]): Option[B] = if (isEmpty) None else f(ref.get) + def flatMap[B](f: T => Option[B]): Option[B] = { + ensureIsInTransaction + if (isEmpty) None else f(ref.get) + } - def filter(p: T => Boolean): Option[T] = if (isEmpty || p(ref.get)) Some(ref.get) else None + def filter(p: T => Boolean): Option[T] = { + ensureIsInTransaction + if (isEmpty || p(ref.get)) Some(ref.get) else None + } - def foreach(f: T => Unit) { if (!isEmpty) f(ref.get) } + def foreach(f: T => Unit) { + ensureIsInTransaction + if (!isEmpty) f(ref.get) + } - def elements: Iterator[T] = if (isEmpty) Iterator.empty else Iterator.fromValues(ref.get) + def elements: Iterator[T] = { + ensureIsInTransaction + if (isEmpty) Iterator.empty else Iterator.fromValues(ref.get) + } - def toList: List[T] = if (isEmpty) List() else List(ref.get) + def toList: List[T] = { + ensureIsInTransaction + if (isEmpty) List() else List(ref.get) + } - def toRight[X](left: => X) = if (isEmpty) Left(left) else Right(ref.get) + def toRight[X](left: => X) = { + ensureIsInTransaction + if (isEmpty) Left(left) else Right(ref.get) + } - def toLeft[X](right: => X) = if (isEmpty) Right(right) else Left(ref.get) + def toLeft[X](right: => X) = { + ensureIsInTransaction + if (isEmpty) Right(right) else Left(ref.get) + } + + private def ensureIsInTransaction = + if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException } object TransactionalMap { diff --git a/akka-patterns/src/main/scala/Agent.scala b/akka-patterns/src/main/scala/Agent.scala index ceb657d473..4dd8640c32 100644 --- a/akka-patterns/src/main/scala/Agent.scala +++ b/akka-patterns/src/main/scala/Agent.scala @@ -16,8 +16,10 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.atomic.AtomicReference import se.scalablesolutions.akka.state.TransactionalState +import se.scalablesolutions.akka.stm.Transaction.atomic + +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{CountDownLatch} /** @@ -34,7 +36,7 @@ import java.util.concurrent.{CountDownLatch} * Date: Oct 18, 2009 * * AKKA retrofit by -* @Author Viktor Klang +* @author Viktor Klang * Date: Jan 24 2010 */ sealed class Agent[T] private (initialValue: T) extends Actor { @@ -48,11 +50,11 @@ sealed class Agent[T] private (initialValue: T) extends Actor { * Periodically handles incoming messages */ def receive = { - case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait)) + case FunctionHolder(fun: (T => T)) => atomic { updateData(fun(value.getOrWait)) } case ValueHolder(x: T) => updateData(x) - case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait)) + case ProcedureHolder(fun: (T => Unit)) => atomic { fun(copyStrategy(value.getOrWait)) } } /** @@ -64,7 +66,7 @@ sealed class Agent[T] private (initialValue: T) extends Actor { /** * Updates the internal state with the value provided as a by-name parameter */ - private final def updateData(newData: => T) : Unit = value.swap(newData) + private final def updateData(newData: => T) : Unit = atomic { value.swap(newData) } /** * Submits a request to read the internal state.