Removing ActorRef.isDefinedAt and Future.empty and moving Future.channel to Promise, renaming future to promise for the channel

This commit is contained in:
Viktor Klang 2011-06-04 12:42:06 -07:00
parent 07eaf0ba48
commit f9d0b188af
8 changed files with 29 additions and 96 deletions

View file

@ -166,38 +166,6 @@ class RoutingSpec extends WordSpec with MustMatchers {
for (a List(broadcast, a1, a2, a3)) a.stop()
}
"be defined at" in {
import akka.actor.ActorRef
val Yes = "yes"
val No = "no"
def testActor() = actorOf(new Actor() {
def receive = {
case Yes "yes"
}
}).start()
val t1 = testActor()
val t2 = testActor()
val t3 = testActor()
val t4 = testActor()
val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil))
t1.isDefinedAt(Yes) must be(true)
t1.isDefinedAt(No) must be(false)
t2.isDefinedAt(Yes) must be(true)
t2.isDefinedAt(No) must be(false)
d1.isDefinedAt(Yes) must be(true)
d1.isDefinedAt(No) must be(false)
d2.isDefinedAt(Yes) must be(true)
d2.isDefinedAt(No) must be(false)
for (a List(t1, t2, d1, d2)) a.stop()
}
}
"Actor Pool" must {

View file

@ -619,21 +619,6 @@ trait Actor {
throw new UnhandledMessageException(msg, self)
}
/**
* Is the actor able to handle the message passed in as arguments?
*/
def isDefinedAt(message: Any): Boolean = {
val behaviorStack = self.hotswap
message match { //Same logic as apply(msg) but without the unhandled catch-all
case l: AutoReceivedMessage true
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) true
case msg if behaviorStack.isEmpty &&
processingBehavior.isDefinedAt(msg) true
case _ false
}
}
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.

View file

@ -233,12 +233,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
*/
def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED
/**
* Is the actor able to handle the message passed in as arguments?
*/
@deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1")
def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message)
/**
* Only for internal use. UUID is effectively final.
*/
@ -801,11 +795,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
}
def tooManyRestarts() {
_supervisor.foreach { sup
// can supervisor handle the notification?
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
}
notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason))
stop()
}

View file

@ -217,20 +217,6 @@ object Future {
def apply[T](body: T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] =
dispatcher.dispatchFuture(() body, timeout)
/**
* Construct a completable channel
*/
def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
val future = empty[Any](timeout)
def !(msg: Any) = future completeWithResult msg
}
/**
* Create an empty Future with default timeout
*/
@deprecated("Superceded by Promise.apply", "1.2")
def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout)
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
@ -273,9 +259,11 @@ object Future {
*/
def flow[A](body: A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = {
val future = Promise[A](timeout)
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f
val opte = f.exception
if (opte.isDefined) future completeWithException (opte.get)
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete {
_.exception match {
case Some(e) future completeWithException e
case None
}
}
future
}
@ -616,6 +604,14 @@ object Promise {
def apply[A](): Promise[A] = apply(Actor.TIMEOUT)
/**
* Construct a completable channel
*/
def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
val promise = Promise[Any](timeout)
def !(msg: Any) = promise completeWithResult msg
}
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() Unit]]]() {
override def initialValue = None
}
@ -732,11 +728,16 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
def complete(value: Either[Throwable, T]): DefaultPromise[T] = {
_lock.lock
val notifyTheseListeners = try {
if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired
_value = Some(value)
val existingListeners = _listeners
_listeners = Nil
existingListeners
if (_value.isEmpty) { //Only complete if we aren't expired
if (!isExpired) {
_value = Some(value)
val existingListeners = _listeners
_listeners = Nil
existingListeners
} else {
_listeners = Nil
Nil
}
} else Nil
} finally {
_signal.signalAll

View file

@ -90,8 +90,6 @@ trait LoadBalancer extends Router { self: Actor ⇒
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
/**
@ -106,8 +104,6 @@ abstract class UntypedLoadBalancer extends UntypedRouter {
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
object Routing {

View file

@ -40,7 +40,7 @@ trait EmbeddedAppServer extends Bootable {
super.onLoad
if (isRestEnabled) {
val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(error("microkernel-server.xml not found!")))
val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(sys.error("microkernel-server.xml not found!")))
System.setProperty("jetty.port", REST_PORT.toString)
System.setProperty("jetty.host", REST_HOSTNAME)

View file

@ -24,11 +24,6 @@ class TestActorRef[T <: Actor](factory: () ⇒ T, address: String) extends Local
dispatcher = CallingThreadDispatcher.global
receiveTimeout = None
/**
* Query actor's current receive behavior.
*/
override def isDefinedAt(o: Any) = actor.isDefinedAt(o)
/**
* Directly inject messages into actor receive behavior. Any exceptions
* thrown will be available to you, while still being able to use

View file

@ -8,7 +8,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.config.Supervision.OneForOneStrategy
import akka.event.EventHandler
import akka.dispatch.Future
import akka.dispatch.{ Future, Promise }
/**
* Test whether TestActorRef behaves as an ActorRef should, besides its own spec.
@ -234,14 +234,12 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
EventHandler.removeListener(log)
}
"proxy isDefinedAt/apply for the underlying actor" in {
"proxy apply for the underlying actor" in {
val ref = TestActorRef[WorkerActor].start()
ref.isDefinedAt("work") must be(true)
ref.isDefinedAt("sleep") must be(false)
intercept[IllegalActorStateException] { ref("work") }
val ch = Future.channel()
val ch = Promise.channel()
ref ! ch
val f = ch.future
val f = ch.promise
f must be('completed)
f.get must be("complexReply")
}