diff --git a/akka-patterns/pom.xml b/akka-patterns/pom.xml index f741d5401c..6d3f8e0b41 100644 --- a/akka-patterns/pom.xml +++ b/akka-patterns/pom.xml @@ -21,5 +21,19 @@ ${project.groupId} ${project.version} + + + + org.scalatest + scalatest + 1.0 + test + + + junit + junit + 4.5 + test + diff --git a/akka-patterns/src/main/scala/Agent.scala b/akka-patterns/src/main/scala/Agent.scala new file mode 100644 index 0000000000..ceb657d473 --- /dev/null +++ b/akka-patterns/src/main/scala/Agent.scala @@ -0,0 +1,144 @@ +// ScalaAgent +// +// Copyright © 2008-9 The original author or authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package se.scalablesolutions.akka.actor + +import java.util.concurrent.atomic.AtomicReference +import se.scalablesolutions.akka.state.TransactionalState +import java.util.concurrent.{CountDownLatch} + +/** +* The Agent class was strongly inspired by the agent principle in Clojure. Essentially, an agent wraps a shared mutable state +* and hides it behind a message-passing interface. Agents accept messages and process them on behalf of the wrapped state. +* Typically agents accept functions / commands as messages and ensure the submitted commands are executed against the internal +* agent's state in a thread-safe manner (sequentially). +* The submitted functions / commands take the internal state as a parameter and their output becomes the new internal state value. +* The code that is submitted to an agent doesn't need to pay attention to threading or synchronization, the agent will +* provide such guarantees by itself. +* See the examples of use for more details. +* +* @author Vaclav Pech +* Date: Oct 18, 2009 +* +* AKKA retrofit by +* @Author Viktor Klang +* Date: Jan 24 2010 +*/ +sealed class Agent[T] private (initialValue: T) extends Actor { + import Agent._ + + private val value = TransactionalState.newRef[T] + + updateData(initialValue) + + /** + * Periodically handles incoming messages + */ + def receive = { + case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait)) + + case ValueHolder(x: T) => updateData(x) + + case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait)) + } + + /** + * Specifies how a copy of the value is made, defaults to using identity + */ + protected def copyStrategy(t : T) : T = t + + + /** + * Updates the internal state with the value provided as a by-name parameter + */ + private final def updateData(newData: => T) : Unit = value.swap(newData) + + /** + * Submits a request to read the internal state. + * A copy of the internal state will be returned, depending on the underlying effective copyStrategy. + * Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch. + */ + final def get : T = { + val ref = new AtomicReference[T] + val latch = new CountDownLatch(1) + get((x: T) => {ref.set(x); latch.countDown}) + latch.await + ref.get + } + + /** + * Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value. + * A copy of the internal state will be used, depending on the underlying effective copyStrategy. + */ + final def get(message: (T => Unit)) : Unit = this ! ProcedureHolder(message) + + /** + * Submits a request to read the internal state. + * A copy of the internal state will be returned, depending on the underlying effective copyStrategy. + * Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch. + */ + final def apply() : T = get + + /** + * Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value. + * A copy of the internal state will be used, depending on the underlying effective copyStrategy. + */ +// final def apply(message: (T => Unit)) : Unit = get(message) + + /** + * Submits the provided function for execution against the internal agent's state + */ + final def apply(message: (T => T)) : Unit = this ! FunctionHolder(message) + + /** + * Submits a new value to be set as the new agent's internal state + */ + final def apply(message: T) : Unit = this ! ValueHolder(message) + + /** + * Submits the provided function for execution against the internal agent's state + */ + final def update(message: (T => T)) : Unit = this ! FunctionHolder(message) + + /** + * Submits a new value to be set as the new agent's internal state + */ + final def update(message: T) : Unit = this ! ValueHolder(message) +} + +/** +* Provides factory methods to create Agents. +*/ +object Agent { + /** + * The internal messages for passing around requests + */ + private case class ProcedureHolder[T](val fun: ((T) => Unit)) + private case class FunctionHolder[T](val fun: ((T) => T)) + private case class ValueHolder[T](val value: T) + + /** + * Creates a new Agent of type T with the initial value of value + */ + def apply[T](value:T) : Agent[T] = new Agent(value) + + /** + * Creates a new Agent of type T with the initial value of value and with the specified copy function + */ + def apply[T](value:T, newCopyStrategy: (T) => T) = new Agent(value) { + override def copyStrategy(t : T) = newCopyStrategy(t) + } +} diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala new file mode 100644 index 0000000000..09cb1d4917 --- /dev/null +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -0,0 +1,101 @@ +package se.scalablesolutions.akka.actor.patterns + +import se.scalablesolutions.akka.actor.Actor + +object Patterns { + type PF[A,B] = PartialFunction[A,B] + + /** + * Creates a new PartialFunction whose isDefinedAt is a combination + * of the two parameters, and whose apply is first to call filter.apply and then filtered.apply + */ + def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = { + case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) => + filter(a) + filtered(a) + } + + /** + * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true + */ + def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter( + { case a if a.isInstanceOf[A] => interceptor(a) }, + interceptee + ) + + //FIXME 2.8, use default params with CyclicIterator + def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer { + val seq = actors + } + + //FIXME 2.8, use default params with CyclicIterator + /*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor( + new CyclicIterator(actors()) + ) */ + + def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher { + override def transform(msg : Any) = msgTransformer(msg) + def routes = routing + } + + def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher { + def routes = routing + } + + def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor ( + { case _ => actorToLog }, + logger + ) +} + +trait Dispatcher { self : Actor => + + protected def transform(msg : Any) : Any = msg + protected def routes : PartialFunction[Any,Actor] + + protected def dispatch : PartialFunction[Any,Unit] = { + case a if routes.isDefinedAt(a) => { + if(self.sender.isDefined) + routes(a) forward transform(a) + else + routes(a) send transform(a) + } + } + + def receive = dispatch +} + +trait LoadBalancer extends Dispatcher { self : Actor => + protected def seq : InfiniteIterator[Actor] + + protected def routes = { case x if seq.hasNext => seq.next } +} + +trait InfiniteIterator[T] extends Iterator[T] + +class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] { + @volatile private[this] var current : List[T] = items + def hasNext = items != Nil + def next = { + val nc = if(current == Nil) items else current + current = nc.tail + nc.head + } +} + +//Agent +/* +val a = agent(startValue) +a.set(_ + 5) +a.get +a.foreach println(_) +*/ +object Agent { + sealed trait AgentMessage + case class FunMessage[T](f : (T) => T) extends AgentMessage + case class ProcMessage[T](f : (T) => Unit) extends AgentMessage + case class ValMessage[T](t : T) extends AgentMessage +} +sealed private[akka] class Agent[T] { + +} \ No newline at end of file diff --git a/akka-patterns/src/test/scala/ActorPatternsTest.scala b/akka-patterns/src/test/scala/ActorPatternsTest.scala new file mode 100644 index 0000000000..11f2664640 --- /dev/null +++ b/akka-patterns/src/test/scala/ActorPatternsTest.scala @@ -0,0 +1,87 @@ +package se.scalablesolutions.akka.actor + + +import config.ScalaConfig._ + +import org.scalatest.Suite +import patterns.Patterns +import se.scalablesolutions.akka.util.Logging +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.matchers.MustMatchers +import org.junit.{Before, After, Test} +import scala.collection.mutable.HashSet + +@RunWith(classOf[JUnitRunner]) +class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging { + import Actor._ + import Patterns._ + @Test def testDispatcher = verify(new TestActor { + def test = { + val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4") + + var targetOk = 0 + val t1 = actor() receive { + case `testMsg1` => targetOk += 2 + case `testMsg2` => targetOk += 4 + } + + val t2 = actor() receive { + case `testMsg3` => targetOk += 8 + } + + val d = dispatcherActor { + case `testMsg1`|`testMsg2` => t1 + case `testMsg3` => t2 + } + + handle(d,t1,t2){ + d ! testMsg1 + d ! testMsg2 + d ! testMsg3 + Thread.sleep(1000) + targetOk must be(14) + } + } + }) + + @Test def testLogger = verify(new TestActor { + def test = { + val msgs = new HashSet[Any] + val t1 = actor() receive { + case _ => + } + val l = loggerActor(t1,(x) => msgs += x) + handle(t1,l) { + val t1 : Any = "foo" + val t2 : Any = "bar" + l ! t1 + l ! t2 + Thread.sleep(1000) + msgs must ( have size (2) and contain (t1) and contain (t2) ) + } + } + }) +} + +trait ActorTestUtil { + + def handle[T](actors : Actor*)(test : => T) : T = { + for(a <- actors) a.start + try { + test + } + finally { + for(a <- actors) a.stop + } + } + + def verify(actor : TestActor) : Unit = handle(actor) { + actor.test + } +} + +abstract class TestActor extends Actor with ActorTestUtil { + def test : Unit + def receive = { case _ => } +} \ No newline at end of file diff --git a/akka-patterns/src/test/scala/AgentTest.scala b/akka-patterns/src/test/scala/AgentTest.scala new file mode 100644 index 0000000000..17ccce8e0a --- /dev/null +++ b/akka-patterns/src/test/scala/AgentTest.scala @@ -0,0 +1,24 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.Suite +import se.scalablesolutions.akka.util.Logging +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.matchers.MustMatchers +import org.junit.{Test} + +@RunWith(classOf[JUnitRunner]) +class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging { + @Test def testAgent = verify(new TestActor { + def test = { + val t = Agent(5) + handle(t){ + t.update( _ + 1 ) + t.update( _ * 2 ) + + val r = t() + r must be (12) + } + } + }) +} \ No newline at end of file