diff --git a/akka-actor/src/main/scala/actor/Agent.scala b/akka-actor/src/main/scala/actor/Agent.scala index c9b91b3ca8..28a98bc7c5 100644 --- a/akka-actor/src/main/scala/actor/Agent.scala +++ b/akka-actor/src/main/scala/actor/Agent.scala @@ -5,11 +5,12 @@ package akka.actor import akka.stm.Ref +import akka.config.RemoteAddress +import akka.japi.{Function => JFunc, Procedure => JProc} import akka.AkkaException -import akka.japi.{ Function => JFunc, Procedure => JProc } + import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.CountDownLatch -import akka.config.RemoteAddress class AgentException private[akka](message: String) extends AkkaException(message) @@ -105,10 +106,11 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = import Agent._ import Actor._ + val dispatcher = remote match { case Some(address) => val d = actorOf(new AgentDispatcher[T]()) - d.makeRemote(remote.get.hostname,remote.get.port) + d.makeRemote(remote.get.hostname, remote.get.port) d.start d ! Value(initialValue) d @@ -127,7 +129,7 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = if (dispatcher.isTransactionInScope) throw new AgentException( "Can't call Agent.get within an enclosing transaction."+ "\n\tWould block indefinitely.\n\tPlease refactor your code.") - val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await + val f = (dispatcher.!!![T](Read, java.lang.Long.MAX_VALUE)).await if (f.exception.isDefined) throw f.exception.get else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out")) } @@ -187,19 +189,19 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = * Does not change the value of the agent (this). * Java API */ - final def sendProc(f: JProc[T]): Unit = dispatcher ! Procedure((t:T) => f(t)) + final def sendProc(f: JProc[T]): Unit = dispatcher ! Procedure((t: T) => f(t)) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote) + final def map[B](f: (T) => B): Agent[B] = Agent(f(get), remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote) + final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(), remote) /** * Applies function with type 'T => B' to the agent's internal state. @@ -212,14 +214,14 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = * Does not change the value of the agent (this). * Java API */ - final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote) + final def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get), remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). * Java API */ - final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote) + final def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = Agent(f(get)(), remote) /** * Applies procedure with type T to the agent's internal state. @@ -244,7 +246,8 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = */ object Agent { import Actor._ - /* + + /** * The internal messages for passing around requests. */ private[akka] case class Value[T](value: T) @@ -256,20 +259,20 @@ object Agent { * Creates a new Agent of type T with the initial value of value. */ def apply[T](value: T): Agent[T] = - apply(value,None) + apply(value, None) /** * Creates an Agent backed by a client managed Actor if Some(remoteAddress) * or a local agent if None */ def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] = - new Agent[T](value,remoteAddress) + new Agent[T](value, remoteAddress) /** * Creates an Agent backed by a client managed Actor */ def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] = - apply(value,Some(remoteAddress)) + apply(value, Some(remoteAddress)) } /** @@ -291,13 +294,11 @@ final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor { * Periodically handles incoming messages. */ def receive = { - case Value(v: T) => - swap(v) - case Read => self.reply_?(value.get()) - case Function(fun: (T => T)) => - swap(fun(value.getOrWait)) - case Procedure(proc: (T => Unit)) => - proc(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))) + case Value(v: T) => swap(v) + case Read => self.reply_?(value.get()) + case Function(fun: (T => T)) => swap(fun(value.getOrWait)) + case Procedure(proc: (T => Unit)) => proc(value.getOrElse( + throw new AgentException("Could not read Agent's value; value is null"))) } /** diff --git a/akka-actor/src/main/scala/actor/UntypedActor.scala b/akka-actor/src/main/scala/actor/UntypedActor.scala index a3866fba92..e36a7837b6 100644 --- a/akka-actor/src/main/scala/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/actor/UntypedActor.scala @@ -7,11 +7,11 @@ package akka.actor import akka.dispatch._ import akka.stm.global._ import akka.config.Supervision._ +import akka.japi.Procedure import java.net.InetSocketAddress import scala.reflect.BeanProperty -import akka.japi.Procedure /** * Subclass this abstract class to create a MDB-style untyped actor. @@ -62,6 +62,7 @@ import akka.japi.Procedure * @author Jonas Bonér */ abstract class UntypedActor extends Actor { + def getContext(): ActorRef = self final protected def receive = { @@ -123,6 +124,7 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct * @author Jonas Bonér */ object UntypedActor { + /** * Creates an ActorRef out of the Actor type represented by the class provided. * Example in Java: diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 873c5e9f55..68a3ce4399 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -6,9 +6,10 @@ package akka.dispatch import akka.AkkaException import akka.actor.Actor.spawn +import akka.routing.Dispatcher + import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit -import akka.routing.Dispatcher class FutureTimeoutException(message: String) extends AkkaException(message) @@ -26,12 +27,10 @@ object Futures { dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) (body: => T): Future[T] = { val f = new DefaultCompletableFuture[T](timeout) - spawn({ try { f completeWithResult body } catch { case e => f completeWithException e} })(dispatcher) - f } @@ -45,8 +44,7 @@ object Futures { var future: Option[Future[_]] = None do { future = futures.find(_.isCompleted) - if (sleepMs > 0 && future.isEmpty) - Thread.sleep(sleepMs) + if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs) } while (future.isEmpty) future.get } @@ -89,12 +87,19 @@ object Futures { sealed trait Future[T] { def await : Future[T] + def awaitBlocking : Future[T] + def isCompleted: Boolean + def isExpired: Boolean + def timeoutInNanos: Long + def result: Option[T] + def exception: Option[Throwable] + def map[O](f: (T) => O): Future[O] = { val wrapped = this new Future[O] { @@ -111,12 +116,14 @@ sealed trait Future[T] { trait CompletableFuture[T] extends Future[T] { def completeWithResult(result: T) + def completeWithException(exception: Throwable) } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { import TimeUnit.{MILLISECONDS => TIME_UNIT} + def this() = this(0) val timeoutInNanos = TIME_UNIT.toNanos(timeout) @@ -207,7 +214,9 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) protected def onComplete(result: T) {} + protected def onCompleteException(exception: Throwable) {} + + private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index bc50ebb781..95d07483c5 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -9,13 +9,13 @@ import java.net.InetSocketAddress import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.{Map => JMap} -import akka.actor.{ - Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage} import akka.actor.Actor._ +import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage} import akka.util._ import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.config.Config._ +import akka.config.ConfigurationException import akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ @@ -31,7 +31,6 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.collection.mutable.Map import scala.reflect.BeanProperty -import akka.config.ConfigurationException /** * Use this object if you need a single remote server on a specific node.