From 034d6c6e6a8dce6bb20fbd17890a1eafd3532b02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 19 Jan 2018 18:13:24 +0100 Subject: [PATCH] Actor-to-actor ask for typed, #23770 --- .../typed/javadsl/ActorContextAskTest.java | 69 +++++++++ .../akka/typed/InteractionPatternsTest.java | 69 +++++++++ .../test/scala/akka/actor/typed/AskSpec.scala | 52 ++++++- .../typed/scaladsl/ActorContextAskSpec.scala | 143 ++++++++++++++++++ .../akka/typed/InteractionPatternsSpec.scala | 62 ++++++++ .../typed/internal/ActorContextImpl.scala | 25 ++- .../actor/typed/internal/AskResponse.scala | 19 +++ .../typed/internal/adapter/ActorAdapter.scala | 1 + .../internal/adapter/ActorRefAdapter.scala | 3 +- .../typed/internal/adapter/PropsAdapter.scala | 9 +- .../actor/typed/javadsl/ActorContext.scala | 36 ++++- .../scala/akka/actor/typed/javadsl/Ask.scala | 8 +- .../actor/typed/scaladsl/ActorContext.scala | 28 +++- .../actor/typed/scaladsl/AskPattern.scala | 28 ++-- .../mima-filters/2.5.9.backwards.excludes | 3 + .../main/scala/akka/pattern/AskSupport.scala | 8 +- .../cluster/typed/RemoteContextAskSpec.scala | 131 ++++++++++++++++ akka-docs/src/main/paradox/index-typed.md | 1 + .../paradox/interaction-patterns-typed.md | 120 +++++++++++++++ .../scala/akka/testkit/typed/TestKit.scala | 32 ++-- 20 files changed, 811 insertions(+), 36 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java create mode 100644 akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala create mode 100644 akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala create mode 100644 akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala create mode 100644 akka-docs/src/main/paradox/interaction-patterns-typed.md diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java new file mode 100644 index 0000000000..441352a9ac --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.actor.typed.javadsl; + +import akka.actor.ActorSystem; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import akka.testkit.typed.javadsl.TestProbe; +import akka.util.Timeout; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.util.concurrent.TimeUnit; + +public class ActorContextAskTest extends JUnitSuite { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorSelectionTest", + AkkaSpec.testConf()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + static class Ping { + final ActorRef respondTo; + public Ping(ActorRef respondTo) { + this.respondTo = respondTo; + } + } + static class Pong { } + + @Test + public void provideASafeAsk() { + final Behavior pingPongBehavior = Behaviors.immutable((ActorContext context, Ping message) -> { + message.respondTo.tell(new Pong()); + return Behaviors.same(); + }); + + final ActorRef pingPong = Adapter.spawnAnonymous(system, pingPongBehavior); + + + final TestProbe probe = new TestProbe<>(Adapter.toTyped(system)); + + final Behavior snitch = Behaviors.deferred((ActorContext ctx) -> { + ctx.ask(Pong.class, + pingPong, + new Timeout(3, TimeUnit.SECONDS), + (ActorRef ref) -> new Ping(ref), + (pong, exception) -> { + if (pong != null) return pong; + else return exception; + }); + + return Behaviors.immutable((ActorContext context, Object message) -> { + probe.ref().tell(message); + return Behaviors.same(); + }); + }); + + Adapter.spawnAnonymous(system, snitch); + + probe.expectMsgType(Pong.class); + } + + +} diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java new file mode 100644 index 0000000000..c63d8ae22e --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package jdocs.akka.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.BehaviorBuilder; +import akka.actor.typed.javadsl.Behaviors; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; + +public class InteractionPatternsTest extends JUnitSuite { + + // #fire-and-forget + interface PrinterProtocol {} + class DisableOutput implements PrinterProtocol {} + class EnableOutput implements PrinterProtocol {} + class PrintMe implements PrinterProtocol { + public final String message; + public PrintMe(String message) { + this.message = message; + } + } + + public Behavior enabledPrinterBehavior() { + return BehaviorBuilder.create() + .onMessage(DisableOutput.class, (ctx, disableOutput) -> disabledPrinterBehavior()) + .onMessage(PrintMe.class, (ctx, printMe) -> { + System.out.println(printMe.message); + return Behaviors.same(); + }).build(); + } + + public Behavior disabledPrinterBehavior() { + return BehaviorBuilder.create() + .onMessage(EnableOutput.class, (ctx, enableOutput) -> enabledPrinterBehavior()) + .build(); + } + // #fire-and-forget + + + @Test + public void fireAndForgetSample() throws Exception { + // #fire-and-forget + final ActorSystem system = + ActorSystem.create(enabledPrinterBehavior(), "printer-sample-system"); + + // note that system is also the ActorRef to the guardian actor + final ActorRef ref = system; + + // these are all fire and forget + ref.tell(new PrintMe("message")); + ref.tell(new DisableOutput()); + ref.tell(new PrintMe("message")); + ref.tell(new EnableOutput()); + + // #fire-and-forget + + Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); + } + + +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala index 4462bd04dd..70df741fb1 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala @@ -4,14 +4,16 @@ package akka.actor.typed import akka.actor.typed.internal.adapter.ActorSystemAdapter +import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors._ -import akka.actor.typed.scaladsl.AskPattern._ -import akka.pattern.AskTimeoutException +import akka.actor.typed.scaladsl.adapter._ import akka.testkit.typed.TestKit +import akka.util.Timeout import org.scalatest.concurrent.ScalaFutures -import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, TimeoutException } object AskSpec { sealed trait Msg @@ -40,7 +42,9 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { val ref = spawn(behavior) (ref ? Stop).futureValue val answer = ref ? Foo("bar") - answer.recover { case _: AskTimeoutException ⇒ "ask" }.futureValue should ===("ask") + val result = answer.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should include("had already been terminated.") } "must succeed when the actor is alive" in { @@ -49,6 +53,15 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { response.futureValue should ===("foo") } + "must fail the future if the actor doesn't reply in time" in { + val actor = spawn(Behaviors.empty[Foo]) + implicit val timeout: Timeout = 10.millis + val answer = actor ? Foo("bar") + val result = answer.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should startWith("Ask timed out on") + } + /** See issue #19947 (MatchError with adapted ActorRef) */ "must fail the future if the actor doesn't exist" in { val noSuchActor: ActorRef[Msg] = system match { @@ -60,7 +73,36 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { } val answer = noSuchActor ? Foo("bar") - answer.recover { case _: AskTimeoutException ⇒ "ask" }.futureValue should ===("ask") + val result = answer.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should include("had already been terminated") + } + + "must transform a replied akka.actor.Status.Failure to a failed future" in { + // It's unlikely but possible that this happens, since the recieving actor would + // have to accept a message with an actoref that accepts AnyRef or be doing crazy casting + // For completeness sake though + implicit val untypedSystem = akka.actor.ActorSystem("AskSpec-untyped-1") + try { + case class Ping(respondTo: ActorRef[AnyRef]) + val ex = new RuntimeException("not good!") + + class LegacyActor extends akka.actor.Actor { + def receive = { + case Ping(respondTo) ⇒ respondTo ! akka.actor.Status.Failure(ex) + } + } + + val legacyActor = untypedSystem.actorOf(akka.actor.Props(new LegacyActor)) + + import scaladsl.AskPattern._ + implicit val timeout: Timeout = 3.seconds + implicit val scheduler = untypedSystem.toTyped.scheduler + val typedLegacy: ActorRef[AnyRef] = legacyActor + (typedLegacy ? Ping).failed.futureValue should ===(ex) + } finally { + akka.testkit.TestKit.shutdownActorSystem(untypedSystem) + } } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala new file mode 100644 index 0000000000..d08b411a8e --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.actor.typed.scaladsl + +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.{ ActorRef, PostStop, Props, TypedAkkaSpecWithShutdown } +import akka.testkit.EventFilter +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.reflect.ClassTag +import scala.util.{ Failure, Success } + +object ActorContextAskSpec { + val config = ConfigFactory.parseString( + """ + akka.loggers = ["akka.testkit.TestEventListener"] + ping-pong-dispatcher { + executor = thread-pool-executor + type = PinnedDispatcher + } + snitch-dispatcher { + executor = thread-pool-executor + type = PinnedDispatcher + } + """) +} + +class ActorContextAskSpec extends TestKit(ActorContextAskSpec.config) with TypedAkkaSpecWithShutdown { + + implicit val untyped = system.toUntyped // FIXME no typed event filter yet + + "The Scala DSL ActorContext" must { + + "provide a safe ask" in { + case class Ping(sender: ActorRef[Pong]) + case class Pong(selfName: String, threadName: String) + + val pingPong = spawn(Behaviors.immutable[Ping] { (ctx, msg) ⇒ + msg.sender ! Pong(ctx.self.path.name, Thread.currentThread().getName) + Behaviors.same + }, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) + + val probe = TestProbe[AnyRef]() + + val snitch = Behaviors.deferred[Pong] { (ctx) ⇒ + + // Timeout comes from TypedAkkaSpec + + ctx.ask(pingPong)(Ping) { + case Success(pong) ⇒ Pong(ctx.self.path.name + "1", Thread.currentThread().getName) + case Failure(ex) ⇒ throw ex + } + + Behaviors.immutable { + case (ctx, pong: Pong) ⇒ + probe.ref ! pong + Behaviors.same + } + } + + spawn(snitch, "snitch", Props.empty.withDispatcherFromConfig("snitch-dispatcher")) + + val pong = probe.expectMsgType[Pong] + + pong.selfName should ===("snitch1") + pong.threadName should startWith("ActorContextAskSpec-snitch-dispatcher") + } + + "fail actor when mapping does not match response" in { + val probe = TestProbe[AnyRef]() + + trait Protocol + case class Ping(respondTo: ActorRef[Pong.type]) extends Protocol + case object Pong extends Protocol + + val pingPong = spawn(Behaviors.immutable[Protocol]((_, msg) ⇒ + msg match { + case Ping(respondTo) ⇒ + respondTo ! Pong + Behaviors.same + } + )) + + val snitch = Behaviors.deferred[AnyRef] { (ctx) ⇒ + ctx.ask(pingPong)(Ping) { + // uh oh, missing case for the response, this can never end well + case Failure(x) ⇒ x + } + + Behaviors.immutable[AnyRef] { + case (_, msg) ⇒ + probe.ref ! msg + Behaviors.same + }.onSignal { + + case (_, PostStop) ⇒ + probe.ref ! "stopped" + Behaviors.same + } + } + + EventFilter[MatchError](occurrences = 1, start = "Success(Pong)").intercept { + spawn(snitch) + } + + // no-match should cause failure and subsequent stop of actor + probe.expectMsg("stopped") + } + + "deal with timeouts in ask" in { + val probe = TestProbe[AnyRef]() + val snitch = Behaviors.deferred[AnyRef] { (ctx) ⇒ + + ctx.ask[String, String](system.deadLetters)(ref ⇒ "boo") { + case Success(m) ⇒ m + case Failure(x) ⇒ x + }(20.millis, implicitly[ClassTag[String]]) + + Behaviors.immutable { + case (_, msg) ⇒ + probe.ref ! msg + Behaviors.same + } + } + + EventFilter.warning(occurrences = 1, message = "received dead letter without sender: boo").intercept { + EventFilter.info(occurrences = 1, start = "Message [java.lang.String] without sender").intercept { + spawn(snitch) + } + } + + probe.expectMsgType[TimeoutException] + + } + + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala new file mode 100644 index 0000000000..ba4b3aa74d --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package docs.akka.typed + +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedAkkaSpecWithShutdown } +import akka.actor.typed.scaladsl.Behaviors +import akka.testkit.typed.TestKit + +class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown { + + "The interaction patterns docs" must { + + "contain a sample for fire and forget" in { + // #fire-and-forget + + sealed trait PrinterProtocol + case object DisableOutput extends PrinterProtocol + case object EnableOutput extends PrinterProtocol + case class PrintMe(message: String) extends PrinterProtocol + + // two state behavior + def enabledPrinterBehavior: Behavior[PrinterProtocol] = Behaviors.immutable { + case (_, DisableOutput) ⇒ + disabledPrinterBehavior + + case (_, EnableOutput) ⇒ + Behaviors.ignore + + case (_, PrintMe(message)) ⇒ + println(message) + Behaviors.same + } + + def disabledPrinterBehavior: Behavior[PrinterProtocol] = Behaviors.immutable { + case (_, DisableOutput) ⇒ + enabledPrinterBehavior + + case (_, _) ⇒ + // ignore any message + Behaviors.ignore + } + + val system = ActorSystem(enabledPrinterBehavior, "fire-and-forget-sample") + + // note how the system is also the top level actor ref + val printer: ActorRef[PrinterProtocol] = system + + // these are all fire and forget + printer ! PrintMe("printed") + printer ! DisableOutput + printer ! PrintMe("not printed") + printer ! EnableOutput + + // #fire-and-forget + + system.terminate().futureValue + } + + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 6cd0b5de13..871621a49d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -4,10 +4,15 @@ package akka.actor.typed package internal +import java.util.function.BiFunction +import java.util.{ ArrayList, Optional, function } + import akka.annotation.InternalApi -import java.util.Optional -import java.util.ArrayList +import akka.util.Timeout + import scala.concurrent.ExecutionContextExecutor +import scala.reflect.ClassTag +import scala.util.{ Failure, Success, Try } /** * INTERNAL API @@ -62,6 +67,22 @@ import scala.concurrent.ExecutionContextExecutor override def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.actor.typed.ActorRef[U] = internalSpawnAdapter(f.apply, name) + // Scala API impl + override def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { + import akka.actor.typed.scaladsl.AskPattern._ + (otherActor ? createRequest)(responseTimeout, system.scheduler).onComplete(res ⇒ + self.asInstanceOf[ActorRef[AnyRef]] ! new AskResponse(res, mapResponse) + ) + } + + // Java API impl + def ask[Req, Res](resClass: Class[Res], otherActor: ActorRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = { + this.ask(otherActor)(createRequest.apply) { + case Success(message) ⇒ applyToResponse.apply(message, null) + case Failure(ex) ⇒ applyToResponse.apply(null.asInstanceOf[Res], ex) + }(responseTimeout, ClassTag[Res](resClass)) + } + /** * INTERNAL API: Needed to make Scala 2.12 compiler happy. * Otherwise "ambiguous reference to overloaded definition" because Function is lambda. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala new file mode 100644 index 0000000000..10ab7416da --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.actor.typed.internal + +import akka.annotation.InternalApi + +import scala.util.Try + +/** + * INTERNAL API + * + * Message wrapper used to allow ActorContext.ask to map the response inside the asking actor. + */ +@InternalApi +private[akka] final class AskResponse[T, U](result: Try[T], adapt: Try[T] ⇒ U) { + + def adapted: U = adapt(result) +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 512dde6578..44f38561f6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -35,6 +35,7 @@ import akka.util.OptionVal next(Behavior.interpretSignal(behavior, ctx, msg), msg) case a.ReceiveTimeout ⇒ next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg) + case msg: AskResponse[AnyRef, T] @unchecked ⇒ receive(msg.adapted) case msg: T @unchecked ⇒ next(Behavior.interpretMessage(behavior, ctx, msg), msg) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala index 6c0fd631f3..7a7cf1b0c0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala @@ -32,7 +32,8 @@ private[akka] object ActorRefAdapter { def toUntyped[U](ref: ActorRef[U]): akka.actor.InternalActorRef = ref match { - case adapter: ActorRefAdapter[_] ⇒ adapter.untyped + case adapter: ActorRefAdapter[_] ⇒ adapter.untyped + case system: ActorSystemAdapter[_] ⇒ system.untyped.guardian case _ ⇒ throw new UnsupportedOperationException("only adapted untyped ActorRefs permissible " + s"($ref of class ${ref.getClass.getName})") diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala index 8571312c8e..ba1ab4209d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala @@ -9,6 +9,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.EmptyProps import akka.actor.typed.Props import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts /** * INTERNAL API @@ -16,7 +17,13 @@ import akka.annotation.InternalApi @InternalApi private[akka] object PropsAdapter { def apply[T](behavior: () ⇒ Behavior[T], deploy: Props = Props.empty): akka.actor.Props = { // FIXME use Props, e.g. dispatcher - akka.actor.Props(new ActorAdapter(behavior())) + + val props = akka.actor.Props(new ActorAdapter(behavior())) + + deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match { + case _: DispatcherDefault ⇒ props + case DispatcherFromConfig(name, _) ⇒ props.withDispatcher(name) + } } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 91a865edf4..356d4a7ad3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -3,11 +3,15 @@ */ package akka.actor.typed.javadsl -import java.util.function.{ Function ⇒ JFunction } +import java.util.function.{ BiFunction, Function ⇒ JFunction } + import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange import akka.actor.typed._ import java.util.Optional + +import akka.util.Timeout + import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor @@ -167,4 +171,34 @@ trait ActorContext[T] { */ def spawnAdapter[U](f: JFunction[U, T]): ActorRef[U] + /** + * Perform a single request-response message interaction with another actor, and transform the messages back to + * the protocol of this actor. + * + * The interaction has a timeout (to avoid a resource leak). If the timeout hits without any response it + * will be passed as an [[java.util.concurrent.TimeoutException]] to the `applyToResponse` function. + * + * For other messaging patterns with other actors, see [[spawnAdapter]]. + * + * @param createREquest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that + * the other actor can send a message back through. + * @param applyToResponse Transforms the response from the `otherActor` into a message this actor understands. + * Will be invoked with either the response message or an AskTimeoutException failed or + * potentially another exception if the remote actor is untyped and sent a + * [[akka.actor.Status.Failure]] as response. The returned message of type `T` is then + * fed into this actor as a message. Should be a pure function but is executed inside + * the actor when the response arrives so can safely touch the actor internals. If this + * function throws an exception it is just as if the normal message receiving logic would + * throw. + * + * @tparam Req The request protocol, what the other actor accepts + * @tparam Res The response protocol, what the other actor sends back + */ + def ask[Req, Res]( + resClass: Class[Res], + otherActor: ActorRef[Req], + responseTimeout: Timeout, + createREquest: java.util.function.Function[ActorRef[Res], Req], + applyToResponse: BiFunction[Res, Throwable, T]): Unit + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala index 20fb9c5114..ee4d4d2db6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala @@ -2,11 +2,13 @@ package akka.actor.typed package javadsl import java.util.concurrent.CompletionStage -import scala.compat.java8.FutureConverters -import akka.util.Timeout + import akka.actor.Scheduler -import scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.AskPattern._ import akka.japi.function.Function +import akka.util.Timeout + +import scala.compat.java8.FutureConverters object AskPattern { def ask[T, U](actor: ActorRef[T], message: Function[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] = diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index f9310bc5be..27d675d89a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -5,10 +5,13 @@ package akka.actor.typed.scaladsl import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration - import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.actor.typed._ +import akka.util.Timeout + +import scala.reflect.ClassTag +import scala.util.{ Success, Failure, Try } /** * An Actor is given by the combination of a [[Behavior]] and a context in @@ -154,4 +157,27 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ */ def spawnAdapter[U](f: U ⇒ T): ActorRef[U] + /** + * Perform a single request-response message interaction with another actor, and transform the messages back to + * the protocol of this actor. + * + * The interaction has a timeout (to avoid a resource leak). If the timeout hits without any response it + * will be passed as a `Failure(`[[java.util.concurrent.TimeoutException]]`)` to the `mapResponse` function + * (this is the only "normal" way a `Failure` is passed to the function). + * + * For other messaging patterns with other actors, see [[spawnAdapter]]. + * + * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that + * the other actor can send a message back through. + * @param mapResponse Transforms the response from the `otherActor` into a message this actor understands. + * Should be a pure function but is executed inside the actor when the response arrives + * so can safely touch the actor internals. If this function throws an exception it is + * just as if the normal message receiving logic would throw. + * + * @tparam Req The request protocol, what the other actor accepts + * @tparam Res The response protocol, what the other actor sends back + */ + def ask[Req, Res]( + otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 1b95a9cee9..9b964dc3a6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -3,22 +3,23 @@ */ package akka.actor.typed.scaladsl -import scala.concurrent.{ Future, Promise } -import akka.util.Timeout -import akka.actor.InternalActorRef -import akka.pattern.AskTimeoutException -import akka.pattern.PromiseActorRef -import akka.actor.Scheduler -import akka.actor.RootActorPath -import akka.actor.Address -import akka.annotation.InternalApi +import java.util.concurrent.TimeoutException + +import akka.actor.{ Address, InternalActorRef, RootActorPath, Scheduler } import akka.actor.typed.ActorRef import akka.actor.typed.internal.{ adapter ⇒ adapt } +import akka.pattern.PromiseActorRef +import akka.util.Timeout + +import scala.concurrent.Future /** * The ask-pattern implements the initiator side of a request–reply protocol. * The `?` operator is pronounced as "ask". * + * Note that if you are inside of an actor you should prefer [[ActorContext.ask]] + * as that provides better safety. + * * The party that asks may be within or without an Actor, since the * implementation will fabricate a (hidden) [[ActorRef]] that is bound to a * [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the @@ -42,6 +43,9 @@ object AskPattern { * The ask-pattern implements the initiator side of a request–reply protocol. * The `?` operator is pronounced as "ask". * + * Note that if you are inside of an actor you should prefer [[ActorContext.ask]] + * as that provides better safety. + * * The party that asks may be within or without an Actor, since the * implementation will fabricate a (hidden) [[ActorRef]] that is bound to a * [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the @@ -67,6 +71,8 @@ object AskPattern { } } + private val onTimeout: String ⇒ Throwable = msg ⇒ new TimeoutException(msg) + private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { // Note: _promiseRef mustn't have a type pattern, since it can be null @@ -74,13 +80,13 @@ object AskPattern { if (untyped.isTerminated) ( adapt.ActorRefAdapter[U](untyped.provider.deadLetters), - Future.failed[U](new AskTimeoutException(s"Recipient[$target] had already been terminated.")), null) + Future.failed[U](new TimeoutException(s"Recipient[$target] had already been terminated.")), null) else if (timeout.duration.length <= 0) ( adapt.ActorRefAdapter[U](untyped.provider.deadLetters), Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null) else { - val a = PromiseActorRef(untyped.provider, timeout, target, "unknown") + val a = PromiseActorRef(untyped.provider, timeout, target, "unknown", onTimeout = onTimeout) val b = adapt.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } diff --git a/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes index b581630c37..6da73fdc7b 100644 --- a/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes @@ -1,2 +1,5 @@ # #24330 ActorSystem.getWhenTerminated ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorSystem.getWhenTerminated") + +# #23770 typed actor context ask +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply") diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index a374a5c064..5f2aff7dd3 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -6,6 +6,7 @@ package akka.pattern import java.util.concurrent.TimeoutException import akka.actor._ +import akka.annotation.InternalApi import akka.dispatch.sysmsg._ import akka.util.{ Timeout, Unsafe } @@ -584,21 +585,24 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide /** * INTERNAL API */ +@InternalApi private[akka] object PromiseActorRef { private case object Registering private case object Stopped private final case class StoppedWithPath(path: ActorPath) private val ActorStopResult = Failure(ActorKilledException("Stopped")) + private val defaultOnTimeout: String ⇒ Throwable = str ⇒ new AskTimeoutException(str) - def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String, sender: ActorRef = Actor.noSender): PromiseActorRef = { + def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String, + sender: ActorRef = Actor.noSender, onTimeout: String ⇒ Throwable = defaultOnTimeout): PromiseActorRef = { val result = Promise[Any]() val scheduler = provider.guardian.underlying.system.scheduler val a = new PromiseActorRef(provider, result, messageClassName) implicit val ec = a.internalCallingThreadExecutionContext val f = scheduler.scheduleOnce(timeout.duration) { result tryComplete Failure( - new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}".""")) + onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}".""")) } result.future onComplete { _ ⇒ try a.stop() finally f.cancel() } a diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala new file mode 100644 index 0000000000..a7f797e960 --- /dev/null +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.cluster.typed + +import java.nio.charset.StandardCharsets + +import akka.actor.ExtendedActorSystem +import akka.actor.typed.receptionist.Receptionist.Registered +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, ActorRefResolver, ActorSystem, TypedAkkaSpecWithShutdown } +import akka.serialization.SerializerWithStringManifest +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.scaladsl.adapter._ +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } + +class RemoteContextAskSpecSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + override def identifier = 41 + override def manifest(o: AnyRef) = o match { + case _: RemoteContextAskSpec.Ping ⇒ "a" + case RemoteContextAskSpec.Pong ⇒ "b" + } + override def toBinary(o: AnyRef) = o match { + case RemoteContextAskSpec.Ping(who) ⇒ + ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) + case RemoteContextAskSpec.Pong ⇒ Array.emptyByteArray + } + override def fromBinary(bytes: Array[Byte], manifest: String) = manifest match { + case "a" ⇒ + val str = new String(bytes, StandardCharsets.UTF_8) + val ref = ActorRefResolver(system.toTyped).resolveActorRef[RemoteContextAskSpec.Pong.type](str) + RemoteContextAskSpec.Ping(ref) + case "b" ⇒ RemoteContextAskSpec.Pong + } +} + +object RemoteContextAskSpec { + def config = ConfigFactory.parseString( + s""" + akka { + loglevel = debug + actor { + provider = cluster + warn-about-java-serializer-usage = off + serialize-creators = off + serializers { + test = "akka.cluster.typed.RemoteContextAskSpecSerializer" + } + serialization-bindings { + "akka.cluster.typed.RemoteContextAskSpec$$Ping" = test + "akka.cluster.typed.RemoteContextAskSpec$$Pong$$" = test + } + } + remote.artery { + enabled = on + canonical { + hostname = 127.0.0.1 + port = 0 + } + } + } + """) + + case object Pong + case class Ping(respondTo: ActorRef[Pong.type]) + + def pingPong = Behaviors.immutable[Ping] { (_, msg) ⇒ + msg match { + case Ping(sender) ⇒ + sender ! Pong + Behaviors.same + } + } + + val pingPongKey = ServiceKey[Ping]("ping-pong") + +} + +class RemoteContextAskSpec extends TestKit(RemoteContextAskSpec.config) with TypedAkkaSpecWithShutdown { + + import RemoteContextAskSpec._ + + "Asking another actor through the ActorContext across remoting" must { + + "work" in { + val node1 = Cluster(system) + val node1Probe = TestProbe[AnyRef]()(system) + node1.manager ! Join(node1.selfMember.address) + + Receptionist(system).ref ! Receptionist.Subscribe(pingPongKey, node1Probe.ref) + node1Probe.expectMsgType[Receptionist.Listing[_]] + + val system2 = ActorSystem(pingPong, system.name, system.settings.config) + val node2 = Cluster(system2) + node2.manager ! Join(node1.selfMember.address) + + val node2Probe = TestProbe[AnyRef]()(system2) + Receptionist(system2).ref ! Receptionist.Register(pingPongKey, system2, node2Probe.ref) + node2Probe.expectMsgType[Registered[_]] + + // wait until the service is seen on the first node + val remoteRef = node1Probe.expectMsgType[Receptionist.Listing[Ping]].serviceInstances.head + + spawn(Behaviors.deferred[AnyRef] { (ctx) ⇒ + implicit val timeout: Timeout = 3.seconds + + ctx.ask(remoteRef)(Ping) { + case Success(pong) ⇒ pong + case Failure(ex) ⇒ ex + } + + Behaviors.immutable { (_, msg) ⇒ + node1Probe.ref ! msg + Behaviors.same + } + }) + + node1Probe.expectMsgType[Pong.type] + + } + + } + +} diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md index 160f84f8f5..d7d343fc6f 100644 --- a/akka-docs/src/main/paradox/index-typed.md +++ b/akka-docs/src/main/paradox/index-typed.md @@ -7,6 +7,7 @@ * [actors](actors-typed.md) * [coexisting](coexisting.md) * [actor-lifecycle](actor-lifecycle-typed.md) +* [interaction patterns](interaction-patterns-typed.md) * [fault-tolerance](fault-tolerance-typed.md) * [actor-discovery](actor-discovery-typed.md) * [cluster](cluster-typed.md) diff --git a/akka-docs/src/main/paradox/interaction-patterns-typed.md b/akka-docs/src/main/paradox/interaction-patterns-typed.md new file mode 100644 index 0000000000..3fc924f14a --- /dev/null +++ b/akka-docs/src/main/paradox/interaction-patterns-typed.md @@ -0,0 +1,120 @@ +# Typed Actor Interaction Patterns + +Interacting with an Actor in Akka Typed is done through an @scala[`ActorRef[T]`]@java[`ActorRef`] where `T` is the type of messages the actor accepts, also known as the "protocol". This ensures that only the right kind of messages can be sent to an actor and also ensures no access to the Actor instance internals is available to anyone else but the Actor itself. + +Message exchange with Actors follow a few common patterns, let's go through each one of them. + +## Fire and Forget + +The fundamental way to interact with an actor is through @scala["tell", which is so common that it has a special symbolic method name: `actorRef ! message`]@java[`actorRef.tell(message)`]. Sending a message to an actor like this can be done both from inside another actor and from any logic outside of the `ActorSystem`. + +Tell is asynchronous which means that the method returns right away and that when execution of the statement after it in the code is executed there is no guarantee that the message has been processed by the recipient yet. It also means there is no way to way to know if the processing succeeded or failed without additional interaction with the actor in question. + +Scala +: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #fire-and-forget } + +Java +: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget } + +**Scenarios fire and forget is useful:** + + * When it is not critical to be sure that the message was processed + * When there is no way to act on non successful delivery or processing + * When we want to minimize the number of messages created to get higher throughput + +**Problems with fire and forget:** + + * Consistently higher rates of fire and forget to an actor than it process will make the inbox fill up and can in the worst case cause the JVM crash with an `OutOfMemoryError` + * If the message got lost, we will not notice + +## Same protocol Request-Response + +In many interactions a request is followed by a response back from the actor. In Akka Typed the recipient of responses has to be encoded as a field in the message itself, which the recipient can then use to send a response back. When the response message is already a part of the sending actor protocol we can simply use @scala[`ActorContext.self`]@java[`ActorContext.getSelf()`] when constructing the message. + +TODO sample + +**Scenarios where request response with tell is useful:** + + * Subscribing to an actor that will send many response messages (of the same protocol) back + * When communicating between a parent and its children, where the protocol can be made include the messages for the interaction + * ??? + +**Problems request-response:** + + * Often the response that the other actor wants to send back is not a part of the sending actor's protocol (see adapted request response or ask) + * It is hard to detect and that a message request was not delivered or processed (see ask) + * Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor + +## Adapted Request-Response + +Very often the receiving does not, and should not be made, know of the protocol of the sending actor, and will respond with one or more messages that the sending actor cannot receive. + +TODO sample + +**Scenarios where Adapted Request-Response is useful:** + + * Subscribing to an actor that will send many response messages back + +**Problems with adapted request-response:** + + * It is hard to detect and that a message request was not delivered or processed (see ask) + * Only one adaption can be made per response message type, if a new one is registered the old one is replaced, for example different target actors can't have different adaption if they use the same response types, unless some correlation is encoded in the messages + * Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor + + +## 1:1 Request-Response with ask between two actors + +In an interaction where there is a 1:1 mapping between a request and a response we can use `ask` on the `ActorContext` to interact with another actor. + +The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef`] to put as recipient in the outgoing message. The second step is to transform the `Response` or the failure to produce a response, into a message that is part of the protocol of the sending actor. + +TODO sample + + +**Scenarios where ask is useful:** + + * Single response queries + * When an actor needs to know that the message was processed before continuing + * To allow an actor to resend if a timely response is not produced + * To keep track of outstanding requests and not overwhelm a recipient with messages (simple backpressure) + * When some context should be attached to the interaction but the protocol does not support that (request id, what query the response was for) + +**Problems with ask:** + + * There can only be a single response to one `ask` + * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact + * Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requestor, but at the same time you do not want to have many false positives + + +## 1:1 Request-Response with ask from outside the ActorSystem + +In an interaction where there is a 1:1 mapping between a request and a response we can use @scala[`ActorRef.?` implicitly provided by `akka.actor.typed.scaladsl.AskPattern`]@java[`akka.actor.typed.javadsl.AskPattern.ask`] to send a message to an actor and get a @scala[`Future[Response]`]@java[`CompletionState[Response]`] back. + +TODO sample + +**Scenarios where this ask variant is useful:** + + * Single response queries where the response should be passed on to some other actor + * ??? + +**Problems with ask:** + + * There can only be a single response to one `ask` + * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact + + +## Per session child Actor + +Keeping context for an interaction, or multiple interactions can be done by moving the work for one "session", into a child actor. + +TODO + +**Scenarios where per session child actor is useful:** + + * A single incoming request should result in multiple interactions with other actions before a result can be built + * ??? + +**Problems with ask:** + + * Children have lifecycles that must be managed to not create a resource leak + * ??? \ No newline at end of file diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index 4521fa1ff7..fd0a821f1d 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -2,7 +2,7 @@ package akka.testkit.typed import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.AskPattern._ -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.annotation.ApiMayChange import akka.testkit.typed.TestKit._ import akka.util.Timeout @@ -14,15 +14,15 @@ import scala.concurrent.{ Await, TimeoutException } object TestKit { private[akka] sealed trait TestKitCommand - private[akka] case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]]) extends TestKitCommand - private[akka] case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]]) extends TestKitCommand + private[akka] case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand + private[akka] case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand private val testKitGuardian = Behaviors.immutable[TestKitCommand] { - case (ctx, SpawnActor(name, behavior, reply)) ⇒ - reply ! ctx.spawn(behavior, name) + case (ctx, SpawnActor(name, behavior, reply, props)) ⇒ + reply ! ctx.spawn(behavior, name, props) Behaviors.same - case (ctx, SpawnActorAnonymous(behavior, reply)) ⇒ - reply ! ctx.spawnAnonymous(behavior) + case (ctx, SpawnActorAnonymous(behavior, reply, props)) ⇒ + reply ! ctx.spawnAnonymous(behavior, props) Behaviors.same } @@ -87,14 +87,28 @@ trait TestKitBase { * guardian */ def spawn[T](behavior: Behavior[T]): ActorRef[T] = - Await.result(system ? (SpawnActorAnonymous(behavior, _)), timeoutDuration) + spawn(behavior, Props.empty) + + /** + * Spawn the given behavior. This is created as a child of the test kit + * guardian + */ + def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] = + Await.result(system ? (SpawnActorAnonymous(behavior, _, props)), timeoutDuration) /** * Spawn the given behavior. This is created as a child of the test kit * guardian */ def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = - Await.result(system ? (SpawnActor(name, behavior, _)), timeoutDuration) + spawn(behavior, name, Props.empty) + + /** + * Spawn the given behavior. This is created as a child of the test kit + * guardian + */ + def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] = + Await.result(system ? (SpawnActor(name, behavior, _, props)), timeoutDuration) def systemActor[T](behaviour: Behavior[T], name: String): ActorRef[T] = Await.result(system.systemActorOf(behaviour, name), timeoutDuration)