move all Ask stuff to akka.pattern
- remove ?(msg, timeout), should always use ?(msg)(timeout) because of Scala’s only Martin-acknowledged design flaw of being able to pass tuples into single-arg methods without adding another pair of parens - put a provider into all actor refs, because they all are created by and associated with one - treat all terminated refs equally: tell(msg) and return broken promise
This commit is contained in:
parent
1daaee98aa
commit
020c6b61da
19 changed files with 134 additions and 75 deletions
|
|
@ -40,11 +40,13 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
|
||||||
val c2 = system.actorOf(p, "c2")
|
val c2 = system.actorOf(p, "c2")
|
||||||
val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration)
|
val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
||||||
val user = system.asInstanceOf[ActorSystemImpl].guardian
|
val sysImpl = system.asInstanceOf[ActorSystemImpl]
|
||||||
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
|
|
||||||
val root = system.asInstanceOf[ActorSystemImpl].lookupRoot
|
|
||||||
|
|
||||||
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match {
|
val user = sysImpl.guardian
|
||||||
|
val syst = sysImpl.systemGuardian
|
||||||
|
val root = sysImpl.lookupRoot
|
||||||
|
|
||||||
|
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match {
|
||||||
case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems
|
case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -288,7 +288,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
||||||
val baos = new ByteArrayOutputStream(8192 * 32)
|
val baos = new ByteArrayOutputStream(8192 * 32)
|
||||||
val out = new ObjectOutputStream(baos)
|
val out = new ObjectOutputStream(baos)
|
||||||
|
|
||||||
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address
|
val sysImpl = system.asInstanceOf[ActorSystemImpl]
|
||||||
|
val addr = sysImpl.provider.rootPath.address
|
||||||
val serialized = SerializedActorRef(addr + "/non-existing")
|
val serialized = SerializedActorRef(addr + "/non-existing")
|
||||||
|
|
||||||
out.writeObject(serialized)
|
out.writeObject(serialized)
|
||||||
|
|
@ -296,9 +297,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
||||||
out.flush
|
out.flush
|
||||||
out.close
|
out.close
|
||||||
|
|
||||||
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
Serialization.currentSystem.withValue(sysImpl) {
|
||||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||||
in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing")
|
in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -359,8 +360,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
||||||
val ffive = (ref ? (5, timeout)).mapTo[String]
|
val ffive = (ref.ask(5)(timeout)).mapTo[String]
|
||||||
val fnull = (ref ? (null, timeout)).mapTo[String]
|
val fnull = (ref.ask(null)(timeout)).mapTo[String]
|
||||||
ref ! PoisonPill
|
ref ! PoisonPill
|
||||||
|
|
||||||
Await.result(ffive, timeout.duration) must be("five")
|
Await.result(ffive, timeout.duration) must be("five")
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
|
||||||
"use explicitly supplied timeout" in {
|
"use explicitly supplied timeout" in {
|
||||||
within(testTimeout - 100.millis, testTimeout + 300.millis) {
|
within(testTimeout - 100.millis, testTimeout + 300.millis) {
|
||||||
val echo = system.actorOf(Props.empty)
|
val echo = system.actorOf(Props.empty)
|
||||||
val f = echo.?("hallo", testTimeout)
|
val f = echo.?("hallo")(testTimeout)
|
||||||
try {
|
try {
|
||||||
intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) }
|
intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) }
|
||||||
} finally { system.stop(echo) }
|
} finally { system.stop(echo) }
|
||||||
|
|
|
||||||
|
|
@ -129,12 +129,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
}
|
}
|
||||||
|
|
||||||
def ping(pingPongActor: ActorRef) = {
|
def ping(pingPongActor: ActorRef) = {
|
||||||
Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
||||||
expectMsg(Timeout, PingMessage)
|
expectMsg(Timeout, PingMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
def kill(pingPongActor: ActorRef) = {
|
def kill(pingPongActor: ActorRef) = {
|
||||||
val result = (pingPongActor ? (DieReply, TimeoutMillis))
|
val result = (pingPongActor.?(DieReply)(TimeoutMillis))
|
||||||
expectMsg(Timeout, ExceptionMessage)
|
expectMsg(Timeout, ExceptionMessage)
|
||||||
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) }
|
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) }
|
||||||
}
|
}
|
||||||
|
|
@ -152,7 +152,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
"not restart temporary actor" in {
|
"not restart temporary actor" in {
|
||||||
val (temporaryActor, _) = temporaryActorAllForOne
|
val (temporaryActor, _) = temporaryActorAllForOne
|
||||||
|
|
||||||
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) }
|
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) }
|
||||||
|
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1 second)
|
||||||
}
|
}
|
||||||
|
|
@ -298,11 +298,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
|
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
|
||||||
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
|
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
|
||||||
intercept[RuntimeException] {
|
intercept[RuntimeException] {
|
||||||
Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis)
|
Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
||||||
|
|
||||||
inits.get must be(3)
|
inits.get must be(3)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -324,7 +324,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] }
|
||||||
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
|
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -334,7 +334,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) }
|
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) }
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(10000).mapTo[Int] }
|
||||||
Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45)
|
Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -351,7 +351,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] }
|
||||||
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
|
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -383,7 +383,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] }
|
||||||
assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45)
|
assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -400,7 +400,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
val timeout = 10000
|
val timeout = 10000
|
||||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] }
|
||||||
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
|
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -441,7 +441,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
"shouldHandleThrowables" in {
|
"shouldHandleThrowables" in {
|
||||||
class ThrowableTest(m: String) extends Throwable(m)
|
class ThrowableTest(m: String) extends Throwable(m)
|
||||||
|
|
||||||
filterException[ThrowableTest] {
|
EventFilter[ThrowableTest](occurrences = 4) intercept {
|
||||||
val f1 = Future[Any] { throw new ThrowableTest("test") }
|
val f1 = Future[Any] { throw new ThrowableTest("test") }
|
||||||
intercept[ThrowableTest] { Await.result(f1, timeout.duration) }
|
intercept[ThrowableTest] { Await.result(f1, timeout.duration) }
|
||||||
|
|
||||||
|
|
|
||||||
35
akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala
Normal file
35
akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala
Normal file
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.pattern
|
||||||
|
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
|
class AskSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
"The “ask” pattern" must {
|
||||||
|
|
||||||
|
"return broken promises on DeadLetters" in {
|
||||||
|
val dead = system.actorFor("/system/deadLetters")
|
||||||
|
val f = dead ask 42
|
||||||
|
f.isCompleted must be(true)
|
||||||
|
f.value.get match {
|
||||||
|
case Left(_: AskTimeoutException) ⇒
|
||||||
|
case v ⇒ fail(v + " was not Left(AskTimeoutException)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"return broken promises on EmptyLocalActorRefs" in {
|
||||||
|
val empty = system.actorFor("unknown")
|
||||||
|
implicit val timeout = system.settings.ActorTimeout
|
||||||
|
val f = empty ? 3.14
|
||||||
|
f.isCompleted must be(true)
|
||||||
|
f.value.get match {
|
||||||
|
case Left(_: AskTimeoutException) ⇒
|
||||||
|
case v ⇒ fail(v + " was not Left(AskTimeoutException)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -167,13 +167,6 @@ trait LocalRef extends ActorRefScope {
|
||||||
final def isLocal = true
|
final def isLocal = true
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Trait for matching on ActorRefs which have access to a provider; this is used in akka.pattern.ask.
|
|
||||||
*/
|
|
||||||
trait ActorRefWithProvider { this: InternalActorRef ⇒
|
|
||||||
def provider: ActorRefProvider
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal trait for assembling all the functionality needed internally on
|
* Internal trait for assembling all the functionality needed internally on
|
||||||
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
|
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
|
||||||
|
|
@ -181,12 +174,25 @@ trait ActorRefWithProvider { this: InternalActorRef ⇒
|
||||||
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
|
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
|
||||||
*/
|
*/
|
||||||
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒
|
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒
|
||||||
|
/*
|
||||||
|
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
|
||||||
|
*/
|
||||||
def resume(): Unit
|
def resume(): Unit
|
||||||
def suspend(): Unit
|
def suspend(): Unit
|
||||||
def restart(cause: Throwable): Unit
|
def restart(cause: Throwable): Unit
|
||||||
def stop(): Unit
|
def stop(): Unit
|
||||||
def sendSystemMessage(message: SystemMessage): Unit
|
def sendSystemMessage(message: SystemMessage): Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a reference to the actor ref provider which created this ref.
|
||||||
|
*/
|
||||||
|
def provider: ActorRefProvider
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain parent of this ref; used by getChild for ".." paths.
|
||||||
|
*/
|
||||||
def getParent: InternalActorRef
|
def getParent: InternalActorRef
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
|
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
|
||||||
* some provider-specific location. This method shall return the end result,
|
* some provider-specific location. This method shall return the end result,
|
||||||
|
|
@ -196,6 +202,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
||||||
* exist, return Nobody.
|
* exist, return Nobody.
|
||||||
*/
|
*/
|
||||||
def getChild(name: Iterator[String]): InternalActorRef
|
def getChild(name: Iterator[String]): InternalActorRef
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scope: if this ref points to an actor which resides within the same JVM,
|
* Scope: if this ref points to an actor which resides within the same JVM,
|
||||||
* i.e. whose mailbox is directly reachable etc.
|
* i.e. whose mailbox is directly reachable etc.
|
||||||
|
|
@ -203,8 +210,12 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
||||||
def isLocal: Boolean
|
def isLocal: Boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is an internal look-up failure token, not useful for anything else.
|
||||||
|
*/
|
||||||
private[akka] case object Nobody extends MinimalActorRef {
|
private[akka] case object Nobody extends MinimalActorRef {
|
||||||
val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody")
|
val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody")
|
||||||
|
def provider = throw new UnsupportedOperationException("Nobody does not provide")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -218,7 +229,7 @@ private[akka] class LocalActorRef private[akka] (
|
||||||
val systemService: Boolean = false,
|
val systemService: Boolean = false,
|
||||||
_receiveTimeout: Option[Duration] = None,
|
_receiveTimeout: Option[Duration] = None,
|
||||||
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
|
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
|
||||||
extends InternalActorRef with LocalRef with ActorRefWithProvider {
|
extends InternalActorRef with LocalRef {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* actorCell.start() publishes actorCell & this to the dispatcher, which
|
* actorCell.start() publishes actorCell & this to the dispatcher, which
|
||||||
|
|
@ -371,8 +382,9 @@ trait MinimalActorRef extends InternalActorRef with LocalRef {
|
||||||
}
|
}
|
||||||
|
|
||||||
object MinimalActorRef {
|
object MinimalActorRef {
|
||||||
def apply(_path: ActorPath)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef {
|
def apply(_path: ActorPath, _provider: ActorRefProvider)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef {
|
||||||
def path = _path
|
def path = _path
|
||||||
|
def provider = _provider
|
||||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit =
|
override def !(message: Any)(implicit sender: ActorRef = null): Unit =
|
||||||
if (receive.isDefinedAt(message)) receive(message)
|
if (receive.isDefinedAt(message)) receive(message)
|
||||||
}
|
}
|
||||||
|
|
@ -393,8 +405,6 @@ trait DeadLetterActorRefLike extends MinimalActorRef {
|
||||||
|
|
||||||
def eventStream: EventStream
|
def eventStream: EventStream
|
||||||
|
|
||||||
@volatile
|
|
||||||
private var brokenPromise: Future[Any] = _
|
|
||||||
@volatile
|
@volatile
|
||||||
private var _path: ActorPath = _
|
private var _path: ActorPath = _
|
||||||
def path: ActorPath = {
|
def path: ActorPath = {
|
||||||
|
|
@ -402,9 +412,13 @@ trait DeadLetterActorRefLike extends MinimalActorRef {
|
||||||
_path
|
_path
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) {
|
@volatile
|
||||||
|
private var _provider: ActorRefProvider = _
|
||||||
|
def provider = _provider
|
||||||
|
|
||||||
|
private[akka] def init(provider: ActorRefProvider, path: ActorPath) {
|
||||||
_path = path
|
_path = path
|
||||||
brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher)
|
_provider = provider
|
||||||
}
|
}
|
||||||
|
|
||||||
override def isTerminated(): Boolean = true
|
override def isTerminated(): Boolean = true
|
||||||
|
|
@ -426,16 +440,25 @@ class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRe
|
||||||
* This special dead letter reference has a name: it is that which is returned
|
* This special dead letter reference has a name: it is that which is returned
|
||||||
* by a local look-up which is unsuccessful.
|
* by a local look-up which is unsuccessful.
|
||||||
*/
|
*/
|
||||||
class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath)
|
class EmptyLocalActorRef(
|
||||||
extends DeadLetterActorRefLike {
|
val eventStream: EventStream,
|
||||||
init(_dispatcher, _path)
|
_provider: ActorRefProvider,
|
||||||
|
_dispatcher: MessageDispatcher,
|
||||||
|
_path: ActorPath) extends DeadLetterActorRefLike {
|
||||||
|
|
||||||
|
init(_provider, _path)
|
||||||
|
|
||||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
|
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
|
||||||
case d: DeadLetter ⇒ // do NOT form endless loops
|
case d: DeadLetter ⇒ // do NOT form endless loops
|
||||||
case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
|
case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef {
|
class VirtualPathContainer(
|
||||||
|
val provider: ActorRefProvider,
|
||||||
|
val path: ActorPath,
|
||||||
|
override val getParent: InternalActorRef,
|
||||||
|
val log: LoggingAdapter) extends MinimalActorRef {
|
||||||
|
|
||||||
private val children = new ConcurrentHashMap[String, InternalActorRef]
|
private val children = new ConcurrentHashMap[String, InternalActorRef]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -324,6 +324,8 @@ class LocalActorRefProvider(
|
||||||
|
|
||||||
val path = rootPath / "bubble-walker"
|
val path = rootPath / "bubble-walker"
|
||||||
|
|
||||||
|
def provider: ActorRefProvider = LocalActorRefProvider.this
|
||||||
|
|
||||||
override def stop() = stopped switchOn {
|
override def stop() = stopped switchOn {
|
||||||
terminationFuture.complete(causeOfTermination.toLeft(()))
|
terminationFuture.complete(causeOfTermination.toLeft(()))
|
||||||
}
|
}
|
||||||
|
|
@ -440,7 +442,7 @@ class LocalActorRefProvider(
|
||||||
lazy val systemGuardian: InternalActorRef =
|
lazy val systemGuardian: InternalActorRef =
|
||||||
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None)
|
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None)
|
||||||
|
|
||||||
lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log)
|
lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)
|
||||||
|
|
||||||
def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = {
|
def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = {
|
||||||
assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()")
|
assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()")
|
||||||
|
|
@ -489,7 +491,7 @@ class LocalActorRefProvider(
|
||||||
} else ref.getChild(path.iterator) match {
|
} else ref.getChild(path.iterator) match {
|
||||||
case Nobody ⇒
|
case Nobody ⇒
|
||||||
log.debug("look-up of path sequence '{}' failed", path)
|
log.debug("look-up of path sequence '{}' failed", path)
|
||||||
new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path)
|
new EmptyLocalActorRef(eventStream, system.provider, dispatcher, ref.path / path)
|
||||||
case x ⇒ x
|
case x ⇒ x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -377,10 +377,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
||||||
// the provider is expected to start default loggers, LocalActorRefProvider does this
|
// the provider is expected to start default loggers, LocalActorRefProvider does this
|
||||||
provider.init(this)
|
provider.init(this)
|
||||||
_log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass)
|
_log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass)
|
||||||
deadLetters.init(dispatcher, lookupRoot.path / "deadLetters")
|
deadLetters.init(provider, lookupRoot.path / "deadLetters")
|
||||||
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
|
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
|
||||||
registerOnTermination(stopScheduler())
|
registerOnTermination(stopScheduler())
|
||||||
_locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch)
|
_locker = new Locker(scheduler, ReaperInterval, provider, lookupRoot.path / "locker", deathWatch)
|
||||||
loadExtensions()
|
loadExtensions()
|
||||||
if (LogConfigOnStart) logConfiguration()
|
if (LogConfigOnStart) logConfiguration()
|
||||||
this
|
this
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,12 @@ import akka.util.duration._
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import akka.event.DeathWatch
|
import akka.event.DeathWatch
|
||||||
|
|
||||||
class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef {
|
class Locker(
|
||||||
|
scheduler: Scheduler,
|
||||||
|
period: Duration,
|
||||||
|
val provider: ActorRefProvider,
|
||||||
|
val path: ActorPath,
|
||||||
|
val deathWatch: DeathWatch) extends MinimalActorRef {
|
||||||
|
|
||||||
class DavyJones extends Runnable {
|
class DavyJones extends Runnable {
|
||||||
def run = {
|
def run = {
|
||||||
|
|
|
||||||
|
|
@ -335,15 +335,15 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
MethodCall(method, args) match {
|
MethodCall(method, args) match {
|
||||||
case m if m.isOneWay ⇒ actor ! m; null //Null return value
|
case m if m.isOneWay ⇒ actor ! m; null //Null return value
|
||||||
case m if m.returnsFuture_? ⇒ actor.?(m, timeout)
|
case m if m.returnsFuture_? ⇒ ask(actor, m)(timeout)
|
||||||
case m if m.returnsJOption_? || m.returnsOption_? ⇒
|
case m if m.returnsJOption_? || m.returnsOption_? ⇒
|
||||||
val f = actor.?(m, timeout)
|
val f = ask(actor, m)(timeout)
|
||||||
(try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match {
|
(try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match {
|
||||||
case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None
|
case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None
|
||||||
case Some(Right(joption: AnyRef)) ⇒ joption
|
case Some(Right(joption: AnyRef)) ⇒ joption
|
||||||
case Some(Left(ex)) ⇒ throw ex
|
case Some(Left(ex)) ⇒ throw ex
|
||||||
}
|
}
|
||||||
case m ⇒ Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef]
|
case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -649,6 +649,7 @@ object Logging {
|
||||||
*/
|
*/
|
||||||
class StandardOutLogger extends MinimalActorRef with StdOutLogger {
|
class StandardOutLogger extends MinimalActorRef with StdOutLogger {
|
||||||
val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger")
|
val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger")
|
||||||
|
def provider: ActorRefProvider = throw new UnsupportedOperationException("StandardOutLogger does not provide")
|
||||||
override val toString = "StandardOutLogger"
|
override val toString = "StandardOutLogger"
|
||||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message)
|
override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ object AskSupport {
|
||||||
*
|
*
|
||||||
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
||||||
*/
|
*/
|
||||||
def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
|
def ask(message: Any)(implicit timeout: Timeout = null): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||||
|
|
@ -81,20 +81,7 @@ object AskSupport {
|
||||||
*
|
*
|
||||||
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
||||||
*/
|
*/
|
||||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)
|
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
|
||||||
|
|
||||||
/*
|
|
||||||
* FIXME: I think this should be removed, since it introduces an “ambiguity”
|
|
||||||
* when sending Tuple2, which the compiler resolves unexpectedly to this
|
|
||||||
* method; also overloading is bad, isn’t it? - RK (ticket #1653)
|
|
||||||
*/
|
|
||||||
/**
|
|
||||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
|
||||||
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
|
|
||||||
* implicit timeout
|
|
||||||
*/
|
|
||||||
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -102,6 +89,7 @@ object AskSupport {
|
||||||
* receive the reply to an "ask" operation.
|
* receive the reply to an "ask" operation.
|
||||||
*/
|
*/
|
||||||
private[akka] final class PromiseActorRef(
|
private[akka] final class PromiseActorRef(
|
||||||
|
val provider: ActorRefProvider,
|
||||||
val path: ActorPath,
|
val path: ActorPath,
|
||||||
override val getParent: InternalActorRef,
|
override val getParent: InternalActorRef,
|
||||||
val result: Promise[Any],
|
val result: Promise[Any],
|
||||||
|
|
@ -130,7 +118,7 @@ object AskSupport {
|
||||||
def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
|
def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
|
||||||
val path = provider.tempPath()
|
val path = provider.tempPath()
|
||||||
val result = Promise[Any]()(provider.dispatcher)
|
val result = Promise[Any]()(provider.dispatcher)
|
||||||
val a = new PromiseActorRef(path, provider.tempContainer, result, provider.deathWatch)
|
val a = new PromiseActorRef(provider, path, provider.tempContainer, result, provider.deathWatch)
|
||||||
provider.registerTempActor(a, path)
|
provider.registerTempActor(a, path)
|
||||||
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) }
|
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) }
|
||||||
result onComplete { _ ⇒
|
result onComplete { _ ⇒
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,10 @@
|
||||||
*/
|
*/
|
||||||
package akka
|
package akka
|
||||||
|
|
||||||
|
import akka.actor._
|
||||||
|
import akka.dispatch.{ Future, Promise }
|
||||||
|
import akka.util.{ Timeout, Duration }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* == Commonly Used Patterns With Akka ==
|
* == Commonly Used Patterns With Akka ==
|
||||||
*
|
*
|
||||||
|
|
@ -38,10 +42,6 @@ package akka
|
||||||
*/
|
*/
|
||||||
package object pattern {
|
package object pattern {
|
||||||
|
|
||||||
import akka.actor._
|
|
||||||
import akka.dispatch.{ Future, Promise }
|
|
||||||
import akka.util.{ Timeout, Duration }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Import this implicit conversion to gain `?` and `ask` methods on
|
* Import this implicit conversion to gain `?` and `ask` methods on
|
||||||
* [[akka.actor.ActorRef]], which will defer to the
|
* [[akka.actor.ActorRef]], which will defer to the
|
||||||
|
|
@ -88,12 +88,15 @@ package object pattern {
|
||||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||||
*/
|
*/
|
||||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = actorRef match {
|
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = actorRef match {
|
||||||
case ref: ActorRefWithProvider ⇒
|
case ref: InternalActorRef if ref.isTerminated ⇒
|
||||||
|
actorRef.tell(message)
|
||||||
|
Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher)
|
||||||
|
case ref: InternalActorRef ⇒
|
||||||
val provider = ref.provider
|
val provider = ref.provider
|
||||||
(if (timeout == null) provider.settings.ActorTimeout else timeout) match {
|
(if (timeout == null) provider.settings.ActorTimeout else timeout) match {
|
||||||
case t if t.duration.length <= 0 ⇒
|
case t if t.duration.length <= 0 ⇒
|
||||||
actorRef.tell(message)
|
actorRef.tell(message)
|
||||||
Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(provider.dispatcher)
|
Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher)
|
||||||
case t ⇒
|
case t ⇒
|
||||||
val a = AskSupport.createAsker(provider, t)
|
val a = AskSupport.createAsker(provider, t)
|
||||||
actorRef.tell(message, a)
|
actorRef.tell(message, a)
|
||||||
|
|
@ -132,7 +135,7 @@ package object pattern {
|
||||||
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = {
|
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = {
|
||||||
future onComplete {
|
future onComplete {
|
||||||
case Right(r) ⇒ actorRef ! r
|
case Right(r) ⇒ actorRef ! r
|
||||||
case Left(f) ⇒ actorRef ! akka.actor.Status.Failure(f)
|
case Left(f) ⇒ actorRef ! Status.Failure(f)
|
||||||
}
|
}
|
||||||
future
|
future
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -568,4 +568,3 @@ object Timeout {
|
||||||
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
|
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
|
||||||
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
|
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -124,7 +124,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
|
||||||
* that new state can be obtained within the given timeout.
|
* that new state can be obtained within the given timeout.
|
||||||
*/
|
*/
|
||||||
def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = {
|
def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = {
|
||||||
def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]]
|
def dispatch = ask(updater, Alter(f))(timeout).asInstanceOf[Future[T]]
|
||||||
val txn = Txn.findCurrent
|
val txn = Txn.findCurrent
|
||||||
if (txn.isDefined) {
|
if (txn.isDefined) {
|
||||||
val result = Promise[T]()(system.dispatcher)
|
val result = Promise[T]()(system.dispatcher)
|
||||||
|
|
@ -172,7 +172,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
|
||||||
send((value: T) ⇒ {
|
send((value: T) ⇒ {
|
||||||
suspend()
|
suspend()
|
||||||
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher("akka.agent.alter-off-dispatcher"))
|
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher("akka.agent.alter-off-dispatcher"))
|
||||||
result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]]
|
result completeWith ask(threadBased, Alter(f))(timeout).asInstanceOf[Future[T]]
|
||||||
value
|
value
|
||||||
})
|
})
|
||||||
result
|
result
|
||||||
|
|
|
||||||
|
|
@ -252,7 +252,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
//#using-explicit-timeout
|
//#using-explicit-timeout
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
val future = myActor ? ("hello", timeout = 500 millis)
|
val future = myActor.ask("hello")(500 millis)
|
||||||
//#using-explicit-timeout
|
//#using-explicit-timeout
|
||||||
Await.result(future, 500 millis) must be("hello")
|
Await.result(future, 500 millis) must be("hello")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -124,7 +124,7 @@ case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMs
|
||||||
* It acts as the brain of the remote that responds to system remote events (messages) and undertakes action.
|
* It acts as the brain of the remote that responds to system remote events (messages) and undertakes action.
|
||||||
*/
|
*/
|
||||||
class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
|
class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
|
||||||
extends VirtualPathContainer(_path, _parent, _log) {
|
extends VirtualPathContainer(system.provider, _path, _parent, _log) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the longest matching path which we know about and return that ref
|
* Find the longest matching path which we know about and return that ref
|
||||||
|
|
|
||||||
|
|
@ -166,7 +166,7 @@ private[akka] class RemoteActorRef private[akka] (
|
||||||
val path: ActorPath,
|
val path: ActorPath,
|
||||||
val getParent: InternalActorRef,
|
val getParent: InternalActorRef,
|
||||||
loader: Option[ClassLoader])
|
loader: Option[ClassLoader])
|
||||||
extends InternalActorRef with RemoteRef with ActorRefWithProvider {
|
extends InternalActorRef with RemoteRef {
|
||||||
|
|
||||||
def getChild(name: Iterator[String]): InternalActorRef = {
|
def getChild(name: Iterator[String]): InternalActorRef = {
|
||||||
val s = name.toStream
|
val s = name.toStream
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue