diff --git a/akka-actor/src/main/scala/actor/Agent.scala b/akka-actor/src/main/scala/actor/Agent.scala
index 6b9385ca4e..00dceba21c 100644
--- a/akka-actor/src/main/scala/actor/Agent.scala
+++ b/akka-actor/src/main/scala/actor/Agent.scala
@@ -9,6 +9,7 @@ import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.japi.{ Function => JFunc, Procedure => JProc }
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
+import se.scalablesolutions.akka.config.RemoteAddress
class AgentException private[akka](message: String) extends AkkaException(message)
@@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag
* @author Viktor Klang
* @author Jonas Bonér
*/
-sealed class Agent[T] private (initialValue: T) {
+sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) {
+
import Agent._
import Actor._
-
- private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
+ val dispatcher = remote match {
+ case Some(address) =>
+ val d = actorOf(new AgentDispatcher[T]())
+ d.makeRemote(remote.get.hostname,remote.get.port)
+ d.start
+ d ! Value(initialValue)
+ d
+ case None =>
+ actorOf(new AgentDispatcher(initialValue)).start
+ }
/**
* Submits a request to read the internal state.
@@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) {
if (dispatcher.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction."+
"\n\tWould block indefinitely.\n\tPlease refactor your code.")
- val ref = new AtomicReference[T]
- val latch = new CountDownLatch(1)
- sendProc((v: T) => {ref.set(v); latch.countDown})
- latch.await
- ref.get
+ val f = (dispatcher.!!).await
+ if (f.exception.isDefined) throw f.exception.get
+ else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out"))
}
/**
@@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) {
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
- final def map[B](f: (T) => B): Agent[B] = Agent(f(get))
+ final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
- final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())
+ final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote)
/**
* Applies function with type 'T => B' to the agent's internal state.
@@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) {
* Does not change the value of the agent (this).
* Java API
*/
- final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get))
+ final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
* Java API
*/
- final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)())
+ final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote)
/**
* Applies procedure with type T to the agent's internal state.
@@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) {
* @author Jonas Bonér
*/
object Agent {
-
+ import Actor._
/*
* The internal messages for passing around requests.
*/
private[akka] case class Value[T](value: T)
private[akka] case class Function[T](fun: ((T) => T))
private[akka] case class Procedure[T](fun: ((T) => Unit))
+ private[akka] case object Read
/**
* Creates a new Agent of type T with the initial value of value.
*/
- def apply[T](value: T): Agent[T] = new Agent(value)
+ def apply[T](value: T): Agent[T] =
+ apply(value,None)
+
+ /**
+ * Creates an Agent backed by a client managed Actor if Some(remoteAddress)
+ * or a local agent if None
+ */
+ def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] =
+ new Agent[T](value,remoteAddress)
+
+ /**
+ * Creates an Agent backed by a client managed Actor
+ */
+ def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] =
+ apply(value,Some(remoteAddress))
}
/**
@@ -254,12 +277,15 @@ object Agent {
*
* @author Jonas Bonér
*/
-final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor {
+final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor {
import Agent._
- import Actor._
- log.debug("Starting up Agent [%s]", self.uuid)
- private val value = Ref[T](initialValue)
+ private[akka] def this(initialValue: T) = this(Ref(initialValue))
+ private[akka] def this() = this(Ref[T]())
+
+ private val value = ref
+
+ log.debug("Starting up Agent [%s]", self.uuid)
/**
* Periodically handles incoming messages.
@@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
def receive = {
case Value(v: T) =>
swap(v)
+ case Read => self.reply_?(value.get())
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala
index ea06ebb4ec..57ea36d5d0 100644
--- a/akka-actor/src/main/scala/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/dispatch/Future.scala
@@ -37,10 +37,16 @@ object Futures {
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
- def awaitOne(futures: List[Future[_]]): Future[_] = {
+ /**
+ * Returns the First Future that is completed
+ * if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans
+ */
+ def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
+ if (sleepMs > 0 && future.isEmpty)
+ Thread.sleep(sleepMs)
} while (future.isEmpty)
future.get
}
@@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] {
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
- private val TIME_UNIT = TimeUnit.MILLISECONDS
+ import TimeUnit.{MILLISECONDS => TIME_UNIT}
def this() = this(0)
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
diff --git a/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala
new file mode 100644
index 0000000000..02a93e949b
--- /dev/null
+++ b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala
@@ -0,0 +1,37 @@
+package se.scalablesolutions.akka.actor.remote
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.{Test, Before, After}
+import se.scalablesolutions.akka.config.RemoteAddress
+import se.scalablesolutions.akka.actor.Agent
+import se.scalablesolutions.akka.remote. {RemoteClient, RemoteServer}
+
+
+class RemoteAgentSpec extends JUnitSuite {
+ var server: RemoteServer = _
+
+ val HOSTNAME = "localhost"
+ val PORT = 9992
+
+ @Before def startServer {
+ val s = new RemoteServer()
+ s.start(HOSTNAME, PORT)
+ server = s
+ Thread.sleep(1000)
+ }
+
+ @After def stopServer {
+ val s = server
+ server = null
+ s.shutdown
+ RemoteClient.shutdownAll
+ }
+
+ @Test def remoteAgentShouldInitializeProperly {
+ val a = Agent(10,RemoteAddress(HOSTNAME,PORT))
+ assert(a() == 10, "Remote agent should have the proper initial value")
+ a(20)
+ assert(a() == 20, "Remote agent should be updated properly")
+ a.close
+ }
+}
\ No newline at end of file
diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala
index 2bde073df8..82ccbe401a 100644
--- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala
+++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala
@@ -4,7 +4,7 @@ object AkkaRepositories {
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
- val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
+ val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 5b93e30044..6add9542ee 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
- lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
+ lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")