Remove akka-agent #26184
This commit is contained in:
parent
2e99b2921d
commit
a8bd5af1fb
13 changed files with 8 additions and 943 deletions
|
|
@ -1,23 +0,0 @@
|
|||
####################################
|
||||
# Akka Agent Reference Config File #
|
||||
####################################
|
||||
|
||||
# This is the reference config file that contains all the default settings.
|
||||
# Make your edits/overrides in your application.conf.
|
||||
|
||||
akka {
|
||||
agent {
|
||||
|
||||
# The dispatcher used for agent-send-off actor
|
||||
send-off-dispatcher {
|
||||
executor = thread-pool-executor
|
||||
type = PinnedDispatcher
|
||||
}
|
||||
|
||||
# The dispatcher used for agent-alter-off actor
|
||||
alter-off-dispatcher {
|
||||
executor = thread-pool-executor
|
||||
type = PinnedDispatcher
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,261 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.agent
|
||||
|
||||
import scala.concurrent.stm._
|
||||
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||
import akka.util.SerializedSuspendableExecutionContext
|
||||
|
||||
@deprecated(
|
||||
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
|
||||
since = "2.5.0")
|
||||
object Agent {
|
||||
|
||||
/**
|
||||
* Factory method for creating an Agent.
|
||||
*/
|
||||
@deprecated(
|
||||
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
|
||||
since = "2.5.0")
|
||||
def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context)
|
||||
|
||||
/**
|
||||
* Java API: Factory method for creating an Agent.
|
||||
* @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead.i
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated(
|
||||
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
|
||||
since = "2.5.0")
|
||||
def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context)
|
||||
|
||||
/**
|
||||
* Default agent implementation.
|
||||
*/
|
||||
private final class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] {
|
||||
private val ref = Ref(initialValue)
|
||||
private val updater = SerializedSuspendableExecutionContext(10)(context)
|
||||
|
||||
def get(): T = ref.single.get
|
||||
|
||||
def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })
|
||||
|
||||
def send(f: T => T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })
|
||||
|
||||
def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit =
|
||||
withinTransaction(new Runnable {
|
||||
def run =
|
||||
try updater.suspend()
|
||||
finally ec.execute(new Runnable {
|
||||
def run =
|
||||
try ref.single.transform(f)
|
||||
finally updater.resume()
|
||||
})
|
||||
})
|
||||
|
||||
def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })
|
||||
|
||||
def alter(f: T => T): Future[T] = doAlter(ref.single.transformAndGet(f))
|
||||
|
||||
def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] = {
|
||||
val result = Promise[T]()
|
||||
withinTransaction(new Runnable {
|
||||
def run = {
|
||||
updater.suspend()
|
||||
result.completeWith(
|
||||
Future(try ref.single.transformAndGet(f)
|
||||
finally updater.resume()))
|
||||
}
|
||||
})
|
||||
result.future
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal helper method
|
||||
*/
|
||||
private final def withinTransaction(run: Runnable): Unit = {
|
||||
Txn.findCurrent match {
|
||||
case Some(txn) => Txn.afterCommit(_ => updater.execute(run))(txn)
|
||||
case _ => updater.execute(run)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal helper method
|
||||
*/
|
||||
private final def doAlter(f: => T): Future[T] = {
|
||||
Txn.findCurrent match {
|
||||
case Some(txn) =>
|
||||
val result = Promise[T]()
|
||||
Txn.afterCommit(status => result.completeWith(Future(f)(updater)))(txn)
|
||||
result.future
|
||||
case _ => Future(f)(updater)
|
||||
}
|
||||
}
|
||||
|
||||
def future(): Future[T] = Future(ref.single.get)(updater)
|
||||
|
||||
def map[B](f: T => B): Agent[B] = Agent(f(get))(updater)
|
||||
|
||||
def flatMap[B](f: T => Agent[B]): Agent[B] = f(get)
|
||||
|
||||
def foreach[U](f: T => U): Unit = f(get)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The Agent class was inspired by agents in Clojure.
|
||||
*
|
||||
* Agents provide asynchronous change of individual locations. Agents
|
||||
* are bound to a single storage location for their lifetime, and only
|
||||
* allow mutation of that location (to a new state) to occur as a result
|
||||
* of an action. Update actions are functions that are asynchronously
|
||||
* applied to the Agent's state and whose return value becomes the
|
||||
* Agent's new state. The state of an Agent should be immutable.
|
||||
*
|
||||
* While updates to Agents are asynchronous, the state of an Agent is
|
||||
* always immediately available for reading by any thread (using ''get''
|
||||
* or ''apply'') without any messages.
|
||||
*
|
||||
* Agents are reactive. The update actions of all Agents get interleaved
|
||||
* amongst threads in a thread pool. At any point in time, at most one
|
||||
* ''send'' action for each Agent is being executed. Actions dispatched to
|
||||
* an agent from another thread will occur in the order they were sent,
|
||||
* potentially interleaved with actions dispatched to the same agent from
|
||||
* other sources.
|
||||
*
|
||||
* Example of usage:
|
||||
* {{{
|
||||
* val agent = Agent(5)
|
||||
*
|
||||
* agent send (_ * 2)
|
||||
*
|
||||
* ...
|
||||
*
|
||||
* val result = agent()
|
||||
* // use result ...
|
||||
*
|
||||
* }}}
|
||||
*
|
||||
* Agent is also monadic, which means that you can compose operations using
|
||||
* for-comprehensions. In monadic usage the original agents are not touched
|
||||
* but new agents are created. So the old values (agents) are still available
|
||||
* as-is. They are so-called 'persistent'.
|
||||
*
|
||||
* Example of monadic usage:
|
||||
* {{{
|
||||
* val agent1 = Agent(3)
|
||||
* val agent2 = Agent(5)
|
||||
*
|
||||
* for (value <- agent1) {
|
||||
* result = value + 1
|
||||
* }
|
||||
*
|
||||
* val agent3 = for (value <- agent1) yield value + 1
|
||||
*
|
||||
* val agent4 = for {
|
||||
* value1 <- agent1
|
||||
* value2 <- agent2
|
||||
* } yield value1 + value2
|
||||
*
|
||||
* }}}
|
||||
*
|
||||
* ==DEPRECATED STM SUPPORT==
|
||||
*
|
||||
* Agents participating in enclosing STM transaction is a deprecated feature in 2.3.
|
||||
*
|
||||
* If an Agent is used within an enclosing transaction, then it will
|
||||
* participate in that transaction. Agents are integrated with the STM -
|
||||
* any dispatches made in a transaction are held until that transaction
|
||||
* commits, and are discarded if it is retried or aborted.
|
||||
*
|
||||
* @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead.
|
||||
*/
|
||||
@deprecated(
|
||||
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
|
||||
since = "2.5.0")
|
||||
abstract class Agent[T] {
|
||||
|
||||
/**
|
||||
* Java API: Read the internal state of the agent.
|
||||
*/
|
||||
def get(): T
|
||||
|
||||
/**
|
||||
* Read the internal state of the agent.
|
||||
*/
|
||||
def apply(): T = get
|
||||
|
||||
/**
|
||||
* Dispatch a new value for the internal state. Behaves the same
|
||||
* as sending a function (x => newValue).
|
||||
*/
|
||||
def send(newValue: T): Unit
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def send(f: T => T): Unit
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state but on its own thread.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `sendOff` or `send` will
|
||||
* still be executed in order.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit
|
||||
|
||||
/**
|
||||
* Dispatch an update to the internal state, and return a Future where
|
||||
* that new state can be obtained.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def alter(newValue: T): Future[T]
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state, and return a Future where
|
||||
* that new state can be obtained.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def alter(f: T => T): Future[T]
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state but on its own thread,
|
||||
* and return a Future where that new state can be obtained.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `alterOff` or `alter` will
|
||||
* still be executed in order.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T]
|
||||
|
||||
/**
|
||||
* A future to the current value that will be completed after any currently
|
||||
* queued updates.
|
||||
*/
|
||||
def future(): Future[T]
|
||||
|
||||
/**
|
||||
* Map this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def map[B](f: T => B): Agent[B]
|
||||
|
||||
/**
|
||||
* Flatmap this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def flatMap[B](f: T => Agent[B]): Agent[B]
|
||||
|
||||
/**
|
||||
* Applies the function to the internal state. Does not change the value of this agent.
|
||||
* In Java, pass in an instance of `akka.dispatch.Foreach`.
|
||||
*/
|
||||
def foreach[U](f: T => U): Unit
|
||||
}
|
||||
|
|
@ -1,187 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.agent
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.util.Timeout
|
||||
import akka.testkit._
|
||||
import scala.concurrent.stm._
|
||||
import java.util.concurrent.{ CountDownLatch }
|
||||
|
||||
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
|
||||
val latch = new CountDownLatch(num)
|
||||
def apply(a: A) = { latch.countDown(); a }
|
||||
def await(timeout: Duration) = latch.await(timeout.length, timeout.unit)
|
||||
}
|
||||
|
||||
class AgentSpec extends AkkaSpec {
|
||||
|
||||
implicit val timeout = Timeout(5.seconds.dilated)
|
||||
import system.dispatcher
|
||||
"Agent" must {
|
||||
"update with send dispatches in order sent" in {
|
||||
val countDown = new CountDownFunction[String]
|
||||
|
||||
val agent = Agent("a")
|
||||
agent.send(_ + "b")
|
||||
agent.send(_ + "c")
|
||||
agent.send(_ + "d")
|
||||
agent.send(countDown)
|
||||
|
||||
countDown.await(5 seconds)
|
||||
agent() should ===("abcd")
|
||||
}
|
||||
|
||||
"maintain order between send and sendOff" in {
|
||||
val countDown = new CountDownFunction[String]
|
||||
val l1, l2 = new TestLatch(1)
|
||||
val agent = Agent("a")
|
||||
agent.send(_ + "b")
|
||||
agent.sendOff((s: String) => { l1.countDown; Await.ready(l2, timeout.duration); s + "c" })
|
||||
Await.ready(l1, timeout.duration)
|
||||
agent.send(_ + "d")
|
||||
agent.send(countDown)
|
||||
l2.countDown
|
||||
countDown.await(5 seconds)
|
||||
agent() should ===("abcd")
|
||||
}
|
||||
|
||||
"maintain order between alter and alterOff" in {
|
||||
val l1, l2 = new TestLatch(1)
|
||||
val agent = Agent("a")
|
||||
|
||||
val r1 = agent.alter(_ + "b")
|
||||
val r2 = agent.alterOff(s => { l1.countDown; Await.ready(l2, timeout.duration); s + "c" })
|
||||
Await.ready(l1, timeout.duration)
|
||||
val r3 = agent.alter(_ + "d")
|
||||
val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":"))
|
||||
l2.countDown
|
||||
|
||||
Await.result(result, 5 seconds) should ===("ab:abc:abcd")
|
||||
|
||||
agent() should ===("abcd")
|
||||
}
|
||||
|
||||
"be immediately readable" in {
|
||||
val countDown = new CountDownFunction[Int]
|
||||
val readLatch = new TestLatch(1)
|
||||
val readTimeout = 5 seconds
|
||||
|
||||
val agent = Agent(5)
|
||||
val f1 = (i: Int) => {
|
||||
Await.ready(readLatch, readTimeout)
|
||||
i + 5
|
||||
}
|
||||
agent.send(f1)
|
||||
val read = agent()
|
||||
readLatch.countDown()
|
||||
agent.send(countDown)
|
||||
|
||||
countDown.await(5 seconds)
|
||||
read should ===(5)
|
||||
agent() should ===(10)
|
||||
}
|
||||
|
||||
"be readable within a transaction" in {
|
||||
val agent = Agent(5)
|
||||
val value = atomic { t =>
|
||||
agent()
|
||||
}
|
||||
value should ===(5)
|
||||
}
|
||||
|
||||
"dispatch sends in successful transactions" in {
|
||||
val countDown = new CountDownFunction[Int]
|
||||
|
||||
val agent = Agent(5)
|
||||
atomic { t =>
|
||||
agent.send(_ * 2)
|
||||
}
|
||||
agent.send(countDown)
|
||||
|
||||
countDown.await(5 seconds)
|
||||
agent() should ===(10)
|
||||
}
|
||||
|
||||
"not dispatch sends in aborted transactions" in {
|
||||
val countDown = new CountDownFunction[Int]
|
||||
|
||||
val agent = Agent(5)
|
||||
|
||||
try {
|
||||
atomic { t =>
|
||||
agent.send(_ * 2)
|
||||
throw new RuntimeException("Expected failure")
|
||||
}
|
||||
} catch { case NonFatal(_) => }
|
||||
|
||||
agent.send(countDown)
|
||||
|
||||
countDown.await(5 seconds)
|
||||
agent() should ===(5)
|
||||
}
|
||||
|
||||
"be able to return a 'queued' future" in {
|
||||
val agent = Agent("a")
|
||||
agent.send(_ + "b")
|
||||
agent.send(_ + "c")
|
||||
|
||||
Await.result(agent.future, timeout.duration) should ===("abc")
|
||||
}
|
||||
|
||||
"be able to await the value after updates have completed" in {
|
||||
val agent = Agent("a")
|
||||
agent.send(_ + "b")
|
||||
agent.send(_ + "c")
|
||||
|
||||
Await.result(agent.future, timeout.duration) should ===("abc")
|
||||
}
|
||||
|
||||
"be able to be mapped" in {
|
||||
val agent1 = Agent(5)
|
||||
val agent2 = agent1.map(_ * 2)
|
||||
|
||||
agent1() should ===(5)
|
||||
agent2() should ===(10)
|
||||
}
|
||||
|
||||
"be able to be used in a 'foreach' for comprehension" in {
|
||||
val agent = Agent(3)
|
||||
var result = 0
|
||||
|
||||
for (value <- agent) {
|
||||
result += value
|
||||
}
|
||||
|
||||
result should ===(3)
|
||||
}
|
||||
|
||||
"be able to be used in a 'map' for comprehension" in {
|
||||
val agent1 = Agent(5)
|
||||
val agent2 = for (value <- agent1) yield value * 2
|
||||
|
||||
agent1() should ===(5)
|
||||
agent2() should ===(10)
|
||||
}
|
||||
|
||||
"be able to be used in a 'flatMap' for comprehension" in {
|
||||
val agent1 = Agent(1)
|
||||
val agent2 = Agent(2)
|
||||
|
||||
val agent3 = for {
|
||||
value1 <- agent1
|
||||
value2 <- agent2
|
||||
} yield value1 + value2
|
||||
|
||||
agent1() should ===(1)
|
||||
agent2() should ===(2)
|
||||
agent3() should ===(3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,168 +1,5 @@
|
|||
# Agents
|
||||
|
||||
## Dependency
|
||||
|
||||
To use Agents, you must add the following dependency in your project:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group="com.typesafe.akka"
|
||||
artifact="akka-agent_$scala.binary_version$"
|
||||
version="$akka.version$"
|
||||
}
|
||||
|
||||
## Introduction
|
||||
|
||||
Agents in Akka are inspired by [agents in Clojure](http://clojure.org/agents).
|
||||
|
||||
@@@ warning { title="Deprecation warning" }
|
||||
|
||||
Agents have been deprecated and are scheduled for removal
|
||||
in the next major version. We have found that their leaky abstraction (they do not
|
||||
work over the network) make them inferior to pure Actors, and in face of the soon
|
||||
inclusion of Akka Typed we see little value in maintaining the current Agents.
|
||||
|
||||
@@@
|
||||
|
||||
Agents provide asynchronous change of individual locations. Agents are bound to
|
||||
a single storage location for their lifetime, and only allow mutation of that
|
||||
location (to a new state) to occur as a result of an action. Update actions are
|
||||
functions that are asynchronously applied to the Agent's state and whose return
|
||||
value becomes the Agent's new state. The state of an Agent should be immutable.
|
||||
|
||||
While updates to Agents are asynchronous, the state of an Agent is always
|
||||
immediately available for reading by any thread (using `get` @scala[or `apply`])
|
||||
without any messages.
|
||||
|
||||
Agents are reactive. The update actions of all Agents get interleaved amongst
|
||||
threads in an `ExecutionContext`. At any point in time, at most one `send` action for
|
||||
each Agent is being executed. Actions dispatched to an agent from another thread
|
||||
will occur in the order they were sent, potentially interleaved with actions
|
||||
dispatched to the same agent from other threads.
|
||||
|
||||
@@@ note
|
||||
|
||||
Agents are local to the node on which they are created. This implies that you
|
||||
should generally not include them in messages that may be passed to remote Actors
|
||||
or as constructor parameters for remote Actors; those remote Actors will not be able to
|
||||
read or update the Agent.
|
||||
|
||||
@@@
|
||||
|
||||
## Creating Agents
|
||||
|
||||
Agents are created by invoking @scala[`Agent(value)`] @java[`new Agent<ValueType>(value, executionContext)`] passing in the Agent's initial
|
||||
value and providing an @scala[implicit] `ExecutionContext` to be used for it,
|
||||
@scala[for these examples we're going to use the default global one, but YMMV:]
|
||||
|
||||
Scala
|
||||
: @@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #create }
|
||||
|
||||
Java
|
||||
: @@snip [AgentDocTest.java](/akka-docs/src/test/java/jdocs/agent/AgentDocTest.java) { #import-agent #create type=java }
|
||||
|
||||
## Reading an Agent's value
|
||||
|
||||
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
|
||||
with @scala[parentheses] @java[`get()`] like this:
|
||||
|
||||
Scala
|
||||
: @@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #read-apply #read-get }
|
||||
|
||||
Java
|
||||
: @@snip [AgentDocTest.java](/akka-docs/src/test/java/jdocs/agent/AgentDocTest.java) { #read-get type=java }
|
||||
|
||||
Reading an Agent's current value does not involve any message passing and
|
||||
happens immediately. So while updates to an Agent are asynchronous, reading the
|
||||
state of an Agent is synchronous.
|
||||
|
||||
@@@ div { .group-java }
|
||||
|
||||
You can also get a `Future` to the Agents value, that will be completed after the
|
||||
currently queued updates have completed:
|
||||
|
||||
@@snip [AgentDocTest.java](/akka-docs/src/test/java/jdocs/agent/AgentDocTest.java) { #import-future #read-future type=java }
|
||||
|
||||
See @ref:[Futures](futures.md) for more information on `Futures`.
|
||||
|
||||
@@@
|
||||
|
||||
## Updating Agents (send & alter)
|
||||
|
||||
You update an Agent by sending a function @java[(`akka.dispatch.Mapper`)] that transforms the current value or
|
||||
by sending just a new value. The Agent will apply the new value or function
|
||||
atomically and asynchronously. The update is done in a fire-forget manner and
|
||||
you are only guaranteed that it will be applied. There is no guarantee of when
|
||||
the update will be applied but dispatches to an Agent from a single thread will
|
||||
occur in order. You apply a value or a function by invoking the `send`
|
||||
function.
|
||||
|
||||
Scala
|
||||
: @@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #send }
|
||||
|
||||
Java
|
||||
: @@snip [AgentDocTest.java](/akka-docs/src/test/java/jdocs/agent/AgentDocTest.java) { #import-function #send type=java }
|
||||
|
||||
You can also dispatch a function to update the internal state but on its own
|
||||
thread. This does not use the reactive thread pool and can be used for
|
||||
long-running or blocking operations. You do this with the `sendOff`
|
||||
method. Dispatches using either `sendOff` or `send` will still be executed
|
||||
in order.
|
||||
|
||||
Scala
|
||||
: @@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #send-off }
|
||||
|
||||
Java
|
||||
: @@snip [AgentDocTest.java](/akka-docs/src/test/java/jdocs/agent/AgentDocTest.java) { #import-function #send-off type=java }
|
||||
|
||||
All `send` methods also have a corresponding `alter` method that returns a `Future`.
|
||||
See @ref:[`Future`s](futures.md) for more information on `Future`s.
|
||||
|
||||
Scala
|
||||
: @@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #alter #alter-off }
|
||||
|
||||
Java
|
||||
: @@snip [AgentDocTest.java](/akka-docs/src/test/java/jdocs/agent/AgentDocTest.java) { #import-future #import-function #alter #alter-off type=java }
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
## Awaiting an Agent's value
|
||||
|
||||
You can also get a `Future` to the Agents value, that will be completed after the
|
||||
currently queued updates have completed:
|
||||
|
||||
@@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #read-future }
|
||||
|
||||
See @ref:[`Future`s](futures.md) for more information on `Future`s.
|
||||
|
||||
## Monadic usage
|
||||
|
||||
Agents are also monadic, allowing you to compose operations using
|
||||
for-comprehensions. In monadic usage, new Agents are created leaving the
|
||||
original Agents untouched. So the old values (Agents) are still available
|
||||
as-is. They are so-called 'persistent'.
|
||||
|
||||
Example of monadic usage:
|
||||
|
||||
@@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #monadic-example }
|
||||
|
||||
@@@
|
||||
|
||||
## Configuration
|
||||
|
||||
There are several configuration properties for the agents module, please refer
|
||||
to the @ref:[reference configuration](general/configuration.md#config-akka-agent).
|
||||
|
||||
## Deprecated Transactional Agents
|
||||
|
||||
Agents participating in enclosing STM transaction is a deprecated feature in 2.3.
|
||||
|
||||
If an Agent is used within an enclosing @java[`Scala STM`] transaction, then it will participate in
|
||||
that transaction. If you send to an Agent within a transaction then the dispatch
|
||||
to the Agent will be held until that transaction commits, and discarded if the
|
||||
transaction is aborted. @scala[Here's an example:]
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
@@snip [AgentDocSpec.scala](/akka-docs/src/test/scala/docs/agent/AgentDocSpec.scala) { #transfer-example }
|
||||
|
||||
@@@
|
||||
The akka-agent module was deprecated in Akka 2.5 and has been removed in
|
||||
2.6. If there is interest it may be moved to a separate, community-maintained
|
||||
repository.
|
||||
|
|
|
|||
|
|
@ -424,11 +424,6 @@ Each Akka module has a reference configuration file with the default values.
|
|||
|
||||
@@snip [reference.conf](/akka-actor/src/main/resources/reference.conf)
|
||||
|
||||
<a id="config-akka-agent"></a>
|
||||
### akka-agent
|
||||
|
||||
@@snip [reference.conf](/akka-agent/src/main/resources/reference.conf)
|
||||
|
||||
<a id="config-akka-cluster"></a>
|
||||
### akka-cluster
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,9 @@
|
|||
# Futures and Agents
|
||||
# Futures
|
||||
|
||||
@@toc { depth=2 }
|
||||
|
||||
@@@ index
|
||||
|
||||
* [futures](futures.md)
|
||||
* [agents](agents.md)
|
||||
|
||||
@@@
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ To use Utilities, you must add the following dependency in your project:
|
|||
|
||||
@@@ index
|
||||
|
||||
* [futures](futures.md)
|
||||
* [event-bus](event-bus.md)
|
||||
* [logging](logging.md)
|
||||
* [scheduler](scheduler.md)
|
||||
|
|
@ -22,4 +23,4 @@ To use Utilities, you must add the following dependency in your project:
|
|||
* [java8-compat](java8-compat.md)
|
||||
* [extending-akka](extending-akka.md)
|
||||
|
||||
@@@
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
* [index-network](index-network.md)
|
||||
* [discovery](discovery/index.md)
|
||||
* [coordination](coordination.md)
|
||||
* [index-futures](index-futures.md)
|
||||
* [index-utilities](index-utilities.md)
|
||||
* [common/other-modules](common/other-modules.md)
|
||||
* [howto](howto.md)
|
||||
|
|
|
|||
|
|
@ -1,120 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.agent;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
// #import-agent
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import akka.agent.Agent;
|
||||
import akka.dispatch.ExecutionContexts;
|
||||
// #import-agent
|
||||
|
||||
// #import-function
|
||||
import akka.dispatch.Mapper;
|
||||
// #import-function
|
||||
|
||||
// #import-future
|
||||
import scala.concurrent.Future;
|
||||
// #import-future
|
||||
|
||||
public class AgentDocTest extends jdocs.AbstractJavaTest {
|
||||
|
||||
private static ExecutionContext ec = ExecutionContexts.global();
|
||||
|
||||
@Test
|
||||
public void createAndRead() throws Exception {
|
||||
// #create
|
||||
ExecutionContext ec = ExecutionContexts.global();
|
||||
Agent<Integer> agent = Agent.create(5, ec);
|
||||
// #create
|
||||
|
||||
// #read-get
|
||||
Integer result = agent.get();
|
||||
// #read-get
|
||||
|
||||
// #read-future
|
||||
Future<Integer> future = agent.future();
|
||||
// #read-future
|
||||
|
||||
assertEquals(result, new Integer(5));
|
||||
assertEquals(Await.result(future, Duration.create(5, "s")), new Integer(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendAndSendOffAndReadAwait() throws Exception {
|
||||
Agent<Integer> agent = Agent.create(5, ec);
|
||||
|
||||
// #send
|
||||
// send a value, enqueues this change
|
||||
// of the value of the Agent
|
||||
agent.send(7);
|
||||
|
||||
// send a Mapper, enqueues this change
|
||||
// to the value of the Agent
|
||||
agent.send(
|
||||
new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 2;
|
||||
}
|
||||
});
|
||||
// #send
|
||||
|
||||
Mapper<Integer, Integer> longRunningOrBlockingFunction =
|
||||
new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 1;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutionContext theExecutionContextToExecuteItIn = ec;
|
||||
// #send-off
|
||||
// sendOff a function
|
||||
agent.sendOff(longRunningOrBlockingFunction, theExecutionContextToExecuteItIn);
|
||||
// #send-off
|
||||
|
||||
assertEquals(Await.result(agent.future(), Duration.create(5, "s")), new Integer(14));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void alterAndAlterOff() throws Exception {
|
||||
Agent<Integer> agent = Agent.create(5, ec);
|
||||
|
||||
// #alter
|
||||
// alter a value
|
||||
Future<Integer> f1 = agent.alter(7);
|
||||
|
||||
// alter a function (Mapper)
|
||||
Future<Integer> f2 =
|
||||
agent.alter(
|
||||
new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 2;
|
||||
}
|
||||
});
|
||||
// #alter
|
||||
|
||||
Mapper<Integer, Integer> longRunningOrBlockingFunction =
|
||||
new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 1;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutionContext theExecutionContextToExecuteItIn = ec;
|
||||
// #alter-off
|
||||
// alterOff a function (Mapper)
|
||||
Future<Integer> f3 =
|
||||
agent.alterOff(longRunningOrBlockingFunction, theExecutionContextToExecuteItIn);
|
||||
// #alter-off
|
||||
|
||||
assertEquals(Await.result(f3, Duration.create(5, "s")), new Integer(14));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,158 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.agent
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.agent.Agent
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ Await, ExecutionContext }
|
||||
import akka.testkit._
|
||||
import scala.concurrent.Future
|
||||
|
||||
class AgentDocSpec extends AkkaSpec {
|
||||
"create" in {
|
||||
//#create
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import akka.agent.Agent
|
||||
val agent = Agent(5)
|
||||
//#create
|
||||
}
|
||||
|
||||
"read value" in {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val agent = Agent(0)
|
||||
|
||||
{
|
||||
//#read-apply
|
||||
val result = agent()
|
||||
//#read-apply
|
||||
result should be(0)
|
||||
}
|
||||
{
|
||||
//#read-get
|
||||
// Or by using the get method:
|
||||
val result = agent.get
|
||||
//#read-get
|
||||
result should be(0)
|
||||
}
|
||||
|
||||
{
|
||||
//#read-future
|
||||
val future = agent.future
|
||||
//#read-future
|
||||
Await.result(future, 5 seconds) should be(0)
|
||||
}
|
||||
}
|
||||
|
||||
"send and sendOff" in {
|
||||
val agent = Agent(0)(ExecutionContext.global)
|
||||
//#send
|
||||
// send a value, enqueues this change
|
||||
// of the value of the Agent
|
||||
agent.send(7)
|
||||
|
||||
// send a function, enqueues this change
|
||||
// to the value of the Agent
|
||||
agent.send(_ + 1)
|
||||
agent.send(_ * 2)
|
||||
//#send
|
||||
|
||||
def longRunningOrBlockingFunction = (i: Int) => i * 1 // Just for the example code
|
||||
def someExecutionContext() = scala.concurrent.ExecutionContext.Implicits.global // Just for the example code
|
||||
//#send-off
|
||||
// the ExecutionContext you want to run the function on
|
||||
implicit val ec = someExecutionContext()
|
||||
// sendOff a function
|
||||
agent.sendOff(longRunningOrBlockingFunction)
|
||||
//#send-off
|
||||
|
||||
Await.result(agent.future, 5 seconds) should be(16)
|
||||
}
|
||||
|
||||
"alter and alterOff" in {
|
||||
val agent = Agent(0)(ExecutionContext.global)
|
||||
//#alter
|
||||
// alter a value
|
||||
val f1: Future[Int] = agent.alter(7)
|
||||
|
||||
// alter a function
|
||||
val f2: Future[Int] = agent.alter(_ + 1)
|
||||
val f3: Future[Int] = agent.alter(_ * 2)
|
||||
//#alter
|
||||
|
||||
def longRunningOrBlockingFunction = (i: Int) => i * 1 // Just for the example code
|
||||
def someExecutionContext() = ExecutionContext.global // Just for the example code
|
||||
|
||||
//#alter-off
|
||||
// the ExecutionContext you want to run the function on
|
||||
implicit val ec = someExecutionContext()
|
||||
// alterOff a function
|
||||
val f4: Future[Int] = agent.alterOff(longRunningOrBlockingFunction)
|
||||
//#alter-off
|
||||
|
||||
Await.result(f4, 5 seconds) should be(16)
|
||||
}
|
||||
|
||||
"transfer example" in {
|
||||
//#transfer-example
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import akka.agent.Agent
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.stm._
|
||||
|
||||
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
|
||||
atomic { txn =>
|
||||
if (from.get < amount) false
|
||||
else {
|
||||
from.send(_ - amount)
|
||||
to.send(_ + amount)
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val from = Agent(100)
|
||||
val to = Agent(20)
|
||||
val ok = transfer(from, to, 50)
|
||||
|
||||
val fromValue = from.future // -> 50
|
||||
val toValue = to.future // -> 70
|
||||
//#transfer-example
|
||||
|
||||
Await.result(fromValue, 5 seconds) should be(50)
|
||||
Await.result(toValue, 5 seconds) should be(70)
|
||||
ok should be(true)
|
||||
}
|
||||
|
||||
"monadic example" in {
|
||||
def println(a: Any) = ()
|
||||
//#monadic-example
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val agent1 = Agent(3)
|
||||
val agent2 = Agent(5)
|
||||
|
||||
// uses foreach
|
||||
for (value <- agent1)
|
||||
println(value)
|
||||
|
||||
// uses map
|
||||
val agent3 = for (value <- agent1) yield value + 1
|
||||
|
||||
// or using map directly
|
||||
val agent4 = agent1.map(_ + 1)
|
||||
|
||||
// uses flatMap
|
||||
val agent5 = for {
|
||||
value1 <- agent1
|
||||
value2 <- agent2
|
||||
} yield value1 + value2
|
||||
//#monadic-example
|
||||
|
||||
agent3() should be(4)
|
||||
agent4() should be(4)
|
||||
agent5() should be(8)
|
||||
}
|
||||
}
|
||||
12
build.sbt
12
build.sbt
|
|
@ -40,7 +40,6 @@ lazy val aggregatedProjects: Seq[ProjectReference] = List[ProjectReference](
|
|||
actorTestkitTyped,
|
||||
actorTyped,
|
||||
actorTypedTests,
|
||||
agent,
|
||||
benchJmh,
|
||||
benchJmhTyped,
|
||||
cluster,
|
||||
|
|
@ -96,16 +95,8 @@ lazy val actorTests = akkaModule("akka-actor-tests")
|
|||
.enablePlugins(NoPublish)
|
||||
.disablePlugins(MimaPlugin, WhiteSourcePlugin)
|
||||
|
||||
lazy val agent = akkaModule("akka-agent")
|
||||
.dependsOn(actor, testkit % "test->test")
|
||||
.settings(Dependencies.agent)
|
||||
.settings(AutomaticModuleName.settings("akka.agent"))
|
||||
.settings(OSGi.agent)
|
||||
.enablePlugins(ScaladocNoVerificationOfDiagrams)
|
||||
|
||||
lazy val akkaScalaNightly = akkaModule("akka-scala-nightly")
|
||||
// remove dependencies that we have to build ourselves (Scala STM)
|
||||
.aggregate(aggregatedProjects.diff(List[ProjectReference](agent, docs)): _*)
|
||||
.aggregate(aggregatedProjects: _*)
|
||||
.disablePlugins(MimaPlugin)
|
||||
.disablePlugins(ValidatePullRequest, MimaPlugin, CopyrightHeaderInPr)
|
||||
|
||||
|
|
@ -206,7 +197,6 @@ lazy val docs = akkaModule("akka-docs")
|
|||
cluster,
|
||||
clusterMetrics,
|
||||
slf4j,
|
||||
agent,
|
||||
osgi,
|
||||
persistenceTck,
|
||||
persistenceQuery,
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ object Dependencies {
|
|||
import DependencyHelpers._
|
||||
|
||||
lazy val scalaTestVersion = settingKey[String]("The version of ScalaTest to use.")
|
||||
lazy val scalaStmVersion = settingKey[String]("The version of ScalaSTM to use.")
|
||||
lazy val scalaCheckVersion = settingKey[String]("The version of ScalaCheck to use.")
|
||||
lazy val java8CompatVersion = settingKey[String]("The version of scala-java8-compat to use.")
|
||||
val junitVersion = "4.12"
|
||||
|
|
@ -23,7 +22,6 @@ object Dependencies {
|
|||
val Versions = Seq(
|
||||
crossScalaVersions := Seq("2.12.8", "2.13.0-M5"),
|
||||
scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head),
|
||||
scalaStmVersion := sys.props.get("akka.build.scalaStmVersion").getOrElse("0.9"),
|
||||
scalaCheckVersion := sys.props.get("akka.build.scalaCheckVersion").getOrElse("1.14.0"),
|
||||
scalaTestVersion := "3.0.7",
|
||||
java8CompatVersion := {
|
||||
|
|
@ -39,7 +37,6 @@ object Dependencies {
|
|||
// when updating config version, update links ActorSystem ScalaDoc to link to the updated version
|
||||
val config = "com.typesafe" % "config" % "1.3.3" // ApacheV2
|
||||
val netty = "io.netty" % "netty" % "3.10.6.Final" // ApacheV2
|
||||
val scalaStm = Def.setting { "org.scala-stm" %% "scala-stm" % scalaStmVersion.value } // Modified BSD (Scala)
|
||||
|
||||
val scalaXml = "org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion // Scala License
|
||||
val scalaReflect = ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _) // Scala License
|
||||
|
|
@ -177,8 +174,6 @@ object Dependencies {
|
|||
|
||||
val slf4j = l ++= Seq(slf4jApi, Test.logback)
|
||||
|
||||
val agent = l ++= Seq(scalaStm.value, Test.scalatest.value, Test.junit)
|
||||
|
||||
val persistence = l ++= Seq(
|
||||
Provided.levelDB,
|
||||
Provided.levelDBNative,
|
||||
|
|
|
|||
|
|
@ -46,8 +46,6 @@ object OSGi {
|
|||
|
||||
val actorTyped = exports(Seq("akka.actor.typed.*"))
|
||||
|
||||
val agent = exports(Seq("akka.agent.*"))
|
||||
|
||||
val cluster = exports(Seq("akka.cluster.*"))
|
||||
|
||||
val clusterTools = exports(Seq("akka.cluster.singleton.*", "akka.cluster.client.*", "akka.cluster.pubsub.*"))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue