Preparing to remove channels and ActorPromise etc

This commit is contained in:
Viktor Klang 2011-10-21 15:51:38 +02:00
parent 9dd0385301
commit bb0b845607
5 changed files with 65 additions and 26 deletions

View file

@ -5,15 +5,13 @@ package akka
import akka.config._
import akka.actor._
import dispatch._
import event._
import java.net.InetAddress
import com.eaio.uuid.UUID
import dispatch.{ Dispatcher, Dispatchers }
import akka.util.Duration
import util.ReflectiveAccess
import java.util.concurrent.TimeUnit
import akka.dispatch.BoundedMailbox
import akka.dispatch.UnboundedMailbox
import akka.routing.Routing
import remote.RemoteSupport
import akka.serialization.Serialization

View file

@ -41,14 +41,17 @@ import akka.event.ActorEventBus
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable {
abstract class ActorRef extends UntypedChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] def uuid: Uuid
/**
* Returns the address for the actor.
*/
def address: String
private[akka] def uuid: Uuid //TODO FIXME REMOVE THIS
/**
* Comparison only takes address into account.
*/
@ -240,30 +243,12 @@ class LocalActorRef private[akka] (
}
}
/**
* This trait represents the common (external) methods for all ActorRefs
* Needed because implicit conversions aren't applied when instance imports are used
*
* i.e.
* var self: ScalaActorRef = ...
* import self._
* //can't call ActorRef methods here unless they are declared in a common
* //superclass, which ActorRefShared is.
*/
trait ActorRefShared {
/**
* Returns the address for the actor.
*/
def address: String
}
/**
* This trait represents the Scala Actor API
* There are implicit conversions in ../actor/Implicits.scala
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef extends ActorRefShared with ReplyChannel[Any] { ref: ActorRef
trait ScalaActorRef extends ReplyChannel[Any] { ref: ActorRef
/**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
@ -366,3 +351,41 @@ class DeadLetterActorRef(app: AkkaApplication) extends UnsupportedActorRef {
timeout: Timeout,
channel: UntypedChannel): Future[Any] = { app.eventHandler.notify(DeadLetter(message, channel)); brokenPromise }
}
abstract class AskActorRef(promise: Promise[Any], app: AkkaApplication) extends ActorRef with ScalaActorRef {
private[akka] final val uuid: akka.actor.Uuid = newUuid()
final val address: String = uuid.toString
promise onComplete { _ app.deathWatch.publish(Terminated(AskActorRef.this, new ActorKilledException("Stopped"))); whenDone() }
promise onTimeout { _ app.deathWatch.publish(Terminated(AskActorRef.this, new FutureTimeoutException("Timed out"))); whenDone() }
protected def whenDone(): Unit
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = message match {
case akka.actor.Status.Success(r) promise.completeWithResult(r)
case akka.actor.Status.Failure(f) promise.completeWithException(f)
case other promise.completeWithResult(other)
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Timeout,
channel: UntypedChannel): Future[Any] = {
postMessageToMailbox(message, channel)
promise
}
def isShutdown = promise.isCompleted || promise.isExpired
def stop(): Unit = if (!isShutdown) promise.completeWithException(new ActorKilledException("Stopped"))
def resume(): Unit = ()
def suspend(): Unit = ()
def restart(t: Throwable): Unit = ()
def startsMonitoring(subject: ActorRef): ActorRef = subject
def stopsMonitoring(subject: ActorRef): ActorRef = subject
}

View file

@ -32,6 +32,8 @@ trait ActorRefProvider {
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef]
private[akka] def createDeathWatch(): DeathWatch
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
}
/**
@ -172,6 +174,19 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address)
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
import akka.dispatch.{ Future, Promise, DefaultPromise }
(if (within == null) app.AkkaConfig.ActorTimeout else within) match {
case t if t.duration.length <= 0 new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout
case other
val result = new DefaultPromise[Any](other)(app.dispatcher)
val a = new AskActorRef(result, app) { def whenDone() = actors.remove(this) }
assert(actors.putIfAbsent(a.address, a) eq null) //If this fails, we're in deep trouble
recipient.tell(message, a)
result
}
}
}
class LocalDeathWatch extends DeathWatch with ActorClassification {

View file

@ -952,7 +952,8 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
}
}
}
dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
val timeoutFuture = dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
onComplete(_ timeoutFuture.cancel(true))
false
} else true
} else false

View file

@ -215,6 +215,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
}
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
}
/**