Update agent and agent spec
This commit is contained in:
parent
3b87e82228
commit
70361e2868
2 changed files with 12 additions and 9 deletions
|
|
@ -9,12 +9,15 @@ import akka.actor.Actor
|
|||
import akka.japi.{Function => JFunc, Procedure => JProc}
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
/**
|
||||
* Used internally to send functions.
|
||||
*/
|
||||
private[akka] case class Update[T](function: T => T)
|
||||
|
||||
/**
|
||||
* Factory method for creating an Agent.
|
||||
*/
|
||||
object Agent {
|
||||
private[akka] case class Update[T](function: T => T)
|
||||
|
||||
def apply[T](initialValue: T) = new Agent(initialValue)
|
||||
}
|
||||
|
||||
|
|
@ -89,8 +92,6 @@ object Agent {
|
|||
* }}}
|
||||
*/
|
||||
class Agent[T](initialValue: T) {
|
||||
import Agent._
|
||||
|
||||
private[akka] val ref = Ref(initialValue)
|
||||
private[akka] val updater = Actor.actorOf(new AgentUpdater(this)).start
|
||||
|
||||
|
|
@ -215,8 +216,6 @@ class Agent[T](initialValue: T) {
|
|||
* Agent updater actor. Used internally for `send` actions.
|
||||
*/
|
||||
class AgentUpdater[T](agent: Agent[T]) extends Actor {
|
||||
import Agent._
|
||||
|
||||
val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false)
|
||||
|
||||
def receive = {
|
||||
|
|
@ -230,8 +229,6 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
|
|||
* Thread-based agent updater actor. Used internally for `sendOff` actions.
|
||||
*/
|
||||
class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
|
||||
import Agent._
|
||||
|
||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
||||
|
||||
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
|
||||
|
|
|
|||
|
|
@ -51,11 +51,17 @@ class AgentSpec extends WordSpec with MustMatchers {
|
|||
|
||||
"be immediately readable" in {
|
||||
val countDown = new CountDownFunction[Int]
|
||||
val readLatch = new CountDownLatch(1)
|
||||
val readTimeout = 5 seconds
|
||||
|
||||
val agent = Agent(5)
|
||||
val f1 = (i: Int) => { Thread.sleep(2000); i + 5 }
|
||||
val f1 = (i: Int) => {
|
||||
readLatch.await(readTimeout.length, readTimeout.unit)
|
||||
i + 5
|
||||
}
|
||||
agent send f1
|
||||
val read = agent()
|
||||
readLatch.countDown
|
||||
agent send countDown
|
||||
|
||||
countDown.await(5 seconds)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue