Merge branch 'master' into fsm

This commit is contained in:
unknown 2010-10-27 15:47:27 +02:00
commit 3abd282c49
5 changed files with 92 additions and 22 deletions

View file

@ -9,6 +9,7 @@ import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.japi.{ Function => JFunc, Procedure => JProc }
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.config.RemoteAddress
class AgentException private[akka](message: String) extends AkkaException(message)
@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag
* @author Viktor Klang
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Agent[T] private (initialValue: T) {
sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) {
import Agent._
import Actor._
private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
val dispatcher = remote match {
case Some(address) =>
val d = actorOf(new AgentDispatcher[T]())
d.makeRemote(remote.get.hostname,remote.get.port)
d.start
d ! Value(initialValue)
d
case None =>
actorOf(new AgentDispatcher(initialValue)).start
}
/**
* Submits a request to read the internal state.
@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) {
if (dispatcher.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction."+
"\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
sendProc((v: T) => {ref.set(v); latch.countDown})
latch.await
ref.get
val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await
if (f.exception.isDefined) throw f.exception.get
else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out"))
}
/**
@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) {
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def map[B](f: (T) => B): Agent[B] = Agent(f(get))
final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote)
/**
* Applies function with type 'T => B' to the agent's internal state.
@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) {
* Does not change the value of the agent (this).
* Java API
*/
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get))
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
* Java API
*/
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)())
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote)
/**
* Applies procedure with type T to the agent's internal state.
@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Agent {
import Actor._
/*
* The internal messages for passing around requests.
*/
private[akka] case class Value[T](value: T)
private[akka] case class Function[T](fun: ((T) => T))
private[akka] case class Procedure[T](fun: ((T) => Unit))
private[akka] case object Read
/**
* Creates a new Agent of type T with the initial value of value.
*/
def apply[T](value: T): Agent[T] = new Agent(value)
def apply[T](value: T): Agent[T] =
apply(value,None)
/**
* Creates an Agent backed by a client managed Actor if Some(remoteAddress)
* or a local agent if None
*/
def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] =
new Agent[T](value,remoteAddress)
/**
* Creates an Agent backed by a client managed Actor
*/
def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] =
apply(value,Some(remoteAddress))
}
/**
@ -254,12 +277,15 @@ object Agent {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor {
final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor {
import Agent._
import Actor._
log.debug("Starting up Agent [%s]", self.uuid)
private val value = Ref[T](initialValue)
private[akka] def this(initialValue: T) = this(Ref(initialValue))
private[akka] def this() = this(Ref[T]())
private val value = ref
log.debug("Starting up Agent [%s]", self.uuid)
/**
* Periodically handles incoming messages.
@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
def receive = {
case Value(v: T) =>
swap(v)
case Read => self.reply_?(value.get())
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>

View file

@ -37,10 +37,16 @@ object Futures {
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[Future[_]]): Future[_] = {
/**
* Returns the First Future that is completed
* if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans
*/
def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
if (sleepMs > 0 && future.isEmpty)
Thread.sleep(sleepMs)
} while (future.isEmpty)
future.get
}
@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] {
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private val TIME_UNIT = TimeUnit.MILLISECONDS
import TimeUnit.{MILLISECONDS => TIME_UNIT}
def this() = this(0)
val timeoutInNanos = TIME_UNIT.toNanos(timeout)

View file

@ -0,0 +1,37 @@
package se.scalablesolutions.akka.actor.remote
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.config.RemoteAddress
import se.scalablesolutions.akka.actor.Agent
import se.scalablesolutions.akka.remote. {RemoteClient, RemoteServer}
class RemoteAgentSpec extends JUnitSuite {
var server: RemoteServer = _
val HOSTNAME = "localhost"
val PORT = 9992
@Before def startServer {
val s = new RemoteServer()
s.start(HOSTNAME, PORT)
server = s
Thread.sleep(1000)
}
@After def stopServer {
val s = server
server = null
s.shutdown
RemoteClient.shutdownAll
}
@Test def remoteAgentShouldInitializeProperly {
val a = Agent(10,RemoteAddress(HOSTNAME,PORT))
assert(a() == 10, "Remote agent should have the proper initial value")
a(20)
assert(a() == 20, "Remote agent should be updated properly")
a.close
}
}

View file

@ -4,7 +4,7 @@ object AkkaRepositories {
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")

View file

@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")