From bb0b845607d16aba425b5efd0505ba78e4c799e1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 21 Oct 2011 15:51:38 +0200 Subject: [PATCH] Preparing to remove channels and ActorPromise etc --- .../src/main/scala/akka/AkkaApplication.scala | 4 +- .../src/main/scala/akka/actor/ActorRef.scala | 67 +++++++++++++------ .../scala/akka/actor/ActorRefProvider.scala | 15 +++++ .../src/main/scala/akka/dispatch/Future.scala | 3 +- .../akka/remote/RemoteActorRefProvider.scala | 2 + 5 files changed, 65 insertions(+), 26 deletions(-) diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index c2e8450325..955622b16a 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -5,15 +5,13 @@ package akka import akka.config._ import akka.actor._ +import dispatch._ import event._ import java.net.InetAddress import com.eaio.uuid.UUID -import dispatch.{ Dispatcher, Dispatchers } import akka.util.Duration import util.ReflectiveAccess import java.util.concurrent.TimeUnit -import akka.dispatch.BoundedMailbox -import akka.dispatch.UnboundedMailbox import akka.routing.Routing import remote.RemoteSupport import akka.serialization.Serialization diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 15667ee282..a178c7724c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -41,14 +41,17 @@ import akka.event.ActorEventBus * * @author Jonas Bonér */ -abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable { +abstract class ActorRef extends UntypedChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef ⇒ // Only mutable for RemoteServer in order to maintain identity across nodes - private[akka] def uuid: Uuid - + /** + * Returns the address for the actor. + */ def address: String + private[akka] def uuid: Uuid //TODO FIXME REMOVE THIS + /** * Comparison only takes address into account. */ @@ -240,30 +243,12 @@ class LocalActorRef private[akka] ( } } -/** - * This trait represents the common (external) methods for all ActorRefs - * Needed because implicit conversions aren't applied when instance imports are used - * - * i.e. - * var self: ScalaActorRef = ... - * import self._ - * //can't call ActorRef methods here unless they are declared in a common - * //superclass, which ActorRefShared is. - */ -trait ActorRefShared { - - /** - * Returns the address for the actor. - */ - def address: String -} - /** * This trait represents the Scala Actor API * There are implicit conversions in ../actor/Implicits.scala * from ActorRef -> ScalaActorRef and back */ -trait ScalaActorRef extends ActorRefShared with ReplyChannel[Any] { ref: ActorRef ⇒ +trait ScalaActorRef extends ReplyChannel[Any] { ref: ActorRef ⇒ /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. @@ -366,3 +351,41 @@ class DeadLetterActorRef(app: AkkaApplication) extends UnsupportedActorRef { timeout: Timeout, channel: UntypedChannel): Future[Any] = { app.eventHandler.notify(DeadLetter(message, channel)); brokenPromise } } + +abstract class AskActorRef(promise: Promise[Any], app: AkkaApplication) extends ActorRef with ScalaActorRef { + private[akka] final val uuid: akka.actor.Uuid = newUuid() + final val address: String = uuid.toString + + promise onComplete { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this, new ActorKilledException("Stopped"))); whenDone() } + promise onTimeout { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this, new FutureTimeoutException("Timed out"))); whenDone() } + + protected def whenDone(): Unit + + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = message match { + case akka.actor.Status.Success(r) ⇒ promise.completeWithResult(r) + case akka.actor.Status.Failure(f) ⇒ promise.completeWithException(f) + case other ⇒ promise.completeWithResult(other) + } + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Timeout, + channel: UntypedChannel): Future[Any] = { + postMessageToMailbox(message, channel) + promise + } + + def isShutdown = promise.isCompleted || promise.isExpired + + def stop(): Unit = if (!isShutdown) promise.completeWithException(new ActorKilledException("Stopped")) + + def resume(): Unit = () + + def suspend(): Unit = () + + def restart(t: Throwable): Unit = () + + def startsMonitoring(subject: ActorRef): ActorRef = subject + + def stopsMonitoring(subject: ActorRef): ActorRef = subject +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 0e62699243..9d863eb196 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -32,6 +32,8 @@ trait ActorRefProvider { private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] private[akka] def createDeathWatch(): DeathWatch + + private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] } /** @@ -172,6 +174,19 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch + + private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { + import akka.dispatch.{ Future, Promise, DefaultPromise } + (if (within == null) app.AkkaConfig.ActorTimeout else within) match { + case t if t.duration.length <= 0 ⇒ new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout + case other ⇒ + val result = new DefaultPromise[Any](other)(app.dispatcher) + val a = new AskActorRef(result, app) { def whenDone() = actors.remove(this) } + assert(actors.putIfAbsent(a.address, a) eq null) //If this fails, we're in deep trouble + recipient.tell(message, a) + result + } + } } class LocalDeathWatch extends DeathWatch with ActorClassification { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index dc302c5af9..2b15b0668a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -952,7 +952,8 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } } } - dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + val timeoutFuture = dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + onComplete(_ ⇒ timeoutFuture.cancel(true)) false } else true } else false diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a0375b8064..3a774e77fb 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -215,6 +215,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider } private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch + + private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) } /**