From b2eeffe6a116dba2d8b956ac91d7fea20924e442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 18 Mar 2010 08:53:58 +0100 Subject: [PATCH] Fixed problem with Agent, now tests pass --- akka-core/src/main/scala/actor/Actor.scala | 6 + akka-core/src/main/scala/actor/Agent.scala | 155 ++++++++++++++++++ akka-core/src/test/scala/AgentTest.scala | 20 +-- .../src/test/scala/PerformanceTest.scala | 3 + 4 files changed, 173 insertions(+), 11 deletions(-) create mode 100644 akka-core/src/main/scala/actor/Agent.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 674afeb6ad..fae6b2e70d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -28,6 +28,8 @@ import java.util.concurrent.locks.{Lock, ReentrantLock} * Implements the Transactor abstraction. E.g. a transactional actor. *

* Equivalent to invoking the makeTransactionRequired method in the body of the ActorJonas Bonér */ trait Transactor extends Actor { makeTransactionRequired @@ -37,6 +39,8 @@ trait Transactor extends Actor { * Extend this abstract class to create a remote actor. *

* Equivalent to invoking the makeRemote(..) method in the body of the ActorJonas Bonér */ abstract class RemoteActor(hostname: String, port: Int) extends Actor { makeRemote(hostname, port) @@ -66,6 +70,8 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { } /** + * Utility class with factory methods for creating Actors. + * * @author Jonas Bonér */ object Actor extends Logging { diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala new file mode 100644 index 0000000000..6f9bad0df4 --- /dev/null +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -0,0 +1,155 @@ +// Copyright © 2008-10 The original author or authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package se.scalablesolutions.akka.actor + +import se.scalablesolutions.akka.stm.Ref + +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.CountDownLatch + +/** +* The Agent class was strongly inspired by the agent principle in Clojure. +* Essentially, an agent wraps a shared mutable state and hides it behind +* a message-passing interface. Agents accept messages and process them on +* behalf of the wrapped state. +* +* Typically agents accept functions / commands as messages and ensure the +* submitted commands are executed against the internal agent's state in a +* thread-safe manner (sequentially). +* +* The submitted functions / commands take the internal state as a parameter +* and their output becomes the new internal state value. +* +* The code that is submitted to an agent doesn't need to pay attention to +* threading or synchronization, the agent will provide such guarantees by itself. +* +* See the examples of use for more details. +* +* @author Vaclav Pech +* Date: Oct 18, 2009 +* +* AKKA port by +* @author Viktor Klang +* Date: Jan 24 2010 +*/ +sealed class Agent[T] private (initialValue: T) extends Transactor { + import Agent._ + private lazy val value = Ref[T]() + + start + this ! ValueHolder(initialValue) + + /** + * Periodically handles incoming messages + */ + def receive = { + case ValueHolder(x: T) => updateData(x) + case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait)) + case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait)) + } + + /** + * Specifies how a copy of the value is made, defaults to using identity + */ + protected def copyStrategy(t: T): T = t + + + /** + * Updates the internal state with the value provided as a by-name parameter + */ + private final def updateData(newData: => T): Unit = value.swap(newData) + + /** + * Submits a request to read the internal state. + * + * A copy of the internal state will be returned, depending on the underlying + * effective copyStrategy. Internally leverages the asynchronous getValue() + * method and then waits for its result on a CountDownLatch. + */ + final def get: T = { + val ref = new AtomicReference[T] + val latch = new CountDownLatch(1) + get((x: T) => {ref.set(x); latch.countDown}) + latch.await + ref.get + } + + /** + * Asynchronously submits a request to read the internal state. The supplied function + * will be executed on the returned internal state value. A copy of the internal state + * will be used, depending on the underlying effective copyStrategy. + */ + final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message) + + /** + * Submits a request to read the internal state. A copy of the internal state will be + * returned, depending on the underlying effective copyStrategy. Internally leverages + * the asynchronous getValue() method and then waits for its result on a CountDownLatch. + */ + final def apply(): T = get + + /** + * Asynchronously submits a request to read the internal state. The supplied function + * will be executed on the returned internal state value. A copy of the internal state + * will be used, depending on the underlying effective copyStrategy. + */ +// final def apply(message: (T => Unit)) : Unit = get(message) + + /** + * Submits the provided function for execution against the internal agent's state + */ + final def apply(message: (T => T)): Unit = this ! FunctionHolder(message) + + /** + * Submits a new value to be set as the new agent's internal state + */ + final def apply(message: T): Unit = this ! ValueHolder(message) + + /** + * Submits the provided function for execution against the internal agent's state + */ + final def update(message: (T => T)): Unit = this ! FunctionHolder(message) + + /** + * Submits a new value to be set as the new agent's internal state + */ + final def update(message: T): Unit = this ! ValueHolder(message) +} + +/** +* Provides factory methods to create Agents. +*/ +object Agent { + + /* + * The internal messages for passing around requests + */ + private case class ProcedureHolder[T](val fun: ((T) => Unit)) + private case class FunctionHolder[T](val fun: ((T) => T)) + private case class ValueHolder[T](val value: T) + + /** + * Creates a new Agent of type T with the initial value of value + */ + def apply[T](value:T): Agent[T] = new Agent(value) + + /** + * Creates a new Agent of type T with the initial value of value and with the + * specified copy function + */ + def apply[T](value:T, newCopyStrategy: (T) => T) = new Agent(value) { + override def copyStrategy(t : T) = newCopyStrategy(t) + } +} \ No newline at end of file diff --git a/akka-core/src/test/scala/AgentTest.scala b/akka-core/src/test/scala/AgentTest.scala index 013cd13ada..d6776c2d23 100644 --- a/akka-core/src/test/scala/AgentTest.scala +++ b/akka-core/src/test/scala/AgentTest.scala @@ -7,23 +7,21 @@ import org.scalatest.junit.JUnitRunner import org.scalatest.matchers.MustMatchers import org.junit.{Test} -/* @RunWith(classOf[JUnitRunner]) -class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging { +class AgentTest extends junit.framework.TestCase +with Suite with MustMatchers +with ActorTestUtil with Logging { @Test def testAgent = verify(new TestActor { def test = { - atomic { - val t = Agent(5) - handle(t) { - t.update(_ + 1) - t.update(_ * 2) + val agent = Agent(5) + handle(agent) { + agent update (_ + 1) + agent update (_ * 2) - val r = t() - r must be(12) - } + val result = agent() + result must be(12) } } }) } -*/ diff --git a/akka-core/src/test/scala/PerformanceTest.scala b/akka-core/src/test/scala/PerformanceTest.scala index fe2495c356..9c58e6e0f6 100644 --- a/akka-core/src/test/scala/PerformanceTest.scala +++ b/akka-core/src/test/scala/PerformanceTest.scala @@ -15,6 +15,9 @@ import net.lag.logging.Logger */ class PerformanceTest extends JUnitSuite { + @Test + def dummyTest = assert(true) + // @Test def benchAkkaActorsVsScalaActors = {