Merged patterns code into module

This commit is contained in:
Viktor Klang 2010-02-13 21:45:35 +01:00
parent a92a90bb72
commit 8829ceacbb
5 changed files with 370 additions and 0 deletions

View file

@ -21,5 +21,19 @@
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- For Testing -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.5</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View file

@ -0,0 +1,144 @@
// ScalaAgent
//
// Copyright © 2008-9 The original author or authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package se.scalablesolutions.akka.actor
import java.util.concurrent.atomic.AtomicReference
import se.scalablesolutions.akka.state.TransactionalState
import java.util.concurrent.{CountDownLatch}
/**
* The Agent class was strongly inspired by the agent principle in Clojure. Essentially, an agent wraps a shared mutable state
* and hides it behind a message-passing interface. Agents accept messages and process them on behalf of the wrapped state.
* Typically agents accept functions / commands as messages and ensure the submitted commands are executed against the internal
* agent's state in a thread-safe manner (sequentially).
* The submitted functions / commands take the internal state as a parameter and their output becomes the new internal state value.
* The code that is submitted to an agent doesn't need to pay attention to threading or synchronization, the agent will
* provide such guarantees by itself.
* See the examples of use for more details.
*
* @author Vaclav Pech
* Date: Oct 18, 2009
*
* AKKA retrofit by
* @Author Viktor Klang
* Date: Jan 24 2010
*/
sealed class Agent[T] private (initialValue: T) extends Actor {
import Agent._
private val value = TransactionalState.newRef[T]
updateData(initialValue)
/**
* Periodically handles incoming messages
*/
def receive = {
case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
case ValueHolder(x: T) => updateData(x)
case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
}
/**
* Specifies how a copy of the value is made, defaults to using identity
*/
protected def copyStrategy(t : T) : T = t
/**
* Updates the internal state with the value provided as a by-name parameter
*/
private final def updateData(newData: => T) : Unit = value.swap(newData)
/**
* Submits a request to read the internal state.
* A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
* Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
final def get : T = {
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
get((x: T) => {ref.set(x); latch.countDown})
latch.await
ref.get
}
/**
* Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
* A copy of the internal state will be used, depending on the underlying effective copyStrategy.
*/
final def get(message: (T => Unit)) : Unit = this ! ProcedureHolder(message)
/**
* Submits a request to read the internal state.
* A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
* Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
final def apply() : T = get
/**
* Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
* A copy of the internal state will be used, depending on the underlying effective copyStrategy.
*/
// final def apply(message: (T => Unit)) : Unit = get(message)
/**
* Submits the provided function for execution against the internal agent's state
*/
final def apply(message: (T => T)) : Unit = this ! FunctionHolder(message)
/**
* Submits a new value to be set as the new agent's internal state
*/
final def apply(message: T) : Unit = this ! ValueHolder(message)
/**
* Submits the provided function for execution against the internal agent's state
*/
final def update(message: (T => T)) : Unit = this ! FunctionHolder(message)
/**
* Submits a new value to be set as the new agent's internal state
*/
final def update(message: T) : Unit = this ! ValueHolder(message)
}
/**
* Provides factory methods to create Agents.
*/
object Agent {
/**
* The internal messages for passing around requests
*/
private case class ProcedureHolder[T](val fun: ((T) => Unit))
private case class FunctionHolder[T](val fun: ((T) => T))
private case class ValueHolder[T](val value: T)
/**
* Creates a new Agent of type T with the initial value of value
*/
def apply[T](value:T) : Agent[T] = new Agent(value)
/**
* Creates a new Agent of type T with the initial value of value and with the specified copy function
*/
def apply[T](value:T, newCopyStrategy: (T) => T) = new Agent(value) {
override def copyStrategy(t : T) = newCopyStrategy(t)
}
}

View file

