Actor-to-actor ask for typed, #23770

This commit is contained in:
Johan Andrén 2018-01-19 18:13:24 +01:00 committed by Patrik Nordwall
parent 49aee62f30
commit 034d6c6e6a
20 changed files with 811 additions and 36 deletions

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.javadsl;
import akka.actor.ActorSystem;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import akka.testkit.typed.javadsl.TestProbe;
import akka.util.Timeout;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.TimeUnit;
public class ActorContextAskTest extends JUnitSuite {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorSelectionTest",
AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
static class Ping {
final ActorRef<Pong> respondTo;
public Ping(ActorRef<Pong> respondTo) {
this.respondTo = respondTo;
}
}
static class Pong { }
@Test
public void provideASafeAsk() {
final Behavior<Ping> pingPongBehavior = Behaviors.immutable((ActorContext<Ping> context, Ping message) -> {
message.respondTo.tell(new Pong());
return Behaviors.same();
});
final ActorRef<Ping> pingPong = Adapter.spawnAnonymous(system, pingPongBehavior);
final TestProbe<Object> probe = new TestProbe<>(Adapter.toTyped(system));
final Behavior<Object> snitch = Behaviors.deferred((ActorContext<Object> ctx) -> {
ctx.ask(Pong.class,
pingPong,
new Timeout(3, TimeUnit.SECONDS),
(ActorRef<Pong> ref) -> new Ping(ref),
(pong, exception) -> {
if (pong != null) return pong;
else return exception;
});
return Behaviors.immutable((ActorContext<Object> context, Object message) -> {
probe.ref().tell(message);
return Behaviors.same();
});
});
Adapter.spawnAnonymous(system, snitch);
probe.expectMsgType(Pong.class);
}
}

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.BehaviorBuilder;
import akka.actor.typed.javadsl.Behaviors;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class InteractionPatternsTest extends JUnitSuite {
// #fire-and-forget
interface PrinterProtocol {}
class DisableOutput implements PrinterProtocol {}
class EnableOutput implements PrinterProtocol {}
class PrintMe implements PrinterProtocol {
public final String message;
public PrintMe(String message) {
this.message = message;
}
}
public Behavior<PrinterProtocol> enabledPrinterBehavior() {
return BehaviorBuilder.<PrinterProtocol>create()
.onMessage(DisableOutput.class, (ctx, disableOutput) -> disabledPrinterBehavior())
.onMessage(PrintMe.class, (ctx, printMe) -> {
System.out.println(printMe.message);
return Behaviors.same();
}).build();
}
public Behavior<PrinterProtocol> disabledPrinterBehavior() {
return BehaviorBuilder.<PrinterProtocol>create()
.onMessage(EnableOutput.class, (ctx, enableOutput) -> enabledPrinterBehavior())
.build();
}
// #fire-and-forget
@Test
public void fireAndForgetSample() throws Exception {
// #fire-and-forget
final ActorSystem<PrinterProtocol> system =
ActorSystem.create(enabledPrinterBehavior(), "printer-sample-system");
// note that system is also the ActorRef to the guardian actor
final ActorRef<PrinterProtocol> ref = system;
// these are all fire and forget
ref.tell(new PrintMe("message"));
ref.tell(new DisableOutput());
ref.tell(new PrintMe("message"));
ref.tell(new EnableOutput());
// #fire-and-forget
Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS));
}
}

View file

@ -4,14 +4,16 @@
package akka.actor.typed
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors._
import akka.actor.typed.scaladsl.AskPattern._
import akka.pattern.AskTimeoutException
import akka.actor.typed.scaladsl.adapter._
import akka.testkit.typed.TestKit
import akka.util.Timeout
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, TimeoutException }
object AskSpec {
sealed trait Msg
@ -40,7 +42,9 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures {
val ref = spawn(behavior)
(ref ? Stop).futureValue
val answer = ref ? Foo("bar")
answer.recover { case _: AskTimeoutException "ask" }.futureValue should ===("ask")
val result = answer.failed.futureValue
result shouldBe a[TimeoutException]
result.getMessage should include("had already been terminated.")
}
"must succeed when the actor is alive" in {
@ -49,6 +53,15 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures {
response.futureValue should ===("foo")
}
"must fail the future if the actor doesn't reply in time" in {
val actor = spawn(Behaviors.empty[Foo])
implicit val timeout: Timeout = 10.millis
val answer = actor ? Foo("bar")
val result = answer.failed.futureValue
result shouldBe a[TimeoutException]
result.getMessage should startWith("Ask timed out on")
}
/** See issue #19947 (MatchError with adapted ActorRef) */
"must fail the future if the actor doesn't exist" in {
val noSuchActor: ActorRef[Msg] = system match {
@ -60,7 +73,36 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures {
}
val answer = noSuchActor ? Foo("bar")
answer.recover { case _: AskTimeoutException "ask" }.futureValue should ===("ask")
val result = answer.failed.futureValue
result shouldBe a[TimeoutException]
result.getMessage should include("had already been terminated")
}
"must transform a replied akka.actor.Status.Failure to a failed future" in {
// It's unlikely but possible that this happens, since the recieving actor would
// have to accept a message with an actoref that accepts AnyRef or be doing crazy casting
// For completeness sake though
implicit val untypedSystem = akka.actor.ActorSystem("AskSpec-untyped-1")
try {
case class Ping(respondTo: ActorRef[AnyRef])
val ex = new RuntimeException("not good!")
class LegacyActor extends akka.actor.Actor {
def receive = {
case Ping(respondTo) respondTo ! akka.actor.Status.Failure(ex)
}
}
val legacyActor = untypedSystem.actorOf(akka.actor.Props(new LegacyActor))
import scaladsl.AskPattern._
implicit val timeout: Timeout = 3.seconds
implicit val scheduler = untypedSystem.toTyped.scheduler
val typedLegacy: ActorRef[AnyRef] = legacyActor
(typedLegacy ? Ping).failed.futureValue should ===(ex)
} finally {
akka.testkit.TestKit.shutdownActorSystem(untypedSystem)
}
}
}
}

View file

@ -0,0 +1,143 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorRef, PostStop, Props, TypedAkkaSpecWithShutdown }
import akka.testkit.EventFilter
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.{ Failure, Success }
object ActorContextAskSpec {
val config = ConfigFactory.parseString(
"""
akka.loggers = ["akka.testkit.TestEventListener"]
ping-pong-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
}
snitch-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
}
""")
}
class ActorContextAskSpec extends TestKit(ActorContextAskSpec.config) with TypedAkkaSpecWithShutdown {
implicit val untyped = system.toUntyped // FIXME no typed event filter yet
"The Scala DSL ActorContext" must {
"provide a safe ask" in {
case class Ping(sender: ActorRef[Pong])
case class Pong(selfName: String, threadName: String)
val pingPong = spawn(Behaviors.immutable[Ping] { (ctx, msg)
msg.sender ! Pong(ctx.self.path.name, Thread.currentThread().getName)
Behaviors.same
}, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val probe = TestProbe[AnyRef]()
val snitch = Behaviors.deferred[Pong] { (ctx)
// Timeout comes from TypedAkkaSpec
ctx.ask(pingPong)(Ping) {
case Success(pong) Pong(ctx.self.path.name + "1", Thread.currentThread().getName)
case Failure(ex) throw ex
}
Behaviors.immutable {
case (ctx, pong: Pong)
probe.ref ! pong
Behaviors.same
}
}
spawn(snitch, "snitch", Props.empty.withDispatcherFromConfig("snitch-dispatcher"))
val pong = probe.expectMsgType[Pong]
pong.selfName should ===("snitch1")
pong.threadName should startWith("ActorContextAskSpec-snitch-dispatcher")
}
"fail actor when mapping does not match response" in {
val probe = TestProbe[AnyRef]()
trait Protocol
case class Ping(respondTo: ActorRef[Pong.type]) extends Protocol
case object Pong extends Protocol
val pingPong = spawn(Behaviors.immutable[Protocol]((_, msg)
msg match {
case Ping(respondTo)
respondTo ! Pong
Behaviors.same
}
))
val snitch = Behaviors.deferred[AnyRef] { (ctx)
ctx.ask(pingPong)(Ping) {
// uh oh, missing case for the response, this can never end well
case Failure(x) x
}
Behaviors.immutable[AnyRef] {
case (_, msg)
probe.ref ! msg
Behaviors.same
}.onSignal {
case (_, PostStop)
probe.ref ! "stopped"
Behaviors.same
}
}
EventFilter[MatchError](occurrences = 1, start = "Success(Pong)").intercept {
spawn(snitch)
}
// no-match should cause failure and subsequent stop of actor
probe.expectMsg("stopped")
}
"deal with timeouts in ask" in {
val probe = TestProbe[AnyRef]()
val snitch = Behaviors.deferred[AnyRef] { (ctx)
ctx.ask[String, String](system.deadLetters)(ref "boo") {
case Success(m) m
case Failure(x) x
}(20.millis, implicitly[ClassTag[String]])
Behaviors.immutable {
case (_, msg)
probe.ref ! msg
Behaviors.same
}
}
EventFilter.warning(occurrences = 1, message = "received dead letter without sender: boo").intercept {
EventFilter.info(occurrences = 1, start = "Message [java.lang.String] without sender").intercept {
spawn(snitch)
}
}
probe.expectMsgType[TimeoutException]
}
}
}

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.akka.typed
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedAkkaSpecWithShutdown }
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestKit
class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
"The interaction patterns docs" must {
"contain a sample for fire and forget" in {
// #fire-and-forget
sealed trait PrinterProtocol
case object DisableOutput extends PrinterProtocol
case object EnableOutput extends PrinterProtocol
case class PrintMe(message: String) extends PrinterProtocol
// two state behavior
def enabledPrinterBehavior: Behavior[PrinterProtocol] = Behaviors.immutable {
case (_, DisableOutput)
disabledPrinterBehavior
case (_, EnableOutput)
Behaviors.ignore
case (_, PrintMe(message))
println(message)
Behaviors.same
}
def disabledPrinterBehavior: Behavior[PrinterProtocol] = Behaviors.immutable {
case (_, DisableOutput)
enabledPrinterBehavior
case (_, _)
// ignore any message
Behaviors.ignore
}
val system = ActorSystem(enabledPrinterBehavior, "fire-and-forget-sample")
// note how the system is also the top level actor ref
val printer: ActorRef[PrinterProtocol] = system
// these are all fire and forget
printer ! PrintMe("printed")
printer ! DisableOutput
printer ! PrintMe("not printed")
printer ! EnableOutput
// #fire-and-forget
system.terminate().futureValue
}
}
}

