Add docs and tests for java api for agents. Fixes #1545
This commit is contained in:
parent
70d8cd3c51
commit
a144c35c41
5 changed files with 230 additions and 3 deletions
|
|
@ -32,7 +32,7 @@ class IncludeCode(Directive):
|
|||
document = self.state.document
|
||||
arg0 = self.arguments[0]
|
||||
(filename, sep, section) = arg0.partition('#')
|
||||
|
||||
|
||||
if not document.settings.file_insertion_enabled:
|
||||
return [document.reporter.warning('File insertion disabled',
|
||||
line=self.lineno)]
|
||||
|
|
@ -126,8 +126,9 @@ class IncludeCode(Directive):
|
|||
retnode = nodes.literal_block(text, text, source=fn)
|
||||
retnode.line = 1
|
||||
retnode.attributes['line_number'] = self.lineno
|
||||
if self.options.get('language', ''):
|
||||
retnode['language'] = self.options['language']
|
||||
language = self.options.get('language')
|
||||
if language:
|
||||
retnode['language'] = language
|
||||
if 'linenos' in self.options:
|
||||
retnode['linenos'] = True
|
||||
document.settings.env.note_dependency(rel_fn)
|
||||
|
|
|
|||
112
akka-docs/java/agents.rst
Normal file
112
akka-docs/java/agents.rst
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
.. _agents-java:
|
||||
|
||||
##############
|
||||
Agents (Java)
|
||||
##############
|
||||
|
||||
.. sidebar:: Contents
|
||||
|
||||
.. contents:: :local:
|
||||
|
||||
Agents in Akka are inspired by `agents in Clojure`_.
|
||||
|
||||
.. _agents in Clojure: http://clojure.org/agents
|
||||
|
||||
Agents provide asynchronous change of individual locations. Agents are bound to
|
||||
a single storage location for their lifetime, and only allow mutation of that
|
||||
location (to a new state) to occur as a result of an action. Update actions are
|
||||
functions that are asynchronously applied to the Agent's state and whose return
|
||||
value becomes the Agent's new state. The state of an Agent should be immutable.
|
||||
|
||||
While updates to Agents are asynchronous, the state of an Agent is always
|
||||
immediately available for reading by any thread (using ``get``) without any
|
||||
messages.
|
||||
|
||||
Agents are reactive. The update actions of all Agents get interleaved amongst
|
||||
threads in a thread pool. At any point in time, at most one ``send`` action for
|
||||
each Agent is being executed. Actions dispatched to an agent from another thread
|
||||
will occur in the order they were sent, potentially interleaved with actions
|
||||
dispatched to the same agent from other sources.
|
||||
|
||||
If an Agent is used within an enclosing transaction, then it will participate in
|
||||
that transaction. Agents are integrated with the STM - any dispatches made in
|
||||
a transaction are held until that transaction commits, and are discarded if it
|
||||
is retried or aborted.
|
||||
|
||||
|
||||
Creating and stopping Agents
|
||||
============================
|
||||
|
||||
Agents are created by invoking ``new Agent(value, system)`` passing in the
|
||||
Agent's initial value and a reference to the ``ActorSystem`` for your
|
||||
application. An ``ActorSystem`` is required to create the underlying Actors. See
|
||||
:ref:`actor-systems` for more information about actor systems.
|
||||
|
||||
Here is an example of creating an Agent:
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java
|
||||
:include: import-system,import-agent
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#create
|
||||
:language: java
|
||||
|
||||
An Agent will be running until you invoke ``close`` on it. Then it will be
|
||||
eligible for garbage collection (unless you hold on to it in some way).
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#close
|
||||
:language: java
|
||||
|
||||
|
||||
Updating Agents
|
||||
===============
|
||||
|
||||
You update an Agent by sending a function that transforms the current value or
|
||||
by sending just a new value. The Agent will apply the new value or function
|
||||
atomically and asynchronously. The update is done in a fire-forget manner and
|
||||
you are only guaranteed that it will be applied. There is no guarantee of when
|
||||
the update will be applied but dispatches to an Agent from a single thread will
|
||||
occur in order. You apply a value or a function by invoking the ``send``
|
||||
function.
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-function
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#send
|
||||
:language: java
|
||||
|
||||
You can also dispatch a function to update the internal state but on its own
|
||||
thread. This does not use the reactive thread pool and can be used for
|
||||
long-running or blocking operations. You do this with the ``sendOff``
|
||||
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
|
||||
in order.
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#send-off
|
||||
:language: java
|
||||
|
||||
|
||||
Reading an Agent's value
|
||||
========================
|
||||
|
||||
Agents can be dereferenced (you can get an Agent's value) by calling the get
|
||||
method:
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-get
|
||||
:language: java
|
||||
|
||||
Reading an Agent's current value does not involve any message passing and
|
||||
happens immediately. So while updates to an Agent are asynchronous, reading the
|
||||
state of an Agent is synchronous.
|
||||
|
||||
|
||||
Awaiting an Agent's value
|
||||
=========================
|
||||
|
||||
It is also possible to read the value after all currently queued sends have
|
||||
completed. You can do this with ``await``:
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-timeout
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-await
|
||||
:language: java
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
package akka.docs.agent
|
||||
|
||||
import org.scalatest.junit.JUnitWrapperSuite
|
||||
|
||||
class AgentDocJavaSpec extends JUnitWrapperSuite(
|
||||
"akka.docs.agent.AgentDocTest",
|
||||
Thread.currentThread.getContextClassLoader)
|
||||
106
akka-docs/java/code/akka/docs/agent/AgentDocTest.java
Normal file
106
akka-docs/java/code/akka/docs/agent/AgentDocTest.java
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
package akka.docs.agent;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import akka.testkit.AkkaSpec;
|
||||
|
||||
//#import-system
|
||||
import akka.actor.ActorSystem;
|
||||
//#import-system
|
||||
|
||||
//#import-agent
|
||||
import akka.agent.Agent;
|
||||
//#import-agent
|
||||
|
||||
//#import-function
|
||||
import akka.japi.Function;
|
||||
//#import-function
|
||||
|
||||
//#import-timeout
|
||||
import akka.util.Duration;
|
||||
import akka.util.Timeout;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
//#import-timeout
|
||||
|
||||
public class AgentDocTest {
|
||||
|
||||
private static ActorSystem testSystem;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAll() {
|
||||
testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterAll() {
|
||||
testSystem.shutdown();
|
||||
testSystem = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createAndClose() {
|
||||
//#create
|
||||
ActorSystem system = ActorSystem.create("app");
|
||||
|
||||
Agent<Integer> agent = new Agent<Integer>(5, system);
|
||||
//#create
|
||||
|
||||
//#close
|
||||
agent.close();
|
||||
//#close
|
||||
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendAndSendOffAndReadAwait() {
|
||||
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
|
||||
|
||||
//#send
|
||||
// send a value
|
||||
agent.send(7);
|
||||
|
||||
// send a function
|
||||
agent.send(new Function<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 2;
|
||||
}
|
||||
});
|
||||
//#send
|
||||
|
||||
Function<Integer, Integer> longRunningOrBlockingFunction = new Function<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 1;
|
||||
}
|
||||
};
|
||||
|
||||
//#send-off
|
||||
// sendOff a function
|
||||
agent.sendOff(longRunningOrBlockingFunction);
|
||||
//#send-off
|
||||
|
||||
//#read-await
|
||||
Integer result = agent.await(new Timeout(Duration.create(5, SECONDS)));
|
||||
//#read-await
|
||||
|
||||
assertEquals(result, new Integer(14));
|
||||
|
||||
agent.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void readWithGet() {
|
||||
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
|
||||
|
||||
//#read-get
|
||||
Integer result = agent.get();
|
||||
//#read-get
|
||||
|
||||
assertEquals(result, new Integer(5));
|
||||
|
||||
agent.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -17,5 +17,6 @@ Java API
|
|||
routing
|
||||
remoting
|
||||
serialization
|
||||
agents
|
||||
extending-akka
|
||||
transactors
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue