Merge pull request #172 from jboner/migrate-agent

Migrate agent to scala stm
This commit is contained in:
Peter Vlugter 2011-12-19 13:19:31 -08:00
commit 00c7fe5bbb
12 changed files with 603 additions and 189 deletions

View file

@ -4,17 +4,17 @@
package akka.agent package akka.agent
import akka.actor.ActorSystem
import akka.actor._ import akka.actor._
import akka.stm._
import akka.japi.{ Function JFunc, Procedure JProc } import akka.japi.{ Function JFunc, Procedure JProc }
import akka.dispatch._ import akka.dispatch._
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.stm._
/** /**
* Used internally to send functions. * Used internally to send functions.
*/ */
private[akka] case class Update[T](function: T T) private[akka] case class Update[T](function: T T)
private[akka] case class Alter[T](function: T T)
private[akka] case object Get private[akka] case object Get
/** /**
@ -101,7 +101,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/** /**
* Read the internal state of the agent. * Read the internal state of the agent.
*/ */
def get() = ref.get def get() = ref.single.get
/** /**
* Read the internal state of the agent. * Read the internal state of the agent.
@ -111,9 +111,10 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/** /**
* Dispatch a function to update the internal state. * Dispatch a function to update the internal state.
*/ */
def send(f: T T) { def send(f: T T): Unit = {
def dispatch = updater ! Update(f) def dispatch = updater ! Update(f)
if (Stm.activeTransaction) { get; deferred(dispatch) } val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status dispatch)(txn.get)
else dispatch else dispatch
} }
@ -122,11 +123,11 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* that new state can be obtained within the given timeout. * that new state can be obtained within the given timeout.
*/ */
def alter(f: T T)(timeout: Timeout): Future[T] = { def alter(f: T T)(timeout: Timeout): Future[T] = {
def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]] def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]]
if (Stm.activeTransaction) { val txn = Txn.findCurrent
if (txn.isDefined) {
val result = Promise[T]()(system.dispatcher) val result = Promise[T]()(system.dispatcher)
get //Join xa Txn.afterCommit(status result completeWith dispatch)(txn.get)
deferred { result completeWith dispatch } //Attach deferred-block to current transaction
result result
} else dispatch } else dispatch
} }
@ -172,7 +173,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
suspend() suspend()
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration) val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration)
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]]
value value
}) })
result result
@ -283,28 +284,35 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* Agent updater actor. Used internally for `send` actions. * Agent updater actor. Used internally for `send` actions.
*/ */
class AgentUpdater[T](agent: Agent[T]) extends Actor { class AgentUpdater[T](agent: Agent[T]) extends Actor {
val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false)
def receive = { def receive = {
case update: Update[_] sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] }) case u: Update[_] update(u.function.asInstanceOf[T T])
case Get sender.tell(agent.get) case a: Alter[_] sender ! update(a.function.asInstanceOf[T T])
case _ case Get sender ! agent.get
case _
} }
def update(function: T T): T = agent.ref.single.transformAndGet(function)
} }
/** /**
* Thread-based agent updater actor. Used internally for `sendOff` actions. * Thread-based agent updater actor. Used internally for `sendOff` actions.
*/ */
class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
def receive = { def receive = {
case update: Update[_] try { case u: Update[_] try {
sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] }) update(u.function.asInstanceOf[T T])
} finally {
agent.resume()
context.stop(self)
}
case a: Alter[_] try {
sender ! update(a.function.asInstanceOf[T T])
} finally { } finally {
agent.resume() agent.resume()
context.stop(self) context.stop(self)
} }
case _ context.stop(self) case _ context.stop(self)
} }
def update(function: T T): T = agent.ref.single.transformAndGet(function)
} }

View file