@ -0,0 +1,101 @@
package se.scalablesolutions.akka.actor.patterns
import se.scalablesolutions.akka.actor.Actor
object Patterns {
type PF[A,B] = PartialFunction[A,B]
/**
* Creates a new PartialFunction whose isDefinedAt is a combination
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
*/
def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = {
case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
filter(a)
filtered(a)
}
/**
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
*/
def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter(
{ case a if a.isInstanceOf[A] => interceptor(a) },
interceptee
)
//FIXME 2.8, use default params with CyclicIterator
def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer {
val seq = actors
}
//FIXME 2.8, use default params with CyclicIterator
/*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor(
new CyclicIterator(actors())
) */
def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
override def transform(msg : Any) = msgTransformer(msg)
def routes = routing
}
def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher {
def routes = routing
}
def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor (
{ case _ => actorToLog },
logger
)
}
trait Dispatcher { self : Actor =>
protected def transform(msg : Any) : Any = msg
protected def routes : PartialFunction[Any,Actor]
protected def dispatch : PartialFunction[Any,Unit] = {
case a if routes.isDefinedAt(a) => {
if(self.sender.isDefined)
routes(a) forward transform(a)
else
routes(a) send transform(a)
}
}
def receive = dispatch
}
trait LoadBalancer extends Dispatcher { self : Actor =>
protected def seq : InfiniteIterator[Actor]
protected def routes = { case x if seq.hasNext => seq.next }
}
trait InfiniteIterator[T] extends Iterator[T]
class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
@volatile private[this] var current : List[T] = items
def hasNext = items != Nil
def next = {
val nc = if(current == Nil) items else current
current = nc.tail
nc.head
}
}
//Agent
/*
val a = agent(startValue)
a.set(_ + 5)
a.get
a.foreach println(_)
*/
object Agent {
sealed trait AgentMessage
case class FunMessage[T](f : (T) => T) extends AgentMessage
case class ProcMessage[T](f : (T) => Unit) extends AgentMessage
case class ValMessage[T](t : T) extends AgentMessage
}
sealed private[akka] class Agent[T] {
}

View file

@ -0,0 +1,87 @@
package se.scalablesolutions.akka.actor
import config.ScalaConfig._
import org.scalatest.Suite
import patterns.Patterns
import se.scalablesolutions.akka.util.Logging
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.{Before, After, Test}
import scala.collection.mutable.HashSet
@RunWith(classOf[JUnitRunner])
class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
import Actor._
import Patterns._
@Test def testDispatcher = verify(new TestActor {
def test = {
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
var targetOk = 0
val t1 = actor() receive {
case `testMsg1` => targetOk += 2
case `testMsg2` => targetOk += 4
}
val t2 = actor() receive {
case `testMsg3` => targetOk += 8
}
val d = dispatcherActor {
case `testMsg1`|`testMsg2` => t1
case `testMsg3` => t2
}
handle(d,t1,t2){
d ! testMsg1
d ! testMsg2
d ! testMsg3
Thread.sleep(1000)
targetOk must be(14)
}
}
})
@Test def testLogger = verify(new TestActor {
def test = {
val msgs = new HashSet[Any]
val t1 = actor() receive {
case _ =>
}
val l = loggerActor(t1,(x) => msgs += x)
handle(t1,l) {
val t1 : Any = "foo"
val t2 : Any = "bar"
l ! t1
l ! t2
Thread.sleep(1000)
msgs must ( have size (2) and contain (t1) and contain (t2) )
}
}
})
}
trait ActorTestUtil {
def handle[T](actors : Actor*)(test : => T) : T = {
for(a <- actors) a.start
try {
test
}
finally {
for(a <- actors) a.stop
}
}
def verify(actor : TestActor) : Unit = handle(actor) {
actor.test
}
}
abstract class TestActor extends Actor with ActorTestUtil {
def test : Unit
def receive = { case _ => }
}

View file

@ -0,0 +1,24 @@
package se.scalablesolutions.akka.actor
import org.scalatest.Suite
import se.scalablesolutions.akka.util.Logging
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.{Test}
@RunWith(classOf[JUnitRunner])
class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
@Test def testAgent = verify(new TestActor {
def test = {
val t = Agent(5)
handle(t){
t.update( _ + 1 )
t.update( _ * 2 )
val r = t()
r must be (12)
}
}
})
}