Merge branch 'wip-1581-patterns-ask'

This commit is contained in:
Roland 2012-01-23 18:35:30 +01:00
commit 2a0c4ca145
126 changed files with 980 additions and 415 deletions

View file

@ -10,6 +10,7 @@ import akka.dispatch.OldFuture
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.migration.AskableActorRef
/** /**
* Migration replacement for `object akka.actor.Actor`. * Migration replacement for `object akka.actor.Actor`.
@ -54,7 +55,6 @@ object OldActor {
@deprecated("OldActor.remote should not be used", "2.0") @deprecated("OldActor.remote should not be used", "2.0")
lazy val remote: OldRemoteSupport = new OldRemoteSupport lazy val remote: OldRemoteSupport = new OldRemoteSupport
} }
@deprecated("use Actor", "2.0") @deprecated("use Actor", "2.0")
@ -66,6 +66,8 @@ abstract class OldActor extends Actor {
implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef) implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef)
implicit def askableActorRef(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
@deprecated("Use context.become instead", "2.0") @deprecated("Use context.become instead", "2.0")
def become(behavior: Receive, discardOld: Boolean = true) = context.become(behavior, discardOld) def become(behavior: Receive, discardOld: Boolean = true) = context.become(behavior, discardOld)

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.migration
import akka.actor.ActorRef
import akka.dispatch.Future
import akka.util.Timeout
/**
* Implementation detail of the ask pattern enrichment of ActorRef
*/
private[akka] final class AskableActorRef(val actorRef: ActorRef) {
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* flow {
* val f = worker.ask(request)(timeout)
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* flow {
* val f = worker ? request
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
/**
* This method is just there to catch 2.0-unsupported usage and print deprecation warnings for it.
*/
@deprecated("use ?(msg)(timeout), this method has dangerous ambiguity", "2.0-migration")
def ?(message: Any, timeout: Timeout)(i: Int = 0): Future[Any] = this.?(message)(timeout)
}

View file

@ -31,4 +31,7 @@ package object migration {
def stop(): Unit = GlobalActorSystem.stop(actorRef) def stop(): Unit = GlobalActorSystem.stop(actorRef)
} }
implicit def ask(actorRef: ActorRef) = new akka.migration.AskableActorRef(actorRef)
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
} }

View file

@ -8,6 +8,7 @@ import akka.testkit._
import org.scalatest.BeforeAndAfterEach import org.scalatest.BeforeAndAfterEach
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object ActorFireForgetRequestReplySpec { object ActorFireForgetRequestReplySpec {

View file

@ -12,6 +12,7 @@ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object ActorLifeCycleSpec { object ActorLifeCycleSpec {

View file

@ -6,6 +6,7 @@ package akka.actor
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object ActorLookupSpec { object ActorLookupSpec {
@ -39,11 +40,13 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val c2 = system.actorOf(p, "c2") val c2 = system.actorOf(p, "c2")
val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration) val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration)
val user = system.asInstanceOf[ActorSystemImpl].guardian val sysImpl = system.asInstanceOf[ActorSystemImpl]
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
val root = system.asInstanceOf[ActorSystemImpl].lookupRoot
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match { val user = sysImpl.guardian
val syst = sysImpl.systemGuardian
val root = sysImpl.lookupRoot
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match {
case RelativeActorPath(elems) system.actorFor("/").path / elems case RelativeActorPath(elems) system.actorFor("/").path / elems
}) })

View file

@ -15,6 +15,7 @@ import akka.util.ReflectiveAccess
import akka.serialization.Serialization import akka.serialization.Serialization
import java.util.concurrent.{ CountDownLatch, TimeUnit } import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.dispatch.{ Await, DefaultPromise, Promise, Future } import akka.dispatch.{ Await, DefaultPromise, Promise, Future }
import akka.pattern.ask
object ActorRefSpec { object ActorRefSpec {
@ -287,7 +288,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val baos = new ByteArrayOutputStream(8192 * 32) val baos = new ByteArrayOutputStream(8192 * 32)
val out = new ObjectOutputStream(baos) val out = new ObjectOutputStream(baos)
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address val sysImpl = system.asInstanceOf[ActorSystemImpl]
val addr = sysImpl.provider.rootPath.address
val serialized = SerializedActorRef(addr + "/non-existing") val serialized = SerializedActorRef(addr + "/non-existing")
out.writeObject(serialized) out.writeObject(serialized)
@ -295,9 +297,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
out.flush out.flush
out.close out.close
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { Serialization.currentSystem.withValue(sysImpl) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing") in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing")
} }
} }
@ -358,8 +360,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
} }
})) }))
val ffive = (ref ? (5, timeout)).mapTo[String] val ffive = (ref.ask(5)(timeout)).mapTo[String]
val fnull = (ref ? (null, timeout)).mapTo[String] val fnull = (ref.ask(null)(timeout)).mapTo[String]
ref ! PoisonPill ref ! PoisonPill
Await.result(ffive, timeout.duration) must be("five") Await.result(ffive, timeout.duration) must be("five")

View file

@ -10,6 +10,7 @@ import akka.testkit.DefaultTimeout
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.Await import akka.dispatch.Await
import akka.util.Timeout import akka.util.Timeout
import akka.pattern.{ ask, AskTimeoutException }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
@ -44,7 +45,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
"use explicitly supplied timeout" in { "use explicitly supplied timeout" in {
within(testTimeout - 100.millis, testTimeout + 300.millis) { within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = system.actorOf(Props.empty) val echo = system.actorOf(Props.empty)
val f = echo.?("hallo", testTimeout) val f = echo.?("hallo")(testTimeout)
try { try {
intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) } intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) }
} finally { system.stop(echo) } } finally { system.stop(echo) }

View file

@ -8,6 +8,7 @@ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec

View file

@ -9,6 +9,7 @@ import akka.util.duration._
import Actor._ import Actor._
import akka.util.Duration import akka.util.Duration
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object ForwardActorSpec { object ForwardActorSpec {
val ExpectedMessage = "FOO" val ExpectedMessage = "FOO"
@ -46,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec {
"forward actor reference when invoking forward on ask" in { "forward actor reference when invoking forward on ask" in {
val chain = createForwardingChain(system) val chain = createForwardingChain(system)
chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage testActor ! ExpectedMessage } chain.ask(ExpectedMessage)(5 seconds) onSuccess { case ExpectedMessage testActor ! ExpectedMessage }
expectMsg(5 seconds, ExpectedMessage) expectMsg(5 seconds, ExpectedMessage)
} }
} }

View file