@ -1,17 +1,12 @@
package akka.agent.test package akka.agent
import org.scalatest.WordSpec import akka.dispatch.Await
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.util.Timeout
import akka.agent.Agent
import akka.stm._
import akka.util.Duration import akka.util.Duration
import akka.util.duration._ import akka.util.duration._
import java.util.concurrent.CountDownLatch import akka.util.Timeout
import akka.testkit.AkkaSpec
import akka.testkit._ import akka.testkit._
import akka.dispatch.Await import scala.concurrent.stm._
import java.util.concurrent.CountDownLatch
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num) val latch = new CountDownLatch(num)
@ -96,7 +91,7 @@ class AgentSpec extends AkkaSpec {
"be readable within a transaction" in { "be readable within a transaction" in {
val agent = Agent(5) val agent = Agent(5)
val value = atomic { agent() } val value = atomic { t agent() }
value must be(5) value must be(5)
agent.close() agent.close()
} }
@ -105,7 +100,7 @@ class AgentSpec extends AkkaSpec {
val countDown = new CountDownFunction[Int] val countDown = new CountDownFunction[Int]
val agent = Agent(5) val agent = Agent(5)
atomic { atomic { t
agent send (_ * 2) agent send (_ * 2)
} }
agent send countDown agent send countDown
@ -122,7 +117,7 @@ class AgentSpec extends AkkaSpec {
val agent = Agent(5) val agent = Agent(5)
try { try {
atomic(DefaultTransactionFactory) { atomic { t
agent send (_ * 2) agent send (_ * 2)
throw new RuntimeException("Expected failure") throw new RuntimeException("Expected failure")
} }

View file

@ -32,7 +32,7 @@ class IncludeCode(Directive):
document = self.state.document document = self.state.document
arg0 = self.arguments[0] arg0 = self.arguments[0]
(filename, sep, section) = arg0.partition('#') (filename, sep, section) = arg0.partition('#')
if not document.settings.file_insertion_enabled: if not document.settings.file_insertion_enabled:
return [document.reporter.warning('File insertion disabled', return [document.reporter.warning('File insertion disabled',
line=self.lineno)] line=self.lineno)]
@ -126,8 +126,9 @@ class IncludeCode(Directive):
retnode = nodes.literal_block(text, text, source=fn) retnode = nodes.literal_block(text, text, source=fn)
retnode.line = 1 retnode.line = 1
retnode.attributes['line_number'] = self.lineno retnode.attributes['line_number'] = self.lineno
if self.options.get('language', ''): language = self.options.get('language')
retnode['language'] = self.options['language'] if language:
retnode['language'] = language
if 'linenos' in self.options: if 'linenos' in self.options:
retnode['linenos'] = True retnode['linenos'] = True
document.settings.env.note_dependency(rel_fn) document.settings.env.note_dependency(rel_fn)

View file

@ -1,147 +0,0 @@
Agents (Scala)
==============
.. sidebar:: Contents
.. contents:: :local:
Agents in Akka were inspired by `agents in Clojure <http://clojure.org/agents>`_.
Agents provide asynchronous change of individual locations. Agents are bound to a single storage location for their lifetime, and only allow mutation of that location (to a new state) to occur as a result of an action. Update actions are functions that are asynchronously applied to the Agent's state and whose return value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always immediately available for reading by any thread (using ``get`` or ``apply``) without any messages.
Agents are reactive. The update actions of all Agents get interleaved amongst threads in a thread pool. At any point in time, at most one ``send`` action for each Agent is being executed. Actions dispatched to an agent from another thread will occur in the order they were sent, potentially interleaved with actions dispatched to the same agent from other sources.
If an Agent is used within an enclosing transaction, then it will participate in that transaction. Agents are integrated with the STM - any dispatches made in a transaction are held until that transaction commits, and are discarded if it is retried or aborted.
Creating and stopping Agents
----------------------------
Agents are created by invoking ``Agent(value)`` passing in the Agent's initial value.
.. code-block:: scala
val agent = Agent(5)
An Agent will be running until you invoke ``close`` on it. Then it will be eligible for garbage collection (unless you hold on to it in some way).
.. code-block:: scala
agent.close()
Updating Agents
---------------
You update an Agent by sending a function that transforms the current value or by sending just a new value. The Agent will apply the new value or function atomically and asynchronously. The update is done in a fire-forget manner and you are only guaranteed that it will be applied. There is no guarantee of when the update will be applied but dispatches to an Agent from a single thread will occur in order. You apply a value or a function by invoking the ``send`` function.
.. code-block:: scala
// send a value
agent send 7
// send a function
agent send (_ + 1)
agent send (_ * 2)
You can also dispatch a function to update the internal state but on its own thread. This does not use the reactive thread pool and can be used for long-running or blocking operations. You do this with the ``sendOff`` method. Dispatches using either ``sendOff`` or ``send`` will still be executed in order.
.. code-block:: scala
// sendOff a function
agent sendOff (longRunningOrBlockingFunction)
Reading an Agent's value
------------------------
Agents can be dereferenced, e.g. you can get an Agent's value, by invoking the Agent with parenthesis like this:
.. code-block:: scala
val result = agent()
Or by using the get method.
.. code-block:: scala
val result = agent.get
Reading an Agent's current value does not involve any message passing and happens immediately. So while updates to an Agent are asynchronous, reading the state of an Agent is synchronous.
Awaiting an Agent's value
-------------------------
It is also possible to read the value after all currently queued ``send``\s have completed. You can do this with ``await``:
.. code-block:: scala
val result = agent.await
You can also get a ``Future`` to this value, that will be completed after the currently queued updates have completed:
.. code-block:: scala
val future = agent.future
// ...
val result = future.await.result.get
Transactional Agents
--------------------
If an Agent is used within an enclosing transaction, then it will participate in that transaction. If you send to an Agent within a transaction then the dispatch to the Agent will be held until that transaction commits, and discarded if the transaction is aborted.
.. code-block:: scala
import akka.agent.Agent
import akka.stm._
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
atomic {
if (from.get < amount) false
else {
from send (_ - amount)
to send (_ + amount)
true
}
}
}
val from = Agent(100)
val to = Agent(20)
val ok = transfer(from, to, 50)
from() // -> 50
to() // -> 70
Monadic usage
-------------
Agents are also monadic, allowing you to compose operations using for-comprehensions. In a monadic usage, new Agents are created leaving the original Agents untouched. So the old values (Agents) are still available as-is. They are so-called 'persistent'.
Example of a monadic usage:
.. code-block:: scala
val agent1 = Agent(3)
val agent2 = Agent(5)
// uses foreach
var result = 0
for (value <- agent1) {
result = value + 1
}
// uses map
val agent3 =
for (value <- agent1) yield value + 1
// uses flatMap
val agent4 = for {
value1 <- agent1
value2 <- agent2
} yield value1 + value2
agent1.close()
agent2.close()
agent3.close()
agent4.close()

112
akka-docs/java/agents.rst Normal file
View file

@ -0,0 +1,112 @@
.. _agents-java:
##############
Agents (Java)
##############
.. sidebar:: Contents
.. contents:: :local:
Agents in Akka are inspired by `agents in Clojure`_.
.. _agents in Clojure: http://clojure.org/agents
Agents provide asynchronous change of individual locations. Agents are bound to
a single storage location for their lifetime, and only allow mutation of that
location (to a new state) to occur as a result of an action. Update actions are
functions that are asynchronously applied to the Agent's state and whose return
value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always
immediately available for reading by any thread (using ``get``) without any
messages.
Agents are reactive. The update actions of all Agents get interleaved amongst
threads in a thread pool. At any point in time, at most one ``send`` action for
each Agent is being executed. Actions dispatched to an agent from another thread
will occur in the order they were sent, potentially interleaved with actions
dispatched to the same agent from other sources.
If an Agent is used within an enclosing transaction, then it will participate in
that transaction. Agents are integrated with the STM - any dispatches made in
a transaction are held until that transaction commits, and are discarded if it
is retried or aborted.
Creating and stopping Agents
============================
Agents are created by invoking ``new Agent(value, system)`` passing in the
Agent's initial value and a reference to the ``ActorSystem`` for your
application. An ``ActorSystem`` is required to create the underlying Actors. See
:ref:`actor-systems` for more information about actor systems.
Here is an example of creating an Agent:
.. includecode:: code/akka/docs/agent/AgentDocTest.java
:include: import-system,import-agent
:language: java
.. includecode:: code/akka/docs/agent/AgentDocTest.java#create
:language: java
An Agent will be running until you invoke ``close`` on it. Then it will be
eligible for garbage collection (unless you hold on to it in some way).
.. includecode:: code/akka/docs/agent/AgentDocTest.java#close
:language: java
Updating Agents
===============
You update an Agent by sending a function that transforms the current value or
by sending just a new value. The Agent will apply the new value or function
atomically and asynchronously. The update is done in a fire-forget manner and
you are only guaranteed that it will be applied. There is no guarantee of when
the update will be applied but dispatches to an Agent from a single thread will
occur in order. You apply a value or a function by invoking the ``send``
function.
.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-function
:language: java
.. includecode:: code/akka/docs/agent/AgentDocTest.java#send
:language: java
You can also dispatch a function to update the internal state but on its own
thread. This does not use the reactive thread pool and can be used for
long-running or blocking operations. You do this with the ``sendOff``
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
in order.
.. includecode:: code/akka/docs/agent/AgentDocTest.java#send-off
:language: java
Reading an Agent's value
========================
Agents can be dereferenced (you can get an Agent's value) by calling the get
method:
.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-get
:language: java
Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous.
Awaiting an Agent's value
=========================
It is also possible to read the value after all currently queued sends have
completed. You can do this with ``await``:
.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-timeout
:language: java
.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-await
:language: java

View file

@ -0,0 +1,7 @@
package akka.docs.agent
import org.scalatest.junit.JUnitWrapperSuite
class AgentDocJavaSpec extends JUnitWrapperSuite(
"akka.docs.agent.AgentDocTest",
Thread.currentThread.getContextClassLoader)

View file

@ -0,0 +1,106 @@
package akka.docs.agent;
import static org.junit.Assert.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.testkit.AkkaSpec;
//#import-system
import akka.actor.ActorSystem;
//#import-system
//#import-agent
import akka.agent.Agent;
//#import-agent
//#import-function
import akka.japi.Function;
//#import-function
//#import-timeout
import akka.util.Duration;
import akka.util.Timeout;
import static java.util.concurrent.TimeUnit.SECONDS;
//#import-timeout
public class AgentDocTest {
private static ActorSystem testSystem;
@BeforeClass
public static void beforeAll() {
testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf());
}
@AfterClass
public static void afterAll() {
testSystem.shutdown();
testSystem = null;
}
@Test
public void createAndClose() {
//#create
ActorSystem system = ActorSystem.create("app");
Agent<Integer> agent = new Agent<Integer>(5, system);
//#create
//#close
agent.close();
//#close
system.shutdown();
}
@Test
public void sendAndSendOffAndReadAwait() {
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
//#send
// send a value
agent.send(7);
// send a function
agent.send(new Function<Integer, Integer>() {
public Integer apply(Integer i) {
return i * 2;
}
});
//#send
Function<Integer, Integer> longRunningOrBlockingFunction = new Function<Integer, Integer>() {
public Integer apply(Integer i) {
return i * 1;
}
};
//#send-off
// sendOff a function
agent.sendOff(longRunningOrBlockingFunction);
//#send-off
//#read-await
Integer result = agent.await(new Timeout(Duration.create(5, SECONDS)));
//#read-await
assertEquals(result, new Integer(14));
agent.close();
}
@Test
public void readWithGet() {
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
//#read-get
Integer result = agent.get();
//#read-get
assertEquals(result, new Integer(5));
agent.close();
}
}

View file

@ -17,5 +17,6 @@ Java API
routing routing
remoting remoting
serialization serialization
agents
extending-akka extending-akka
transactors transactors

View file

@ -1,4 +1,135 @@
Agents (Scala) .. _agents-scala:
==============
The Akka Agents module has not been migrated to Akka 2.0-SNAPSHOT yet. ################
Agents (Scala)
################
.. sidebar:: Contents
.. contents:: :local:
Agents in Akka are inspired by `agents in Clojure`_.
.. _agents in Clojure: http://clojure.org/agents
Agents provide asynchronous change of individual locations. Agents are bound to
a single storage location for their lifetime, and only allow mutation of that
location (to a new state) to occur as a result of an action. Update actions are
functions that are asynchronously applied to the Agent's state and whose return
value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always
immediately available for reading by any thread (using ``get`` or ``apply``)
without any messages.
Agents are reactive. The update actions of all Agents get interleaved amongst
threads in a thread pool. At any point in time, at most one ``send`` action for
each Agent is being executed. Actions dispatched to an agent from another thread
will occur in the order they were sent, potentially interleaved with actions
dispatched to the same agent from other sources.
If an Agent is used within an enclosing transaction, then it will participate in
that transaction. Agents are integrated with Scala STM - any dispatches made in
a transaction are held until that transaction commits, and are discarded if it
is retried or aborted.
Creating and stopping Agents
============================
Agents are created by invoking ``Agent(value)`` passing in the Agent's initial
value:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create
Note that creating an Agent requires an implicit ``ActorSystem`` (for creating
the underlying actors). See :ref:`actor-systems` for more information about
actor systems. An ActorSystem can be in implicit scope when creating an Agent:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create-implicit-system
Or the ActorSystem can be passed explicitly when creating an Agent:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create-explicit-system
An Agent will be running until you invoke ``close`` on it. Then it will be
eligible for garbage collection (unless you hold on to it in some way).
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#close
Updating Agents
===============
You update an Agent by sending a function that transforms the current value or
by sending just a new value. The Agent will apply the new value or function
atomically and asynchronously. The update is done in a fire-forget manner and
you are only guaranteed that it will be applied. There is no guarantee of when
the update will be applied but dispatches to an Agent from a single thread will
occur in order. You apply a value or a function by invoking the ``send``
function.
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#send
You can also dispatch a function to update the internal state but on its own
thread. This does not use the reactive thread pool and can be used for
long-running or blocking operations. You do this with the ``sendOff``
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
in order.
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#send-off
Reading an Agent's value
========================
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
with parentheses like this:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-apply
Or by using the get method:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-get
Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous.
Awaiting an Agent's value
=========================
It is also possible to read the value after all currently queued sends have
completed. You can do this with ``await``:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-await
You can also get a ``Future`` to this value, that will be completed after the
currently queued updates have completed:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-future
Transactional Agents
====================
If an Agent is used within an enclosing transaction, then it will participate in
that transaction. If you send to an Agent within a transaction then the dispatch
to the Agent will be held until that transaction commits, and discarded if the
transaction is aborted. Here's an example:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#transfer-example
Monadic usage
=============
Agents are also monadic, allowing you to compose operations using
for-comprehensions. In monadic usage, new Agents are created leaving the
original Agents untouched. So the old values (Agents) are still available
as-is. They are so-called 'persistent'.
Example of monadic usage:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#monadic-example

View file

@ -0,0 +1,187 @@
package akka.docs.agent
import akka.agent.Agent
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
class AgentDocSpec extends AkkaSpec {
"create and close" in {
//#create
import akka.agent.Agent
val agent = Agent(5)
//#create
//#close
agent.close()
//#close
}
"create with implicit system" in {
//#create-implicit-system
import akka.actor.ActorSystem
import akka.agent.Agent
implicit val system = ActorSystem("app")
val agent = Agent(5)
//#create-implicit-system
agent.close()
system.shutdown()
}
"create with explicit system" in {
//#create-explicit-system
import akka.actor.ActorSystem
import akka.agent.Agent
val system = ActorSystem("app")
val agent = Agent(5)(system)
//#create-explicit-system
agent.close()
system.shutdown()
}
"send and sendOff" in {
val agent = Agent(0)
//#send
// send a value
agent send 7
// send a function
agent send (_ + 1)
agent send (_ * 2)
//#send
def longRunningOrBlockingFunction = (i: Int) i * 1
//#send-off
// sendOff a function
agent sendOff (longRunningOrBlockingFunction)
//#send-off
val result = agent.await(Timeout(5 seconds))
result must be === 16
}
"read with apply" in {
val agent = Agent(0)
//#read-apply
val result = agent()
//#read-apply
result must be === 0
}
"read with get" in {
val agent = Agent(0)
//#read-get
val result = agent.get
//#read-get
result must be === 0
}
"read with await" in {
val agent = Agent(0)
//#read-await
import akka.util.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val result = agent.await
//#read-await
result must be === 0
}
"read with future" in {
val agent = Agent(0)
//#read-future
import akka.dispatch.Await
implicit val timeout = Timeout(5 seconds)
val future = agent.future
val result = Await.result(future, timeout.duration)
//#read-future
result must be === 0
}
"transfer example" in {
//#transfer-example
import akka.agent.Agent
import akka.util.duration._
import akka.util.Timeout
import scala.concurrent.stm._
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
atomic { txn
if (from.get < amount) false
else {
from send (_ - amount)
to send (_ + amount)
true
}
}
}
val from = Agent(100)
val to = Agent(20)
val ok = transfer(from, to, 50)
implicit val timeout = Timeout(5 seconds)
val fromValue = from.await // -> 50
val toValue = to.await // -> 70
//#transfer-example
fromValue must be === 50
toValue must be === 70
}
"monadic example" in {
//#monadic-example
val agent1 = Agent(3)
val agent2 = Agent(5)
// uses foreach
var result = 0
for (value agent1) {
result = value + 1
}
// uses map
val agent3 = for (value agent1) yield value + 1
// or using map directly
val agent4 = agent1 map (_ + 1)
// uses flatMap
val agent5 = for {
value1 agent1
value2 agent2
} yield value1 + value2
//#monadic-example
result must be === 4
agent3() must be === 4
agent4() must be === 4
agent5() must be === 8
agent1.close()
agent2.close()
agent3.close()
agent4.close()
agent5.close()
}
}

View file

@ -18,7 +18,7 @@ Scala API
remoting remoting
serialization serialization
fsm fsm
agents
testing testing
extending-akka extending-akka
agents
transactors transactors

View file

@ -30,7 +30,7 @@ object AkkaBuild extends Build {
Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id)
), ),
aggregate = Seq(actor, testkit, actorTests, remote, slf4j, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs)
) )
lazy val actor = Project( lazy val actor = Project(
@ -94,6 +94,15 @@ object AkkaBuild extends Build {
) )
) )
lazy val agent = Project(
id = "akka-agent",
base = file("akka-agent"),
dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.agent
)
)
// lazy val amqp = Project( // lazy val amqp = Project(
// id = "akka-amqp", // id = "akka-amqp",
// base = file("akka-amqp"), // base = file("akka-amqp"),
@ -256,7 +265,7 @@ object AkkaBuild extends Build {
lazy val docs = Project( lazy val docs = Project(
id = "akka-docs", id = "akka-docs",
base = file("akka-docs"), base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", remote, slf4j, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox),
settings = defaultSettings ++ Seq( settings = defaultSettings ++ Seq(
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
libraryDependencies ++= Dependencies.docs, libraryDependencies ++= Dependencies.docs,
@ -369,6 +378,8 @@ object Dependencies {
val slf4j = Seq(slf4jApi) val slf4j = Seq(slf4jApi)
val agent = Seq(scalaStm, Test.scalatest, Test.junit)
val amqp = Seq(rabbit, commonsIo, protobuf) val amqp = Seq(rabbit, commonsIo, protobuf)
val mailboxes = Seq(Test.scalatest, Test.junit) val mailboxes = Seq(Test.scalatest, Test.junit)
@ -409,11 +420,12 @@ object Dependency {
val Logback = "0.9.28" val Logback = "0.9.28"
val Netty = "3.2.5.Final" val Netty = "3.2.5.Final"
val Protobuf = "2.4.1" val Protobuf = "2.4.1"
val Rabbit = "2.3.1"
val ScalaStm = "0.4"
val Scalatest = "1.6.1" val Scalatest = "1.6.1"
val Slf4j = "1.6.4" val Slf4j = "1.6.4"
val Spring = "3.0.5.RELEASE" val Spring = "3.0.5.RELEASE"
val Zookeeper = "3.4.0" val Zookeeper = "3.4.0"
val Rabbit = "2.3.1"
} }
// Compile // Compile
@ -438,6 +450,7 @@ object Dependency {
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License
val redis = "net.debasishg" %% "redisclient" % "2.4.0" // ApacheV2 val redis = "net.debasishg" %% "redisclient" % "2.4.0" // ApacheV2
val scalaStm = "org.scala-tools" %% "scala-stm" % V.ScalaStm // Modified BSD (Scala)
val sjson = "net.debasishg" %% "sjson" % "0.15" // ApacheV2 val sjson = "net.debasishg" %% "sjson" % "0.15" // ApacheV2
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2 val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2