From 4a43c933a1eb9d88af1f8e40642d7f2ec77cd9bc Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 26 Oct 2010 14:23:55 +0200 Subject: [PATCH 1/3] Switching to non-SSL repo for jBoss --- akka-sbt-plugin/src/main/scala/AkkaProject.scala | 2 +- project/build/AkkaProject.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index 2bde073df8..82ccbe401a 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -4,7 +4,7 @@ object AkkaRepositories { val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5b93e30044..6add9542ee 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") From 2b46fcee5066a01afb7667f7bd712a6fe4033e87 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 26 Oct 2010 15:53:34 +0200 Subject: [PATCH 2/3] Added support for remote agent --- akka-actor/src/main/scala/actor/Agent.scala | 63 +++++++++++++------ .../test/scala/remote/RemoteAgentSpec.scala | 37 +++++++++++ 2 files changed, 82 insertions(+), 18 deletions(-) create mode 100644 akka-remote/src/test/scala/remote/RemoteAgentSpec.scala diff --git a/akka-actor/src/main/scala/actor/Agent.scala b/akka-actor/src/main/scala/actor/Agent.scala index 6b9385ca4e..00dceba21c 100644 --- a/akka-actor/src/main/scala/actor/Agent.scala +++ b/akka-actor/src/main/scala/actor/Agent.scala @@ -9,6 +9,7 @@ import se.scalablesolutions.akka.AkkaException import se.scalablesolutions.akka.japi.{ Function => JFunc, Procedure => JProc } import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.config.RemoteAddress class AgentException private[akka](message: String) extends AkkaException(message) @@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag * @author Viktor Klang * @author Jonas Bonér */ -sealed class Agent[T] private (initialValue: T) { +sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) { + import Agent._ import Actor._ - - private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start + val dispatcher = remote match { + case Some(address) => + val d = actorOf(new AgentDispatcher[T]()) + d.makeRemote(remote.get.hostname,remote.get.port) + d.start + d ! Value(initialValue) + d + case None => + actorOf(new AgentDispatcher(initialValue)).start + } /** * Submits a request to read the internal state. @@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) { if (dispatcher.isTransactionInScope) throw new AgentException( "Can't call Agent.get within an enclosing transaction."+ "\n\tWould block indefinitely.\n\tPlease refactor your code.") - val ref = new AtomicReference[T] - val latch = new CountDownLatch(1) - sendProc((v: T) => {ref.set(v); latch.countDown}) - latch.await - ref.get + val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await + if (f.exception.isDefined) throw f.exception.get + else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out")) } /** @@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) { * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def map[B](f: (T) => B): Agent[B] = Agent(f(get)) + final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)()) + final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote) /** * Applies function with type 'T => B' to the agent's internal state. @@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) { * Does not change the value of the agent (this). * Java API */ - final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get)) + final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). * Java API */ - final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)()) + final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote) /** * Applies procedure with type T to the agent's internal state. @@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) { * @author Jonas Bonér */ object Agent { - + import Actor._ /* * The internal messages for passing around requests. */ private[akka] case class Value[T](value: T) private[akka] case class Function[T](fun: ((T) => T)) private[akka] case class Procedure[T](fun: ((T) => Unit)) + private[akka] case object Read /** * Creates a new Agent of type T with the initial value of value. */ - def apply[T](value: T): Agent[T] = new Agent(value) + def apply[T](value: T): Agent[T] = + apply(value,None) + + /** + * Creates an Agent backed by a client managed Actor if Some(remoteAddress) + * or a local agent if None + */ + def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] = + new Agent[T](value,remoteAddress) + + /** + * Creates an Agent backed by a client managed Actor + */ + def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] = + apply(value,Some(remoteAddress)) } /** @@ -254,12 +277,15 @@ object Agent { * * @author Jonas Bonér */ -final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor { +final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor { import Agent._ - import Actor._ - log.debug("Starting up Agent [%s]", self.uuid) - private val value = Ref[T](initialValue) + private[akka] def this(initialValue: T) = this(Ref(initialValue)) + private[akka] def this() = this(Ref[T]()) + + private val value = ref + + log.debug("Starting up Agent [%s]", self.uuid) /** * Periodically handles incoming messages. @@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto def receive = { case Value(v: T) => swap(v) + case Read => self.reply_?(value.get()) case Function(fun: (T => T)) => swap(fun(value.getOrWait)) case Procedure(proc: (T => Unit)) => diff --git a/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala new file mode 100644 index 0000000000..02a93e949b --- /dev/null +++ b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala @@ -0,0 +1,37 @@ +package se.scalablesolutions.akka.actor.remote + +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} +import se.scalablesolutions.akka.config.RemoteAddress +import se.scalablesolutions.akka.actor.Agent +import se.scalablesolutions.akka.remote. {RemoteClient, RemoteServer} + + +class RemoteAgentSpec extends JUnitSuite { + var server: RemoteServer = _ + + val HOSTNAME = "localhost" + val PORT = 9992 + + @Before def startServer { + val s = new RemoteServer() + s.start(HOSTNAME, PORT) + server = s + Thread.sleep(1000) + } + + @After def stopServer { + val s = server + server = null + s.shutdown + RemoteClient.shutdownAll + } + + @Test def remoteAgentShouldInitializeProperly { + val a = Agent(10,RemoteAddress(HOSTNAME,PORT)) + assert(a() == 10, "Remote agent should have the proper initial value") + a(20) + assert(a() == 20, "Remote agent should be updated properly") + a.close + } +} \ No newline at end of file From 9b59bffa9bc7cb0c78342f0a7360c7671cf34594 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 26 Oct 2010 16:40:09 +0200 Subject: [PATCH 3/3] Adding possibility to take naps between scans for finished future, closing ticket #449 --- akka-actor/src/main/scala/dispatch/Future.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index ea06ebb4ec..57ea36d5d0 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -37,10 +37,16 @@ object Futures { def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - def awaitOne(futures: List[Future[_]]): Future[_] = { + /** + * Returns the First Future that is completed + * if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans + */ + def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = { var future: Option[Future[_]] = None do { future = futures.find(_.isCompleted) + if (sleepMs > 0 && future.isEmpty) + Thread.sleep(sleepMs) } while (future.isEmpty) future.get } @@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] { // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { - private val TIME_UNIT = TimeUnit.MILLISECONDS + import TimeUnit.{MILLISECONDS => TIME_UNIT} def this() = this(0) val timeoutInNanos = TIME_UNIT.toNanos(timeout)