diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 674afeb6ad..fae6b2e70d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -28,6 +28,8 @@ import java.util.concurrent.locks.{Lock, ReentrantLock} * Implements the Transactor abstraction. E.g. a transactional actor. *
* Equivalent to invoking themakeTransactionRequired method in the body of the ActorJonas Bonér
*/
trait Transactor extends Actor {
makeTransactionRequired
@@ -37,6 +39,8 @@ trait Transactor extends Actor {
* Extend this abstract class to create a remote actor.
*
* Equivalent to invoking the makeRemote(..) method in the body of the ActorJonas Bonér
*/
abstract class RemoteActor(hostname: String, port: Int) extends Actor {
makeRemote(hostname, port)
@@ -66,6 +70,8 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
}
/**
+ * Utility class with factory methods for creating Actors.
+ *
* @author Jonas Bonér
*/
object Actor extends Logging {
diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala
new file mode 100644
index 0000000000..6f9bad0df4
--- /dev/null
+++ b/akka-core/src/main/scala/actor/Agent.scala
@@ -0,0 +1,155 @@
+// Copyright © 2008-10 The original author or authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.stm.Ref
+
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.CountDownLatch
+
+/**
+* The Agent class was strongly inspired by the agent principle in Clojure.
+* Essentially, an agent wraps a shared mutable state and hides it behind
+* a message-passing interface. Agents accept messages and process them on
+* behalf of the wrapped state.
+*
+* Typically agents accept functions / commands as messages and ensure the
+* submitted commands are executed against the internal agent's state in a
+* thread-safe manner (sequentially).
+*
+* The submitted functions / commands take the internal state as a parameter
+* and their output becomes the new internal state value.
+*
+* The code that is submitted to an agent doesn't need to pay attention to
+* threading or synchronization, the agent will provide such guarantees by itself.
+*
+* See the examples of use for more details.
+*
+* @author Vaclav Pech
+* Date: Oct 18, 2009
+*
+* AKKA port by
+* @author Viktor Klang
+* Date: Jan 24 2010
+*/
+sealed class Agent[T] private (initialValue: T) extends Transactor {
+ import Agent._
+ private lazy val value = Ref[T]()
+
+ start
+ this ! ValueHolder(initialValue)
+
+ /**
+ * Periodically handles incoming messages
+ */
+ def receive = {
+ case ValueHolder(x: T) => updateData(x)
+ case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
+ case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
+ }
+
+ /**
+ * Specifies how a copy of the value is made, defaults to using identity
+ */
+ protected def copyStrategy(t: T): T = t
+
+
+ /**
+ * Updates the internal state with the value provided as a by-name parameter
+ */
+ private final def updateData(newData: => T): Unit = value.swap(newData)
+
+ /**
+ * Submits a request to read the internal state.
+ *
+ * A copy of the internal state will be returned, depending on the underlying
+ * effective copyStrategy. Internally leverages the asynchronous getValue()
+ * method and then waits for its result on a CountDownLatch.
+ */
+ final def get: T = {
+ val ref = new AtomicReference[T]
+ val latch = new CountDownLatch(1)
+ get((x: T) => {ref.set(x); latch.countDown})
+ latch.await
+ ref.get
+ }
+
+ /**
+ * Asynchronously submits a request to read the internal state. The supplied function
+ * will be executed on the returned internal state value. A copy of the internal state
+ * will be used, depending on the underlying effective copyStrategy.
+ */
+ final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)
+
+ /**
+ * Submits a request to read the internal state. A copy of the internal state will be
+ * returned, depending on the underlying effective copyStrategy. Internally leverages
+ * the asynchronous getValue() method and then waits for its result on a CountDownLatch.
+ */
+ final def apply(): T = get
+
+ /**
+ * Asynchronously submits a request to read the internal state. The supplied function
+ * will be executed on the returned internal state value. A copy of the internal state
+ * will be used, depending on the underlying effective copyStrategy.
+ */
+// final def apply(message: (T => Unit)) : Unit = get(message)
+
+ /**
+ * Submits the provided function for execution against the internal agent's state
+ */
+ final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
+
+ /**
+ * Submits a new value to be set as the new agent's internal state
+ */
+ final def apply(message: T): Unit = this ! ValueHolder(message)
+
+ /**
+ * Submits the provided function for execution against the internal agent's state
+ */
+ final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
+
+ /**
+ * Submits a new value to be set as the new agent's internal state
+ */
+ final def update(message: T): Unit = this ! ValueHolder(message)
+}
+
+/**
+* Provides factory methods to create Agents.
+*/
+object Agent {
+
+ /*
+ * The internal messages for passing around requests
+ */
+ private case class ProcedureHolder[T](val fun: ((T) => Unit))
+ private case class FunctionHolder[T](val fun: ((T) => T))
+ private case class ValueHolder[T](val value: T)
+
+ /**
+ * Creates a new Agent of type T with the initial value of value
+ */
+ def apply[T](value:T): Agent[T] = new Agent(value)
+
+ /**
+ * Creates a new Agent of type T with the initial value of value and with the
+ * specified copy function
+ */
+ def apply[T](value:T, newCopyStrategy: (T) => T) = new Agent(value) {
+ override def copyStrategy(t : T) = newCopyStrategy(t)
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/test/scala/AgentTest.scala b/akka-core/src/test/scala/AgentTest.scala
index 013cd13ada..d6776c2d23 100644
--- a/akka-core/src/test/scala/AgentTest.scala
+++ b/akka-core/src/test/scala/AgentTest.scala
@@ -7,23 +7,21 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.{Test}
-/*
@RunWith(classOf[JUnitRunner])
-class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
+class AgentTest extends junit.framework.TestCase
+with Suite with MustMatchers
+with ActorTestUtil with Logging {
@Test def testAgent = verify(new TestActor {
def test = {
- atomic {
- val t = Agent(5)
- handle(t) {
- t.update(_ + 1)
- t.update(_ * 2)
+ val agent = Agent(5)
+ handle(agent) {
+ agent update (_ + 1)
+ agent update (_ * 2)
- val r = t()
- r must be(12)
- }
+ val result = agent()
+ result must be(12)
}
}
})
}
-*/
diff --git a/akka-core/src/test/scala/PerformanceTest.scala b/akka-core/src/test/scala/PerformanceTest.scala
index fe2495c356..9c58e6e0f6 100644
--- a/akka-core/src/test/scala/PerformanceTest.scala
+++ b/akka-core/src/test/scala/PerformanceTest.scala
@@ -15,6 +15,9 @@ import net.lag.logging.Logger
*/
class PerformanceTest extends JUnitSuite {
+ @Test
+ def dummyTest = assert(true)
+
// @Test
def benchAkkaActorsVsScalaActors = {