From 089cf01b40f081de007e6ccfa5928380a4e87dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 31 Mar 2010 19:53:00 +0200 Subject: [PATCH] Agent is now monadic, added more tests to AgentTest --- akka-core/src/main/scala/actor/Agent.scala | 143 +++++++++++------- .../src/main/scala/stm/Transaction.scala | 14 +- akka-core/src/test/scala/AgentTest.scala | 71 +++++++-- akka-core/src/test/scala/ShutdownSpec.scala | 4 +- 4 files changed, 154 insertions(+), 78 deletions(-) diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index 697b53d797..d202a0d2be 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -44,8 +44,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes *
 * val agent = Agent(5)
 *
-* agent update (_ + 1)
-* agent update (_ * 2)
+* agent send (_ + 1)
+* agent send (_ * 2)
 *
 * val result = agent()
 * ... // use result
@@ -53,17 +53,26 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
 * agent.close
 * 
* +* Example of monadic usage: +*
+* val agent1 = Agent(3)
+* val agent2 = Agent(5)
+*
+* for {
+*   first <- agent1
+*   second <- agent2
+*   if first == second
+* } process(first, second)
+*
+* agent1.close
+* agent2.close
+* 
+* * NOTE: You can't call 'agent.get' or 'agent()' within an enclosing transaction since -* that will block the transaction indefinitely. But 'agent.update' or 'Agent(value)' +* that will block the transaction indefinitely. But 'agent.send' or 'Agent(value)' * is fine. * -* Original author: -* @author Vaclav Pech -* -* Inital AKKA port by: * @author Viktor Klang -* -* Modifications by: * @author Jonas Bonér */ sealed class Agent[T] private (initialValue: T) extends Transactor { @@ -71,15 +80,18 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { private lazy val value = Ref[T]() start - this ! ValueHolder(initialValue) + this !! Value(initialValue) /** - * Periodically handles incoming messages. - */ + * Periodically handles incoming messages. + */ def receive = { - case ValueHolder(x: T) => updateData(x) - case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait)) - case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait)) + case Value(v: T) => + swap(v) + case Function(fun: (T => T)) => + swap(fun(value.getOrWait)) + case Procedure(proc: (T => Unit)) => + proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))) } /** @@ -87,11 +99,11 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { */ protected def copyStrategy(t: T): T = t - /** - * Updates the internal state with the value provided as a by-name parameter. - */ - private final def updateData(newData: => T): Unit = value.swap(newData) + * Performs a CAS operation, atomically swapping the internal state with the value + * provided as a by-name parameter. + */ + private final def swap(newData: => T): Unit = value.swap(newData) /** * Submits a request to read the internal state. @@ -105,46 +117,64 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { "Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.") val ref = new AtomicReference[T] val latch = new CountDownLatch(1) - get((x: T) => {ref.set(x); latch.countDown}) + sendProc((x: T) => {ref.set(x); latch.countDown}) latch.await ref.get } /** - * Asynchronously submits a request to read the internal state. The supplied function - * will be executed on the returned internal state value. A copy of the internal state - * will be used, depending on the underlying effective copyStrategy. - */ - final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message) - - /** - * Submits a request to read the internal state. A copy of the internal state will be - * returned, depending on the underlying effective copyStrategy. Internally leverages - * the asynchronous getValue() method and then waits for its result on a CountDownLatch. - */ + * Submits a request to read the internal state. A copy of the internal state will be + * returned, depending on the underlying effective copyStrategy. Internally leverages + * the asynchronous getValue() method and then waits for its result on a CountDownLatch. + */ final def apply(): T = get + + /** + * Submits the provided function for execution against the internal agent's state. + */ + final def apply(message: (T => T)): Unit = this ! Function(message) + + /** + * Submits a new value to be set as the new agent's internal state. + */ + final def apply(message: T): Unit = this ! Value(message) + + /** + * Submits the provided function of type 'T => T' for execution against the internal agent's state. + */ + final def send(message: (T) => T): Unit = this ! Function(message) + + /** + * Submits a new value to be set as the new agent's internal state. + */ + final def send(message: T): Unit = this ! Value(message) /** - * Submits the provided function for execution against the internal agent's state. - */ - final def apply(message: (T => T)): Unit = this ! FunctionHolder(message) - + * Asynchronously submits a procedure of type 'T => Unit' to read the internal state. + * The supplied procedure will be executed on the returned internal state value. A copy + * of the internal state will be used, depending on the underlying effective copyStrategy. + * Does not change the value of the agent (this). + */ + final def sendProc(f: (T) => Unit): Unit = this ! Procedure(f) + /** - * Submits a new value to be set as the new agent's internal state. - */ - final def apply(message: T): Unit = this ! ValueHolder(message) - + * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. + * Does not change the value of the agent (this). + */ + final def map[B](f: (T) => B): Agent[B] = Agent(f(get)) + /** - * Submits the provided function for execution against the internal agent's state. - */ - final def update(message: (T => T)): Unit = this ! FunctionHolder(message) - + * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. + * Does not change the value of the agent (this). + */ + final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)()) + /** - * Submits a new value to be set as the new agent's internal state. - */ - // FIXME Change to 'send' when we have Scala 2.8 and we can remove the Actor.send method - final def update(message: T): Unit = this ! ValueHolder(message) - + * Applies function with type 'T => B' to the agent's internal state. + * Does not change the value of the agent (this). + */ + final def foreach(f: (T) => Unit): Unit = f(get) + /** * Closes the agents and makes it eligable for garbage collection. * @@ -154,16 +184,19 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { } /** -* Provides factory methods to create Agents. -*/ -object Agent { + * Provides factory methods to create Agents. + * + * @author Viktor Klang + * @author Jonas Bonér + */ +object Agent { /* * The internal messages for passing around requests. */ - private case class ProcedureHolder[T](fun: ((T) => Unit)) - private case class FunctionHolder[T](fun: ((T) => T)) - private case class ValueHolder[T](value: T) + private case class Value[T](value: T) + private case class Function[T](fun: ((T) => T)) + private case class Procedure[T](fun: ((T) => Unit)) /** * Creates a new Agent of type T with the initial value of value. @@ -177,4 +210,4 @@ object Agent { def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) { override def copyStrategy(t: T) = newCopyStrategy(t) } -} \ No newline at end of file +} diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index c86abe662b..29d4c586e9 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -103,33 +103,29 @@ object Transaction extends TransactionManagement with Logging { /** * See ScalaDoc on Transaction class. */ - def map[T](f: => T)(implicit transactionFamilyName: String): T = - atomic {f} + def map[T](f: => T): T = atomic {f} /** * See ScalaDoc on Transaction class. */ - def flatMap[T](f: => T)(implicit transactionFamilyName: String): T = - atomic {f} + def flatMap[T](f: => T): T = atomic {f} /** * See ScalaDoc on Transaction class. */ - def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit = - atomic {f} + def foreach(f: => Unit): Unit = atomic {f} /** * See ScalaDoc on Transaction class. */ - def atomic[T](body: => T)(implicit transactionFamilyName: String): T = { + def atomic[T](body: => T): T = { var isTopLevelTransaction = true new TransactionTemplate[T]() { def execute(mtx: MultiverseTransaction): T = { val result = body val txSet = getTransactionSetInScope - log.trace("Committing transaction [%s]\n\twith family name [%s]\n\tby joining transaction set [%s]", - mtx, transactionFamilyName, txSet) + log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) txSet.joinCommit(mtx) // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) diff --git a/akka-core/src/test/scala/AgentTest.scala b/akka-core/src/test/scala/AgentTest.scala index a81a945439..de2929490e 100644 --- a/akka-core/src/test/scala/AgentTest.scala +++ b/akka-core/src/test/scala/AgentTest.scala @@ -16,15 +16,12 @@ class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging { - implicit val txFamilyName = "test" - @Test def testSendFun = verify(new TestActor { def test = { val agent = Agent(5) handle(agent) { - agent update (_ + 1) - agent update (_ * 2) - + agent send (_ + 1) + agent send (_ * 2) val result = agent() result must be(12) } @@ -35,21 +32,34 @@ with ActorTestUtil with Logging { def test = { val agent = Agent(5) handle(agent) { - agent update 6 + agent send 6 val result = agent() result must be(6) } } }) - @Test def testOneAgentUpdateWithinEnlosingTransactionSuccess = { + @Test def testSendProc = verify(new TestActor { + def test = { + val agent = Agent(5) + var result = 0 + handle(agent) { + agent sendProc (result += _) + agent sendProc (result += _) + Thread.sleep(1000) + result must be(10) + } + } + }) + + @Test def testOneAgentsendWithinEnlosingTransactionSuccess = { case object Go val agent = Agent(5) val tx = transactor { - case Go => agent update (_ + 1) + case Go => agent send (_ + 1) } - tx send Go - Thread.sleep(5000) + tx ! Go + Thread.sleep(1000) val result = agent() result must be(6) agent.close @@ -63,16 +73,53 @@ with ActorTestUtil with Logging { val agent = Agent(5) val tx = transactor { case Go => - agent update (_ * 2) + agent send (_ * 2) try { agent() } catch { case _ => latch.countDown } } - tx send Go + tx ! Go latch.await // FIXME should await with timeout and fail if timeout agent.close tx.stop assert(true) } + + @Test def testAgentForeach = verify(new TestActor { + def test = { + val agent1 = Agent(3) + var result = 0 + for (first <- agent1) { + result = first + 1 + } + result must be(4) + agent1.close + } + }) + + @Test def testAgentMap = verify(new TestActor { + def test = { + val agent1 = Agent(3) + val result = for (first <- agent1) yield first + 1 + result() must be(4) + result.close + agent1.close + } + }) + + @Test def testAgentFlatMap = verify(new TestActor { + def test = { + val agent1 = Agent(3) + val agent2 = Agent(5) + val result = for { + first <- agent1 + second <- agent2 + } yield second + first + result() must be(8) + result.close + agent1.close + agent2.close + } + }) } diff --git a/akka-core/src/test/scala/ShutdownSpec.scala b/akka-core/src/test/scala/ShutdownSpec.scala index 20927bbfb1..8790715983 100644 --- a/akka-core/src/test/scala/ShutdownSpec.scala +++ b/akka-core/src/test/scala/ShutdownSpec.scala @@ -13,7 +13,7 @@ object ActorShutdownRunner { val myActor = new MyActor myActor.start - myActor.send("test") + myActor ! "test" myActor.stop } } @@ -34,4 +34,4 @@ object RemoteServerAndClusterShutdownRunner { s2.shutdown s3.shutdown } -} \ No newline at end of file +}