@ -9,6 +9,7 @@ import akka.util.duration._
import scala.util.continuations._ import scala.util.continuations._
import akka.testkit._ import akka.testkit._
import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher } import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher }
import akka.pattern.ask
object IOActorSpec { object IOActorSpec {

View file

@ -14,6 +14,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch import akka.testkit.TestLatch
import akka.util.duration._ import akka.util.duration._
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RestartStrategySpec extends AkkaSpec with DefaultTimeout { class RestartStrategySpec extends AkkaSpec with DefaultTimeout {

View file

@ -5,6 +5,7 @@ import akka.util.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit._ import akka.testkit._
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -8,6 +8,7 @@ import akka.testkit._
import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object SupervisorHierarchySpec { object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg) class FireWorkerException(msg: String) extends Exception(msg)

View file

@ -8,6 +8,7 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await }
import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.pattern.ask
object SupervisorMiscSpec { object SupervisorMiscSpec {
val config = """ val config = """

View file

@ -11,6 +11,7 @@ import akka.testkit.TestEvent._
import akka.testkit._ import akka.testkit._
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object SupervisorSpec { object SupervisorSpec {
val Timeout = 5 seconds val Timeout = 5 seconds
@ -130,12 +131,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
} }
def ping(pingPongActor: ActorRef) = { def ping(pingPongActor: ActorRef) = {
Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage
expectMsg(Timeout, PingMessage) expectMsg(Timeout, PingMessage)
} }
def kill(pingPongActor: ActorRef) = { def kill(pingPongActor: ActorRef) = {
val result = (pingPongActor ? (DieReply, TimeoutMillis)) val result = (pingPongActor.?(DieReply)(TimeoutMillis))
expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, ExceptionMessage)
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) }
} }
@ -153,7 +154,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"not restart temporary actor" in { "not restart temporary actor" in {
val (temporaryActor, _) = temporaryActorAllForOne val (temporaryActor, _) = temporaryActorAllForOne
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) }
expectNoMsg(1 second) expectNoMsg(1 second)
} }
@ -299,11 +300,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
intercept[RuntimeException] { intercept[RuntimeException] {
Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis)
} }
} }
Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage
inits.get must be(3) inits.get must be(3)

View file

@ -12,6 +12,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, Dispatchers } import akka.dispatch.{ Await, Dispatchers }
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {

View file

@ -11,6 +11,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {

View file

@ -18,6 +18,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.japi.{ Creator, Option JOption } import akka.japi.{ Creator, Option JOption }
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, Dispatchers, Future, Promise } import akka.dispatch.{ Await, Dispatchers, Future, Promise }
import akka.pattern.ask
object TypedActorSpec { object TypedActorSpec {

View file

@ -20,6 +20,7 @@ import akka.util.duration._
import akka.event.Logging.Error import akka.event.Logging.Error
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.Duration import akka.util.Duration
import akka.pattern.ask
object ActorModelSpec { object ActorModelSpec {

View file

@ -8,6 +8,7 @@ import akka.util.Duration
import akka.util.duration._ import akka.util.duration._
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher } import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher }
import akka.pattern.ask
object DispatcherActorSpec { object DispatcherActorSpec {
val config = """ val config = """

View file

@ -7,6 +7,7 @@ import akka.actor.{ Props, Actor }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import org.scalatest.BeforeAndAfterEach import org.scalatest.BeforeAndAfterEach
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers } import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers }
import akka.pattern.ask
object PinnedActorSpec { object PinnedActorSpec {
val config = """ val config = """

View file

@ -9,6 +9,7 @@ import akka.actor.future2actor
import akka.util.duration._ import akka.util.duration._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.pattern.ask
class Future2ActorSpec extends AkkaSpec with DefaultTimeout { class Future2ActorSpec extends AkkaSpec with DefaultTimeout {

View file

@ -15,6 +15,7 @@ import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch import akka.testkit.TestLatch
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
import scala.runtime.NonLocalReturnControl import scala.runtime.NonLocalReturnControl
import akka.pattern.ask
import java.lang.{ IllegalStateException, ArithmeticException } import java.lang.{ IllegalStateException, ArithmeticException }
object FutureSpec { object FutureSpec {
@ -323,7 +324,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})) }))
} }
val timeout = 10000 val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200))(timeout).mapTo[Int] }
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
} }
@ -351,7 +352,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); sender.tell(add) } def receive = { case (add: Int, wait: Int) Thread.sleep(wait); sender.tell(add) }
})) }))
} }
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200))(10000).mapTo[Int] }
Await.result(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(45) Await.result(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(45)
} }
@ -368,7 +369,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})) }))
} }
val timeout = 10000 val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100))(timeout).mapTo[Int] }
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected") intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
} }
} }
@ -400,7 +401,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})) }))
} }
val timeout = 10000 val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200))(timeout).mapTo[Int] }
assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45) assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45)
} }
@ -417,7 +418,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})) }))
} }
val timeout = 10000 val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100))(timeout).mapTo[Int] }
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected" intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
} }
} }
@ -458,7 +459,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"shouldHandleThrowables" in { "shouldHandleThrowables" in {
class ThrowableTest(m: String) extends Throwable(m) class ThrowableTest(m: String) extends Throwable(m)
filterException[ThrowableTest] { EventFilter[ThrowableTest](occurrences = 4) intercept {
val f1 = Future[Any] { throw new ThrowableTest("test") } val f1 = Future[Any] { throw new ThrowableTest("test") }
intercept[ThrowableTest] { Await.result(f1, timeout.duration) } intercept[ThrowableTest] { Await.result(f1, timeout.duration) }

View file

@ -2,7 +2,7 @@ package akka.dispatch
import akka.actor.{ Props, LocalActorRef, Actor } import akka.actor.{ Props, LocalActorRef, Actor }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.util.Duration import akka.pattern.ask
import akka.util.duration._ import akka.util.duration._
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import com.typesafe.config.Config import com.typesafe.config.Config

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.testkit.AkkaSpec
import akka.util.duration._
class AskSpec extends AkkaSpec {
"The “ask” pattern" must {
"return broken promises on DeadLetters" in {
val dead = system.actorFor("/system/deadLetters")
val f = dead.ask(42)(1 second)
f.isCompleted must be(true)
f.value.get match {
case Left(_: AskTimeoutException)
case v fail(v + " was not Left(AskTimeoutException)")
}
}
"return broken promises on EmptyLocalActorRefs" in {
val empty = system.actorFor("unknown")
implicit val timeout = system.settings.ActorTimeout
val f = empty ? 3.14
f.isCompleted must be(true)
f.value.get match {
case Left(_: AskTimeoutException)
case v fail(v + " was not Left(AskTimeoutException)")
}
}
}
}

View file

@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask
object ConfiguredLocalRoutingSpec { object ConfiguredLocalRoutingSpec {
val config = """ val config = """

View file

@ -10,6 +10,7 @@ import akka.dispatch.Await
import akka.util.duration._ import akka.util.duration._
import akka.actor.ActorRef import akka.actor.ActorRef
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask
object ResizerSpec { object ResizerSpec {

View file

@ -12,6 +12,7 @@ import akka.dispatch.Await
import akka.util.Duration import akka.util.Duration
import akka.config.ConfigurationException import akka.config.ConfigurationException
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.pattern.ask
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config import com.typesafe.config.Config

View file

@ -13,6 +13,7 @@ import akka.util.Timeout
import akka.util.duration._ import akka.util.duration._
import scala.reflect.BeanInfo import scala.reflect.BeanInfo
import com.google.protobuf.Message import com.google.protobuf.Message
import akka.pattern.ask
class ProtobufSerializer extends Serializer { class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config; package com.typesafe.config;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com> * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;

View file

@ -104,33 +104,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/ */
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender) final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
/**
* Akka Java API.
*
* Sends a message asynchronously returns a future holding the eventual reply message.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'tell' together with the sender
* parameter to implement non-blocking request/response message exchanges.
*
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
* otherwise the sender will block until the timeout expires.
*
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors reference, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*/
def ask(message: Any, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
def ask(message: Any, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
/** /**
* Forwards the message and passes the original sender actor as the sender. * Forwards the message and passes the original sender actor as the sender.
* <p/> * <p/>
@ -179,35 +152,6 @@ trait ScalaActorRef { ref: ActorRef ⇒
*/ */
def !(message: Any)(implicit sender: ActorRef = null): Unit def !(message: Any)(implicit sender: ActorRef = null): Unit
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
* sender parameter to implement non-blocking request/response message exchanges.
*
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
* otherwise the sender will block until the timeout expires.
*
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors reference, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any]
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
* implicit timeout
*/
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
} }
/** /**
@ -230,12 +174,25 @@ trait LocalRef extends ActorRefScope {
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA! * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
*/ */
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope
/*
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
*/
def resume(): Unit def resume(): Unit
def suspend(): Unit def suspend(): Unit
def restart(cause: Throwable): Unit def restart(cause: Throwable): Unit
def stop(): Unit def stop(): Unit
def sendSystemMessage(message: SystemMessage): Unit def sendSystemMessage(message: SystemMessage): Unit
/**
* Get a reference to the actor ref provider which created this ref.
*/
def provider: ActorRefProvider
/**
* Obtain parent of this ref; used by getChild for ".." paths.
*/
def getParent: InternalActorRef def getParent: InternalActorRef
/** /**
* Obtain ActorRef by possibly traversing the actor tree or looking it up at * Obtain ActorRef by possibly traversing the actor tree or looking it up at
* some provider-specific location. This method shall return the end result, * some provider-specific location. This method shall return the end result,
@ -245,6 +202,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
* exist, return Nobody. * exist, return Nobody.
*/ */
def getChild(name: Iterator[String]): InternalActorRef def getChild(name: Iterator[String]): InternalActorRef
/** /**
* Scope: if this ref points to an actor which resides within the same JVM, * Scope: if this ref points to an actor which resides within the same JVM,
* i.e. whose mailbox is directly reachable etc. * i.e. whose mailbox is directly reachable etc.
@ -252,8 +210,12 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
def isLocal: Boolean def isLocal: Boolean
} }
/**
* This is an internal look-up failure token, not useful for anything else.
*/
private[akka] case object Nobody extends MinimalActorRef { private[akka] case object Nobody extends MinimalActorRef {
val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody") val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody")
def provider = throw new UnsupportedOperationException("Nobody does not provide")
} }
/** /**
@ -321,6 +283,8 @@ private[akka] class LocalActorRef private[akka] (
def getParent: InternalActorRef = actorCell.parent def getParent: InternalActorRef = actorCell.parent
def provider = actorCell.provider
/** /**
* Method for looking up a single child beneath this actor. Override in order * Method for looking up a single child beneath this actor. Override in order
* to inject synthetic actor paths like /temp. * to inject synthetic actor paths like /temp.
@ -365,17 +329,6 @@ private[akka] class LocalActorRef private[akka] (
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
actorCell.provider.ask(timeout) match {
case Some(a)
this.!(message)(a)
a.result
case None
this.!(message)(null)
Promise[Any]()(actorCell.system.dispatcher)
}
}
def restart(cause: Throwable): Unit = actorCell.restart(cause) def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
@ -402,9 +355,10 @@ case class SerializedActorRef(path: String) {
/** /**
* Trait for ActorRef implementations where all methods contain default stubs. * Trait for ActorRef implementations where all methods contain default stubs.
*/ */
trait MinimalActorRef extends InternalActorRef with LocalRef { private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
def getParent: InternalActorRef = Nobody def getParent: InternalActorRef = Nobody
def getChild(names: Iterator[String]): InternalActorRef = { def getChild(names: Iterator[String]): InternalActorRef = {
val dropped = names.dropWhile(_.isEmpty) val dropped = names.dropWhile(_.isEmpty)
if (dropped.isEmpty) this if (dropped.isEmpty) this
@ -420,9 +374,6 @@ trait MinimalActorRef extends InternalActorRef with LocalRef {
def !(message: Any)(implicit sender: ActorRef = null): Unit = () def !(message: Any)(implicit sender: ActorRef = null): Unit = ()
def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
def sendSystemMessage(message: SystemMessage): Unit = () def sendSystemMessage(message: SystemMessage): Unit = ()
def restart(cause: Throwable): Unit = () def restart(cause: Throwable): Unit = ()
@ -430,9 +381,10 @@ trait MinimalActorRef extends InternalActorRef with LocalRef {
protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
} }
object MinimalActorRef { private[akka] object MinimalActorRef {
def apply(_path: ActorPath)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef { def apply(_path: ActorPath, _provider: ActorRefProvider)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef {
def path = _path def path = _path
def provider = _provider
override def !(message: Any)(implicit sender: ActorRef = null): Unit = override def !(message: Any)(implicit sender: ActorRef = null): Unit =
if (receive.isDefinedAt(message)) receive(message) if (receive.isDefinedAt(message)) receive(message)
} }
@ -440,7 +392,7 @@ object MinimalActorRef {
case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
object DeadLetterActorRef { private[akka] object DeadLetterActorRef {
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters
@ -449,12 +401,10 @@ object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef val serialized = new SerializedDeadLetterActorRef
} }
trait DeadLetterActorRefLike extends MinimalActorRef { private[akka] trait DeadLetterActorRefLike extends MinimalActorRef {
def eventStream: EventStream def eventStream: EventStream
@volatile
private var brokenPromise: Future[Any] = _
@volatile @volatile
private var _path: ActorPath = _ private var _path: ActorPath = _
def path: ActorPath = { def path: ActorPath = {
@ -462,9 +412,13 @@ trait DeadLetterActorRefLike extends MinimalActorRef {
_path _path
} }
private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) { @volatile
private var _provider: ActorRefProvider = _
def provider = _provider
private[akka] def init(provider: ActorRefProvider, path: ActorPath) {
_path = path _path = path
brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher) _provider = provider
} }
override def isTerminated(): Boolean = true override def isTerminated(): Boolean = true
@ -473,16 +427,9 @@ trait DeadLetterActorRefLike extends MinimalActorRef {
case d: DeadLetter eventStream.publish(d) case d: DeadLetter eventStream.publish(d)
case _ eventStream.publish(DeadLetter(message, sender, this)) case _ eventStream.publish(DeadLetter(message, sender, this))
} }
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
eventStream.publish(DeadLetter(message, this, this))
// leave this in: guard with good visibility against really stupid/weird errors
assert(brokenPromise != null)
brokenPromise
}
} }
class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike { private[akka] class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike {
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
} }
@ -491,16 +438,28 @@ class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRe
* This special dead letter reference has a name: it is that which is returned * This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful. * by a local look-up which is unsuccessful.
*/ */
class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) private[akka] class EmptyLocalActorRef(
extends DeadLetterActorRefLike { val eventStream: EventStream,
init(_dispatcher, _path) _provider: ActorRefProvider,
_dispatcher: MessageDispatcher,
_path: ActorPath) extends DeadLetterActorRefLike {
init(_provider, _path)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops case d: DeadLetter // do NOT form endless loops
case _ eventStream.publish(DeadLetter(message, sender, this)) case _ eventStream.publish(DeadLetter(message, sender, this))
} }
} }
class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef { /**
* Internal implementation detail used for paths like /temp
*/
private[akka] class VirtualPathContainer(
val provider: ActorRefProvider,
val path: ActorPath,
override val getParent: InternalActorRef,
val log: LoggingAdapter) extends MinimalActorRef {
private val children = new ConcurrentHashMap[String, InternalActorRef] private val children = new ConcurrentHashMap[String, InternalActorRef]
@ -534,41 +493,3 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal
} }
} }
} }
/**
* This is what is used to complete a Future that is returned from an ask/? call,
* when it times out.
*/
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
def this(message: String) = this(message, null: Throwable)
}
class AskActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,
val dispatcher: MessageDispatcher,
val deathWatch: DeathWatch) extends MinimalActorRef {
final val running = new AtomicBoolean(true)
final val result = Promise[Any]()(dispatcher)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match {
case Status.Success(r) result.success(r)
case Status.Failure(f) result.failure(f)
case other result.success(other)
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case _
}
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
Promise.failed(new UnsupportedOperationException("Ask/? is not supported for [%s]".format(getClass.getName)))(dispatcher)
override def isTerminated = result.isCompleted
override def stop(): Unit = if (running.getAndSet(false)) {
deathWatch.publish(Terminated(this))
}
}

View file

@ -47,6 +47,8 @@ trait ActorRefProvider {
def settings: ActorSystem.Settings def settings: ActorSystem.Settings
def dispatcher: MessageDispatcher
/** /**
* Initialization of an ActorRefProvider happens in two steps: first * Initialization of an ActorRefProvider happens in two steps: first
* construction of the object with settings, eventStream, scheduler, etc. * construction of the object with settings, eventStream, scheduler, etc.
@ -59,6 +61,26 @@ trait ActorRefProvider {
def scheduler: Scheduler def scheduler: Scheduler
/**
* Generates and returns a unique actor path below /temp.
*/
def tempPath(): ActorPath
/**
* Returns the actor reference representing the /temp path.
*/
def tempContainer: InternalActorRef
/**
* Registers an actorRef at a path returned by tempPath(); do NOT pass in any other path.
*/
def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit
/**
* Unregister a temporary actor from the /temp path (i.e. obtained from tempPath()); do NOT pass in any other path.
*/
def unregisterTempActor(path: ActorPath): Unit
/** /**
* Actor factory with create-only semantics: will create an actor as * Actor factory with create-only semantics: will create an actor as
* described by props with the given supervisor and path (may be different * described by props with the given supervisor and path (may be different
@ -89,12 +111,6 @@ trait ActorRefProvider {
*/ */
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
/**
* Create AskActorRef and register it properly so it can be serialized/deserialized;
* caller needs to send the message.
*/
def ask(within: Timeout): Option[AskActorRef]
/** /**
* This Future is completed upon termination of this ActorRefProvider, which * This Future is completed upon termination of this ActorRefProvider, which
* is usually initiated by stopping the guardian via ActorSystem.stop(). * is usually initiated by stopping the guardian via ActorSystem.stop().
@ -308,6 +324,8 @@ class LocalActorRefProvider(
val path = rootPath / "bubble-walker" val path = rootPath / "bubble-walker"
def provider: ActorRefProvider = LocalActorRefProvider.this
override def stop() = stopped switchOn { override def stop() = stopped switchOn {
terminationFuture.complete(causeOfTermination.toLeft(())) terminationFuture.complete(causeOfTermination.toLeft(()))
} }
@ -426,7 +444,17 @@ class LocalActorRefProvider(
lazy val systemGuardian: InternalActorRef = lazy val systemGuardian: InternalActorRef =
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None) actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None)
lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log) lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)
def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = {
assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()")
tempContainer.addChild(path.name, actorRef)
}
def unregisterTempActor(path: ActorPath): Unit = {
assert(path.parent eq tempNode, "cannot unregisterTempActor() with anything not obtained from tempPath()")
tempContainer.removeChild(path.name)
}
val deathWatch = new LocalDeathWatch(1024) //TODO make configrable val deathWatch = new LocalDeathWatch(1024) //TODO make configrable
@ -465,7 +493,7 @@ class LocalActorRefProvider(
} else ref.getChild(path.iterator) match { } else ref.getChild(path.iterator) match {
case Nobody case Nobody
log.debug("look-up of path sequence '{}' failed", path) log.debug("look-up of path sequence '{}' failed", path)
new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path) new EmptyLocalActorRef(eventStream, system.provider, dispatcher, ref.path / path)
case x x case x x
} }
@ -480,25 +508,6 @@ class LocalActorRefProvider(
new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path) new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path)
} }
} }
def ask(within: Timeout): Option[AskActorRef] = {
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0 None
case t
val path = tempPath()
val name = path.name
val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch)
tempContainer.addChild(name, a)
val result = a.result
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
result onComplete { _
try { a.stop(); f.cancel() }
finally { tempContainer.removeChild(name) }
}
Some(a)
}
}
} }
class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification { class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification {

View file

@ -6,6 +6,7 @@ package akka.actor
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.event._ import akka.event._
import akka.dispatch._ import akka.dispatch._
import akka.pattern.ask
import akka.util.duration._ import akka.util.duration._
import akka.util.Timeout._ import akka.util.Timeout._
import org.jboss.netty.akka.util.HashedWheelTimer import org.jboss.netty.akka.util.HashedWheelTimer
@ -396,10 +397,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// the provider is expected to start default loggers, LocalActorRefProvider does this // the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this) provider.init(this)
_log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass) _log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass)
deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") deadLetters.init(provider, lookupRoot.path / "deadLetters")
registerOnTermination(stopScheduler()) registerOnTermination(stopScheduler())
// this starts the reaper actor and the user-configured logging subscribers, which are also actors // this starts the reaper actor and the user-configured logging subscribers, which are also actors
_locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) _locker = new Locker(scheduler, ReaperInterval, provider, lookupRoot.path / "locker", deathWatch)
loadExtensions() loadExtensions()
if (LogConfigOnStart) logConfiguration() if (LogConfigOnStart) logConfiguration()
this this

View file

@ -9,7 +9,15 @@ import akka.util.duration._
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.event.DeathWatch import akka.event.DeathWatch
class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { /**
* Internal implementation detail for disposing of orphaned actors.
*/
private[akka] class Locker(
scheduler: Scheduler,
period: Duration,
val provider: ActorRefProvider,
val path: ActorPath,
val deathWatch: DeathWatch) extends MinimalActorRef {
class DavyJones extends Runnable { class DavyJones extends Runnable {
def run = { def run = {

View file

@ -32,8 +32,8 @@ trait Scheduler {
/** /**
* Schedules a message to be sent repeatedly with an initial delay and * Schedules a message to be sent repeatedly with an initial delay and
* frequency. E.g. if you would like a message to be sent immediately and * frequency. E.g. if you would like a message to be sent immediately and
* thereafter every 500ms you would set delay = Duration.Zero and frequency * thereafter every 500ms you would set delay=Duration.Zero and
* = Duration(500, TimeUnit.MILLISECONDS) * frequency=Duration(500, TimeUnit.MILLISECONDS)
* *
* Java & Scala API * Java & Scala API
*/ */
@ -260,4 +260,4 @@ class DefaultCancellable(val timeout: HWTimeout) extends Cancellable {
def isCancelled: Boolean = { def isCancelled: Boolean = {
timeout.isCancelled timeout.isCancelled
} }
} }

View file

@ -348,17 +348,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "equals" (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "hashCode" actor.hashCode.asInstanceOf[AnyRef] case "hashCode" actor.hashCode.asInstanceOf[AnyRef]
case _ case _
import akka.pattern.ask
MethodCall(method, args) match { MethodCall(method, args) match {
case m if m.isOneWay actor ! m; null //Null return value case m if m.isOneWay actor ! m; null //Null return value
case m if m.returnsFuture_? actor.?(m, timeout) case m if m.returnsFuture_? ask(actor, m)(timeout)
case m if m.returnsJOption_? || m.returnsOption_? case m if m.returnsJOption_? || m.returnsOption_?
val f = actor.?(m, timeout) val f = ask(actor, m)(timeout)
(try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException None }) match { (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException None }) match {
case None | Some(Right(null)) if (m.returnsJOption_?) JOption.none[Any] else None case None | Some(Right(null)) if (m.returnsJOption_?) JOption.none[Any] else None
case Some(Right(joption: AnyRef)) joption case Some(Right(joption: AnyRef)) joption
case Some(Left(ex)) throw ex case Some(Left(ex)) throw ex
} }
case m Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] case m Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef]
} }
} }
} }

