From 744bbc4ee2a3654fa7d78560078f0caef651fde9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 18 Jan 2018 08:06:49 +0100 Subject: [PATCH] Message transformer, without risk of resource leak, #23770 * Replacement of spawnAdapter, we could could keep spawnAdapter as an internal feature (we know about the resource leak risk) * The new ActorContext.messageAdapter can be used in the same way as spawnAdapter but without the risk of unbounded resource leak. * Only one message transformer per class is supported to avoid unbounded growth of registered transformers in case they are registered repeatedly. That is also a way to replace transformers, if that would be needed. * It's still encouraged to register the transformers in a top level deferred, but other usages are possible. It would still be wrong to capture incoming message specific context in the transformer function. * The transformation function is run inside the actor. * One single spawnAdapter ref is used for all transformers. It wraps the message in an internal Transform message. * rename to spawnMessageAdapter and only kept it as internal in scaladsl * update chat room example to use child actors * docs --- .../actor/typed/javadsl/ActorCompile.java | 4 +- .../akka/typed/InteractionPatternsTest.java | 148 +++++++++++ .../test/java/jdocs/akka/typed/IntroTest.java | 68 +++-- .../jdocs/akka/typed/MutableIntroTest.java | 70 ++++-- .../akka/actor/typed/ActorContextSpec.scala | 20 +- .../akka/actor/typed/TypedAkkaSpec.scala | 4 +- .../typed/scaladsl/MessageAdapterSpec.scala | 235 ++++++++++++++++++ .../akka/typed/InteractionPatternsSpec.scala | 79 ++++++ .../scala/docs/akka/typed/IntroSpec.scala | 56 +++-- .../docs/akka/typed/MutableIntroSpec.scala | 61 +++-- .../testing/sync/BasicSyncTestingSpec.scala | 2 +- .../typed/internal/ActorContextImpl.scala | 70 ++++-- .../actor/typed/internal/AskResponse.scala | 4 +- .../typed/internal/InternalMessage.scala | 23 ++ .../typed/internal/adapter/ActorAdapter.scala | 37 ++- .../adapter/ActorContextAdapter.scala | 5 +- .../actor/typed/javadsl/ActorContext.scala | 43 ++-- .../actor/typed/scaladsl/ActorContext.scala | 57 ++++- .../typed/internal/ReplicatorBehavior.scala | 2 +- .../receptionist/ClusterReceptionist.scala | 16 +- .../ddata/typed/javadsl/ReplicatorTest.java | 12 +- .../ddata/typed/scaladsl/ReplicatorSpec.scala | 8 +- .../typed/ReceptionistExampleSpec.scala | 2 +- akka-docs/src/main/paradox/actors-typed.md | 32 ++- .../paradox/interaction-patterns-typed.md | 74 ++++-- .../PersistentActorCompileOnlyTest.scala | 5 +- .../akka/testkit/typed/BehaviourTestkit.scala | 8 +- .../testkit/typed/StubbedActorContext.scala | 2 +- .../testkit/typed/BehaviorTestkitSpec.scala | 6 +- 29 files changed, 939 insertions(+), 214 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index 8ad57f35ce..4af5520eb5 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -54,7 +54,7 @@ public class ActorCompile { if (msg2 instanceof MyMsgB) { ((MyMsgA) msg).replyTo.tell(((MyMsgB) msg2).greeting); - ActorRef adapter = ctx2.spawnAdapter(s -> new MyMsgB(s.toUpperCase())); + ActorRef adapter = ctx2.messageAdapter(String.class, s -> new MyMsgB(s.toUpperCase())); } return same(); }); @@ -79,7 +79,7 @@ public class ActorCompile { @Override public Behavior receiveMessage(ActorContext ctx, MyMsg msg) throws Exception { - ActorRef adapter = ctx.asJava().spawnAdapter(s -> new MyMsgB(s.toUpperCase())); + ActorRef adapter = ctx.asJava().messageAdapter(String.class, s -> new MyMsgB(s.toUpperCase())); return this; } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java index c63d8ae22e..04f6c5495d 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -6,6 +6,7 @@ package jdocs.akka.typed; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.BehaviorBuilder; import akka.actor.typed.javadsl.Behaviors; import org.junit.Test; @@ -13,6 +14,9 @@ import org.scalatest.junit.JUnitSuite; import scala.concurrent.Await; import scala.concurrent.duration.Duration; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; public class InteractionPatternsTest extends JUnitSuite { @@ -44,6 +48,150 @@ public class InteractionPatternsTest extends JUnitSuite { } // #fire-and-forget + // #adapted-response + + public static class Backend { + interface Request {} + public static class StartTranslationJob implements Request { + public final int taskId; + public final URI site; + public final ActorRef replyTo; + + public StartTranslationJob(int taskId, URI site, ActorRef replyTo) { + this.taskId = taskId; + this.site = site; + this.replyTo = replyTo; + } + } + + interface Response {} + public static class JobStarted implements Response { + public final int taskId; + + public JobStarted(int taskId) { + this.taskId = taskId; + } + } + + public static class JobProgress implements Response { + public final int taskId; + public final double progress; + + public JobProgress(int taskId, double progress) { + this.taskId = taskId; + this.progress = progress; + } + } + + public static class JobCompleted implements Response { + public final int taskId; + public final URI result; + + public JobCompleted(int taskId, URI result) { + this.taskId = taskId; + this.result = result; + } + } + + } + + public static class Frontend { + + interface Command {} + public static class Translate implements Command { + public final URI site; + public final ActorRef replyTo; + + public Translate(URI site, ActorRef replyTo) { + this.site = site; + this.replyTo = replyTo; + } + } + + private static class WrappedJobStarted implements Command { + final Backend.JobStarted response; + + public WrappedJobStarted(Backend.JobStarted response) { + this.response = response; + } + } + + private static class WrappedJobProgress implements Command { + final Backend.JobProgress response; + + public WrappedJobProgress(Backend.JobProgress response) { + this.response = response; + } + } + + private static class WrappedJobCompleted implements Command { + final Backend.JobCompleted response; + + public WrappedJobCompleted(Backend.JobCompleted response) { + this.response = response; + } + } + + private static class OtherResponse implements Command { + final Backend.Response response; + + public OtherResponse(Backend.Response response) { + this.response = response; + } + } + + public static class Translator extends Behaviors.MutableBehavior { + private final ActorContext ctx; + private final ActorRef backend; + private final ActorRef backendResponseAdapter; + + private int taskIdCounter = 0; + private Map> inProgress = new HashMap<>(); + + public Translator(ActorContext ctx, ActorRef backend) { + this.ctx = ctx; + this.backend = backend; + this.backendResponseAdapter = + ctx.messageAdapter(Backend.Response.class, rsp -> { + if (rsp instanceof Backend.JobStarted) + return new WrappedJobStarted((Backend.JobStarted) rsp); + else if (rsp instanceof Backend.JobProgress) + return new WrappedJobProgress((Backend.JobProgress) rsp); + else if (rsp instanceof Backend.JobCompleted) + return new WrappedJobCompleted((Backend.JobCompleted) rsp); + else return new OtherResponse(rsp); + }); + } + + @Override + public Behaviors.Receive createReceive() { + return receiveBuilder() + .onMessage(Translate.class, cmd -> { + taskIdCounter += 1; + inProgress.put(taskIdCounter, cmd.replyTo); + backend.tell(new Backend.StartTranslationJob( + taskIdCounter, cmd.site, backendResponseAdapter)); + return this; + }) + .onMessage(WrappedJobStarted.class, wrapped -> { + System.out.println("Started " + wrapped.response.taskId); + return this; + }) + .onMessage(WrappedJobProgress.class, wrapped -> { + System.out.println("Progress " + wrapped.response.taskId + ": " + wrapped.response.progress); + return this; + }) + .onMessage(WrappedJobCompleted.class, wrapped -> { + System.out.println("Completed " + wrapped.response.taskId + ": " + wrapped.response.result); + return this; + }) + .onMessage(OtherResponse.class, other -> Behaviors.unhandled()) + .build(); + } + } + } + // #adapted-response + @Test public void fireAndForgetSample() throws Exception { diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java index ccbd8e258b..ee3af3e274 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java @@ -13,6 +13,8 @@ import akka.actor.typed.javadsl.AskPattern; import akka.util.Timeout; //#imports +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; @@ -75,8 +77,8 @@ public class IntroTest { //#chatroom-actor public static class ChatRoom { //#chatroom-protocol - static interface Command {} - public static final class GetSession implements Command { + static interface RoomCommand {} + public static final class GetSession implements RoomCommand { public final String screenName; public final ActorRef replyTo; public GetSession(String screenName, ActorRef replyTo) { @@ -86,10 +88,10 @@ public class IntroTest { } //#chatroom-protocol //#chatroom-behavior - private static final class PostSessionMessage implements Command { + private static final class PublishSessionMessage implements RoomCommand { public final String screenName; public final String message; - public PostSessionMessage(String screenName, String message) { + public PublishSessionMessage(String screenName, String message) { this.screenName = screenName; this.message = message; } @@ -119,37 +121,65 @@ public class IntroTest { } } - public static final class PostMessage { + static interface SessionCommand {} + public static final class PostMessage implements SessionCommand { public final String message; public PostMessage(String message) { this.message = message; } } + private static final class NotifyClient implements SessionCommand { + final MessagePosted message; + NotifyClient(MessagePosted message) { + this.message = message; + } + } //#chatroom-protocol //#chatroom-behavior - public static Behavior behavior() { - return chatRoom(new ArrayList>()); + public static Behavior behavior() { + return chatRoom(new ArrayList>()); } - private static Behavior chatRoom(List> sessions) { - return Behaviors.immutable(Command.class) + private static Behavior chatRoom(List> sessions) { + return Behaviors.immutable(RoomCommand.class) .onMessage(GetSession.class, (ctx, getSession) -> { - ActorRef wrapper = ctx.spawnAdapter(p -> - new PostSessionMessage(getSession.screenName, p.message)); - getSession.replyTo.tell(new SessionGranted(wrapper)); - List> newSessions = - new ArrayList>(sessions); - newSessions.add(getSession.replyTo); + ActorRef client = getSession.replyTo; + ActorRef ses = ctx.spawn( + session(ctx.getSelf(), getSession.screenName, client), + URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name())); + // narrow to only expose PostMessage + client.tell(new SessionGranted(ses.narrow())); + List> newSessions = new ArrayList<>(sessions); + newSessions.add(ses); return chatRoom(newSessions); }) - .onMessage(PostSessionMessage.class, (ctx, post) -> { - MessagePosted mp = new MessagePosted(post.screenName, post.message); - sessions.forEach(s -> s.tell(mp)); + .onMessage(PublishSessionMessage.class, (ctx, pub) -> { + NotifyClient notification = + new NotifyClient((new MessagePosted(pub.screenName, pub.message))); + sessions.forEach(s -> s.tell(notification)); return Behaviors.same(); }) .build(); } + + public static Behavior session( + ActorRef room, + String screenName, + ActorRef client) { + return Behaviors.immutable(ChatRoom.SessionCommand.class) + .onMessage(PostMessage.class, (ctx, post) -> { + // from client, publish to others via the room + room.tell(new PublishSessionMessage(screenName, post.message)); + return Behaviors.same(); + }) + .onMessage(NotifyClient.class, (ctx, notification) -> { + // published from the room + client.tell(notification.message); + return Behaviors.same(); + }) + .build(); + } //#chatroom-behavior } @@ -185,7 +215,7 @@ public class IntroTest { //#chatroom-main Behavior main = Behaviors.deferred(ctx -> { - ActorRef chatRoom = + ActorRef chatRoom = ctx.spawn(ChatRoom.behavior(), "chatRoom"); ActorRef gabbler = ctx.spawn(Gabbler.behavior(), "gabbler"); diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MutableIntroTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MutableIntroTest.java index 8bb6da2e1d..1bb071e5e6 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MutableIntroTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MutableIntroTest.java @@ -1,7 +1,7 @@ /** * Copyright (C) 2017-2018 Lightbend Inc. */ -package jdocs.akka.actor.typed; +package jdocs.akka.typed; //#imports import akka.actor.typed.ActorRef; @@ -10,6 +10,8 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors.Receive; import akka.actor.typed.javadsl.ActorContext; //#imports +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -18,8 +20,8 @@ public class MutableIntroTest { //#chatroom-actor public static class ChatRoom { //#chatroom-protocol - static interface Command {} - public static final class GetSession implements Command { + static interface RoomCommand {} + public static final class GetSession implements RoomCommand { public final String screenName; public final ActorRef replyTo; public GetSession(String screenName, ActorRef replyTo) { @@ -29,10 +31,10 @@ public class MutableIntroTest { } //#chatroom-protocol //#chatroom-behavior - private static final class PostSessionMessage implements Command { + private static final class PublishSessionMessage implements RoomCommand { public final String screenName; public final String message; - public PostSessionMessage(String screenName, String message) { + public PublishSessionMessage(String screenName, String message) { this.screenName = screenName; this.message = message; } @@ -62,46 +64,74 @@ public class MutableIntroTest { } } - public static final class PostMessage { + static interface SessionCommand {} + public static final class PostMessage implements SessionCommand { public final String message; public PostMessage(String message) { this.message = message; } } + private static final class NotifyClient implements SessionCommand { + final MessagePosted message; + NotifyClient(MessagePosted message) { + this.message = message; + } + } //#chatroom-protocol //#chatroom-behavior - public static Behavior behavior() { + public static Behavior behavior() { return Behaviors.mutable(ChatRoomBehavior::new); } - public static class ChatRoomBehavior extends Behaviors.MutableBehavior { - final ActorContext ctx; - final List> sessions = new ArrayList>(); + public static class ChatRoomBehavior extends Behaviors.MutableBehavior { + final ActorContext ctx; + final List> sessions = new ArrayList<>(); - public ChatRoomBehavior(ActorContext ctx) { + public ChatRoomBehavior(ActorContext ctx) { this.ctx = ctx; } @Override - public Receive createReceive() { + public Receive createReceive() { return receiveBuilder() .onMessage(GetSession.class, getSession -> { - ActorRef wrapper = ctx.spawnAdapter(p -> - new PostSessionMessage(getSession.screenName, p.message)); - getSession.replyTo.tell(new SessionGranted(wrapper)); - sessions.add(getSession.replyTo); - return Behaviors.same(); + ActorRef client = getSession.replyTo; + ActorRef ses = ctx.spawn( + session(ctx.getSelf(), getSession.screenName, client), + URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name())); + // narrow to only expose PostMessage + client.tell(new SessionGranted(ses.narrow())); + sessions.add(ses); + return this; }) - .onMessage(PostSessionMessage.class, post -> { - MessagePosted mp = new MessagePosted(post.screenName, post.message); - sessions.forEach(s -> s.tell(mp)); + .onMessage(PublishSessionMessage.class, pub -> { + NotifyClient notification = + new NotifyClient((new MessagePosted(pub.screenName, pub.message))); + sessions.forEach(s -> s.tell(notification)); return this; }) .build(); } } + public static Behavior session( + ActorRef room, + String screenName, + ActorRef client) { + return Behaviors.immutable(ChatRoom.SessionCommand.class) + .onMessage(PostMessage.class, (ctx, post) -> { + // from client, publish to others via the room + room.tell(new PublishSessionMessage(screenName, post.message)); + return Behaviors.same(); + }) + .onMessage(NotifyClient.class, (ctx, notification) -> { + // published from the room + client.tell(notification.message); + return Behaviors.same(); + }) + .build(); + } //#chatroom-behavior } //#chatroom-actor diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index e72a987526..f1dafbac5a 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -164,7 +164,7 @@ object ActorContextSpec { Behaviors.same } case GetAdapter(replyTo, name) ⇒ - replyTo ! Adapter(ctx.spawnAdapter(identity, name)) + replyTo ! Adapter(ctx.spawnMessageAdapter(identity, name)) Behaviors.same } } onSignal { @@ -252,7 +252,7 @@ object ActorContextSpec { Behaviors.same } case GetAdapter(replyTo, name) ⇒ - replyTo ! Adapter(ctx.spawnAdapter(identity, name)) + replyTo ! Adapter(ctx.spawnMessageAdapter(identity, name)) Behaviors.same } } onSignal { @@ -510,7 +510,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { sync(setup("ctx03") { (ctx, startWith) ⇒ val self = ctx.self val ex = new Exception("KABOOM2") - startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) { + startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self) { case (subj, child) ⇒ val log = muteExpectedException[Exception]("KABOOM2", occurrences = 1) child ! Throw(ex) @@ -541,7 +541,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { "stop a child actor" in { sync(setup("ctx04") { (ctx, startWith) ⇒ val self = ctx.self - startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self, inert = true) { + startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self, inert = true) { case (subj, child) ⇒ subj ! Kill(child, self) child @@ -602,7 +602,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { "not stop non-child actor" in { sync(setup("ctx08") { (ctx, startWith) ⇒ val self = ctx.self - startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) { + startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self) { case (subj, child) ⇒ val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A") subj ! Kill(other, ctx.self) @@ -616,7 +616,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { "watch a child actor before its termination" in { sync(setup("ctx10") { (ctx, startWith) ⇒ val self = ctx.self - startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) { + startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self) { case (subj, child) ⇒ subj ! Watch(child, self) child @@ -632,7 +632,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { "watch a child actor after its termination" in { sync(setup("ctx11") { (ctx, startWith) ⇒ val self = ctx.self - startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep { + startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep { case (subj, child) ⇒ ctx.watch(child) child ! Stop @@ -650,7 +650,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { "unwatch a child actor before its termination" in { sync(setup("ctx12") { (ctx, startWith) ⇒ val self = ctx.self - startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep { + startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep { case (subj, child) ⇒ subj ! Watch(child, self) }.expectMessageKeep(expectTimeout) { @@ -672,7 +672,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { "terminate upon not handling Terminated" in { sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith) ⇒ val self = ctx.self - startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep { + startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep { case (subj, child) ⇒ muteExpectedException[DeathPactException]() subj ! Watch(child, self) @@ -711,7 +711,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec { sync(setup("ctx21") { (ctx, startWith) ⇒ val self = ctx.self startWith - .mkChild(Some("B"), ctx.spawnAdapter(ChildEvent), self) + .mkChild(Some("B"), ctx.spawnMessageAdapter(ChildEvent), self) .stimulate(_._1 ! GetChild("A", self), _ ⇒ Child(None)) .stimulate(_._1 ! GetChild("B", self), x ⇒ Child(Some(x._2))) .stimulate(_._1 ! GetChildren(self), x ⇒ Children(Set(x._2))) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala index a3fdc822b8..818def6739 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala @@ -6,8 +6,8 @@ import org.scalactic.TypeCheckedTripleEquals import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.Span import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } - import scala.concurrent.duration._ +import scala.util.control.NoStackTrace /** * Helper trait to include standard traits for typed tests @@ -29,3 +29,5 @@ trait TypedAkkaSpecWithShutdown extends TypedAkkaSpec { self: TestKit ⇒ override protected def afterAll(): Unit = shutdown() } + +class TestException(msg: String) extends RuntimeException(msg) with NoStackTrace diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala new file mode 100644 index 0000000000..eff51d0ffc --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala @@ -0,0 +1,235 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.actor.typed.scaladsl + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.reflect.ClassTag +import scala.util.Failure +import scala.util.Success + +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.ActorRef +import akka.actor.typed.PostStop +import akka.actor.typed.Props +import akka.actor.typed.TestException +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.testkit.EventFilter +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory + +object MessageAdapterSpec { + val config = ConfigFactory.parseString( + """ + akka.loggers = ["akka.testkit.TestEventListener"] + akka.log-dead-letters = off + ping-pong-dispatcher { + executor = thread-pool-executor + type = PinnedDispatcher + } + snitch-dispatcher { + executor = thread-pool-executor + type = PinnedDispatcher + } + """) +} + +class MessageAdapterSpec extends TestKit(MessageAdapterSpec.config) with TypedAkkaSpecWithShutdown { + + implicit val untyped = system.toUntyped // FIXME no typed event filter yet + + "Message adapters" must { + + "map messages inside the actor" in { + case class Ping(sender: ActorRef[Response]) + trait Response + case class Pong(selfName: String, threadName: String) extends Response + + case class AnotherPong(selfName: String, threadName: String) + + val pingPong = spawn(Behaviors.immutable[Ping] { (ctx, msg) ⇒ + msg.sender ! Pong(ctx.self.path.name, Thread.currentThread().getName) + Behaviors.same + }, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) + + val probe = TestProbe[AnotherPong]() + + val snitch = Behaviors.deferred[AnotherPong] { (ctx) ⇒ + + val replyTo = ctx.messageAdapter[Response](_ ⇒ + AnotherPong(ctx.self.path.name, Thread.currentThread().getName)) + pingPong ! Ping(replyTo) + + // also verify the internal spawnMessageAdapter + val replyTo2: ActorRef[Response] = ctx.spawnMessageAdapter(_ ⇒ + AnotherPong(ctx.self.path.name, Thread.currentThread().getName)) + pingPong ! Ping(replyTo2) + + Behaviors.immutable { + case (_, anotherPong: AnotherPong) ⇒ + probe.ref ! anotherPong + Behaviors.same + } + } + + spawn(snitch, "snitch", Props.empty.withDispatcherFromConfig("snitch-dispatcher")) + + val response1 = probe.expectMsgType[AnotherPong] + response1.selfName should ===("snitch") + response1.threadName should startWith("MessageAdapterSpec-snitch-dispatcher") + + // and from the spawnMessageAdapter + val response2 = probe.expectMsgType[AnotherPong] + response2.selfName should ===("snitch") + response2.threadName should startWith("MessageAdapterSpec-snitch-dispatcher") + } + + "use the right adapter" in { + trait Ping + case class Ping1(sender: ActorRef[Pong1]) extends Ping + case class Ping2(sender: ActorRef[Pong2]) extends Ping + trait Response + case class Pong1(greeting: String) extends Response + case class Pong2(greeting: String) extends Response + + case class Wrapped(qualifier: String, response: Response) + + val pingPong = spawn(Behaviors.immutable[Ping] { (_, msg) ⇒ + msg match { + case Ping1(sender) ⇒ + sender ! Pong1("hello-1") + Behaviors.same + case Ping2(sender) ⇒ + sender ! Pong2("hello-2") + Behaviors.same + } + }) + + val probe = TestProbe[Wrapped]() + + val snitch = Behaviors.deferred[Wrapped] { (ctx) ⇒ + + ctx.messageAdapter[Response](pong ⇒ Wrapped(qualifier = "wrong", pong)) // this is replaced + val replyTo1: ActorRef[Response] = ctx.messageAdapter(pong ⇒ Wrapped(qualifier = "1", pong)) + val replyTo2 = ctx.messageAdapter[Pong2](pong ⇒ Wrapped(qualifier = "2", pong)) + pingPong ! Ping1(replyTo1) + pingPong ! Ping2(replyTo2) + + Behaviors.immutable { + case (_, wrapped) ⇒ + probe.ref ! wrapped + Behaviors.same + } + } + + spawn(snitch) + + probe.expectMsg(Wrapped("1", Pong1("hello-1"))) + probe.expectMsg(Wrapped("2", Pong2("hello-2"))) + } + + "not break if wrong/unknown response type" in { + trait Ping + case class Ping1(sender: ActorRef[Pong1]) extends Ping + case class Ping2(sender: ActorRef[Pong2]) extends Ping + trait Response + case class Pong1(greeting: String) extends Response + case class Pong2(greeting: String) extends Response + + case class Wrapped(qualifier: String, response: Response) + + val pingPong = spawn(Behaviors.immutable[Ping] { (_, msg) ⇒ + msg match { + case Ping1(sender) ⇒ + sender ! Pong1("hello-1") + Behaviors.same + case Ping2(sender) ⇒ + // doing something terribly wrong + sender ! Pong2("hello-2") + Behaviors.same + } + }) + + val probe = TestProbe[Wrapped]() + + val snitch = Behaviors.deferred[Wrapped] { (ctx) ⇒ + + val replyTo1 = ctx.messageAdapter[Pong1](pong ⇒ Wrapped(qualifier = "1", pong)) + pingPong ! Ping1(replyTo1) + // doing something terribly wrong + // Pong2 message adapter not registered + pingPong ! Ping2(replyTo1.asInstanceOf[ActorRef[Pong2]]) + pingPong ! Ping1(replyTo1) + + Behaviors.immutable { + case (_, wrapped) ⇒ + probe.ref ! wrapped + Behaviors.same + } + } + + EventFilter.warning(start = "unhandled message", occurrences = 1).intercept { + spawn(snitch) + } + + probe.expectMsg(Wrapped("1", Pong1("hello-1"))) + // hello-2 discarded because it was wrong type + probe.expectMsg(Wrapped("1", Pong1("hello-1"))) + } + + "stop when exception from adapter" in { + case class Ping(sender: ActorRef[Pong]) + case class Pong(greeting: String) + case class Wrapped(count: Int, response: Pong) + + val pingPong = spawn(Behaviors.immutable[Ping] { (_, ping) ⇒ + ping.sender ! Pong("hello") + Behaviors.same + }) + + val probe = TestProbe[Any]() + + val snitch = Behaviors.deferred[Wrapped] { (ctx) ⇒ + + var count = 0 + val replyTo = ctx.messageAdapter[Pong] { pong ⇒ + count += 1 + if (count == 3) throw new TestException("boom") + else Wrapped(count, pong) + } + (1 to 4).foreach { _ ⇒ + pingPong ! Ping(replyTo) + } + + Behaviors.immutable[Wrapped] { + case (_, wrapped) ⇒ + probe.ref ! wrapped + Behaviors.same + }.onSignal { + case (_, PostStop) ⇒ + probe.ref ! "stopped" + Behaviors.same + } + } + + EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept { + EventFilter[TestException](occurrences = 1).intercept { + spawn(snitch) + } + } + + probe.expectMsg(Wrapped(1, Pong("hello"))) + probe.expectMsg(Wrapped(2, Pong("hello"))) + // exception was thrown for 3 + + // FIXME One thing to be aware of is that the supervision strategy of the Behavior is not + // used for exceptions from adapters. Should we instead catch, log, unhandled, and resume? + // It's kind of "before" the message arrives. + probe.expectMsg("stopped") + } + + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala index ba4b3aa74d..7622bfd49d 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -3,9 +3,12 @@ */ package docs.akka.typed +import java.net.URI + import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedAkkaSpecWithShutdown } import akka.actor.typed.scaladsl.Behaviors import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown { @@ -57,6 +60,82 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown { system.terminate().futureValue } + "contain a sample for adapted response" in { + // #adapted-response + + object Backend { + sealed trait Request + final case class StartTranslationJob(taskId: Int, site: URI, replyTo: ActorRef[Response]) extends Request + + sealed trait Response + final case class JobStarted(taskId: Int) extends Response + final case class JobProgress(taskId: Int, progress: Double) extends Response + final case class JobCompleted(taskId: Int, result: URI) extends Response + } + + object Frontend { + + sealed trait Command + final case class Translate(site: URI, replyTo: ActorRef[URI]) extends Command + private final case class WrappedBackendResponse(response: Backend.Response) extends Command + + def translator(backend: ActorRef[Backend.Request]): Behavior[Command] = + Behaviors.deferred[Command] { ctx ⇒ + val backendResponseMapper: ActorRef[Backend.Response] = + ctx.messageAdapter(rsp ⇒ WrappedBackendResponse(rsp)) + + def active( + inProgress: Map[Int, ActorRef[URI]], + count: Int): Behavior[Command] = { + Behaviors.immutable[Command] { (_, msg) ⇒ + msg match { + case Translate(site, replyTo) ⇒ + val taskId = count + 1 + backend ! Backend.StartTranslationJob(taskId, site, backendResponseMapper) + active(inProgress.updated(taskId, replyTo), taskId) + + case wrapped: WrappedBackendResponse ⇒ wrapped.response match { + case Backend.JobStarted(taskId) ⇒ + println(s"Started $taskId") + Behaviors.same + case Backend.JobProgress(taskId, progress) ⇒ + println(s"Progress $taskId: $progress") + Behaviors.same + case Backend.JobCompleted(taskId, result) ⇒ + println(s"Completed $taskId: $result") + inProgress(taskId) ! result + active(inProgress - taskId, count) + } + } + } + } + + active(inProgress = Map.empty, count = 0) + } + } + // #adapted-response + + val backend = spawn(Behaviors.immutable[Backend.Request] { (_, msg) ⇒ + msg match { + case Backend.StartTranslationJob(taskId, site, replyTo) ⇒ + replyTo ! Backend.JobStarted(taskId) + replyTo ! Backend.JobProgress(taskId, 0.25) + replyTo ! Backend.JobProgress(taskId, 0.50) + replyTo ! Backend.JobProgress(taskId, 0.75) + replyTo ! Backend.JobCompleted(taskId, new URI("https://akka.io/docs/sv/")) + Behaviors.same + } + + } + ) + + val frontend = spawn(Frontend.translator(backend)) + val probe = TestProbe[URI]() + frontend ! Frontend.Translate(new URI("https://akka.io/docs/"), probe.ref) + probe.expectMsg(new URI("https://akka.io/docs/sv/")) + + } + } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala index b3d8866b16..79508df7d8 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala @@ -4,6 +4,9 @@ package docs.akka.typed //#imports +import java.net.URLEncoder +import java.nio.charset.StandardCharsets + import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -11,7 +14,6 @@ import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.testkit.typed.TestKit - import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ @@ -37,13 +39,13 @@ object IntroSpec { //#chatroom-actor object ChatRoom { //#chatroom-protocol - sealed trait Command + sealed trait RoomCommand final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) - extends Command + extends RoomCommand //#chatroom-protocol //#chatroom-behavior - private final case class PostSessionMessage(screenName: String, message: String) - extends Command + private final case class PublishSessionMessage(screenName: String, message: String) + extends RoomCommand //#chatroom-behavior //#chatroom-protocol @@ -52,25 +54,45 @@ object IntroSpec { final case class SessionDenied(reason: String) extends SessionEvent final case class MessagePosted(screenName: String, message: String) extends SessionEvent - final case class PostMessage(message: String) + trait SessionCommand + final case class PostMessage(message: String) extends SessionCommand + private final case class NotifyClient(message: MessagePosted) extends SessionCommand //#chatroom-protocol //#chatroom-behavior - val behavior: Behavior[Command] = + val behavior: Behavior[RoomCommand] = chatRoom(List.empty) - private def chatRoom(sessions: List[ActorRef[SessionEvent]]): Behavior[Command] = - Behaviors.immutable[Command] { (ctx, msg) ⇒ + private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] = + Behaviors.immutable[RoomCommand] { (ctx, msg) ⇒ msg match { case GetSession(screenName, client) ⇒ - val wrapper = ctx.spawnAdapter { - p: PostMessage ⇒ PostSessionMessage(screenName, p.message) - } - client ! SessionGranted(wrapper) - chatRoom(client :: sessions) - case PostSessionMessage(screenName, message) ⇒ - val mp = MessagePosted(screenName, message) - sessions foreach (_ ! mp) + // create a child actor for further interaction with the client + val ses = ctx.spawn( + session(ctx.self, screenName, client), + name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name)) + client ! SessionGranted(ses) + chatRoom(ses :: sessions) + case PublishSessionMessage(screenName, message) ⇒ + val notification = NotifyClient(MessagePosted(screenName, message)) + sessions foreach (_ ! notification) + Behaviors.same + } + } + + private def session( + room: ActorRef[PublishSessionMessage], + screenName: String, + client: ActorRef[SessionEvent]): Behavior[SessionCommand] = + Behaviors.immutable { (ctx, msg) ⇒ + msg match { + case PostMessage(message) ⇒ + // from client, publish to others via the room + room ! PublishSessionMessage(screenName, message) + Behaviors.same + case NotifyClient(message) ⇒ + // published from the room + client ! message Behaviors.same } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala index b18b5fbc17..0a32a0386e 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala @@ -4,11 +4,13 @@ package docs.akka.typed //#imports +import java.net.URLEncoder +import java.nio.charset.StandardCharsets + import akka.actor.typed._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.ActorContext import akka.testkit.typed.TestKit - import scala.concurrent.duration._ import scala.concurrent.Await //#imports @@ -18,13 +20,13 @@ object MutableIntroSpec { //#chatroom-actor object ChatRoom { //#chatroom-protocol - sealed trait Command + sealed trait RoomCommand final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) - extends Command + extends RoomCommand //#chatroom-protocol //#chatroom-behavior - private final case class PostSessionMessage(screenName: String, message: String) - extends Command + private final case class PublishSessionMessage(screenName: String, message: String) + extends RoomCommand //#chatroom-behavior //#chatroom-protocol @@ -33,33 +35,52 @@ object MutableIntroSpec { final case class SessionDenied(reason: String) extends SessionEvent final case class MessagePosted(screenName: String, message: String) extends SessionEvent - final case class PostMessage(message: String) + trait SessionCommand + final case class PostMessage(message: String) extends SessionCommand + private final case class NotifyClient(message: MessagePosted) extends SessionCommand //#chatroom-protocol //#chatroom-behavior - def behavior(): Behavior[Command] = - Behaviors.mutable[Command](ctx ⇒ new ChatRoomBehavior(ctx)) + def behavior(): Behavior[RoomCommand] = + Behaviors.mutable[RoomCommand](ctx ⇒ new ChatRoomBehavior(ctx)) - class ChatRoomBehavior(ctx: ActorContext[Command]) extends Behaviors.MutableBehavior[Command] { - private var sessions: List[ActorRef[SessionEvent]] = List.empty + class ChatRoomBehavior(ctx: ActorContext[RoomCommand]) extends Behaviors.MutableBehavior[RoomCommand] { + private var sessions: List[ActorRef[SessionCommand]] = List.empty - override def onMessage(msg: Command): Behavior[Command] = { + override def onMessage(msg: RoomCommand): Behavior[RoomCommand] = { msg match { case GetSession(screenName, client) ⇒ - val wrapper = ctx.spawnAdapter { - p: PostMessage ⇒ PostSessionMessage(screenName, p.message) - } - client ! SessionGranted(wrapper) - sessions = client :: sessions + // create a child actor for further interaction with the client + val ses = ctx.spawn( + session(ctx.self, screenName, client), + name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name)) + client ! SessionGranted(ses) + sessions = ses :: sessions this - case PostSessionMessage(screenName, message) ⇒ - val mp = MessagePosted(screenName, message) - sessions foreach (_ ! mp) + case PublishSessionMessage(screenName, message) ⇒ + val notification = NotifyClient(MessagePosted(screenName, message)) + sessions foreach (_ ! notification) this } } - } + + private def session( + room: ActorRef[PublishSessionMessage], + screenName: String, + client: ActorRef[SessionEvent]): Behavior[SessionCommand] = + Behaviors.immutable { (ctx, msg) ⇒ + msg match { + case PostMessage(message) ⇒ + // from client, publish to others via the room + room ! PublishSessionMessage(screenName, message) + Behaviors.same + case NotifyClient(message) ⇒ + // published from the room + client ! message + Behaviors.same + } + } //#chatroom-behavior } //#chatroom-actor diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/sync/BasicSyncTestingSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/sync/BasicSyncTestingSpec.scala index 9fc6000846..bc213c300c 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/sync/BasicSyncTestingSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/sync/BasicSyncTestingSpec.scala @@ -91,7 +91,7 @@ class BasicSyncTestingSpec extends WordSpec with Matchers { val testKit = BehaviorTestkit(myBehaviour) testKit.run(SayHelloToAnonymousChild) // Anonymous actors are created as: $a $b etc - val childInbox = testKit.childInbox[String]("$a") + val childInbox = testKit.childInbox[String](s"$$a") childInbox.expectMsg("hello stranger") //#test-child-message-anonymous } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 871621a49d..dcef4a0185 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -4,21 +4,30 @@ package akka.actor.typed package internal +import java.util.function.{ Function ⇒ JFunction } +import java.util.ArrayList +import java.util.Optional +import java.util.function import java.util.function.BiFunction -import java.util.{ ArrayList, Optional, function } - -import akka.annotation.InternalApi -import akka.util.Timeout import scala.concurrent.ExecutionContextExecutor import scala.reflect.ClassTag -import scala.util.{ Failure, Success, Try } +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import akka.annotation.InternalApi +import akka.util.OptionVal +import akka.util.Timeout /** * INTERNAL API */ @InternalApi private[akka] trait ActorContextImpl[T] extends ActorContext[T] with javadsl.ActorContext[T] with scaladsl.ActorContext[T] { + private var messageAdapterRef: OptionVal[ActorRef[Any]] = OptionVal.None + private var _messageAdapters: List[(Class[_], Any ⇒ T)] = Nil + override def asJava: javadsl.ActorContext[T] = this override def asScala: scaladsl.ActorContext[T] = this @@ -55,18 +64,6 @@ import scala.util.{ Failure, Success, Try } override def spawnAnonymous[U](behavior: akka.actor.typed.Behavior[U]): akka.actor.typed.ActorRef[U] = spawnAnonymous(behavior, Props.empty) - override def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = - internalSpawnAdapter(f, name) - - override def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = - internalSpawnAdapter(f, "") - - override def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.actor.typed.ActorRef[U] = - internalSpawnAdapter(f.apply, "") - - override def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.actor.typed.ActorRef[U] = - internalSpawnAdapter(f.apply, name) - // Scala API impl override def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { import akka.actor.typed.scaladsl.AskPattern._ @@ -83,10 +80,45 @@ import scala.util.{ Failure, Success, Try } }(responseTimeout, ClassTag[Res](resClass)) } + private[akka] override def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = + internalSpawnMessageAdapter(f, name) + + private[akka] override def spawnMessageAdapter[U](f: U ⇒ T): ActorRef[U] = + internalSpawnMessageAdapter(f, name = "") + /** - * INTERNAL API: Needed to make Scala 2.12 compiler happy. + * INTERNAL API: Needed to make Scala 2.12 compiler happy if spawnMessageAdapter is overloaded for scaladsl/javadsl. * Otherwise "ambiguous reference to overloaded definition" because Function is lambda. */ - @InternalApi private[akka] def internalSpawnAdapter[U](f: U ⇒ T, _name: String): ActorRef[U] + @InternalApi private[akka] def internalSpawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] + + override def messageAdapter[U: ClassTag](f: U ⇒ T): ActorRef[U] = { + val messageClass = implicitly[ClassTag[U]].runtimeClass.asInstanceOf[Class[U]] + internalMessageAdapter(messageClass, f) + } + + override def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U] = + internalMessageAdapter(messageClass, f.apply) + + private def internalMessageAdapter[U](messageClass: Class[U], f: U ⇒ T): ActorRef[U] = { + // replace existing adapter for same class, only one per class is supported to avoid unbounded growth + // in case "same" adapter is added repeatedly + _messageAdapters = (messageClass, f.asInstanceOf[Any ⇒ T]) :: + _messageAdapters.filterNot { case (cls, _) ⇒ cls == messageClass } + val ref = messageAdapterRef match { + case OptionVal.Some(ref) ⇒ ref.asInstanceOf[ActorRef[U]] + case OptionVal.None ⇒ + // AdaptMessage is not really a T, but that is erased + val ref = internalSpawnMessageAdapter[Any](msg ⇒ AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter") + messageAdapterRef = OptionVal.Some(ref) + ref + } + ref.asInstanceOf[ActorRef[U]] + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] def messageAdapters: List[(Class[_], Any ⇒ T)] = _messageAdapters } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala index 10ab7416da..7daf93a3d5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala @@ -13,7 +13,7 @@ import scala.util.Try * Message wrapper used to allow ActorContext.ask to map the response inside the asking actor. */ @InternalApi -private[akka] final class AskResponse[T, U](result: Try[T], adapt: Try[T] ⇒ U) { +private[akka] final class AskResponse[U, T](result: Try[U], adapt: Try[U] ⇒ T) { - def adapted: U = adapt(result) + def adapted: T = adapt(result) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala new file mode 100644 index 0000000000..f545b29984 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.actor.typed.internal + +import akka.annotation.InternalApi + +/** + * INTERNAL API: Wrapping of messages that should be adapted by + * adapters registered with `ActorContext.messageAdapter`. + */ +@InternalApi private[akka] final case class AdaptWithRegisteredMessageAdapter[U](msg: U) + +/** + * INTERNAL API: Wrapping of messages that should be adapted by the included + * function. Used by `ActorContext.spawnMessageAdapter` so that the function is + * applied in the "parent" actor (for better thread safetey).. + */ +@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapt: U ⇒ T) { + def adapted: T = adapt(msg) +} + +// FIXME move AskResponse in other PR diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 44f38561f6..555eb2362a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -5,6 +5,8 @@ package akka.actor.typed package internal package adapter +import scala.annotation.tailrec + import akka.{ actor ⇒ a } import akka.annotation.InternalApi import akka.util.OptionVal @@ -35,9 +37,23 @@ import akka.util.OptionVal next(Behavior.interpretSignal(behavior, ctx, msg), msg) case a.ReceiveTimeout ⇒ next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg) - case msg: AskResponse[AnyRef, T] @unchecked ⇒ receive(msg.adapted) + case wrapped: AskResponse[Any, T] @unchecked ⇒ + handleMessage(wrapped.adapted) + case wrapped: AdaptMessage[Any, T] @unchecked ⇒ + wrapped.adapted match { + case AdaptWithRegisteredMessageAdapter(msg) ⇒ + adaptAndHandle(msg) + case msg: T @unchecked ⇒ + handleMessage(msg) + } + case AdaptWithRegisteredMessageAdapter(msg) ⇒ + adaptAndHandle(msg) case msg: T @unchecked ⇒ - next(Behavior.interpretMessage(behavior, ctx, msg), msg) + handleMessage(msg) + } + + private def handleMessage(msg: T): Unit = { + next(Behavior.interpretMessage(behavior, ctx, msg), msg) } private def next(b: Behavior[T], msg: Any): Unit = { @@ -63,6 +79,23 @@ import akka.util.OptionVal } } + private def adaptAndHandle(msg: Any): Unit = { + @tailrec def handle(adapters: List[(Class[_], Any ⇒ T)]): Unit = { + adapters match { + case Nil ⇒ + // no adapter function registered for message class + unhandled(msg) + case (clazz, f) :: tail ⇒ + if (clazz.isAssignableFrom(msg.getClass)) { + val adaptedMsg = f(msg) + handleMessage(adaptedMsg) + } else + handle(tail) // recursive + } + } + handle(ctx.messageAdapters) + } + override def unhandled(msg: Any): Unit = msg match { case Terminated(ref) ⇒ throw a.DeathPactException(toUntyped(ref)) case msg: Signal ⇒ // that's ok diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index da08aa1276..cdd2fb9b00 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -58,9 +58,10 @@ import akka.actor.typed.Behavior.UntypedBehavior import untyped.dispatcher untyped.system.scheduler.scheduleOnce(delay, toUntyped(target), msg) } - override private[akka] def internalSpawnAdapter[U](f: U ⇒ T, _name: String): ActorRef[U] = { + override private[akka] def internalSpawnMessageAdapter[U](f: U ⇒ T, _name: String): ActorRef[U] = { val cell = untyped.asInstanceOf[akka.actor.ActorCell] - val ref = cell.addFunctionRef((_, msg) ⇒ untyped.self ! f(msg.asInstanceOf[U]), _name) + // apply the function inside the actor by wrapping the msg and f, handled by ActorAdapter + val ref = cell.addFunctionRef((_, msg) ⇒ untyped.self ! AdaptMessage[U, T](msg.asInstanceOf[U], f), _name) ActorRefAdapter[U](ref) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 356d4a7ad3..9cdabd470f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -152,24 +152,29 @@ trait ActorContext[T] { def getExecutionContext: ExecutionContextExecutor /** - * Create a child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. + * Create a message adapter that will convert or wrap messages such that other Actor’s + * protocols can be ingested by this Actor. * - * The name of the child actor will be composed of a unique identifier - * starting with a dollar sign to which the given `name` argument is - * appended, with an inserted hyphen between these two parts. Therefore - * the given `name` argument does not need to be unique within the scope - * of the parent actor. + * You can register several message adapters for different message classes. + * It's only possible to have one message adapter per message class to make sure + * that the number of adapters are not growing unbounded if registered repeatedly. + * That also means that a registered adapter will replace an existing adapter for + * the same message class. + * + * A message adapter will be used if the message class matches the given class or + * is a subclass thereof. The registered adapters are tried in reverse order of + * their registration order, i.e. the last registered first. + * + * A message adapter (and the returned `ActorRef`) has the same lifecycle as + * this actor. It's recommended to register the adapters in a top level + * `Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to + * register them later also if needed. Message adapters don't have to be stopped since + * they consume no resources other than an entry in an internal `Map` and the number + * of adapters are bounded since it's only possible to have one per message class. + * * + * The function is running in this actor and can safely access state of it. */ - def spawnAdapter[U](f: JFunction[U, T], name: String): ActorRef[U] - - /** - * Create an anonymous child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. - */ - def spawnAdapter[U](f: JFunction[U, T]): ActorRef[U] + def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U] /** * Perform a single request-response message interaction with another actor, and transform the messages back to @@ -178,9 +183,9 @@ trait ActorContext[T] { * The interaction has a timeout (to avoid a resource leak). If the timeout hits without any response it * will be passed as an [[java.util.concurrent.TimeoutException]] to the `applyToResponse` function. * - * For other messaging patterns with other actors, see [[spawnAdapter]]. + * For other messaging patterns with other actors, see [[ActorContext#messageAdapter]]. * - * @param createREquest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that + * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that * the other actor can send a message back through. * @param applyToResponse Transforms the response from the `otherActor` into a message this actor understands. * Will be invoked with either the response message or an AskTimeoutException failed or @@ -198,7 +203,7 @@ trait ActorContext[T] { resClass: Class[Res], otherActor: ActorRef[Req], responseTimeout: Timeout, - createREquest: java.util.function.Function[ActorRef[Res], Req], + createRequest: java.util.function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 27d675d89a..9785a11821 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -5,14 +5,15 @@ package akka.actor.typed.scaladsl import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag +import scala.util.Try + +import akka.actor.typed._ import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit -import akka.actor.typed._ +import akka.annotation.InternalApi import akka.util.Timeout -import scala.reflect.ClassTag -import scala.util.{ Success, Failure, Try } - /** * An Actor is given by the combination of a [[Behavior]] and a context in * which this behavior is executed. As per the Actor Model an Actor can perform @@ -138,24 +139,54 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ implicit def executionContext: ExecutionContextExecutor /** - * Create a child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. + * INTERNAL API: It is currently internal because it's too easy to create + * resource leaks by spawning adapters without stopping them. `messageAdapter` + * is the public API. + * + * Create a "lightweight" child actor that will convert or wrap messages such that + * other Actor’s protocols can be ingested by this Actor. You are strongly advised + * to cache these ActorRefs or to stop them when no longer needed. * * The name of the child actor will be composed of a unique identifier * starting with a dollar sign to which the given `name` argument is * appended, with an inserted hyphen between these two parts. Therefore * the given `name` argument does not need to be unique within the scope * of the parent actor. + * + * The function is applied inside the "parent" actor and can safely access + * state of the "parent". */ - def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] + @InternalApi private[akka] def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] /** - * Create an anonymous child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. + * INTERNAL API: See `spawnMessageAdapter` with name parameter */ - def spawnAdapter[U](f: U ⇒ T): ActorRef[U] + @InternalApi private[akka] def spawnMessageAdapter[U](f: U ⇒ T): ActorRef[U] + + /** + * Create a message adapter that will convert or wrap messages such that other Actor’s + * protocols can be ingested by this Actor. + * + * You can register several message adapters for different message classes. + * It's only possible to have one message adapter per message class to make sure + * that the number of adapters are not growing unbounded if registered repeatedly. + * That also means that a registered adapter will replace an existing adapter for + * the same message class. + * + * A message adapter will be used if the message class matches the given class or + * is a subclass thereof. The registered adapters are tried in reverse order of + * their registration order, i.e. the last registered first. + * + * A message adapter (and the returned `ActorRef`) has the same lifecycle as + * this actor. It's recommended to register the adapters in a top level + * `Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to + * register them later also if needed. Message adapters don't have to be stopped since + * they consume no resources other than an entry in an internal `Map` and the number + * of adapters are bounded since it's only possible to have one per message class. + * * + * The function is running in this actor and can safely access state of it. + */ + def messageAdapter[U: ClassTag](f: U ⇒ T): ActorRef[U] /** * Perform a single request-response message interaction with another actor, and transform the messages back to @@ -165,7 +196,7 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ * will be passed as a `Failure(`[[java.util.concurrent.TimeoutException]]`)` to the `mapResponse` function * (this is the only "normal" way a `Failure` is passed to the function). * - * For other messaging patterns with other actors, see [[spawnAdapter]]. + * For other messaging patterns with other actors, see [[ActorContext#messageAdapter]]. * * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that * the other actor can send a message back through. diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala index efd070e71a..e931dd1f16 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala @@ -120,7 +120,7 @@ import akka.actor.typed.Terminated // For the Java API the Changed messages must be mapped to the JReplicator.Changed class. // That is done with an adapter, and we have to keep track of the lifecycle of the original // subscriber and stop the adapter when the original subscriber is stopped. - val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnAdapter { + val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnMessageAdapter { chg ⇒ InternalChanged(chg, cmd.subscriber) } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index d2cf817511..599e884a3f 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -107,15 +107,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { } val adapter: ActorRef[Replicator.ReplicatorMessage] = - ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage) ⇒ - x match { - case changed @ Replicator.Changed(ReceptionistKey) ⇒ - val value = changed.get(ReceptionistKey) - val oldState = state - val newState = ServiceRegistry(value) - val changes = diff(oldState, newState) - externalInterface.RegistrationsChangedExternally(changes, newState) - } + ctx.messageAdapter[Replicator.ReplicatorMessage] { + case changed @ Replicator.Changed(ReceptionistKey) ⇒ + val value = changed.get(ReceptionistKey) + val oldState = state + val newState = ServiceRegistry(value) + val changes = diff(oldState, newState) + externalInterface.RegistrationsChangedExternally(changes, newState) } replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped) diff --git a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java index d7edcda3f7..cb4dafb21f 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java @@ -95,11 +95,17 @@ public class ReplicatorTest extends JUnitSuite { this.replicator = replicator; this.node = node; - updateResponseAdapter = ctx.spawnAdapter(InternalUpdateResponse::new); + updateResponseAdapter = ctx.messageAdapter( + (Class>) (Object) Replicator.UpdateResponse.class, + msg -> new InternalUpdateResponse(msg)); - getResponseAdapter = ctx.spawnAdapter(InternalGetResponse::new); + getResponseAdapter = ctx.messageAdapter( + (Class>) (Object) Replicator.GetResponse.class, + msg -> new InternalGetResponse(msg)); - changedAdapter = ctx.spawnAdapter(InternalChanged::new); + changedAdapter = ctx.messageAdapter( + (Class>) (Object) Replicator.Changed.class, + msg -> new InternalChanged(msg)); replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter)); } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index 9df8fe86c7..c4d156d610 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -24,6 +24,7 @@ object ReplicatorSpec { val config = ConfigFactory.parseString( """ + akka.loglevel = DEBUG akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 @@ -43,14 +44,15 @@ object ReplicatorSpec { def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] = Behaviors.deferred[ClientCommand] { ctx ⇒ + val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] = - ctx.spawnAdapter(InternalUpdateResponse.apply) + ctx.messageAdapter(InternalUpdateResponse.apply) val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] = - ctx.spawnAdapter(InternalGetResponse.apply) + ctx.messageAdapter(InternalGetResponse.apply) val changedAdapter: ActorRef[Replicator.Changed[GCounter]] = - ctx.spawnAdapter(InternalChanged.apply) + ctx.messageAdapter(InternalChanged.apply) replicator ! Replicator.Subscribe(Key, changedAdapter) diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala index 8d28b70be3..a28b45c67c 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala @@ -51,7 +51,7 @@ object RandomRouter { val cluster = Cluster(ctx.system) // typically you have to map such external messages into this // actor's protocol with a message adapter - val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.spawnAdapter(WrappedReachabilityEvent.apply) + val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply) cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent]) def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] = diff --git a/akka-docs/src/main/paradox/actors-typed.md b/akka-docs/src/main/paradox/actors-typed.md index 270579fbc4..0ab221efcd 100644 --- a/akka-docs/src/main/paradox/actors-typed.md +++ b/akka-docs/src/main/paradox/actors-typed.md @@ -140,7 +140,7 @@ In the next section we demonstrate this on a more realistic example. The next example demonstrates some important patterns: * Using a sealed trait and case class/objects to represent multiple messages an actor can receive -* Handle incoming messages of different types by using `adapter`s +* Handle sessions by using child actors * Handling state by changing behavior * Using multiple typed actors to represent different parts of a protocol in a type safe way @@ -182,33 +182,31 @@ When a new `GetSession` command comes in we add that client to the list that is in the returned behavior. Then we also need to create the session’s `ActorRef` that will be used to post messages. In this case we want to create a very simple Actor that just repackages the `PostMessage` -command into a `PostSessionMessage` command which also includes the -screen name. Such a wrapper Actor can be created by using the -`spawnAdapter` method on the `ActorContext`, so that we can then -go on to reply to the client with the `SessionGranted` result. +command into a `PublishSessionMessage` command which also includes the +screen name. -The behavior that we declare here can handle both subtypes of `Command`. +The behavior that we declare here can handle both subtypes of `RoomCommand`. `GetSession` has been explained already and the -`PostSessionMessage` commands coming from the wrapper Actors will +`PublishSessionMessage` commands coming from the session Actors will trigger the dissemination of the contained chat room message to all connected clients. But we do not want to give the ability to send -`PostSessionMessage` commands to arbitrary clients, we reserve that -right to the wrappers we create—otherwise clients could pose as completely +`PublishSessionMessage` commands to arbitrary clients, we reserve that +right to the internal session actors we create—otherwise clients could pose as completely different screen names (imagine the `GetSession` protocol to include -authentication information to further secure this). Therefore `PostSessionMessage` -has `private` visibility and can't be created outside the actor. +authentication information to further secure this). Therefore `PublishSessionMessage` +has `private` visibility and can't be created outside the `ChatRoom` @scala[object]@java[class]. If we did not care about securing the correspondence between a session and a screen name then we could change the protocol such that `PostMessage` is -removed and all clients just get an @scala[`ActorRef[PostSessionMessage]`]@java[`ActorRef`] to -send to. In this case no wrapper would be needed and we could just use +removed and all clients just get an @scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef`] to +send to. In this case no session actor would be needed and we could just use @scala[`ctx.self`]@java[`ctx.getSelf()`]. The type-checks work out in that case because @scala[`ActorRef[-T]`]@java[`ActorRef`] is contravariant in its type parameter, meaning that we -can use a @scala[`ActorRef[Command]`]@java[`ActorRef`] wherever an -@scala[`ActorRef[PostSessionMessage]`]@java[`ActorRef`] is needed—this makes sense because the +can use a @scala[`ActorRef[RoomCommand]`]@java[`ActorRef`] wherever an +@scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef`] is needed—this makes sense because the former simply speaks more languages than the latter. The opposite would be -problematic, so passing an @scala[`ActorRef[PostSessionMessage]`]@java[`ActorRef`] where -@scala[`ActorRef[Command]`]@java[`ActorRef`] is required will lead to a type error. +problematic, so passing an @scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef`] where +@scala[`ActorRef[RoomCommand]`]@java[`ActorRef`] is required will lead to a type error. ### Trying it out diff --git a/akka-docs/src/main/paradox/interaction-patterns-typed.md b/akka-docs/src/main/paradox/interaction-patterns-typed.md index 3fc924f14a..7490935ea1 100644 --- a/akka-docs/src/main/paradox/interaction-patterns-typed.md +++ b/akka-docs/src/main/paradox/interaction-patterns-typed.md @@ -1,4 +1,4 @@ -# Typed Actor Interaction Patterns +# Interaction Patterns Interacting with an Actor in Akka Typed is done through an @scala[`ActorRef[T]`]@java[`ActorRef`] where `T` is the type of messages the actor accepts, also known as the "protocol". This ensures that only the right kind of messages can be sent to an actor and also ensures no access to the Actor instance internals is available to anyone else but the Actor itself. @@ -16,13 +16,13 @@ Scala Java : @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget } -**Scenarios fire and forget is useful:** +**Useful when:** * When it is not critical to be sure that the message was processed * When there is no way to act on non successful delivery or processing * When we want to minimize the number of messages created to get higher throughput -**Problems with fire and forget:** +**Problems:** * Consistently higher rates of fire and forget to an actor than it process will make the inbox fill up and can in the worst case cause the JVM crash with an `OutOfMemoryError` * If the message got lost, we will not notice @@ -33,33 +33,60 @@ In many interactions a request is followed by a response back from the actor. In TODO sample -**Scenarios where request response with tell is useful:** +**Useful when:** * Subscribing to an actor that will send many response messages (of the same protocol) back * When communicating between a parent and its children, where the protocol can be made include the messages for the interaction * ??? -**Problems request-response:** +**Problems:** * Often the response that the other actor wants to send back is not a part of the sending actor's protocol (see adapted request response or ask) * It is hard to detect and that a message request was not delivered or processed (see ask) * Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor -## Adapted Request-Response +## Adapted Response -Very often the receiving does not, and should not be made, know of the protocol of the sending actor, and will respond with one or more messages that the sending actor cannot receive. +Very often the receiving actor does not, and should, know of the protocol of the sending actor, and +will respond with one or more messages that the sending actor cannot receive. -TODO sample +Scala +: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #adapted-response } -**Scenarios where Adapted Request-Response is useful:** +Java +: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #adapted-response } - * Subscribing to an actor that will send many response messages back +You can register several message adapters for different message classes. +It's only possible to have one message adapter per message class to make sure +that the number of adapters are not growing unbounded if registered repeatedly. +That also means that a registered adapter will replace an existing adapter for +the same message class. + +A message adapter will be used if the message class matches the given class or +is a subclass thereof. The registered adapters are tried in reverse order of +their registration order, i.e. the last registered first. + +A message adapter (and the returned `ActorRef`) has the same lifecycle as +the receiving actor. It's recommended to register the adapters in a top level +`Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to +register them later also if needed. + +The function is running in the receiving actor and can safely access state of it. + +**Useful when:** + + * Subscribing to an actor that will send many response messages back + * Translating between different actor message protocols -**Problems with adapted request-response:** +**Problems:** - * It is hard to detect and that a message request was not delivered or processed (see ask) - * Only one adaption can be made per response message type, if a new one is registered the old one is replaced, for example different target actors can't have different adaption if they use the same response types, unless some correlation is encoded in the messages - * Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor + * It is hard to detect that a message request was not delivered or processed (see ask) + * Only one adaption can be made per response message type, if a new one is registered the old one is replaced, + for example different target actors can't have different adaption if they use the same response types, unless some + correlation is encoded in the messages + * Unless the protocol already includes a way to provide context, for example a request id that is also sent in the + response, it is not possible to tie an interaction to some specific context without introducing a new, + separate, actor ## 1:1 Request-Response with ask between two actors @@ -70,8 +97,9 @@ The interaction has two steps, first we need to construct the outgoing message, TODO sample +The function is running in the receiving actor and can safely access state of it. -**Scenarios where ask is useful:** +**Useful when:** * Single response queries * When an actor needs to know that the message was processed before continuing @@ -79,7 +107,7 @@ TODO sample * To keep track of outstanding requests and not overwhelm a recipient with messages (simple backpressure) * When some context should be attached to the interaction but the protocol does not support that (request id, what query the response was for) -**Problems with ask:** +**Problems:** * There can only be a single response to one `ask` * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact @@ -92,12 +120,12 @@ In an interaction where there is a 1:1 mapping between a request and a response TODO sample -**Scenarios where this ask variant is useful:** +**Useful when:** * Single response queries where the response should be passed on to some other actor * ??? -**Problems with ask:** +**Problems:** * There can only be a single response to one `ask` * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact @@ -109,12 +137,14 @@ Keeping context for an interaction, or multiple interactions can be done by movi TODO -**Scenarios where per session child actor is useful:** +**Useful when:** - * A single incoming request should result in multiple interactions with other actions before a result can be built + * A single incoming request should result in multiple interactions with other actors before a result can be built, + for example aggregation of several results + * Handle acknowledgement and retry messages for at-least-once delivery * ??? -**Problems with ask:** +**Problems:** * Children have lifecycles that must be managed to not create a resource leak - * ??? \ No newline at end of file + * ??? diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 0a9f91c7a8..ee8edc13e3 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -273,7 +273,7 @@ object PersistentActorCompileOnlyTest { var basket = Basket(Nil) var stash: Seq[Command] = Nil - val adapt = ctx.spawnAdapter((m: MetaData) ⇒ GotMetaData(m)) + val adapt = ctx.messageAdapter((m: MetaData) ⇒ GotMetaData(m)) def addItem(id: Id, self: ActorRef[Command]) = Effect @@ -316,8 +316,7 @@ object PersistentActorCompileOnlyTest { case ItemAdded(id) ⇒ id +: state case ItemRemoved(id) ⇒ state.filter(_ != id) }).onRecoveryCompleted((ctx, state) ⇒ { - val ad = ctx.spawnAdapter((m: MetaData) ⇒ GotMetaData(m)) - state.foreach(id ⇒ metadataRegistry ! GetMetaData(id, ad)) + state.foreach(id ⇒ metadataRegistry ! GetMetaData(id, adapt)) }) } } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala index c250191199..6432d69360 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala @@ -59,12 +59,12 @@ object Effect { ref } - override def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = { - spawnAdapter(f, "") + override def spawnMessageAdapter[U](f: U ⇒ T): ActorRef[U] = { + spawnMessageAdapter(f, "") } - override def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = { - val ref = super.spawnAdapter(f, name) + override def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = { + val ref = super.spawnMessageAdapter(f, name) effectQueue.offer(SpawnedAdapter) ref } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala index b7285382bb..113bbdce8d 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala @@ -104,7 +104,7 @@ private[akka] final class FunctionRef[-T]( /** * INTERNAL API */ - @InternalApi private[akka] def internalSpawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = { + @InternalApi private[akka] def internalSpawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = { val n = if (name != "") s"${childName.next()}-$name" else childName.next() val i = TestInbox[U](n) diff --git a/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala b/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala index 37d5fb98de..2cb06a56f2 100644 --- a/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala +++ b/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala @@ -50,12 +50,12 @@ object BehaviorTestkitSpec { } Behaviors.same case SpawnAdapter ⇒ - ctx.spawnAdapter { + ctx.spawnMessageAdapter { r: Reproduce ⇒ SpawnAnonymous(r.times) } Behaviors.same case SpawnAdapterWithName(name) ⇒ - ctx.spawnAdapter({ + ctx.spawnMessageAdapter({ r: Reproduce ⇒ SpawnAnonymous(r.times) }, name) Behaviors.same @@ -115,7 +115,7 @@ class BehaviorTestkitSpec extends WordSpec with Matchers { } } - "BehaviorTestkit's spawnAdapter" must { + "BehaviorTestkit's spawnMessageAdapter" must { "create adapters without name and record effects" in { val testkit = BehaviorTestkit[Father.Command](Father.init()) testkit.run(SpawnAdapter)