diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index 697b53d797..d202a0d2be 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -44,8 +44,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes *
* val agent = Agent(5) * -* agent update (_ + 1) -* agent update (_ * 2) +* agent send (_ + 1) +* agent send (_ * 2) * * val result = agent() * ... // use result @@ -53,17 +53,26 @@ class AgentException private[akka](message: String) extends RuntimeException(mes * agent.close ** +* Example of monadic usage: +*
+* val agent1 = Agent(3)
+* val agent2 = Agent(5)
+*
+* for {
+* first <- agent1
+* second <- agent2
+* if first == second
+* } process(first, second)
+*
+* agent1.close
+* agent2.close
+*
+*
* NOTE: You can't call 'agent.get' or 'agent()' within an enclosing transaction since
-* that will block the transaction indefinitely. But 'agent.update' or 'Agent(value)'
+* that will block the transaction indefinitely. But 'agent.send' or 'Agent(value)'
* is fine.
*
-* Original author:
-* @author Vaclav Pech
-*
-* Inital AKKA port by:
* @author Viktor Klang
-*
-* Modifications by:
* @author Jonas Bonér
*/
sealed class Agent[T] private (initialValue: T) extends Transactor {
@@ -71,15 +80,18 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
private lazy val value = Ref[T]()
start
- this ! ValueHolder(initialValue)
+ this !! Value(initialValue)
/**
- * Periodically handles incoming messages.
- */
+ * Periodically handles incoming messages.
+ */
def receive = {
- case ValueHolder(x: T) => updateData(x)
- case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
- case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
+ case Value(v: T) =>
+ swap(v)
+ case Function(fun: (T => T)) =>
+ swap(fun(value.getOrWait))
+ case Procedure(proc: (T => Unit)) =>
+ proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))))
}
/**
@@ -87,11 +99,11 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
*/
protected def copyStrategy(t: T): T = t
-
/**
- * Updates the internal state with the value provided as a by-name parameter.
- */
- private final def updateData(newData: => T): Unit = value.swap(newData)
+ * Performs a CAS operation, atomically swapping the internal state with the value
+ * provided as a by-name parameter.
+ */
+ private final def swap(newData: => T): Unit = value.swap(newData)
/**
* Submits a request to read the internal state.
@@ -105,46 +117,64 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
"Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
- get((x: T) => {ref.set(x); latch.countDown})
+ sendProc((x: T) => {ref.set(x); latch.countDown})
latch.await
ref.get
}
/**
- * Asynchronously submits a request to read the internal state. The supplied function
- * will be executed on the returned internal state value. A copy of the internal state
- * will be used, depending on the underlying effective copyStrategy.
- */
- final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)
-
- /**
- * Submits a request to read the internal state. A copy of the internal state will be
- * returned, depending on the underlying effective copyStrategy. Internally leverages
- * the asynchronous getValue() method and then waits for its result on a CountDownLatch.
- */
+ * Submits a request to read the internal state. A copy of the internal state will be
+ * returned, depending on the underlying effective copyStrategy. Internally leverages
+ * the asynchronous getValue() method and then waits for its result on a CountDownLatch.
+ */
final def apply(): T = get
+
+ /**
+ * Submits the provided function for execution against the internal agent's state.
+ */
+ final def apply(message: (T => T)): Unit = this ! Function(message)
+
+ /**
+ * Submits a new value to be set as the new agent's internal state.
+ */
+ final def apply(message: T): Unit = this ! Value(message)
+
+ /**
+ * Submits the provided function of type 'T => T' for execution against the internal agent's state.
+ */
+ final def send(message: (T) => T): Unit = this ! Function(message)
+
+ /**
+ * Submits a new value to be set as the new agent's internal state.
+ */
+ final def send(message: T): Unit = this ! Value(message)
/**
- * Submits the provided function for execution against the internal agent's state.
- */
- final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
-
+ * Asynchronously submits a procedure of type 'T => Unit' to read the internal state.
+ * The supplied procedure will be executed on the returned internal state value. A copy
+ * of the internal state will be used, depending on the underlying effective copyStrategy.
+ * Does not change the value of the agent (this).
+ */
+ final def sendProc(f: (T) => Unit): Unit = this ! Procedure(f)
+
/**
- * Submits a new value to be set as the new agent's internal state.
- */
- final def apply(message: T): Unit = this ! ValueHolder(message)
-
+ * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
+ * Does not change the value of the agent (this).
+ */
+ final def map[B](f: (T) => B): Agent[B] = Agent(f(get))
+
/**
- * Submits the provided function for execution against the internal agent's state.
- */
- final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
-
+ * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
+ * Does not change the value of the agent (this).
+ */
+ final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())
+
/**
- * Submits a new value to be set as the new agent's internal state.
- */
- // FIXME Change to 'send' when we have Scala 2.8 and we can remove the Actor.send method
- final def update(message: T): Unit = this ! ValueHolder(message)
-
+ * Applies function with type 'T => B' to the agent's internal state.
+ * Does not change the value of the agent (this).
+ */
+ final def foreach(f: (T) => Unit): Unit = f(get)
+
/**
* Closes the agents and makes it eligable for garbage collection.
*
@@ -154,16 +184,19 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
}
/**
-* Provides factory methods to create Agents.
-*/
-object Agent {
+ * Provides factory methods to create Agents.
+ *
+ * @author Viktor Klang
+ * @author Jonas Bonér
+ */
+object Agent {
/*
* The internal messages for passing around requests.
*/
- private case class ProcedureHolder[T](fun: ((T) => Unit))
- private case class FunctionHolder[T](fun: ((T) => T))
- private case class ValueHolder[T](value: T)
+ private case class Value[T](value: T)
+ private case class Function[T](fun: ((T) => T))
+ private case class Procedure[T](fun: ((T) => Unit))
/**
* Creates a new Agent of type T with the initial value of value.
@@ -177,4 +210,4 @@ object Agent {
def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) {
override def copyStrategy(t: T) = newCopyStrategy(t)
}
-}
\ No newline at end of file
+}
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index c86abe662b..29d4c586e9 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -103,33 +103,29 @@ object Transaction extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction class.
*/
- def map[T](f: => T)(implicit transactionFamilyName: String): T =
- atomic {f}
+ def map[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction class.
*/
- def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
- atomic {f}
+ def flatMap[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction class.
*/
- def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
- atomic {f}
+ def foreach(f: => Unit): Unit = atomic {f}
/**
* See ScalaDoc on Transaction class.
*/
- def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
+ def atomic[T](body: => T): T = {
var isTopLevelTransaction = true
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
val result = body
val txSet = getTransactionSetInScope
- log.trace("Committing transaction [%s]\n\twith family name [%s]\n\tby joining transaction set [%s]",
- mtx, transactionFamilyName, txSet)
+ log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
txSet.joinCommit(mtx)
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
diff --git a/akka-core/src/test/scala/AgentTest.scala b/akka-core/src/test/scala/AgentTest.scala
index a81a945439..de2929490e 100644
--- a/akka-core/src/test/scala/AgentTest.scala
+++ b/akka-core/src/test/scala/AgentTest.scala
@@ -16,15 +16,12 @@ class AgentTest extends junit.framework.TestCase
with Suite with MustMatchers
with ActorTestUtil with Logging {
- implicit val txFamilyName = "test"
-
@Test def testSendFun = verify(new TestActor {
def test = {
val agent = Agent(5)
handle(agent) {
- agent update (_ + 1)
- agent update (_ * 2)
-
+ agent send (_ + 1)
+ agent send (_ * 2)
val result = agent()
result must be(12)
}
@@ -35,21 +32,34 @@ with ActorTestUtil with Logging {
def test = {
val agent = Agent(5)
handle(agent) {
- agent update 6
+ agent send 6
val result = agent()
result must be(6)
}
}
})
- @Test def testOneAgentUpdateWithinEnlosingTransactionSuccess = {
+ @Test def testSendProc = verify(new TestActor {
+ def test = {
+ val agent = Agent(5)
+ var result = 0
+ handle(agent) {
+ agent sendProc (result += _)
+ agent sendProc (result += _)
+ Thread.sleep(1000)
+ result must be(10)
+ }
+ }
+ })
+
+ @Test def testOneAgentsendWithinEnlosingTransactionSuccess = {
case object Go
val agent = Agent(5)
val tx = transactor {
- case Go => agent update (_ + 1)
+ case Go => agent send (_ + 1)
}
- tx send Go
- Thread.sleep(5000)
+ tx ! Go
+ Thread.sleep(1000)
val result = agent()
result must be(6)
agent.close
@@ -63,16 +73,53 @@ with ActorTestUtil with Logging {
val agent = Agent(5)
val tx = transactor {
case Go =>
- agent update (_ * 2)
+ agent send (_ * 2)
try { agent() }
catch {
case _ => latch.countDown
}
}
- tx send Go
+ tx ! Go
latch.await // FIXME should await with timeout and fail if timeout
agent.close
tx.stop
assert(true)
}
+
+ @Test def testAgentForeach = verify(new TestActor {
+ def test = {
+ val agent1 = Agent(3)
+ var result = 0
+ for (first <- agent1) {
+ result = first + 1
+ }
+ result must be(4)
+ agent1.close
+ }
+ })
+
+ @Test def testAgentMap = verify(new TestActor {
+ def test = {
+ val agent1 = Agent(3)
+ val result = for (first <- agent1) yield first + 1
+ result() must be(4)
+ result.close
+ agent1.close
+ }
+ })
+
+ @Test def testAgentFlatMap = verify(new TestActor {
+ def test = {
+ val agent1 = Agent(3)
+ val agent2 = Agent(5)
+ val result = for {
+ first <- agent1
+ second <- agent2
+ } yield second + first
+ result() must be(8)
+ result.close
+ agent1.close
+ agent2.close
+ }
+ })
}
diff --git a/akka-core/src/test/scala/ShutdownSpec.scala b/akka-core/src/test/scala/ShutdownSpec.scala
index 20927bbfb1..8790715983 100644
--- a/akka-core/src/test/scala/ShutdownSpec.scala
+++ b/akka-core/src/test/scala/ShutdownSpec.scala
@@ -13,7 +13,7 @@ object ActorShutdownRunner {
val myActor = new MyActor
myActor.start
- myActor.send("test")
+ myActor ! "test"
myActor.stop
}
}
@@ -34,4 +34,4 @@ object RemoteServerAndClusterShutdownRunner {
s2.shutdown
s3.shutdown
}
-}
\ No newline at end of file
+}