View file

@ -155,6 +155,7 @@ trait LoggingBus extends ActorEventBus {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name) val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds) implicit val timeout = Timeout(3 seconds)
import akka.pattern.ask
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
case _: TimeoutException case _: TimeoutException
publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
@ -648,6 +649,7 @@ object Logging {
*/ */
class StandardOutLogger extends MinimalActorRef with StdOutLogger { class StandardOutLogger extends MinimalActorRef with StdOutLogger {
val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger") val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger")
def provider: ActorRefProvider = throw new UnsupportedOperationException("StandardOutLogger does not provide")
override val toString = "StandardOutLogger" override val toString = "StandardOutLogger"
override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message) override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message)
} }

View file

@ -0,0 +1,136 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeoutException
import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath }
import akka.dispatch.{ Promise, Terminate, SystemMessage, Future }
import akka.event.DeathWatch
import akka.actor.ActorRefProvider
import akka.util.Timeout
/**
* This is what is used to complete a Future that is returned from an ask/? call,
* when it times out.
*/
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
def this(message: String) = this(message, null: Throwable)
}
/**
* This object contains implementation details of the ask pattern.
*/
object AskSupport {
/**
* Implementation detail of the ask pattern enrichment of ActorRef
*/
private[akka] final class AskableActorRef(val actorRef: ActorRef) {
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* flow {
* val f = worker.ask(request)(timeout)
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* flow {
* val f = worker ? request
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
}
/**
* Akka private optimized representation of the temporary actor spawned to
* receive the reply to an "ask" operation.
*/
private[akka] final class PromiseActorRef(
val provider: ActorRefProvider,
val path: ActorPath,
override val getParent: InternalActorRef,
val result: Promise[Any],
val deathWatch: DeathWatch) extends MinimalActorRef {
final val running = new AtomicBoolean(true)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match {
case Status.Success(r) result.success(r)
case Status.Failure(f) result.failure(f)
case other result.success(other)
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case _
}
override def isTerminated = result.isCompleted
override def stop(): Unit = if (running.getAndSet(false)) {
deathWatch.publish(Terminated(this))
}
}
def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
val path = provider.tempPath()
val result = Promise[Any]()(provider.dispatcher)
val a = new PromiseActorRef(provider, path, provider.tempContainer, result, provider.deathWatch)
provider.registerTempActor(a, path)
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) }
result onComplete { _
try { a.stop(); f.cancel() }
finally { provider.unregisterTempActor(path) }
}
a
}
}

View file

@ -3,16 +3,90 @@
*/ */
package akka.pattern package akka.pattern
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Future
import akka.util.Duration
/**
* Patterns is the Java API for the Akka patterns that provide solutions
* to commonly occurring problems.
*/
object Patterns { object Patterns {
import akka.actor.{ ActorRef, ActorSystem }
import akka.dispatch.Future
import akka.pattern.{ ask scalaAsk }
import akka.util.{ Timeout, Duration }
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final Future<Object> f = Patterns.ask(worker, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* }}}
*/
def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]]
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final Future<Object> f = Patterns.ask(worker, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* }}}
*/
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]]
/**
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
* the result to the given actor reference. Returns the original Future to
* allow method chaining.
*
* <b>Recommended usage example:</b>
*
* {{{
* final Future<Object> f = Patterns.ask(worker, request, timeout);
* // apply some transformation (i.e. enrich with request info)
* final Future<Object> transformed = f.map(new akka.japi.Function<Object, Object>() { ... });
* // send it on to the next stage
* Patterns.pipeTo(transformed, nextActor);
* }}}
*/
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef)
/** /**
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
@ -27,4 +101,4 @@ object Patterns {
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = { def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = {
akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
} }
} }

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.actor.ActorRef
import akka.dispatch.Future
object PipeToSupport {
class PipeableFuture[T](val future: Future[T]) {
def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef)
}
}

View file

@ -3,23 +3,142 @@
*/ */
package akka package akka
import akka.actor.Actor import akka.actor._
import akka.actor.ActorRef import akka.dispatch.{ Future, Promise }
import akka.actor.ActorSystem import akka.util.{ Timeout, Duration }
import akka.actor.ActorTimeoutException
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
import akka.dispatch.Future
import akka.dispatch.Promise
import akka.util.Duration
/** /**
* Akka patterns that provide solutions to commonly occurring problems. * == Commonly Used Patterns With Akka ==
*
* This package is used as a collection point for usage patterns which involve
* actors, futures, etc. but are loosely enough coupled to (multiple of) them
* to present them separately from the core implementation. Currently supported
* are:
*
* <ul>
* <li><b>ask:</b> create a temporary one-off actor for receiving a reply to a
* message and complete a [[akka.dispatch.Future]] with it; returns said
* Future.</li>
* <li><b>pipeTo:</b> feed eventually computed value of a future to an actor as
* a message.</li>
* </ul>
*
* In Scala the recommended usage is to import the pattern from the package
* object:
* {{{
* import akka.pattern.ask
*
* ask(actor, message) // use it directly
* actor ask message // use it by implicit conversion
* }}}
*
* For Java the patterns are available as static methods of the [[akka.pattern.Patterns]]
* class:
* {{{
* import static akka.pattern.Patterns.ask;
*
* ask(actor, message);
* }}}
*/ */
package object pattern { package object pattern {
/**
* Import this implicit conversion to gain `?` and `ask` methods on
* [[akka.actor.ActorRef]], which will defer to the
* `ask(actorRef, message)(timeout)` method defined here.
*
* {{{
* import akka.pattern.ask
*
* val future = actor ? message // => ask(actor, message)
* val future = actor ask message // => ask(actor, message)
* val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout)
* }}}
*
* All of the above use an implicit [[akka.actor.Timeout]].
*/
implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = ask(worker, request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see [[akka.dispatch.Future]] for a description of `flow`]
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated
actorRef.tell(message)
Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher)
case ref: InternalActorRef
val provider = ref.provider
if (timeout.duration.length <= 0) {
actorRef.tell(message)
Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher)
} else {
val a = AskSupport.createAsker(provider, timeout)
actorRef.tell(message, a)
a.result
}
case _ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
}
/**
* Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]:
*
* {{{
* import akka.pattern.pipeTo
*
* Future { doExpensiveCalc() } pipeTo nextActor
* }}}
*/
implicit def pipeTo[T](future: Future[T]): PipeToSupport.PipeableFuture[T] = new PipeToSupport.PipeableFuture(future)
/**
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
* the result to the given actor reference. Returns the original Future to
* allow method chaining.
*
* <b>Recommended usage example:</b>
*
* {{{
* val f = ask(worker, request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see [[akka.dispatch.Future]] for a description of `flow`]
*/
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = {
future onComplete {
case Right(r) actorRef ! r
case Left(f) actorRef ! Status.Failure(f)
}
future
}
/** /**
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been * existing messages of the target actor has been processed and the actor has been

View file

@ -4,7 +4,7 @@
package akka.routing package akka.routing
import akka.actor._ import akka.actor._
import akka.dispatch.Future import akka.dispatch.{ Future, Promise }
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -12,6 +12,7 @@ import akka.util.{ Duration, Timeout }
import akka.util.duration._ import akka.util.duration._
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.pattern.AskSupport
import scala.collection.JavaConversions.iterableAsScalaIterable import scala.collection.JavaConversions.iterableAsScalaIterable
/** /**
@ -95,11 +96,6 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
} }
} }
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
resize()
super.?(message)(timeout)
}
def resize() { def resize() {
for (r _props.routerConfig.resizer) { for (r _props.routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) {
@ -699,10 +695,7 @@ trait BroadcastLike { this: RouterConfig ⇒
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
{ {
case (sender, message) case (sender, message) toAll(sender, routeeProvider.routees)
message match {
case _ toAll(sender, routeeProvider.routees)
}
} }
} }
} }
@ -774,12 +767,10 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
{ {
case (sender, message) case (sender, message)
// FIXME avoid this cast val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider
val asker = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get val asker = AskSupport.createAsker(provider, within)
asker.result.pipeTo(sender) asker.result.pipeTo(sender)
message match { toAll(asker, routeeProvider.routees)
case _ toAll(asker, routeeProvider.routees)
}
} }
} }
} }

View file

@ -568,4 +568,3 @@ object Timeout {
implicit def intToTimeout(timeout: Int) = new Timeout(timeout) implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout) implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
} }

View file

@ -7,6 +7,7 @@ package akka.agent
import akka.actor._ import akka.actor._
import akka.japi.{ Function JFunc, Procedure JProc } import akka.japi.{ Function JFunc, Procedure JProc }
import akka.dispatch._ import akka.dispatch._
import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.stm._ import scala.concurrent.stm._
@ -123,7 +124,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* that new state can be obtained within the given timeout. * that new state can be obtained within the given timeout.
*/ */
def alter(f: T T)(timeout: Timeout): Future[T] = { def alter(f: T T)(timeout: Timeout): Future[T] = {
def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]] def dispatch = ask(updater, Alter(f))(timeout).asInstanceOf[Future[T]]
val txn = Txn.findCurrent val txn = Txn.findCurrent
if (txn.isDefined) { if (txn.isDefined) {
val result = Promise[T]()(system.dispatcher) val result = Promise[T]()(system.dispatcher)
@ -171,7 +172,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
send((value: T) { send((value: T) {
suspend() suspend()
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher("akka.agent.alter-off-dispatcher")) val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher("akka.agent.alter-off-dispatcher"))
result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]] result completeWith ask(threadBased, Alter(f))(timeout).asInstanceOf[Future[T]]
value value
}) })
result result

View file

@ -13,6 +13,7 @@ import akka.actor.Props;
import akka.actor.Terminated; import akka.actor.Terminated;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.dispatch.Await; import akka.dispatch.Await;
import static akka.pattern.Patterns.ask;
import akka.util.Duration; import akka.util.Duration;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe; import akka.testkit.TestProbe;
@ -160,19 +161,19 @@ public class FaultHandlingTestBase {
//#create //#create
Props superprops = new Props(Supervisor.class); Props superprops = new Props(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor"); ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); ActorRef child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
//#create //#create
//#resume //#resume
child.tell(42); child.tell(42);
assert Await.result(child.ask("get", 5000), timeout).equals(42); assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException()); child.tell(new ArithmeticException());
assert Await.result(child.ask("get", 5000), timeout).equals(42); assert Await.result(ask(child, "get", 5000), timeout).equals(42);
//#resume //#resume
//#restart //#restart
child.tell(new NullPointerException()); child.tell(new NullPointerException());
assert Await.result(child.ask("get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#restart //#restart
//#stop //#stop
@ -183,9 +184,9 @@ public class FaultHandlingTestBase {
//#stop //#stop
//#escalate-kill //#escalate-kill
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
probe.watch(child); probe.watch(child);
assert Await.result(child.ask("get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception()); child.tell(new Exception());
probe.expectMsg(new Terminated(child)); probe.expectMsg(new Terminated(child));
//#escalate-kill //#escalate-kill
@ -193,11 +194,11 @@ public class FaultHandlingTestBase {
//#escalate-restart //#escalate-restart
superprops = new Props(Supervisor2.class); superprops = new Props(Supervisor2.class);
supervisor = system.actorOf(superprops, "supervisor2"); supervisor = system.actorOf(superprops, "supervisor2");
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
child.tell(23); child.tell(23);
assert Await.result(child.ask("get", 5000), timeout).equals(23); assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception()); child.tell(new Exception());
assert Await.result(child.ask("get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#escalate-restart //#escalate-restart
//#testkit //#testkit
} }

View file

@ -11,6 +11,7 @@ import akka.actor.Props;
//#import-future //#import-future
import akka.dispatch.Future; import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.dispatch.Await; import akka.dispatch.Await;
import akka.util.Duration; import akka.util.Duration;
import akka.util.Timeout; import akka.util.Timeout;
@ -36,6 +37,17 @@ import akka.util.Duration;
import akka.actor.ActorTimeoutException; import akka.actor.ActorTimeoutException;
//#import-gracefulStop //#import-gracefulStop
//#import-askPipeTo
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipeTo;
import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.util.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
//#import-askPipeTo
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
@ -44,7 +56,10 @@ import akka.dispatch.MessageDispatcher;
import org.junit.Test; import org.junit.Test;
import scala.Option; import scala.Option;
import java.lang.Object; import java.lang.Object;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import akka.pattern.Patterns;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -123,7 +138,7 @@ public class UntypedActorDocTestBase {
}), "myactor"); }), "myactor");
//#using-ask //#using-ask
Future<Object> future = myActor.ask("Hello", 1000); Future<Object> future = Patterns.ask(myActor, "Hello", 1000);
Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
//#using-ask //#using-ask
system.shutdown(); system.shutdown();
@ -175,7 +190,7 @@ public class UntypedActorDocTestBase {
public void useWatch() { public void useWatch() {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(WatchActor.class)); ActorRef myActor = system.actorOf(new Props(WatchActor.class));
Future<Object> future = myActor.ask("kill", 1000); Future<Object> future = Patterns.ask(myActor, "kill", 1000);
assert Await.result(future, Duration.parse("1 second")).equals("finished"); assert Await.result(future, Duration.parse("1 second")).equals("finished");
system.shutdown(); system.shutdown();
} }
@ -196,6 +211,43 @@ public class UntypedActorDocTestBase {
//#gracefulStop //#gracefulStop
system.shutdown(); system.shutdown();
} }
class Result {
final int x;
final String s;
public Result(int x, String s) {
this.x = x;
this.s = s;
}
}
@Test
public void usePatternsAskPipeTo() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class));
//#ask-pipeTo
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout
futures.add(ask(actorB, "reqeest", t)); // using timeout from above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher());
final Future<Result> transformed = aggregate.map(new akka.japi.Function<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String s = (String) it.next();
final int x = (Integer) it.next();
return new Result(x, s);
}
});
pipeTo(transformed, actorC);
//#ask-pipeTo
}
public static class MyActor extends UntypedActor { public static class MyActor extends UntypedActor {

View file

@ -53,9 +53,9 @@ import akka.actor.Status.Failure;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.docs.actor.MyUntypedActor;
import akka.actor.Props; import akka.actor.Props;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.pattern.Patterns;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -79,7 +79,7 @@ public class FutureDocTestBase {
String msg = "hello"; String msg = "hello";
//#ask-blocking //#ask-blocking
Timeout timeout = system.settings().ActorTimeout(); Timeout timeout = system.settings().ActorTimeout();
Future<Object> future = actor.ask(msg, timeout); Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration()); String result = (String) Await.result(future, timeout.duration());
//#ask-blocking //#ask-blocking
assertEquals("HELLO", result); assertEquals("HELLO", result);

View file

@ -19,6 +19,7 @@ import akka.dispatch.Await;
import akka.dispatch.Future; import akka.dispatch.Future;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import static akka.pattern.Patterns.ask;
import static akka.docs.jrouting.CustomRouterDocTestBase.DemocratActor; import static akka.docs.jrouting.CustomRouterDocTestBase.DemocratActor;
import static akka.docs.jrouting.CustomRouterDocTestBase.RepublicanActor; import static akka.docs.jrouting.CustomRouterDocTestBase.RepublicanActor;
@ -48,8 +49,8 @@ public class CustomRouterDocTestBase {
routedActor.tell(DemocratVote); routedActor.tell(DemocratVote);
routedActor.tell(RepublicanVote); routedActor.tell(RepublicanVote);
Timeout timeout = new Timeout(Duration.parse("1 seconds")); Timeout timeout = new Timeout(Duration.parse("1 seconds"));
Future<Object> democratsResult = routedActor.ask(DemocratCountResult, timeout); Future<Object> democratsResult = ask(routedActor, DemocratCountResult, timeout);
Future<Object> republicansResult = routedActor.ask(RepublicanCountResult, timeout); Future<Object> republicansResult = ask(routedActor, RepublicanCountResult, timeout);
assertEquals(3, Await.result(democratsResult, timeout.duration())); assertEquals(3, Await.result(democratsResult, timeout.duration()));
assertEquals(2, Await.result(republicansResult, timeout.duration())); assertEquals(2, Await.result(republicansResult, timeout.duration()));

View file

@ -55,8 +55,8 @@ public class ParentActor extends UntypedActor {
new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration
.parse("2 seconds"))), "router"); .parse("2 seconds"))), "router");
Timeout timeout = getContext().system().settings().ActorTimeout(); Timeout timeout = getContext().system().settings().ActorTimeout();
Future<Object> futureResult = scatterGatherFirstCompletedRouter.ask(new FibonacciActor.FibonacciNumber(10), Future<Object> futureResult = akka.pattern.Patterns.ask(
timeout); scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout);
int result = (Integer) Await.result(futureResult, timeout.duration()); int result = (Integer) Await.result(futureResult, timeout.duration());
//#scatterGatherFirstCompletedRouter //#scatterGatherFirstCompletedRouter
System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result)); System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result));

View file

