Agent is now monadic, added more tests to AgentTest
This commit is contained in:
parent
7d465f6d4f
commit
089cf01b40
4 changed files with 154 additions and 78 deletions
|
|
@ -44,8 +44,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
|
|||
* <pre>
|
||||
* val agent = Agent(5)
|
||||
*
|
||||
* agent update (_ + 1)
|
||||
* agent update (_ * 2)
|
||||
* agent send (_ + 1)
|
||||
* agent send (_ * 2)
|
||||
*
|
||||
* val result = agent()
|
||||
* ... // use result
|
||||
|
|
@ -53,17 +53,26 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
|
|||
* agent.close
|
||||
* </pre>
|
||||
*
|
||||
* Example of monadic usage:
|
||||
* <pre>
|
||||
* val agent1 = Agent(3)
|
||||
* val agent2 = Agent(5)
|
||||
*
|
||||
* for {
|
||||
* first <- agent1
|
||||
* second <- agent2
|
||||
* if first == second
|
||||
* } process(first, second)
|
||||
*
|
||||
* agent1.close
|
||||
* agent2.close
|
||||
* </pre>
|
||||
*
|
||||
* NOTE: You can't call 'agent.get' or 'agent()' within an enclosing transaction since
|
||||
* that will block the transaction indefinitely. But 'agent.update' or 'Agent(value)'
|
||||
* that will block the transaction indefinitely. But 'agent.send' or 'Agent(value)'
|
||||
* is fine.
|
||||
*
|
||||
* Original author:
|
||||
* @author Vaclav Pech
|
||||
*
|
||||
* Inital AKKA port by:
|
||||
* @author Viktor Klang
|
||||
*
|
||||
* Modifications by:
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||
|
|
@ -71,15 +80,18 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
|||
private lazy val value = Ref[T]()
|
||||
|
||||
start
|
||||
this ! ValueHolder(initialValue)
|
||||
this !! Value(initialValue)
|
||||
|
||||
/**
|
||||
* Periodically handles incoming messages.
|
||||
*/
|
||||
* Periodically handles incoming messages.
|
||||
*/
|
||||
def receive = {
|
||||
case ValueHolder(x: T) => updateData(x)
|
||||
case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
|
||||
case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
|
||||
case Value(v: T) =>
|
||||
swap(v)
|
||||
case Function(fun: (T => T)) =>
|
||||
swap(fun(value.getOrWait))
|
||||
case Procedure(proc: (T => Unit)) =>
|
||||
proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -87,11 +99,11 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
|||
*/
|
||||
protected def copyStrategy(t: T): T = t
|
||||
|
||||
|
||||
/**
|
||||
* Updates the internal state with the value provided as a by-name parameter.
|
||||
*/
|
||||
private final def updateData(newData: => T): Unit = value.swap(newData)
|
||||
* Performs a CAS operation, atomically swapping the internal state with the value
|
||||
* provided as a by-name parameter.
|
||||
*/
|
||||
private final def swap(newData: => T): Unit = value.swap(newData)
|
||||
|
||||
/**
|
||||
* Submits a request to read the internal state.
|
||||
|
|
@ -105,46 +117,64 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
|||
"Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
|
||||
val ref = new AtomicReference[T]
|
||||
val latch = new CountDownLatch(1)
|
||||
get((x: T) => {ref.set(x); latch.countDown})
|
||||
sendProc((x: T) => {ref.set(x); latch.countDown})
|
||||
latch.await
|
||||
ref.get
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously submits a request to read the internal state. The supplied function
|
||||
* will be executed on the returned internal state value. A copy of the internal state
|
||||
* will be used, depending on the underlying effective copyStrategy.
|
||||
*/
|
||||
final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)
|
||||
|
||||
/**
|
||||
* Submits a request to read the internal state. A copy of the internal state will be
|
||||
* returned, depending on the underlying effective copyStrategy. Internally leverages
|
||||
* the asynchronous getValue() method and then waits for its result on a CountDownLatch.
|
||||
*/
|
||||
* Submits a request to read the internal state. A copy of the internal state will be
|
||||
* returned, depending on the underlying effective copyStrategy. Internally leverages
|
||||
* the asynchronous getValue() method and then waits for its result on a CountDownLatch.
|
||||
*/
|
||||
final def apply(): T = get
|
||||
|
||||
/**
|
||||
* Submits the provided function for execution against the internal agent's state.
|
||||
*/
|
||||
final def apply(message: (T => T)): Unit = this ! Function(message)
|
||||
|
||||
/**
|
||||
* Submits a new value to be set as the new agent's internal state.
|
||||
*/
|
||||
final def apply(message: T): Unit = this ! Value(message)
|
||||
|
||||
/**
|
||||
* Submits the provided function of type 'T => T' for execution against the internal agent's state.
|
||||
*/
|
||||
final def send(message: (T) => T): Unit = this ! Function(message)
|
||||
|
||||
/**
|
||||
* Submits a new value to be set as the new agent's internal state.
|
||||
*/
|
||||
final def send(message: T): Unit = this ! Value(message)
|
||||
|
||||
/**
|
||||
* Submits the provided function for execution against the internal agent's state.
|
||||
*/
|
||||
final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
|
||||
|
||||
* Asynchronously submits a procedure of type 'T => Unit' to read the internal state.
|
||||
* The supplied procedure will be executed on the returned internal state value. A copy
|
||||
* of the internal state will be used, depending on the underlying effective copyStrategy.
|
||||
* Does not change the value of the agent (this).
|
||||
*/
|
||||
final def sendProc(f: (T) => Unit): Unit = this ! Procedure(f)
|
||||
|
||||
/**
|
||||
* Submits a new value to be set as the new agent's internal state.
|
||||
*/
|
||||
final def apply(message: T): Unit = this ! ValueHolder(message)
|
||||
|
||||
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
|
||||
* Does not change the value of the agent (this).
|
||||
*/
|
||||
final def map[B](f: (T) => B): Agent[B] = Agent(f(get))
|
||||
|
||||
/**
|
||||
* Submits the provided function for execution against the internal agent's state.
|
||||
*/
|
||||
final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
|
||||
|
||||
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
|
||||
* Does not change the value of the agent (this).
|
||||
*/
|
||||
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())
|
||||
|
||||
/**
|
||||
* Submits a new value to be set as the new agent's internal state.
|
||||
*/
|
||||
// FIXME Change to 'send' when we have Scala 2.8 and we can remove the Actor.send method
|
||||
final def update(message: T): Unit = this ! ValueHolder(message)
|
||||
|
||||
* Applies function with type 'T => B' to the agent's internal state.
|
||||
* Does not change the value of the agent (this).
|
||||
*/
|
||||
final def foreach(f: (T) => Unit): Unit = f(get)
|
||||
|
||||
/**
|
||||
* Closes the agents and makes it eligable for garbage collection.
|
||||
*
|
||||
|
|
@ -154,16 +184,19 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Provides factory methods to create Agents.
|
||||
*/
|
||||
object Agent {
|
||||
* Provides factory methods to create Agents.
|
||||
*
|
||||
* @author Viktor Klang
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Agent {
|
||||
|
||||
/*
|
||||
* The internal messages for passing around requests.
|
||||
*/
|
||||
private case class ProcedureHolder[T](fun: ((T) => Unit))
|
||||
private case class FunctionHolder[T](fun: ((T) => T))
|
||||
private case class ValueHolder[T](value: T)
|
||||
private case class Value[T](value: T)
|
||||
private case class Function[T](fun: ((T) => T))
|
||||
private case class Procedure[T](fun: ((T) => Unit))
|
||||
|
||||
/**
|
||||
* Creates a new Agent of type T with the initial value of value.
|
||||
|
|
@ -177,4 +210,4 @@ object Agent {
|
|||
def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) {
|
||||
override def copyStrategy(t: T) = newCopyStrategy(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,33 +103,29 @@ object Transaction extends TransactionManagement with Logging {
|
|||
/**
|
||||
* See ScalaDoc on Transaction class.
|
||||
*/
|
||||
def map[T](f: => T)(implicit transactionFamilyName: String): T =
|
||||
atomic {f}
|
||||
def map[T](f: => T): T = atomic {f}
|
||||
|
||||
/**
|
||||
* See ScalaDoc on Transaction class.
|
||||
*/
|
||||
def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
|
||||
atomic {f}
|
||||
def flatMap[T](f: => T): T = atomic {f}
|
||||
|
||||
/**
|
||||
* See ScalaDoc on Transaction class.
|
||||
*/
|
||||
def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
|
||||
atomic {f}
|
||||
def foreach(f: => Unit): Unit = atomic {f}
|
||||
|
||||
/**
|
||||
* See ScalaDoc on Transaction class.
|
||||
*/
|
||||
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
|
||||
def atomic[T](body: => T): T = {
|
||||
var isTopLevelTransaction = true
|
||||
new TransactionTemplate[T]() {
|
||||
def execute(mtx: MultiverseTransaction): T = {
|
||||
val result = body
|
||||
|
||||
val txSet = getTransactionSetInScope
|
||||
log.trace("Committing transaction [%s]\n\twith family name [%s]\n\tby joining transaction set [%s]",
|
||||
mtx, transactionFamilyName, txSet)
|
||||
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
|
||||
txSet.joinCommit(mtx)
|
||||
|
||||
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
|
|
|
|||
|
|
@ -16,15 +16,12 @@ class AgentTest extends junit.framework.TestCase
|
|||
with Suite with MustMatchers
|
||||
with ActorTestUtil with Logging {
|
||||
|
||||
implicit val txFamilyName = "test"
|
||||
|
||||
@Test def testSendFun = verify(new TestActor {
|
||||
def test = {
|
||||
val agent = Agent(5)
|
||||
handle(agent) {
|
||||
agent update (_ + 1)
|
||||
agent update (_ * 2)
|
||||
|
||||
agent send (_ + 1)
|
||||
agent send (_ * 2)
|
||||
val result = agent()
|
||||
result must be(12)
|
||||
}
|
||||
|
|
@ -35,21 +32,34 @@ with ActorTestUtil with Logging {
|
|||
def test = {
|
||||
val agent = Agent(5)
|
||||
handle(agent) {
|
||||
agent update 6
|
||||
agent send 6
|
||||
val result = agent()
|
||||
result must be(6)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@Test def testOneAgentUpdateWithinEnlosingTransactionSuccess = {
|
||||
@Test def testSendProc = verify(new TestActor {
|
||||
def test = {
|
||||
val agent = Agent(5)
|
||||
var result = 0
|
||||
handle(agent) {
|
||||
agent sendProc (result += _)
|
||||
agent sendProc (result += _)
|
||||
Thread.sleep(1000)
|
||||
result must be(10)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@Test def testOneAgentsendWithinEnlosingTransactionSuccess = {
|
||||
case object Go
|
||||
val agent = Agent(5)
|
||||
val tx = transactor {
|
||||
case Go => agent update (_ + 1)
|
||||
case Go => agent send (_ + 1)
|
||||
}
|
||||
tx send Go
|
||||
Thread.sleep(5000)
|
||||
tx ! Go
|
||||
Thread.sleep(1000)
|
||||
val result = agent()
|
||||
result must be(6)
|
||||
agent.close
|
||||
|
|
@ -63,16 +73,53 @@ with ActorTestUtil with Logging {
|
|||
val agent = Agent(5)
|
||||
val tx = transactor {
|
||||
case Go =>
|
||||
agent update (_ * 2)
|
||||
agent send (_ * 2)
|
||||
try { agent() }
|
||||
catch {
|
||||
case _ => latch.countDown
|
||||
}
|
||||
}
|
||||
tx send Go
|
||||
tx ! Go
|
||||
latch.await // FIXME should await with timeout and fail if timeout
|
||||
agent.close
|
||||
tx.stop
|
||||
assert(true)
|
||||
}
|
||||
|
||||
@Test def testAgentForeach = verify(new TestActor {
|
||||
def test = {
|
||||
val agent1 = Agent(3)
|
||||
var result = 0
|
||||
for (first <- agent1) {
|
||||
result = first + 1
|
||||
}
|
||||
result must be(4)
|
||||
agent1.close
|
||||
}
|
||||
})
|
||||
|
||||
@Test def testAgentMap = verify(new TestActor {
|
||||
def test = {
|
||||
val agent1 = Agent(3)
|
||||
val result = for (first <- agent1) yield first + 1
|
||||
result() must be(4)
|
||||
result.close
|
||||
agent1.close
|
||||
}
|
||||
})
|
||||
|
||||
@Test def testAgentFlatMap = verify(new TestActor {
|
||||
def test = {
|
||||
val agent1 = Agent(3)
|
||||
val agent2 = Agent(5)
|
||||
val result = for {
|
||||
first <- agent1
|
||||
second <- agent2
|
||||
} yield second + first
|
||||
result() must be(8)
|
||||
result.close
|
||||
agent1.close
|
||||
agent2.close
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ object ActorShutdownRunner {
|
|||
|
||||
val myActor = new MyActor
|
||||
myActor.start
|
||||
myActor.send("test")
|
||||
myActor ! "test"
|
||||
myActor.stop
|
||||
}
|
||||
}
|
||||
|
|
@ -34,4 +34,4 @@ object RemoteServerAndClusterShutdownRunner {
|
|||
s2.shutdown
|
||||
s3.shutdown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue