diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index fae6b2e70d..f9a5cfbd9b 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -101,6 +101,23 @@ object Actor extends Logging { def receive: PartialFunction[Any, Unit] = body } + /** + * Use to create an anonymous transactional event-driven actor. + * The actor is started when created. + * Example: + *
+ * import Actor._
+ *
+ * val a = transactor {
+ * case msg => ... // handle message
+ * }
+ *
+ */
+ def transactor(body: PartialFunction[Any, Unit]): Actor = new Transactor() {
+ start
+ def receive: PartialFunction[Any, Unit] = body
+ }
+
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
* The actor is started when created.
@@ -202,7 +219,7 @@ object Actor extends Logging {
*
* @author Jonas Bonér
*/
-trait Actor extends TransactionManagement {
+trait Actor extends TransactionManagement with Logging {
implicit protected val self: Option[Actor] = Some(this)
implicit protected val transactionFamilyName: String = this.getClass.getName
@@ -341,7 +358,7 @@ trait Actor extends TransactionManagement {
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
* start if there is no one running, else it joins the existing transaction.
*/
- @volatile protected var isTransactionRequiresNew = false
+ @volatile protected var isTransactor = false
/**
* User overridable callback/setting.
@@ -649,7 +666,7 @@ trait Actor extends TransactionManagement {
def makeTransactionRequired = synchronized {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
- else isTransactionRequiresNew = true
+ else isTransactor = true
}
/**
@@ -788,18 +805,13 @@ trait Actor extends TransactionManagement {
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
val actor = actorClass.newInstance.asInstanceOf[T]
- if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
- actor.dispatcher = dispatcher
- }
+ if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actor.dispatcher = dispatcher
actor
}
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
- if (isTransactionSetInScope) {
- log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
- getTransactionSetInScope.incParties
- }
-
+ joinTransaction(message)
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -815,14 +827,14 @@ trait Actor extends TransactionManagement {
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
- if(sender.isDefined) {
+ if (sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
- val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
+ val (host, port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
- log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
+ Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
@@ -835,20 +847,15 @@ trait Actor extends TransactionManagement {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
- else
- invocation.send
+ else invocation.send
}
- clearTransactionSet
}
-
+
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = {
- if (isTransactionSetInScope) {
- log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
- getTransactionSetInScope.incParties
- }
+ joinTransaction(message)
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
@@ -863,7 +870,6 @@ trait Actor extends TransactionManagement {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
- clearTransactionSet
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
@@ -874,11 +880,17 @@ trait Actor extends TransactionManagement {
_mailbox.add(invocation)
invocation.send
} else invocation.send
- clearTransactionSet
future
}
}
+ private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
+ // FIXME test to run bench without this trace call
+ Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
+ getTransactionSetInScope, toString, message)
+ getTransactionSetInScope.incParties
+ }
+
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
@@ -921,8 +933,9 @@ trait Actor extends TransactionManagement {
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
else {
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
- if (isTransactionRequiresNew) {
- log.trace("Creating a new transaction set (top-level transaction) \nfor actor %s \nwith message %s", toString, messageHandle)
+ if (isTransactor) {
+ Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
+ toString, messageHandle)
Some(createNewTransactionSet)
} else None
}
@@ -932,11 +945,6 @@ trait Actor extends TransactionManagement {
senderFuture = messageHandle.future
sender = messageHandle.sender
- def clearTx = {
- clearTransactionSet
- clearTransaction
- }
-
def proceed = {
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
@@ -946,7 +954,7 @@ trait Actor extends TransactionManagement {
}
try {
- if (isTransactionRequiresNew) {
+ if (isTransactor) {
atomic {
proceed
}
@@ -955,16 +963,21 @@ trait Actor extends TransactionManagement {
case e: IllegalStateException => {}
case e =>
// abort transaction set
- if (isTransactionSetInScope) try { getTransactionSetInScope.abort } catch { case e: IllegalStateException => {} }
+ if (isTransactionSetInScope) try {
+ getTransactionSetInScope.abort
+ } catch { case e: IllegalStateException => {} }
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
- clearTx // need to clear currentTransaction before call to supervisor
+
+ clearTransaction
+ if (topLevelTransaction) clearTransactionSet
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
} finally {
- clearTx
+ clearTransaction
+ if (topLevelTransaction) clearTransactionSet
}
}
diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala
index 6f9bad0df4..ab08b48ec9 100644
--- a/akka-core/src/main/scala/actor/Agent.scala
+++ b/akka-core/src/main/scala/actor/Agent.scala
@@ -18,6 +18,8 @@ import se.scalablesolutions.akka.stm.Ref
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
+
+class AgentException private[akka](message: String) extends RuntimeException(message)
/**
* The Agent class was strongly inspired by the agent principle in Clojure.
@@ -35,14 +37,27 @@ import java.util.concurrent.CountDownLatch
* The code that is submitted to an agent doesn't need to pay attention to
* threading or synchronization, the agent will provide such guarantees by itself.
*
-* See the examples of use for more details.
+* Example of usage:
+* +* val agent = Agent(5) +* +* agent update (_ + 1) +* agent update (_ * 2) +* +* val result = agent() +* ... // use result +* +* agent.close +** * @author Vaclav Pech * Date: Oct 18, 2009 * -* AKKA port by +* Inital AKKA port by * @author Viktor Klang * Date: Jan 24 2010 +* +* @author Jonas Bonér */ sealed class Agent[T] private (initialValue: T) extends Transactor { import Agent._ @@ -52,7 +67,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { this ! ValueHolder(initialValue) /** - * Periodically handles incoming messages + * Periodically handles incoming messages. */ def receive = { case ValueHolder(x: T) => updateData(x) @@ -61,13 +76,13 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { } /** - * Specifies how a copy of the value is made, defaults to using identity + * Specifies how a copy of the value is made, defaults to using identity. */ protected def copyStrategy(t: T): T = t /** - * Updates the internal state with the value provided as a by-name parameter + * Updates the internal state with the value provided as a by-name parameter. */ private final def updateData(newData: => T): Unit = value.swap(newData) @@ -79,6 +94,8 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * method and then waits for its result on a CountDownLatch. */ final def get: T = { + if (isTransactionInScope) throw new AgentException( + "Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.") val ref = new AtomicReference[T] val latch = new CountDownLatch(1) get((x: T) => {ref.set(x); latch.countDown}) @@ -99,33 +116,34 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * the asynchronous getValue() method and then waits for its result on a CountDownLatch. */ final def apply(): T = get - + /** - * Asynchronously submits a request to read the internal state. The supplied function - * will be executed on the returned internal state value. A copy of the internal state - * will be used, depending on the underlying effective copyStrategy. - */ -// final def apply(message: (T => Unit)) : Unit = get(message) - - /** - * Submits the provided function for execution against the internal agent's state + * Submits the provided function for execution against the internal agent's state. */ final def apply(message: (T => T)): Unit = this ! FunctionHolder(message) /** - * Submits a new value to be set as the new agent's internal state + * Submits a new value to be set as the new agent's internal state. */ final def apply(message: T): Unit = this ! ValueHolder(message) /** - * Submits the provided function for execution against the internal agent's state + * Submits the provided function for execution against the internal agent's state. */ final def update(message: (T => T)): Unit = this ! FunctionHolder(message) /** - * Submits a new value to be set as the new agent's internal state + * Submits a new value to be set as the new agent's internal state. */ + // FIXME Change to 'send' when we have Scala 2.8 and we can remove the Actor.send method final def update(message: T): Unit = this ! ValueHolder(message) + + /** + * Closes the agents and makes it eligable for garbage collection. + * + * A closed agent can never be used again. + */ + def close = stop } /** @@ -134,22 +152,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { object Agent { /* - * The internal messages for passing around requests + * The internal messages for passing around requests. */ - private case class ProcedureHolder[T](val fun: ((T) => Unit)) - private case class FunctionHolder[T](val fun: ((T) => T)) - private case class ValueHolder[T](val value: T) + private case class ProcedureHolder[T](fun: ((T) => Unit)) + private case class FunctionHolder[T](fun: ((T) => T)) + private case class ValueHolder[T](value: T) /** - * Creates a new Agent of type T with the initial value of value + * Creates a new Agent of type T with the initial value of value. */ - def apply[T](value:T): Agent[T] = new Agent(value) + def apply[T](value: T): Agent[T] = new Agent(value) /** * Creates a new Agent of type T with the initial value of value and with the - * specified copy function + * specified copy function. */ - def apply[T](value:T, newCopyStrategy: (T) => T) = new Agent(value) { - override def copyStrategy(t : T) = newCopyStrategy(t) + def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) { + override def copyStrategy(t: T) = newCopyStrategy(t) } } \ No newline at end of file diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index a7184e969d..72f97a2d0b 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -97,66 +97,43 @@ class TransactionRetryException(message: String) extends RuntimeException(messag * * @author Jonas Bonér */ -object Transaction extends TransactionManagement { +object Transaction extends TransactionManagement with Logging { val idFactory = new AtomicLong(-1L) -/* - import AlphaStm._ - private val defaultTxBuilder = new AlphaTransactionFactoryBuilder - defaultTxBuilder.setReadonly(false) - defaultTxBuilder.setInterruptible(INTERRUPTIBLE) - defaultTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES) - defaultTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW) - defaultTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING) - defaultTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR) - defaultTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy) - private val readOnlyTxBuilder = new AlphaStm.AlphaTransactionFactoryBuilder - readOnlyTxBuilder.setReadonly(true) - readOnlyTxBuilder.setInterruptible(INTERRUPTIBLE) - readOnlyTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES) - readOnlyTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW) - readOnlyTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING) - readOnlyTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR) - readOnlyTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy) - */ + /** - * See ScalaDoc on class. + * See ScalaDoc on Transaction class. */ def map[T](f: => T)(implicit transactionFamilyName: String): T = atomic {f} /** - * See ScalaDoc on class. + * See ScalaDoc on Transaction class. */ def flatMap[T](f: => T)(implicit transactionFamilyName: String): T = atomic {f} /** - * See ScalaDoc on class. + * See ScalaDoc on Transaction class. */ def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit = atomic {f} /** - * Creates a "pure" STM atomic transaction and by-passes all transactions hooks - * such as persistence etc. - * Only for internal usage. - */ - private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() { - def execute(mtx: MultiverseTransaction): T = body - }.execute() - - /** - * See ScalaDoc on class. + * See ScalaDoc on Transaction class. */ def atomic[T](body: => T)(implicit transactionFamilyName: String): T = { + // FIXME use Transaction Builder and set the transactionFamilyName // defaultTxBuilder.setFamilyName(transactionFamilyName) // new TransactionTemplate[T](defaultTxBuilder.build) { - new TransactionTemplate[T]() { // FIXME take factory + var isTopLevelTransaction = true + new TransactionTemplate[T]() { def execute(mtx: MultiverseTransaction): T = { val result = body - log.trace("Committing transaction [%s] \nwith family name [%s] \nby joining transaction set") - getTransactionSetInScope.joinCommit(mtx) + val txSet = getTransactionSetInScope + log.trace("Committing transaction [%s]\n\twith family name [%s]\n\tby joining transaction set [%s]", + mtx, transactionFamilyName, txSet) + txSet.joinCommit(mtx) // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) @@ -166,8 +143,11 @@ object Transaction extends TransactionManagement { } override def onStart(mtx: MultiverseTransaction) = { - val txSet = if (!isTransactionSetInScope) createNewTransactionSet - else getTransactionSetInScope + val txSet = + if (!isTransactionSetInScope) { + isTopLevelTransaction = true + createNewTransactionSet + } else getTransactionSetInScope val tx = new Transaction tx.transaction = Some(mtx) setTransaction(Some(tx)) @@ -197,6 +177,16 @@ object Transaction extends TransactionManagement { def orelserun(t: MultiverseTransaction) = secondBody }.execute() } + + /** + * Creates a STM atomic transaction and by-passes all transactions hooks + * such as persistence etc. + * + * Only for internal usage. + */ + private[akka] def atomic0[T](body: => T): T = new TransactionTemplate[T]() { + def execute(mtx: MultiverseTransaction): T = body + }.execute() } /** @@ -205,18 +195,19 @@ object Transaction extends TransactionManagement { @serializable class Transaction extends Logging { import Transaction._ - log.trace("Creating %s", toString) val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: Option[MultiverseTransaction] = None private[this] val persistentStateMap = new HashMap[String, Committable] private[akka] val depth = new AtomicInteger(0) + log.trace("Creating %s", toString) + // --- public methods --------- def commit = synchronized { log.trace("Committing transaction %s", toString) - pureAtomic { + atomic0 { persistentStateMap.values.foreach(_.commit) } status = TransactionStatus.Completed @@ -261,21 +252,21 @@ object Transaction extends TransactionManagement { "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) // For reinitialize transaction after sending it over the wire - private[akka] def reinit = synchronized { +/* private[akka] def reinit = synchronized { import net.lag.logging.{Logger, Level} if (log eq null) { log = Logger.get(this.getClass.getName) log.setLevel(Level.ALL) // TODO: preserve logging level } } - +*/ override def equals(that: Any): Boolean = synchronized { that != null && - that.isInstanceOf[Transaction] && - that.asInstanceOf[Transaction].id == this.id + that.isInstanceOf[Transaction] && + that.asInstanceOf[Transaction].id == this.id } - override def hashCode(): Int = synchronized { id.toInt } + override def hashCode: Int = synchronized { id.toInt } override def toString = synchronized { "Transaction[" + id + ", " + status + "]" } } diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 96742b9363..48c8c7dd95 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -6,8 +6,6 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicBoolean -import se.scalablesolutions.akka.util.Logging - import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier @@ -40,8 +38,8 @@ object TransactionManagement extends TransactionManagement { private[akka] def getTransactionSet: CountDownCommitBarrier = { val option = transactionSet.get - if ((option eq null) || option.isEmpty) throw new IllegalStateException("No TransactionSet in scope") - option.get + if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction set in scope") + else option.get } private[akka] def getTransaction: Transaction = { @@ -51,7 +49,7 @@ object TransactionManagement extends TransactionManagement { } } -trait TransactionManagement extends Logging { +trait TransactionManagement { private[akka] def createNewTransactionSet: CountDownCommitBarrier = { val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS) diff --git a/akka-core/src/test/scala/AgentTest.scala b/akka-core/src/test/scala/AgentTest.scala index d6776c2d23..a81a945439 100644 --- a/akka-core/src/test/scala/AgentTest.scala +++ b/akka-core/src/test/scala/AgentTest.scala @@ -1,10 +1,14 @@ package se.scalablesolutions.akka.actor -import org.scalatest.Suite +import se.scalablesolutions.akka.actor.Actor.transactor +import se.scalablesolutions.akka.stm.Transaction.atomic import se.scalablesolutions.akka.util.Logging -import org.junit.runner.RunWith + +import org.scalatest.Suite import org.scalatest.junit.JUnitRunner import org.scalatest.matchers.MustMatchers + +import org.junit.runner.RunWith import org.junit.{Test} @RunWith(classOf[JUnitRunner]) @@ -12,7 +16,9 @@ class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging { - @Test def testAgent = verify(new TestActor { + implicit val txFamilyName = "test" + + @Test def testSendFun = verify(new TestActor { def test = { val agent = Agent(5) handle(agent) { @@ -24,4 +30,49 @@ with ActorTestUtil with Logging { } } }) + + @Test def testSendValue = verify(new TestActor { + def test = { + val agent = Agent(5) + handle(agent) { + agent update 6 + val result = agent() + result must be(6) + } + } + }) + + @Test def testOneAgentUpdateWithinEnlosingTransactionSuccess = { + case object Go + val agent = Agent(5) + val tx = transactor { + case Go => agent update (_ + 1) + } + tx send Go + Thread.sleep(5000) + val result = agent() + result must be(6) + agent.close + tx.stop + } + + @Test def testDoingAgentGetInEnlosingTransactionShouldYieldException = { + import java.util.concurrent.CountDownLatch + case object Go + val latch = new CountDownLatch(1) + val agent = Agent(5) + val tx = transactor { + case Go => + agent update (_ * 2) + try { agent() } + catch { + case _ => latch.countDown + } + } + tx send Go + latch.await // FIXME should await with timeout and fail if timeout + agent.close + tx.stop + assert(true) + } } diff --git a/akka-util/src/main/scala/Logging.scala b/akka-util/src/main/scala/Logging.scala index b988c73f22..978ceaa479 100644 --- a/akka-util/src/main/scala/Logging.scala +++ b/akka-util/src/main/scala/Logging.scala @@ -17,7 +17,7 @@ import java.net.UnknownHostException * @author Jonas Bonér */ trait Logging { - @transient @volatile var log = Logger.get(this.getClass.getName) + @transient @volatile lazy val log = Logger.get(this.getClass.getName) } /** diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 6c442e3ef1..d95fc17873 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -8,7 +8,7 @@