View file

@ -4,10 +4,15 @@
package akka.actor.typed
package internal
import java.util.function.BiFunction
import java.util.{ ArrayList, Optional, function }
import akka.annotation.InternalApi
import java.util.Optional
import java.util.ArrayList
import akka.util.Timeout
import scala.concurrent.ExecutionContextExecutor
import scala.reflect.ClassTag
import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API
@ -62,6 +67,22 @@ import scala.concurrent.ExecutionContextExecutor
override def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.actor.typed.ActorRef[U] =
internalSpawnAdapter(f.apply, name)
// Scala API impl
override def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._
(otherActor ? createRequest)(responseTimeout, system.scheduler).onComplete(res
self.asInstanceOf[ActorRef[AnyRef]] ! new AskResponse(res, mapResponse)
)
}
// Java API impl
def ask[Req, Res](resClass: Class[Res], otherActor: ActorRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
this.ask(otherActor)(createRequest.apply) {
case Success(message) applyToResponse.apply(message, null)
case Failure(ex) applyToResponse.apply(null.asInstanceOf[Res], ex)
}(responseTimeout, ClassTag[Res](resClass))
}
/**
* INTERNAL API: Needed to make Scala 2.12 compiler happy.
* Otherwise "ambiguous reference to overloaded definition" because Function is lambda.

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.internal
import akka.annotation.InternalApi
import scala.util.Try
/**
* INTERNAL API
*
* Message wrapper used to allow ActorContext.ask to map the response inside the asking actor.
*/
@InternalApi
private[akka] final class AskResponse[T, U](result: Try[T], adapt: Try[T] U) {
def adapted: U = adapt(result)
}

View file

@ -35,6 +35,7 @@ import akka.util.OptionVal
next(Behavior.interpretSignal(behavior, ctx, msg), msg)
case a.ReceiveTimeout
next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg)
case msg: AskResponse[AnyRef, T] @unchecked receive(msg.adapted)
case msg: T @unchecked
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
}

View file

@ -33,6 +33,7 @@ private[akka] object ActorRefAdapter {
def toUntyped[U](ref: ActorRef[U]): akka.actor.InternalActorRef =
ref match {
case adapter: ActorRefAdapter[_] adapter.untyped
case system: ActorSystemAdapter[_] system.untyped.guardian
case _
throw new UnsupportedOperationException("only adapted untyped ActorRefs permissible " +
s"($ref of class ${ref.getClass.getName})")

View file

@ -9,6 +9,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.EmptyProps
import akka.actor.typed.Props
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
/**
* INTERNAL API
@ -16,7 +17,13 @@ import akka.annotation.InternalApi
@InternalApi private[akka] object PropsAdapter {
def apply[T](behavior: () Behavior[T], deploy: Props = Props.empty): akka.actor.Props = {
// FIXME use Props, e.g. dispatcher
akka.actor.Props(new ActorAdapter(behavior()))
val props = akka.actor.Props(new ActorAdapter(behavior()))
deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match {
case _: DispatcherDefault props
case DispatcherFromConfig(name, _) props.withDispatcher(name)
}
}
}

View file

@ -3,11 +3,15 @@
*/
package akka.actor.typed.javadsl
import java.util.function.{ Function JFunction }
import java.util.function.{ BiFunction, Function JFunction }
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.actor.typed._
import java.util.Optional
import akka.util.Timeout
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
@ -167,4 +171,34 @@ trait ActorContext[T] {
*/
def spawnAdapter[U](f: JFunction[U, T]): ActorRef[U]
/**
* Perform a single request-response message interaction with another actor, and transform the messages back to
* the protocol of this actor.
*
* The interaction has a timeout (to avoid a resource leak). If the timeout hits without any response it
* will be passed as an [[java.util.concurrent.TimeoutException]] to the `applyToResponse` function.
*
* For other messaging patterns with other actors, see [[spawnAdapter]].
*
* @param createREquest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* the other actor can send a message back through.
* @param applyToResponse Transforms the response from the `otherActor` into a message this actor understands.
* Will be invoked with either the response message or an AskTimeoutException failed or
* potentially another exception if the remote actor is untyped and sent a
* [[akka.actor.Status.Failure]] as response. The returned message of type `T` is then
* fed into this actor as a message. Should be a pure function but is executed inside
* the actor when the response arrives so can safely touch the actor internals. If this
* function throws an exception it is just as if the normal message receiving logic would
* throw.
*
* @tparam Req The request protocol, what the other actor accepts
* @tparam Res The response protocol, what the other actor sends back
*/
def ask[Req, Res](
resClass: Class[Res],
otherActor: ActorRef[Req],
responseTimeout: Timeout,
createREquest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit
}

View file

@ -2,11 +2,13 @@ package akka.actor.typed
package javadsl
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters
import akka.util.Timeout
import akka.actor.Scheduler
import scaladsl.AskPattern._
import akka.actor.typed.scaladsl.AskPattern._
import akka.japi.function.Function
import akka.util.Timeout
import scala.compat.java8.FutureConverters
object AskPattern {
def ask[T, U](actor: ActorRef[T], message: Function[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] =

View file

@ -5,10 +5,13 @@ package akka.actor.typed.scaladsl
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.actor.typed._
import akka.util.Timeout
import scala.reflect.ClassTag
import scala.util.{ Success, Failure, Try }
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
@ -154,4 +157,27 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
*/
def spawnAdapter[U](f: U T): ActorRef[U]
/**
* Perform a single request-response message interaction with another actor, and transform the messages back to
* the protocol of this actor.
*
* The interaction has a timeout (to avoid a resource leak). If the timeout hits without any response it
* will be passed as a `Failure(`[[java.util.concurrent.TimeoutException]]`)` to the `mapResponse` function
* (this is the only "normal" way a `Failure` is passed to the function).
*
* For other messaging patterns with other actors, see [[spawnAdapter]].
*
* @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* the other actor can send a message back through.
* @param mapResponse Transforms the response from the `otherActor` into a message this actor understands.
* Should be a pure function but is executed inside the actor when the response arrives
* so can safely touch the actor internals. If this function throws an exception it is
* just as if the normal message receiving logic would throw.
*
* @tparam Req The request protocol, what the other actor accepts
* @tparam Res The response protocol, what the other actor sends back
*/
def ask[Req, Res](
otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
}

View file

@ -3,22 +3,23 @@
*/
package akka.actor.typed.scaladsl
import scala.concurrent.{ Future, Promise }
import akka.util.Timeout
import akka.actor.InternalActorRef
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.actor.Scheduler
import akka.actor.RootActorPath
import akka.actor.Address
import akka.annotation.InternalApi
import java.util.concurrent.TimeoutException
import akka.actor.{ Address, InternalActorRef, RootActorPath, Scheduler }
import akka.actor.typed.ActorRef
import akka.actor.typed.internal.{ adapter adapt }
import akka.pattern.PromiseActorRef
import akka.util.Timeout
import scala.concurrent.Future
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask".
*
* Note that if you are inside of an actor you should prefer [[ActorContext.ask]]
* as that provides better safety.
*
* The party that asks may be within or without an Actor, since the
* implementation will fabricate a (hidden) [[ActorRef]] that is bound to a
* [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the
@ -42,6 +43,9 @@ object AskPattern {
* The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask".
*
* Note that if you are inside of an actor you should prefer [[ActorContext.ask]]
* as that provides better safety.
*
* The party that asks may be within or without an Actor, since the
* implementation will fabricate a (hidden) [[ActorRef]] that is bound to a
* [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the
@ -67,6 +71,8 @@ object AskPattern {
}
}
private val onTimeout: String Throwable = msg new TimeoutException(msg)
private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) {
// Note: _promiseRef mustn't have a type pattern, since it can be null
@ -74,13 +80,13 @@ object AskPattern {
if (untyped.isTerminated)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new AskTimeoutException(s"Recipient[$target] had already been terminated.")), null)
Future.failed[U](new TimeoutException(s"Recipient[$target] had already been terminated.")), null)
else if (timeout.duration.length <= 0)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null)
else {
val a = PromiseActorRef(untyped.provider, timeout, target, "unknown")
val a = PromiseActorRef(untyped.provider, timeout, target, "unknown", onTimeout = onTimeout)
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}

View file

@ -1,2 +1,5 @@
# #24330 ActorSystem.getWhenTerminated
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorSystem.getWhenTerminated")
# #23770 typed actor context ask
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply")

View file

@ -6,6 +6,7 @@ package akka.pattern
import java.util.concurrent.TimeoutException
import akka.actor._
import akka.annotation.InternalApi
import akka.dispatch.sysmsg._
import akka.util.{ Timeout, Unsafe }
@ -584,21 +585,24 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
/**
* INTERNAL API
*/
@InternalApi
private[akka] object PromiseActorRef {
private case object Registering
private case object Stopped
private final case class StoppedWithPath(path: ActorPath)
private val ActorStopResult = Failure(ActorKilledException("Stopped"))
private val defaultOnTimeout: String Throwable = str new AskTimeoutException(str)
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String, sender: ActorRef = Actor.noSender): PromiseActorRef = {
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String,
sender: ActorRef = Actor.noSender, onTimeout: String Throwable = defaultOnTimeout): PromiseActorRef = {
val result = Promise[Any]()
val scheduler = provider.guardian.underlying.system.scheduler
val a = new PromiseActorRef(provider, result, messageClassName)
implicit val ec = a.internalCallingThreadExecutionContext
val f = scheduler.scheduleOnce(timeout.duration) {
result tryComplete Failure(
new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
}
result.future onComplete { _ try a.stop() finally f.cancel() }
a

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.typed
import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.actor.typed.receptionist.Receptionist.Registered
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorRefResolver, ActorSystem, TypedAkkaSpecWithShutdown }
import akka.serialization.SerializerWithStringManifest
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.scaladsl.adapter._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
class RemoteContextAskSpecSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
override def identifier = 41
override def manifest(o: AnyRef) = o match {
case _: RemoteContextAskSpec.Ping "a"
case RemoteContextAskSpec.Pong "b"
}
override def toBinary(o: AnyRef) = o match {
case RemoteContextAskSpec.Ping(who)
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
case RemoteContextAskSpec.Pong Array.emptyByteArray
}
override def fromBinary(bytes: Array[Byte], manifest: String) = manifest match {
case "a"
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = ActorRefResolver(system.toTyped).resolveActorRef[RemoteContextAskSpec.Pong.type](str)
RemoteContextAskSpec.Ping(ref)
case "b" RemoteContextAskSpec.Pong
}
}
object RemoteContextAskSpec {
def config = ConfigFactory.parseString(
s"""
akka {
loglevel = debug
actor {
provider = cluster
warn-about-java-serializer-usage = off
serialize-creators = off
serializers {
test = "akka.cluster.typed.RemoteContextAskSpecSerializer"
}
serialization-bindings {
"akka.cluster.typed.RemoteContextAskSpec$$Ping" = test
"akka.cluster.typed.RemoteContextAskSpec$$Pong$$" = test
}
}
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0
}
}
}
""")
case object Pong
case class Ping(respondTo: ActorRef[Pong.type])
def pingPong = Behaviors.immutable[Ping] { (_, msg)
msg match {
case Ping(sender)
sender ! Pong
Behaviors.same
}
}
val pingPongKey = ServiceKey[Ping]("ping-pong")
}
class RemoteContextAskSpec extends TestKit(RemoteContextAskSpec.config) with TypedAkkaSpecWithShutdown {
import RemoteContextAskSpec._
"Asking another actor through the ActorContext across remoting" must {
"work" in {
val node1 = Cluster(system)
val node1Probe = TestProbe[AnyRef]()(system)
node1.manager ! Join(node1.selfMember.address)
Receptionist(system).ref ! Receptionist.Subscribe(pingPongKey, node1Probe.ref)
node1Probe.expectMsgType[Receptionist.Listing[_]]
val system2 = ActorSystem(pingPong, system.name, system.settings.config)
val node2 = Cluster(system2)
node2.manager ! Join(node1.selfMember.address)
val node2Probe = TestProbe[AnyRef]()(system2)
Receptionist(system2).ref ! Receptionist.Register(pingPongKey, system2, node2Probe.ref)
node2Probe.expectMsgType[Registered[_]]
// wait until the service is seen on the first node
val remoteRef = node1Probe.expectMsgType[Receptionist.Listing[Ping]].serviceInstances.head
spawn(Behaviors.deferred[AnyRef] { (ctx)
implicit val timeout: Timeout = 3.seconds
ctx.ask(remoteRef)(Ping) {
case Success(pong) pong
case Failure(ex) ex
}
Behaviors.immutable { (_, msg)
node1Probe.ref ! msg
Behaviors.same
}
})
node1Probe.expectMsgType[Pong.type]
}
}
}

View file

@ -7,6 +7,7 @@
* [actors](actors-typed.md)
* [coexisting](coexisting.md)
* [actor-lifecycle](actor-lifecycle-typed.md)
* [interaction patterns](interaction-patterns-typed.md)
* [fault-tolerance](fault-tolerance-typed.md)
* [actor-discovery](actor-discovery-typed.md)
* [cluster](cluster-typed.md)

View file

@ -0,0 +1,120 @@
# Typed Actor Interaction Patterns
Interacting with an Actor in Akka Typed is done through an @scala[`ActorRef[T]`]@java[`ActorRef<T>`] where `T` is the type of messages the actor accepts, also known as the "protocol". This ensures that only the right kind of messages can be sent to an actor and also ensures no access to the Actor instance internals is available to anyone else but the Actor itself.
Message exchange with Actors follow a few common patterns, let's go through each one of them.
## Fire and Forget
The fundamental way to interact with an actor is through @scala["tell", which is so common that it has a special symbolic method name: `actorRef ! message`]@java[`actorRef.tell(message)`]. Sending a message to an actor like this can be done both from inside another actor and from any logic outside of the `ActorSystem`.
Tell is asynchronous which means that the method returns right away and that when execution of the statement after it in the code is executed there is no guarantee that the message has been processed by the recipient yet. It also means there is no way to way to know if the processing succeeded or failed without additional interaction with the actor in question.
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #fire-and-forget }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget }
**Scenarios fire and forget is useful:**
* When it is not critical to be sure that the message was processed
* When there is no way to act on non successful delivery or processing
* When we want to minimize the number of messages created to get higher throughput
**Problems with fire and forget:**
* Consistently higher rates of fire and forget to an actor than it process will make the inbox fill up and can in the worst case cause the JVM crash with an `OutOfMemoryError`
* If the message got lost, we will not notice
## Same protocol Request-Response
In many interactions a request is followed by a response back from the actor. In Akka Typed the recipient of responses has to be encoded as a field in the message itself, which the recipient can then use to send a response back. When the response message is already a part of the sending actor protocol we can simply use @scala[`ActorContext.self`]@java[`ActorContext.getSelf()`] when constructing the message.
TODO sample
**Scenarios where request response with tell is useful:**
* Subscribing to an actor that will send many response messages (of the same protocol) back
* When communicating between a parent and its children, where the protocol can be made include the messages for the interaction
* ???
**Problems request-response:**
* Often the response that the other actor wants to send back is not a part of the sending actor's protocol (see adapted request response or ask)
* It is hard to detect and that a message request was not delivered or processed (see ask)
* Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor
## Adapted Request-Response
Very often the receiving does not, and should not be made, know of the protocol of the sending actor, and will respond with one or more messages that the sending actor cannot receive.
TODO sample
**Scenarios where Adapted Request-Response is useful:**
* Subscribing to an actor that will send many response messages back
**Problems with adapted request-response:**
* It is hard to detect and that a message request was not delivered or processed (see ask)
* Only one adaption can be made per response message type, if a new one is registered the old one is replaced, for example different target actors can't have different adaption if they use the same response types, unless some correlation is encoded in the messages
* Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor
## 1:1 Request-Response with ask between two actors
In an interaction where there is a 1:1 mapping between a request and a response we can use `ask` on the `ActorContext` to interact with another actor.
The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`] to put as recipient in the outgoing message. The second step is to transform the `Response` or the failure to produce a response, into a message that is part of the protocol of the sending actor.
TODO sample
**Scenarios where ask is useful:**
* Single response queries
* When an actor needs to know that the message was processed before continuing
* To allow an actor to resend if a timely response is not produced
* To keep track of outstanding requests and not overwhelm a recipient with messages (simple backpressure)
* When some context should be attached to the interaction but the protocol does not support that (request id, what query the response was for)
**Problems with ask:**
* There can only be a single response to one `ask`
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
* Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requestor, but at the same time you do not want to have many false positives
## 1:1 Request-Response with ask from outside the ActorSystem
In an interaction where there is a 1:1 mapping between a request and a response we can use @scala[`ActorRef.?` implicitly provided by `akka.actor.typed.scaladsl.AskPattern`]@java[`akka.actor.typed.javadsl.AskPattern.ask`] to send a message to an actor and get a @scala[`Future[Response]`]@java[`CompletionState[Response]`] back.
TODO sample
**Scenarios where this ask variant is useful:**
* Single response queries where the response should be passed on to some other actor
* ???
**Problems with ask:**
* There can only be a single response to one `ask`
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
## Per session child Actor
Keeping context for an interaction, or multiple interactions can be done by moving the work for one "session", into a child actor.
TODO
**Scenarios where per session child actor is useful:**
* A single incoming request should result in multiple interactions with other actions before a result can be built
* ???
**Problems with ask:**
* Children have lifecycles that must be managed to not create a resource leak
* ???

View file

@ -2,7 +2,7 @@ package akka.testkit.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.annotation.ApiMayChange
import akka.testkit.typed.TestKit._
import akka.util.Timeout
@ -14,15 +14,15 @@ import scala.concurrent.{ Await, TimeoutException }
object TestKit {
private[akka] sealed trait TestKitCommand
private[akka] case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]]) extends TestKitCommand
private[akka] case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]]) extends TestKitCommand
private[akka] case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand
private[akka] case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand
private val testKitGuardian = Behaviors.immutable[TestKitCommand] {
case (ctx, SpawnActor(name, behavior, reply))
reply ! ctx.spawn(behavior, name)
case (ctx, SpawnActor(name, behavior, reply, props))
reply ! ctx.spawn(behavior, name, props)
Behaviors.same
case (ctx, SpawnActorAnonymous(behavior, reply))
reply ! ctx.spawnAnonymous(behavior)
case (ctx, SpawnActorAnonymous(behavior, reply, props))
reply ! ctx.spawnAnonymous(behavior, props)
Behaviors.same
}
@ -87,14 +87,28 @@ trait TestKitBase {
* guardian
*/
def spawn[T](behavior: Behavior[T]): ActorRef[T] =
Await.result(system ? (SpawnActorAnonymous(behavior, _)), timeoutDuration)
spawn(behavior, Props.empty)
/**
* Spawn the given behavior. This is created as a child of the test kit
* guardian
*/
def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] =
Await.result(system ? (SpawnActorAnonymous(behavior, _, props)), timeoutDuration)
/**
* Spawn the given behavior. This is created as a child of the test kit
* guardian
*/
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] =
Await.result(system ? (SpawnActor(name, behavior, _)), timeoutDuration)
spawn(behavior, name, Props.empty)
/**
* Spawn the given behavior. This is created as a child of the test kit
* guardian
*/
def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
Await.result(system ? (SpawnActor(name, behavior, _, props)), timeoutDuration)
def systemActor[T](behaviour: Behavior[T], name: String): ActorRef[T] =
Await.result(system.systemActorOf(behaviour, name), timeoutDuration)