@ -10,6 +10,7 @@ import org.junit.Test;
//#imports //#imports
import akka.actor.*; import akka.actor.*;
import akka.dispatch.Await; import akka.dispatch.Await;
import static akka.pattern.Patterns.ask;
import akka.transactor.Coordinated; import akka.transactor.Coordinated;
import akka.util.Duration; import akka.util.Duration;
import akka.util.Timeout; import akka.util.Timeout;
@ -30,7 +31,7 @@ public class TransactorDocTest {
counter1.tell(new Coordinated(new Increment(counter2), timeout)); counter1.tell(new Coordinated(new Increment(counter2), timeout));
Integer count = (Integer) Await.result(counter1.ask("GetCount", timeout), timeout.duration()); Integer count = (Integer) Await.result(ask(counter1, "GetCount", timeout), timeout.duration());
//#coordinated-example //#coordinated-example
assertEquals(count, new Integer(1)); assertEquals(count, new Integer(1));
@ -71,7 +72,7 @@ public class TransactorDocTest {
counter.tell(coordinated.coordinate(new Increment())); counter.tell(coordinated.coordinate(new Increment()));
coordinated.await(); coordinated.await();
Integer count = (Integer) Await.result(counter.ask("GetCount", timeout), timeout.duration()); Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration());
assertEquals(count, new Integer(1)); assertEquals(count, new Integer(1));
system.shutdown(); system.shutdown();
@ -88,10 +89,10 @@ public class TransactorDocTest {
friendlyCounter.tell(coordinated.coordinate(new Increment(friend))); friendlyCounter.tell(coordinated.coordinate(new Increment(friend)));
coordinated.await(); coordinated.await();
Integer count1 = (Integer) Await.result(friendlyCounter.ask("GetCount", timeout), timeout.duration()); Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration());
assertEquals(count1, new Integer(1)); assertEquals(count1, new Integer(1));
Integer count2 = (Integer) Await.result(friend.ask("GetCount", timeout), timeout.duration()); Integer count2 = (Integer) Await.result(ask(friend, "GetCount", timeout), timeout.duration());
assertEquals(count2, new Integer(1)); assertEquals(count2, new Integer(1));
system.shutdown(); system.shutdown();

View file

@ -316,26 +316,37 @@ If invoked without the sender parameter the sender will be
Ask: Send-And-Receive-Future Ask: Send-And-Receive-Future
---------------------------- ----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and The ``ask`` pattern involves actors as well as futures, hence it is offered as
will immediately return a :class:`Future` which will be completed with a use pattern rather than a method on :class:`ActorRef`:
an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: java .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipeTo
long timeoutMillis = 1000; .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipeTo
Future future = actorRef.ask("Hello", timeoutMillis);
The receiving actor should reply to this message, which will complete the This example demonstrates ``ask`` together with the ``pipeTo`` pattern on
future with the reply message as value; ``getSender.tell(result)``. futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: ``ask`` produces
a :class:`Future`, two of which are composed into a new future using the
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipeTo`` installs
an ``onComplete``-handler on the future to effect the submission of the
aggregated :class:`Result` to another actor.
Using ``ask`` will send a message to the receiving Actor as with ``tell``, and
the receiving actor must reply with ``getSender().tell(reply)`` in order to
complete the returned :class:`Future` with a value. The ``ask`` operation
involves creating an internal actor for handling this reply, which needs to
have a timeout after which it is destroyed in order not to leak resources; see
more below.
To complete the future with an exception you need send a Failure message to the sender. To complete the future with an exception you need send a Failure message to the sender.
This is not done automatically when an actor throws an exception while processing a This is *not done automatically* when an actor throws an exception while processing a
message. message.
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#reply-exception .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#reply-exception
If the actor does not complete the future, it will expire after the timeout period, If the actor does not complete the future, it will expire after the timeout period,
specified as parameter to the ``ask`` method. specified as parameter to the ``ask`` method; this will complete the
:class:`Future` with an :class:`AskTimeoutException`.
See :ref:`futures-java` for more information on how to await or query a See :ref:`futures-java` for more information on how to await or query a
future. future.
@ -354,15 +365,6 @@ Gives you a way to avoid blocking.
there is not yet a way to detect these illegal accesses at compile time. See also: there is not yet a way to detect these illegal accesses at compile time. See also:
:ref:`jmm-shared-state` :ref:`jmm-shared-state`
The future returned from the ``ask`` method can conveniently be passed around or
chained with further processing steps, but sometimes you just need the value,
even if that entails waiting for it (but keep in mind that waiting inside an
actor is prone to dead-locks, e.g. if obtaining the result depends on
processing another message on this actor).
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java
:include: import-future,using-ask
Forward message Forward message
--------------- ---------------

View file

@ -22,8 +22,8 @@ anything is able to run again. Therefore we provide a migration kit that
makes it possible to do the migration changes in smaller steps. makes it possible to do the migration changes in smaller steps.
The migration kit only covers the most common usage of Akka. It is not intended The migration kit only covers the most common usage of Akka. It is not intended
as a final solution. The whole migration kit is deprecated and will be removed in as a final solution. The whole migration kit is marked as deprecated and will
Akka 2.1. be removed in Akka 2.1.
The migration kit is provided in separate jar files. Add the following dependency:: The migration kit is provided in separate jar files. Add the following dependency::
@ -136,7 +136,8 @@ v1.3::
v2.0:: v2.0::
system.shutdown() system.shutdown() // from outside of this system
context.system.shutdown() // from inside any actor
Documentation: Documentation:
@ -149,7 +150,11 @@ Identifying Actors
In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``. In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``.
The ``ActorRegistry`` has been replaced by actor paths and lookup with The ``ActorRegistry`` has been replaced by actor paths and lookup with
``actorFor`` in ``ActorRefProvider`` (``ActorSystem`` or ``ActorContext``). ``actorFor`` in ``ActorRefProvider`` (``ActorSystem`` or ``ActorContext``). It
is no longer possible to obtain references to all actors being implemented by a
certain class (the reason being that this property is not known yet when an
:class:`ActorRef` is created because instantiation of the actor itself is
asynchronous).
v1.3:: v1.3::
@ -170,7 +175,9 @@ Reply to messages
^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^
``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala) ``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala)
or ``getSender()`` (Java). This works for both tell (!) and ask (?). or ``getSender()`` (Java). This works for both tell (!) and ask (?). Sending to
an actor reference never throws an exception, hence :meth:`tryTell` and
:meth:`tryReply` are removed.
v1.3:: v1.3::
@ -200,11 +207,61 @@ reply to be received; it is independent of the timeout applied when awaiting
completion of the :class:`Future`, however, the actor will complete the completion of the :class:`Future`, however, the actor will complete the
:class:`Future` with an :class:`AskTimeoutException` when it stops itself. :class:`Future` with an :class:`AskTimeoutException` when it stops itself.
Since there is no good library default value for the ask-timeout, specification
of a timeout is required for all usages as shown below.
Also, since the ``ask`` feature is coupling futures and actors, it is no longer
offered on the :class:`ActorRef` itself, but instead as a use pattern to be
imported. While Scalas implicit conversions enable transparent replacement,
Java code will have to be changed by more than just adding an import statement.
v1.3::
actorRef ? message // Scala
actorRef.ask(message, timeout); // Java
v2.0 (Scala)::
import akka.pattern.ask
implicit val timeout: Timeout = ...
actorRef ? message // uses implicit timeout
actorRef ask message // uses implicit timeout
actorRef.ask(message)(timeout) // uses explicit timeout
ask(actorRef, message) // uses implicit timeout
ask(actorRef, message)(timeout) // uses explicit timeout
v2.0 (Java)::
import akka.pattern.Patterns;
Patterns.ask(actorRef, message, timeout)
Documentation: Documentation:
* :ref:`actors-scala` * :ref:`actors-scala`
* :ref:`untyped-actors-java` * :ref:`untyped-actors-java`
``ActorRef.?(msg, timeout)``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This method has a dangerous overlap with ``ActorRef.?(msg)(implicit timeout)``
due to the fact that Scala allows to pass a :class:`Tuple` in place of the
message without requiring extra parentheses::
actor ? (1, "hallo") // will send a tuple
actor ? (1, Timeout()) // will send 1 with an explicit timeout
To remove this ambiguity, the latter variant is removed in version 2.0. If you
were using it before, it will now send tuples where that is not desired. In
order to correct all places in the code where this happens, simply import
``akka.migration.ask`` instead of ``akka.pattern.ask`` to obtain a variant
which will give deprecation warnings where the old method signature is used::
import akka.migration.ask
actor ? (1, Timeout(2 seconds)) // will give deprecation warning
ActorPool ActorPool
^^^^^^^^^ ^^^^^^^^^
@ -305,7 +362,8 @@ v2.0::
import akka.event.Logging import akka.event.Logging
val log = Logging(context.system, this) val log = Logging(context.system, this) // will include system name in message source
val log = Logging(system.eventStream, getClass.getName) // will not include system name
log.error(exception, message) log.error(exception, message)
log.warning(message) log.warning(message)
log.info(message) log.info(message)
@ -485,17 +543,25 @@ Documentation:
Spawn Spawn
^^^^^ ^^^^^
``spawn`` has been removed and can be implemented like this, if needed. Be careful to not ``spawn`` has been removed and should be replaced by creating a :class:`Future`. Be careful to not
access any shared mutable state closed over by the body. access any shared mutable state closed over by the body.
:: Scala::
def spawn(body: ⇒ Unit) { Future { doSomething() } // will be executed asynchronously
system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) })) ! "go"
} Java::
Futures.future<String>(new Callable<String>() {
public String call() {
doSomething();
}
}, executionContext);
Documentation: Documentation:
* :ref:`futures-scala`
* :ref:`futures-java`
* :ref:`jmm` * :ref:`jmm`
HotSwap HotSwap
@ -505,7 +571,10 @@ In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e.
The special ``HotSwap`` and ``RevertHotswap`` messages in v1.3 has been removed. Similar can be The special ``HotSwap`` and ``RevertHotswap`` messages in v1.3 has been removed. Similar can be
implemented with your own message and using ``context.become`` and ``context.unbecome`` implemented with your own message and using ``context.become`` and ``context.unbecome``
in the actor receiving the message. in the actor receiving the message. The rationale is that being able to replace
any actors behavior generically is not a good idea because actor implementors
would have no way to defend against that; hence the change to lay it into the
hands of the actor itself.
* :ref:`actors-scala` * :ref:`actors-scala`
* :ref:`untyped-actors-java` * :ref:`untyped-actors-java`

Some files were not shown because too many files have changed in this diff Show more