Merge pull request #24344 from akka/wip-23770-response2-patriknw

Message adapter, without risk of resource leak, #23770
This commit is contained in:
Patrik Nordwall 2018-01-23 10:49:13 +01:00 committed by GitHub
commit 37484476df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 939 additions and 214 deletions

View file

@ -54,7 +54,7 @@ public class ActorCompile {
if (msg2 instanceof MyMsgB) {
((MyMsgA) msg).replyTo.tell(((MyMsgB) msg2).greeting);
ActorRef<String> adapter = ctx2.spawnAdapter(s -> new MyMsgB(s.toUpperCase()));
ActorRef<String> adapter = ctx2.messageAdapter(String.class, s -> new MyMsgB(s.toUpperCase()));
}
return same();
});
@ -79,7 +79,7 @@ public class ActorCompile {
@Override
public Behavior<MyMsg> receiveMessage(ActorContext<MyMsg> ctx, MyMsg msg) throws Exception {
ActorRef<String> adapter = ctx.asJava().spawnAdapter(s -> new MyMsgB(s.toUpperCase()));
ActorRef<String> adapter = ctx.asJava().messageAdapter(String.class, s -> new MyMsgB(s.toUpperCase()));
return this;
}

View file

@ -6,6 +6,7 @@ package jdocs.akka.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.BehaviorBuilder;
import akka.actor.typed.javadsl.Behaviors;
import org.junit.Test;
@ -13,6 +14,9 @@ import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class InteractionPatternsTest extends JUnitSuite {
@ -44,6 +48,150 @@ public class InteractionPatternsTest extends JUnitSuite {
}
// #fire-and-forget
// #adapted-response
public static class Backend {
interface Request {}
public static class StartTranslationJob implements Request {
public final int taskId;
public final URI site;
public final ActorRef<Response> replyTo;
public StartTranslationJob(int taskId, URI site, ActorRef<Response> replyTo) {
this.taskId = taskId;
this.site = site;
this.replyTo = replyTo;
}
}
interface Response {}
public static class JobStarted implements Response {
public final int taskId;
public JobStarted(int taskId) {
this.taskId = taskId;
}
}
public static class JobProgress implements Response {
public final int taskId;
public final double progress;
public JobProgress(int taskId, double progress) {
this.taskId = taskId;
this.progress = progress;
}
}
public static class JobCompleted implements Response {
public final int taskId;
public final URI result;
public JobCompleted(int taskId, URI result) {
this.taskId = taskId;
this.result = result;
}
}
}
public static class Frontend {
interface Command {}
public static class Translate implements Command {
public final URI site;
public final ActorRef<URI> replyTo;
public Translate(URI site, ActorRef<URI> replyTo) {
this.site = site;
this.replyTo = replyTo;
}
}
private static class WrappedJobStarted implements Command {
final Backend.JobStarted response;
public WrappedJobStarted(Backend.JobStarted response) {
this.response = response;
}
}
private static class WrappedJobProgress implements Command {
final Backend.JobProgress response;
public WrappedJobProgress(Backend.JobProgress response) {
this.response = response;
}
}
private static class WrappedJobCompleted implements Command {
final Backend.JobCompleted response;
public WrappedJobCompleted(Backend.JobCompleted response) {
this.response = response;
}
}
private static class OtherResponse implements Command {
final Backend.Response response;
public OtherResponse(Backend.Response response) {
this.response = response;
}
}
public static class Translator extends Behaviors.MutableBehavior<Command> {
private final ActorContext<Command> ctx;
private final ActorRef<Backend.Request> backend;
private final ActorRef<Backend.Response> backendResponseAdapter;
private int taskIdCounter = 0;
private Map<Integer, ActorRef<URI>> inProgress = new HashMap<>();
public Translator(ActorContext<Command> ctx, ActorRef<Backend.Request> backend) {
this.ctx = ctx;
this.backend = backend;
this.backendResponseAdapter =
ctx.messageAdapter(Backend.Response.class, rsp -> {
if (rsp instanceof Backend.JobStarted)
return new WrappedJobStarted((Backend.JobStarted) rsp);
else if (rsp instanceof Backend.JobProgress)
return new WrappedJobProgress((Backend.JobProgress) rsp);
else if (rsp instanceof Backend.JobCompleted)
return new WrappedJobCompleted((Backend.JobCompleted) rsp);
else return new OtherResponse(rsp);
});
}
@Override
public Behaviors.Receive<Command> createReceive() {
return receiveBuilder()
.onMessage(Translate.class, cmd -> {
taskIdCounter += 1;
inProgress.put(taskIdCounter, cmd.replyTo);
backend.tell(new Backend.StartTranslationJob(
taskIdCounter, cmd.site, backendResponseAdapter));
return this;
})
.onMessage(WrappedJobStarted.class, wrapped -> {
System.out.println("Started " + wrapped.response.taskId);
return this;
})
.onMessage(WrappedJobProgress.class, wrapped -> {
System.out.println("Progress " + wrapped.response.taskId + ": " + wrapped.response.progress);
return this;
})
.onMessage(WrappedJobCompleted.class, wrapped -> {
System.out.println("Completed " + wrapped.response.taskId + ": " + wrapped.response.result);
return this;
})
.onMessage(OtherResponse.class, other -> Behaviors.unhandled())
.build();
}
}
}
// #adapted-response
@Test
public void fireAndForgetSample() throws Exception {

View file

@ -13,6 +13,8 @@ import akka.actor.typed.javadsl.AskPattern;
import akka.util.Timeout;
//#imports
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
@ -75,8 +77,8 @@ public class IntroTest {
//#chatroom-actor
public static class ChatRoom {
//#chatroom-protocol
static interface Command {}
public static final class GetSession implements Command {
static interface RoomCommand {}
public static final class GetSession implements RoomCommand {
public final String screenName;
public final ActorRef<SessionEvent> replyTo;
public GetSession(String screenName, ActorRef<SessionEvent> replyTo) {
@ -86,10 +88,10 @@ public class IntroTest {
}
//#chatroom-protocol
//#chatroom-behavior
private static final class PostSessionMessage implements Command {
private static final class PublishSessionMessage implements RoomCommand {
public final String screenName;
public final String message;
public PostSessionMessage(String screenName, String message) {
public PublishSessionMessage(String screenName, String message) {
this.screenName = screenName;
this.message = message;
}
@ -119,37 +121,65 @@ public class IntroTest {
}
}
public static final class PostMessage {
static interface SessionCommand {}
public static final class PostMessage implements SessionCommand {
public final String message;
public PostMessage(String message) {
this.message = message;
}
}
private static final class NotifyClient implements SessionCommand {
final MessagePosted message;
NotifyClient(MessagePosted message) {
this.message = message;
}
}
//#chatroom-protocol
//#chatroom-behavior
public static Behavior<Command> behavior() {
return chatRoom(new ArrayList<ActorRef<SessionEvent>>());
public static Behavior<RoomCommand> behavior() {
return chatRoom(new ArrayList<ActorRef<SessionCommand>>());
}
private static Behavior<Command> chatRoom(List<ActorRef<SessionEvent>> sessions) {
return Behaviors.immutable(Command.class)
private static Behavior<RoomCommand> chatRoom(List<ActorRef<SessionCommand>> sessions) {
return Behaviors.immutable(RoomCommand.class)
.onMessage(GetSession.class, (ctx, getSession) -> {
ActorRef<PostMessage> wrapper = ctx.spawnAdapter(p ->
new PostSessionMessage(getSession.screenName, p.message));
getSession.replyTo.tell(new SessionGranted(wrapper));
List<ActorRef<SessionEvent>> newSessions =
new ArrayList<ActorRef<SessionEvent>>(sessions);
newSessions.add(getSession.replyTo);
ActorRef<SessionEvent> client = getSession.replyTo;
ActorRef<SessionCommand> ses = ctx.spawn(
session(ctx.getSelf(), getSession.screenName, client),
URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name()));
// narrow to only expose PostMessage
client.tell(new SessionGranted(ses.narrow()));
List<ActorRef<SessionCommand>> newSessions = new ArrayList<>(sessions);
newSessions.add(ses);
return chatRoom(newSessions);
})
.onMessage(PostSessionMessage.class, (ctx, post) -> {
MessagePosted mp = new MessagePosted(post.screenName, post.message);
sessions.forEach(s -> s.tell(mp));
.onMessage(PublishSessionMessage.class, (ctx, pub) -> {
NotifyClient notification =
new NotifyClient((new MessagePosted(pub.screenName, pub.message)));
sessions.forEach(s -> s.tell(notification));
return Behaviors.same();
})
.build();
}
public static Behavior<ChatRoom.SessionCommand> session(
ActorRef<RoomCommand> room,
String screenName,
ActorRef<SessionEvent> client) {
return Behaviors.immutable(ChatRoom.SessionCommand.class)
.onMessage(PostMessage.class, (ctx, post) -> {
// from client, publish to others via the room
room.tell(new PublishSessionMessage(screenName, post.message));
return Behaviors.same();
})
.onMessage(NotifyClient.class, (ctx, notification) -> {
// published from the room
client.tell(notification.message);
return Behaviors.same();
})
.build();
}
//#chatroom-behavior
}
@ -185,7 +215,7 @@ public class IntroTest {
//#chatroom-main
Behavior<Void> main = Behaviors.deferred(ctx -> {
ActorRef<ChatRoom.Command> chatRoom =
ActorRef<ChatRoom.RoomCommand> chatRoom =
ctx.spawn(ChatRoom.behavior(), "chatRoom");
ActorRef<ChatRoom.SessionEvent> gabbler =
ctx.spawn(Gabbler.behavior(), "gabbler");

View file

@ -1,7 +1,7 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.actor.typed;
package jdocs.akka.typed;
//#imports
import akka.actor.typed.ActorRef;
@ -10,6 +10,8 @@ import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Behaviors.Receive;
import akka.actor.typed.javadsl.ActorContext;
//#imports
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -18,8 +20,8 @@ public class MutableIntroTest {
//#chatroom-actor
public static class ChatRoom {
//#chatroom-protocol
static interface Command {}
public static final class GetSession implements Command {
static interface RoomCommand {}
public static final class GetSession implements RoomCommand {
public final String screenName;
public final ActorRef<SessionEvent> replyTo;
public GetSession(String screenName, ActorRef<SessionEvent> replyTo) {
@ -29,10 +31,10 @@ public class MutableIntroTest {
}
//#chatroom-protocol
//#chatroom-behavior
private static final class PostSessionMessage implements Command {
private static final class PublishSessionMessage implements RoomCommand {
public final String screenName;
public final String message;
public PostSessionMessage(String screenName, String message) {
public PublishSessionMessage(String screenName, String message) {
this.screenName = screenName;
this.message = message;
}
@ -62,46 +64,74 @@ public class MutableIntroTest {
}
}
public static final class PostMessage {
static interface SessionCommand {}
public static final class PostMessage implements SessionCommand {
public final String message;
public PostMessage(String message) {
this.message = message;
}
}
private static final class NotifyClient implements SessionCommand {
final MessagePosted message;
NotifyClient(MessagePosted message) {
this.message = message;
}
}
//#chatroom-protocol
//#chatroom-behavior
public static Behavior<Command> behavior() {
public static Behavior<RoomCommand> behavior() {
return Behaviors.mutable(ChatRoomBehavior::new);
}
public static class ChatRoomBehavior extends Behaviors.MutableBehavior<Command> {
final ActorContext<Command> ctx;
final List<ActorRef<SessionEvent>> sessions = new ArrayList<ActorRef<SessionEvent>>();
public static class ChatRoomBehavior extends Behaviors.MutableBehavior<RoomCommand> {
final ActorContext<RoomCommand> ctx;
final List<ActorRef<SessionCommand>> sessions = new ArrayList<>();
public ChatRoomBehavior(ActorContext<Command> ctx) {
public ChatRoomBehavior(ActorContext<RoomCommand> ctx) {
this.ctx = ctx;
}
@Override
public Receive<Command> createReceive() {
public Receive<RoomCommand> createReceive() {
return receiveBuilder()
.onMessage(GetSession.class, getSession -> {
ActorRef<PostMessage> wrapper = ctx.spawnAdapter(p ->
new PostSessionMessage(getSession.screenName, p.message));
getSession.replyTo.tell(new SessionGranted(wrapper));
sessions.add(getSession.replyTo);
return Behaviors.same();
ActorRef<SessionEvent> client = getSession.replyTo;
ActorRef<SessionCommand> ses = ctx.spawn(
session(ctx.getSelf(), getSession.screenName, client),
URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name()));
// narrow to only expose PostMessage
client.tell(new SessionGranted(ses.narrow()));
sessions.add(ses);
return this;
})
.onMessage(PostSessionMessage.class, post -> {
MessagePosted mp = new MessagePosted(post.screenName, post.message);
sessions.forEach(s -> s.tell(mp));
.onMessage(PublishSessionMessage.class, pub -> {
NotifyClient notification =
new NotifyClient((new MessagePosted(pub.screenName, pub.message)));
sessions.forEach(s -> s.tell(notification));
return this;
})
.build();
}
}
public static Behavior<ChatRoom.SessionCommand> session(
ActorRef<RoomCommand> room,
String screenName,
ActorRef<SessionEvent> client) {
return Behaviors.immutable(ChatRoom.SessionCommand.class)
.onMessage(PostMessage.class, (ctx, post) -> {
// from client, publish to others via the room
room.tell(new PublishSessionMessage(screenName, post.message));
return Behaviors.same();
})
.onMessage(NotifyClient.class, (ctx, notification) -> {
// published from the room
client.tell(notification.message);
return Behaviors.same();
})
.build();
}
//#chatroom-behavior
}
//#chatroom-actor

View file

@ -164,7 +164,7 @@ object ActorContextSpec {
Behaviors.same
}
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
replyTo ! Adapter(ctx.spawnMessageAdapter(identity, name))
Behaviors.same
}
} onSignal {
@ -252,7 +252,7 @@ object ActorContextSpec {
Behaviors.same
}
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
replyTo ! Adapter(ctx.spawnMessageAdapter(identity, name))
Behaviors.same
}
} onSignal {
@ -510,7 +510,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
sync(setup("ctx03") { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM2")
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) {
startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self) {
case (subj, child)
val log = muteExpectedException[Exception]("KABOOM2", occurrences = 1)
child ! Throw(ex)
@ -541,7 +541,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
"stop a child actor" in {
sync(setup("ctx04") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self, inert = true) {
startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self, inert = true) {
case (subj, child)
subj ! Kill(child, self)
child
@ -602,7 +602,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
"not stop non-child actor" in {
sync(setup("ctx08") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) {
startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self) {
case (subj, child)
val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A")
subj ! Kill(other, ctx.self)
@ -616,7 +616,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
"watch a child actor before its termination" in {
sync(setup("ctx10") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) {
startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self) {
case (subj, child)
subj ! Watch(child, self)
child
@ -632,7 +632,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
"watch a child actor after its termination" in {
sync(setup("ctx11") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep {
case (subj, child)
ctx.watch(child)
child ! Stop
@ -650,7 +650,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
"unwatch a child actor before its termination" in {
sync(setup("ctx12") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep {
case (subj, child)
subj ! Watch(child, self)
}.expectMessageKeep(expectTimeout) {
@ -672,7 +672,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
"terminate upon not handling Terminated" in {
sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep {
case (subj, child)
muteExpectedException[DeathPactException]()
subj ! Watch(child, self)
@ -711,7 +711,7 @@ abstract class ActorContextSpec extends TypedAkkaSpec {
sync(setup("ctx21") { (ctx, startWith)
val self = ctx.self
startWith
.mkChild(Some("B"), ctx.spawnAdapter(ChildEvent), self)
.mkChild(Some("B"), ctx.spawnMessageAdapter(ChildEvent), self)
.stimulate(_._1 ! GetChild("A", self), _ Child(None))
.stimulate(_._1 ! GetChild("B", self), x Child(Some(x._2)))
.stimulate(_._1 ! GetChildren(self), x Children(Set(x._2)))

View file

@ -6,8 +6,8 @@ import org.scalactic.TypeCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.Span
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
/**
* Helper trait to include standard traits for typed tests
@ -29,3 +29,5 @@ trait TypedAkkaSpecWithShutdown extends TypedAkkaSpec {
self: TestKit
override protected def afterAll(): Unit = shutdown()
}
class TestException(msg: String) extends RuntimeException(msg) with NoStackTrace

View file

@ -0,0 +1,235 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.Failure
import scala.util.Success
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorRef
import akka.actor.typed.PostStop
import akka.actor.typed.Props
import akka.actor.typed.TestException
import akka.actor.typed.TypedAkkaSpecWithShutdown
import akka.testkit.EventFilter
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
object MessageAdapterSpec {
val config = ConfigFactory.parseString(
"""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.log-dead-letters = off
ping-pong-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
}
snitch-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
}
""")
}
class MessageAdapterSpec extends TestKit(MessageAdapterSpec.config) with TypedAkkaSpecWithShutdown {
implicit val untyped = system.toUntyped // FIXME no typed event filter yet
"Message adapters" must {
"map messages inside the actor" in {
case class Ping(sender: ActorRef[Response])
trait Response
case class Pong(selfName: String, threadName: String) extends Response
case class AnotherPong(selfName: String, threadName: String)
val pingPong = spawn(Behaviors.immutable[Ping] { (ctx, msg)
msg.sender ! Pong(ctx.self.path.name, Thread.currentThread().getName)
Behaviors.same
}, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val probe = TestProbe[AnotherPong]()
val snitch = Behaviors.deferred[AnotherPong] { (ctx)
val replyTo = ctx.messageAdapter[Response](_
AnotherPong(ctx.self.path.name, Thread.currentThread().getName))
pingPong ! Ping(replyTo)
// also verify the internal spawnMessageAdapter
val replyTo2: ActorRef[Response] = ctx.spawnMessageAdapter(_
AnotherPong(ctx.self.path.name, Thread.currentThread().getName))
pingPong ! Ping(replyTo2)
Behaviors.immutable {
case (_, anotherPong: AnotherPong)
probe.ref ! anotherPong
Behaviors.same
}
}
spawn(snitch, "snitch", Props.empty.withDispatcherFromConfig("snitch-dispatcher"))
val response1 = probe.expectMsgType[AnotherPong]
response1.selfName should ===("snitch")
response1.threadName should startWith("MessageAdapterSpec-snitch-dispatcher")
// and from the spawnMessageAdapter
val response2 = probe.expectMsgType[AnotherPong]
response2.selfName should ===("snitch")
response2.threadName should startWith("MessageAdapterSpec-snitch-dispatcher")
}
"use the right adapter" in {
trait Ping
case class Ping1(sender: ActorRef[Pong1]) extends Ping
case class Ping2(sender: ActorRef[Pong2]) extends Ping
trait Response
case class Pong1(greeting: String) extends Response
case class Pong2(greeting: String) extends Response
case class Wrapped(qualifier: String, response: Response)
val pingPong = spawn(Behaviors.immutable[Ping] { (_, msg)
msg match {
case Ping1(sender)
sender ! Pong1("hello-1")
Behaviors.same
case Ping2(sender)
sender ! Pong2("hello-2")
Behaviors.same
}
})
val probe = TestProbe[Wrapped]()
val snitch = Behaviors.deferred[Wrapped] { (ctx)
ctx.messageAdapter[Response](pong Wrapped(qualifier = "wrong", pong)) // this is replaced
val replyTo1: ActorRef[Response] = ctx.messageAdapter(pong Wrapped(qualifier = "1", pong))
val replyTo2 = ctx.messageAdapter[Pong2](pong Wrapped(qualifier = "2", pong))
pingPong ! Ping1(replyTo1)
pingPong ! Ping2(replyTo2)
Behaviors.immutable {
case (_, wrapped)
probe.ref ! wrapped
Behaviors.same
}
}
spawn(snitch)
probe.expectMsg(Wrapped("1", Pong1("hello-1")))
probe.expectMsg(Wrapped("2", Pong2("hello-2")))
}
"not break if wrong/unknown response type" in {
trait Ping
case class Ping1(sender: ActorRef[Pong1]) extends Ping
case class Ping2(sender: ActorRef[Pong2]) extends Ping
trait Response
case class Pong1(greeting: String) extends Response
case class Pong2(greeting: String) extends Response
case class Wrapped(qualifier: String, response: Response)
val pingPong = spawn(Behaviors.immutable[Ping] { (_, msg)
msg match {
case Ping1(sender)
sender ! Pong1("hello-1")
Behaviors.same
case Ping2(sender)
// doing something terribly wrong
sender ! Pong2("hello-2")
Behaviors.same
}
})
val probe = TestProbe[Wrapped]()
val snitch = Behaviors.deferred[Wrapped] { (ctx)
val replyTo1 = ctx.messageAdapter[Pong1](pong Wrapped(qualifier = "1", pong))
pingPong ! Ping1(replyTo1)
// doing something terribly wrong
// Pong2 message adapter not registered
pingPong ! Ping2(replyTo1.asInstanceOf[ActorRef[Pong2]])
pingPong ! Ping1(replyTo1)
Behaviors.immutable {
case (_, wrapped)
probe.ref ! wrapped
Behaviors.same
}
}
EventFilter.warning(start = "unhandled message", occurrences = 1).intercept {
spawn(snitch)
}
probe.expectMsg(Wrapped("1", Pong1("hello-1")))
// hello-2 discarded because it was wrong type
probe.expectMsg(Wrapped("1", Pong1("hello-1")))
}
"stop when exception from adapter" in {
case class Ping(sender: ActorRef[Pong])
case class Pong(greeting: String)
case class Wrapped(count: Int, response: Pong)
val pingPong = spawn(Behaviors.immutable[Ping] { (_, ping)
ping.sender ! Pong("hello")
Behaviors.same
})
val probe = TestProbe[Any]()
val snitch = Behaviors.deferred[Wrapped] { (ctx)
var count = 0
val replyTo = ctx.messageAdapter[Pong] { pong
count += 1
if (count == 3) throw new TestException("boom")
else Wrapped(count, pong)
}
(1 to 4).foreach { _
pingPong ! Ping(replyTo)
}
Behaviors.immutable[Wrapped] {
case (_, wrapped)
probe.ref ! wrapped
Behaviors.same
}.onSignal {
case (_, PostStop)
probe.ref ! "stopped"
Behaviors.same
}
}
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
EventFilter[TestException](occurrences = 1).intercept {
spawn(snitch)
}
}
probe.expectMsg(Wrapped(1, Pong("hello")))
probe.expectMsg(Wrapped(2, Pong("hello")))
// exception was thrown for 3
// FIXME One thing to be aware of is that the supervision strategy of the Behavior is not
// used for exceptions from adapters. Should we instead catch, log, unhandled, and resume?
// It's kind of "before" the message arrives.
probe.expectMsg("stopped")
}
}
}

View file

@ -3,9 +3,12 @@
*/
package docs.akka.typed
import java.net.URI
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedAkkaSpecWithShutdown }
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
@ -57,6 +60,82 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
system.terminate().futureValue
}
"contain a sample for adapted response" in {
// #adapted-response
object Backend {
sealed trait Request
final case class StartTranslationJob(taskId: Int, site: URI, replyTo: ActorRef[Response]) extends Request
sealed trait Response
final case class JobStarted(taskId: Int) extends Response
final case class JobProgress(taskId: Int, progress: Double) extends Response
final case class JobCompleted(taskId: Int, result: URI) extends Response
}
object Frontend {
sealed trait Command
final case class Translate(site: URI, replyTo: ActorRef[URI]) extends Command
private final case class WrappedBackendResponse(response: Backend.Response) extends Command
def translator(backend: ActorRef[Backend.Request]): Behavior[Command] =
Behaviors.deferred[Command] { ctx
val backendResponseMapper: ActorRef[Backend.Response] =
ctx.messageAdapter(rsp WrappedBackendResponse(rsp))
def active(
inProgress: Map[Int, ActorRef[URI]],
count: Int): Behavior[Command] = {
Behaviors.immutable[Command] { (_, msg)
msg match {
case Translate(site, replyTo)
val taskId = count + 1
backend ! Backend.StartTranslationJob(taskId, site, backendResponseMapper)
active(inProgress.updated(taskId, replyTo), taskId)
case wrapped: WrappedBackendResponse wrapped.response match {
case Backend.JobStarted(taskId)
println(s"Started $taskId")
Behaviors.same
case Backend.JobProgress(taskId, progress)
println(s"Progress $taskId: $progress")
Behaviors.same
case Backend.JobCompleted(taskId, result)
println(s"Completed $taskId: $result")
inProgress(taskId) ! result
active(inProgress - taskId, count)
}
}
}
}
active(inProgress = Map.empty, count = 0)
}
}
// #adapted-response
val backend = spawn(Behaviors.immutable[Backend.Request] { (_, msg)
msg match {
case Backend.StartTranslationJob(taskId, site, replyTo)
replyTo ! Backend.JobStarted(taskId)
replyTo ! Backend.JobProgress(taskId, 0.25)
replyTo ! Backend.JobProgress(taskId, 0.50)
replyTo ! Backend.JobProgress(taskId, 0.75)
replyTo ! Backend.JobCompleted(taskId, new URI("https://akka.io/docs/sv/"))
Behaviors.same
}
}
)
val frontend = spawn(Frontend.translator(backend))
val probe = TestProbe[URI]()
frontend ! Frontend.Translate(new URI("https://akka.io/docs/"), probe.ref)
probe.expectMsg(new URI("https://akka.io/docs/sv/"))
}
}
}

View file

@ -4,6 +4,9 @@
package docs.akka.typed
//#imports
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
@ -11,7 +14,6 @@ import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestKit
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
@ -37,13 +39,13 @@ object IntroSpec {
//#chatroom-actor
object ChatRoom {
//#chatroom-protocol
sealed trait Command
sealed trait RoomCommand
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent])
extends Command
extends RoomCommand
//#chatroom-protocol
//#chatroom-behavior
private final case class PostSessionMessage(screenName: String, message: String)
extends Command
private final case class PublishSessionMessage(screenName: String, message: String)
extends RoomCommand
//#chatroom-behavior
//#chatroom-protocol
@ -52,25 +54,45 @@ object IntroSpec {
final case class SessionDenied(reason: String) extends SessionEvent
final case class MessagePosted(screenName: String, message: String) extends SessionEvent
final case class PostMessage(message: String)
trait SessionCommand
final case class PostMessage(message: String) extends SessionCommand
private final case class NotifyClient(message: MessagePosted) extends SessionCommand
//#chatroom-protocol
//#chatroom-behavior
val behavior: Behavior[Command] =
val behavior: Behavior[RoomCommand] =
chatRoom(List.empty)
private def chatRoom(sessions: List[ActorRef[SessionEvent]]): Behavior[Command] =
Behaviors.immutable[Command] { (ctx, msg)
private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] =
Behaviors.immutable[RoomCommand] { (ctx, msg)
msg match {
case GetSession(screenName, client)
val wrapper = ctx.spawnAdapter {
p: PostMessage PostSessionMessage(screenName, p.message)
}
client ! SessionGranted(wrapper)
chatRoom(client :: sessions)
case PostSessionMessage(screenName, message)
val mp = MessagePosted(screenName, message)
sessions foreach (_ ! mp)
// create a child actor for further interaction with the client
val ses = ctx.spawn(
session(ctx.self, screenName, client),
name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))
client ! SessionGranted(ses)
chatRoom(ses :: sessions)
case PublishSessionMessage(screenName, message)
val notification = NotifyClient(MessagePosted(screenName, message))
sessions foreach (_ ! notification)
Behaviors.same
}
}
private def session(
room: ActorRef[PublishSessionMessage],
screenName: String,
client: ActorRef[SessionEvent]): Behavior[SessionCommand] =
Behaviors.immutable { (ctx, msg)
msg match {
case PostMessage(message)
// from client, publish to others via the room
room ! PublishSessionMessage(screenName, message)
Behaviors.same
case NotifyClient(message)
// published from the room
client ! message
Behaviors.same
}
}

View file

@ -4,11 +4,13 @@
package docs.akka.typed
//#imports
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import akka.actor.typed._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.ActorContext
import akka.testkit.typed.TestKit
import scala.concurrent.duration._
import scala.concurrent.Await
//#imports
@ -18,13 +20,13 @@ object MutableIntroSpec {
//#chatroom-actor
object ChatRoom {
//#chatroom-protocol
sealed trait Command
sealed trait RoomCommand
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent])
extends Command
extends RoomCommand
//#chatroom-protocol
//#chatroom-behavior
private final case class PostSessionMessage(screenName: String, message: String)
extends Command
private final case class PublishSessionMessage(screenName: String, message: String)
extends RoomCommand
//#chatroom-behavior
//#chatroom-protocol
@ -33,33 +35,52 @@ object MutableIntroSpec {
final case class SessionDenied(reason: String) extends SessionEvent
final case class MessagePosted(screenName: String, message: String) extends SessionEvent
final case class PostMessage(message: String)
trait SessionCommand
final case class PostMessage(message: String) extends SessionCommand
private final case class NotifyClient(message: MessagePosted) extends SessionCommand
//#chatroom-protocol
//#chatroom-behavior
def behavior(): Behavior[Command] =
Behaviors.mutable[Command](ctx new ChatRoomBehavior(ctx))
def behavior(): Behavior[RoomCommand] =
Behaviors.mutable[RoomCommand](ctx new ChatRoomBehavior(ctx))
class ChatRoomBehavior(ctx: ActorContext[Command]) extends Behaviors.MutableBehavior[Command] {
private var sessions: List[ActorRef[SessionEvent]] = List.empty
class ChatRoomBehavior(ctx: ActorContext[RoomCommand]) extends Behaviors.MutableBehavior[RoomCommand] {
private var sessions: List[ActorRef[SessionCommand]] = List.empty
override def onMessage(msg: Command): Behavior[Command] = {
override def onMessage(msg: RoomCommand): Behavior[RoomCommand] = {
msg match {
case GetSession(screenName, client)
val wrapper = ctx.spawnAdapter {
p: PostMessage PostSessionMessage(screenName, p.message)
}
client ! SessionGranted(wrapper)
sessions = client :: sessions
// create a child actor for further interaction with the client
val ses = ctx.spawn(
session(ctx.self, screenName, client),
name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))
client ! SessionGranted(ses)
sessions = ses :: sessions
this
case PostSessionMessage(screenName, message)
val mp = MessagePosted(screenName, message)
sessions foreach (_ ! mp)
case PublishSessionMessage(screenName, message)
val notification = NotifyClient(MessagePosted(screenName, message))
sessions foreach (_ ! notification)
this
}
}
}
private def session(
room: ActorRef[PublishSessionMessage],
screenName: String,
client: ActorRef[SessionEvent]): Behavior[SessionCommand] =
Behaviors.immutable { (ctx, msg)
msg match {
case PostMessage(message)
// from client, publish to others via the room
room ! PublishSessionMessage(screenName, message)
Behaviors.same
case NotifyClient(message)
// published from the room
client ! message
Behaviors.same
}
}
//#chatroom-behavior
}
//#chatroom-actor

View file

@ -91,7 +91,7 @@ class BasicSyncTestingSpec extends WordSpec with Matchers {
val testKit = BehaviorTestkit(myBehaviour)
testKit.run(SayHelloToAnonymousChild)
// Anonymous actors are created as: $a $b etc
val childInbox = testKit.childInbox[String]("$a")
val childInbox = testKit.childInbox[String](s"$$a")
childInbox.expectMsg("hello stranger")
//#test-child-message-anonymous
}

View file

@ -4,21 +4,30 @@
package akka.actor.typed
package internal
import java.util.function.{ Function JFunction }
import java.util.ArrayList
import java.util.Optional
import java.util.function
import java.util.function.BiFunction
import java.util.{ ArrayList, Optional, function }
import akka.annotation.InternalApi
import akka.util.Timeout
import scala.concurrent.ExecutionContextExecutor
import scala.reflect.ClassTag
import scala.util.{ Failure, Success, Try }
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.annotation.InternalApi
import akka.util.OptionVal
import akka.util.Timeout
/**
* INTERNAL API
*/
@InternalApi private[akka] trait ActorContextImpl[T] extends ActorContext[T] with javadsl.ActorContext[T] with scaladsl.ActorContext[T] {
private var messageAdapterRef: OptionVal[ActorRef[Any]] = OptionVal.None
private var _messageAdapters: List[(Class[_], Any T)] = Nil
override def asJava: javadsl.ActorContext[T] = this
override def asScala: scaladsl.ActorContext[T] = this
@ -55,18 +64,6 @@ import scala.util.{ Failure, Success, Try }
override def spawnAnonymous[U](behavior: akka.actor.typed.Behavior[U]): akka.actor.typed.ActorRef[U] =
spawnAnonymous(behavior, Props.empty)
override def spawnAdapter[U](f: U T, name: String): ActorRef[U] =
internalSpawnAdapter(f, name)
override def spawnAdapter[U](f: U T): ActorRef[U] =
internalSpawnAdapter(f, "")
override def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.actor.typed.ActorRef[U] =
internalSpawnAdapter(f.apply, "")
override def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.actor.typed.ActorRef[U] =
internalSpawnAdapter(f.apply, name)
// Scala API impl
override def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._
@ -83,10 +80,45 @@ import scala.util.{ Failure, Success, Try }
}(responseTimeout, ClassTag[Res](resClass))
}
private[akka] override def spawnMessageAdapter[U](f: U T, name: String): ActorRef[U] =
internalSpawnMessageAdapter(f, name)
private[akka] override def spawnMessageAdapter[U](f: U T): ActorRef[U] =
internalSpawnMessageAdapter(f, name = "")
/**
* INTERNAL API: Needed to make Scala 2.12 compiler happy.
* INTERNAL API: Needed to make Scala 2.12 compiler happy if spawnMessageAdapter is overloaded for scaladsl/javadsl.
* Otherwise "ambiguous reference to overloaded definition" because Function is lambda.
*/
@InternalApi private[akka] def internalSpawnAdapter[U](f: U T, _name: String): ActorRef[U]
@InternalApi private[akka] def internalSpawnMessageAdapter[U](f: U T, name: String): ActorRef[U]
override def messageAdapter[U: ClassTag](f: U T): ActorRef[U] = {
val messageClass = implicitly[ClassTag[U]].runtimeClass.asInstanceOf[Class[U]]
internalMessageAdapter(messageClass, f)
}
override def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U] =
internalMessageAdapter(messageClass, f.apply)
private def internalMessageAdapter[U](messageClass: Class[U], f: U T): ActorRef[U] = {
// replace existing adapter for same class, only one per class is supported to avoid unbounded growth
// in case "same" adapter is added repeatedly
_messageAdapters = (messageClass, f.asInstanceOf[Any T]) ::
_messageAdapters.filterNot { case (cls, _) cls == messageClass }
val ref = messageAdapterRef match {
case OptionVal.Some(ref) ref.asInstanceOf[ActorRef[U]]
case OptionVal.None
// AdaptMessage is not really a T, but that is erased
val ref = internalSpawnMessageAdapter[Any](msg AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter")
messageAdapterRef = OptionVal.Some(ref)
ref
}
ref.asInstanceOf[ActorRef[U]]
}
/**
* INTERNAL API
*/
@InternalApi private[akka] def messageAdapters: List[(Class[_], Any T)] = _messageAdapters
}

View file

@ -13,7 +13,7 @@ import scala.util.Try
* Message wrapper used to allow ActorContext.ask to map the response inside the asking actor.
*/
@InternalApi
private[akka] final class AskResponse[T, U](result: Try[T], adapt: Try[T] U) {
private[akka] final class AskResponse[U, T](result: Try[U], adapt: Try[U] T) {
def adapted: U = adapt(result)
def adapted: T = adapt(result)
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.internal
import akka.annotation.InternalApi
/**
* INTERNAL API: Wrapping of messages that should be adapted by
* adapters registered with `ActorContext.messageAdapter`.
*/
@InternalApi private[akka] final case class AdaptWithRegisteredMessageAdapter[U](msg: U)
/**
* INTERNAL API: Wrapping of messages that should be adapted by the included
* function. Used by `ActorContext.spawnMessageAdapter` so that the function is
* applied in the "parent" actor (for better thread safetey)..
*/
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapt: U T) {
def adapted: T = adapt(msg)
}
// FIXME move AskResponse in other PR

View file

@ -5,6 +5,8 @@ package akka.actor.typed
package internal
package adapter
import scala.annotation.tailrec
import akka.{ actor a }
import akka.annotation.InternalApi
import akka.util.OptionVal
@ -35,9 +37,23 @@ import akka.util.OptionVal
next(Behavior.interpretSignal(behavior, ctx, msg), msg)
case a.ReceiveTimeout
next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg)
case msg: AskResponse[AnyRef, T] @unchecked receive(msg.adapted)
case wrapped: AskResponse[Any, T] @unchecked
handleMessage(wrapped.adapted)
case wrapped: AdaptMessage[Any, T] @unchecked
wrapped.adapted match {
case AdaptWithRegisteredMessageAdapter(msg)
adaptAndHandle(msg)
case msg: T @unchecked
handleMessage(msg)
}
case AdaptWithRegisteredMessageAdapter(msg)
adaptAndHandle(msg)
case msg: T @unchecked
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
handleMessage(msg)
}
private def handleMessage(msg: T): Unit = {
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
}
private def next(b: Behavior[T], msg: Any): Unit = {
@ -63,6 +79,23 @@ import akka.util.OptionVal
}
}
private def adaptAndHandle(msg: Any): Unit = {
@tailrec def handle(adapters: List[(Class[_], Any T)]): Unit = {
adapters match {
case Nil
// no adapter function registered for message class
unhandled(msg)
case (clazz, f) :: tail
if (clazz.isAssignableFrom(msg.getClass)) {
val adaptedMsg = f(msg)
handleMessage(adaptedMsg)
} else
handle(tail) // recursive
}
}
handle(ctx.messageAdapters)
}
override def unhandled(msg: Any): Unit = msg match {
case Terminated(ref) throw a.DeathPactException(toUntyped(ref))
case msg: Signal // that's ok

View file

@ -58,9 +58,10 @@ import akka.actor.typed.Behavior.UntypedBehavior
import untyped.dispatcher
untyped.system.scheduler.scheduleOnce(delay, toUntyped(target), msg)
}
override private[akka] def internalSpawnAdapter[U](f: U T, _name: String): ActorRef[U] = {
override private[akka] def internalSpawnMessageAdapter[U](f: U T, _name: String): ActorRef[U] = {
val cell = untyped.asInstanceOf[akka.actor.ActorCell]
val ref = cell.addFunctionRef((_, msg) untyped.self ! f(msg.asInstanceOf[U]), _name)
// apply the function inside the actor by wrapping the msg and f, handled by ActorAdapter
val ref = cell.addFunctionRef((_, msg) untyped.self ! AdaptMessage[U, T](msg.asInstanceOf[U], f), _name)
ActorRefAdapter[U](ref)
}
}

View file

@ -152,24 +152,29 @@ trait ActorContext[T] {
def getExecutionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
* Create a message adapter that will convert or wrap messages such that other Actors
* protocols can be ingested by this Actor.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
* You can register several message adapters for different message classes.
* It's only possible to have one message adapter per message class to make sure
* that the number of adapters are not growing unbounded if registered repeatedly.
* That also means that a registered adapter will replace an existing adapter for
* the same message class.
*
* A message adapter will be used if the message class matches the given class or
* is a subclass thereof. The registered adapters are tried in reverse order of
* their registration order, i.e. the last registered first.
*
* A message adapter (and the returned `ActorRef`) has the same lifecycle as
* this actor. It's recommended to register the adapters in a top level
* `Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to
* register them later also if needed. Message adapters don't have to be stopped since
* they consume no resources other than an entry in an internal `Map` and the number
* of adapters are bounded since it's only possible to have one per message class.
* *
* The function is running in this actor and can safely access state of it.
*/
def spawnAdapter[U](f: JFunction[U, T], name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: JFunction[U, T]): ActorRef[U]
def messageAdapter[U](messageClass: Class[U], f: JFunction[U, T]): ActorRef[U]
/**
* Perform a single request-response message interaction with another actor, and transform the messages back to
@ -178,9 +183,9 @@ trait ActorContext[T] {
* The interaction has a timeout (to avoid a resource leak). If the timeout hits without any response it
* will be passed as an [[java.util.concurrent.TimeoutException]] to the `applyToResponse` function.
*
* For other messaging patterns with other actors, see [[spawnAdapter]].
* For other messaging patterns with other actors, see [[ActorContext#messageAdapter]].
*
* @param createREquest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* the other actor can send a message back through.
* @param applyToResponse Transforms the response from the `otherActor` into a message this actor understands.
* Will be invoked with either the response message or an AskTimeoutException failed or
@ -198,7 +203,7 @@ trait ActorContext[T] {
resClass: Class[Res],
otherActor: ActorRef[Req],
responseTimeout: Timeout,
createREquest: java.util.function.Function[ActorRef[Res], Req],
createRequest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit
}

View file

@ -5,14 +5,15 @@ package akka.actor.typed.scaladsl
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.Try
import akka.actor.typed._
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.actor.typed._
import akka.annotation.InternalApi
import akka.util.Timeout
import scala.reflect.ClassTag
import scala.util.{ Success, Failure, Try }
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
@ -138,24 +139,54 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
* INTERNAL API: It is currently internal because it's too easy to create
* resource leaks by spawning adapters without stopping them. `messageAdapter`
* is the public API.
*
* Create a "lightweight" child actor that will convert or wrap messages such that
* other Actors protocols can be ingested by this Actor. You are strongly advised
* to cache these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*
* The function is applied inside the "parent" actor and can safely access
* state of the "parent".
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
@InternalApi private[akka] def spawnMessageAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
* INTERNAL API: See `spawnMessageAdapter` with name parameter
*/
def spawnAdapter[U](f: U T): ActorRef[U]
@InternalApi private[akka] def spawnMessageAdapter[U](f: U T): ActorRef[U]
/**
* Create a message adapter that will convert or wrap messages such that other Actors
* protocols can be ingested by this Actor.
*
* You can register several message adapters for different message classes.
* It's only possible to have one message adapter per message class to make sure
* that the number of adapters are not growing unbounded if registered repeatedly.
* That also means that a registered adapter will replace an existing adapter for
* the same message class.
*
* A message adapter will be used if the message class matches the given class or
* is a subclass thereof. The registered adapters are tried in reverse order of
* their registration order, i.e. the last registered first.
*
* A message adapter (and the returned `ActorRef`) has the same lifecycle as
* this actor. It's recommended to register the adapters in a top level
* `Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to
* register them later also if needed. Message adapters don't have to be stopped since
* they consume no resources other than an entry in an internal `Map` and the number
* of adapters are bounded since it's only possible to have one per message class.
* *
* The function is running in this actor and can safely access state of it.
*/
def messageAdapter[U: ClassTag](f: U T): ActorRef[U]
/**
* Perform a single request-response message interaction with another actor, and transform the messages back to
@ -165,7 +196,7 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
* will be passed as a `Failure(`[[java.util.concurrent.TimeoutException]]`)` to the `mapResponse` function
* (this is the only "normal" way a `Failure` is passed to the function).
*
* For other messaging patterns with other actors, see [[spawnAdapter]].
* For other messaging patterns with other actors, see [[ActorContext#messageAdapter]].
*
* @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* the other actor can send a message back through.

View file

@ -120,7 +120,7 @@ import akka.actor.typed.Terminated
// For the Java API the Changed messages must be mapped to the JReplicator.Changed class.
// That is done with an adapter, and we have to keep track of the lifecycle of the original
// subscriber and stop the adapter when the original subscriber is stopped.
val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnAdapter {
val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnMessageAdapter {
chg InternalChanged(chg, cmd.subscriber)
}

View file

@ -107,15 +107,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
val adapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage)
x match {
case changed @ Replicator.Changed(ReceptionistKey)
val value = changed.get(ReceptionistKey)
val oldState = state
val newState = ServiceRegistry(value)
val changes = diff(oldState, newState)
externalInterface.RegistrationsChangedExternally(changes, newState)
}
ctx.messageAdapter[Replicator.ReplicatorMessage] {
case changed @ Replicator.Changed(ReceptionistKey)
val value = changed.get(ReceptionistKey)
val oldState = state
val newState = ServiceRegistry(value)
val changes = diff(oldState, newState)
externalInterface.RegistrationsChangedExternally(changes, newState)
}
replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped)

View file

@ -95,11 +95,17 @@ public class ReplicatorTest extends JUnitSuite {
this.replicator = replicator;
this.node = node;
updateResponseAdapter = ctx.spawnAdapter(InternalUpdateResponse::new);
updateResponseAdapter = ctx.messageAdapter(
(Class<Replicator.UpdateResponse<GCounter>>) (Object) Replicator.UpdateResponse.class,
msg -> new InternalUpdateResponse(msg));
getResponseAdapter = ctx.spawnAdapter(InternalGetResponse::new);
getResponseAdapter = ctx.messageAdapter(
(Class<Replicator.GetResponse<GCounter>>) (Object) Replicator.GetResponse.class,
msg -> new InternalGetResponse(msg));
changedAdapter = ctx.spawnAdapter(InternalChanged::new);
changedAdapter = ctx.messageAdapter(
(Class<Replicator.Changed<GCounter>>) (Object) Replicator.Changed.class,
msg -> new InternalChanged(msg));
replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
}

View file

@ -24,6 +24,7 @@ object ReplicatorSpec {
val config = ConfigFactory.parseString(
"""
akka.loglevel = DEBUG
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
@ -43,14 +44,15 @@ object ReplicatorSpec {
def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] =
Behaviors.deferred[ClientCommand] { ctx
val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
ctx.spawnAdapter(InternalUpdateResponse.apply)
ctx.messageAdapter(InternalUpdateResponse.apply)
val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] =
ctx.spawnAdapter(InternalGetResponse.apply)
ctx.messageAdapter(InternalGetResponse.apply)
val changedAdapter: ActorRef[Replicator.Changed[GCounter]] =
ctx.spawnAdapter(InternalChanged.apply)
ctx.messageAdapter(InternalChanged.apply)
replicator ! Replicator.Subscribe(Key, changedAdapter)

View file

@ -51,7 +51,7 @@ object RandomRouter {
val cluster = Cluster(ctx.system)
// typically you have to map such external messages into this
// actor's protocol with a message adapter
val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.spawnAdapter(WrappedReachabilityEvent.apply)
val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply)
cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =

View file

@ -140,7 +140,7 @@ In the next section we demonstrate this on a more realistic example.
The next example demonstrates some important patterns:
* Using a sealed trait and case class/objects to represent multiple messages an actor can receive
* Handle incoming messages of different types by using `adapter`s
* Handle sessions by using child actors
* Handling state by changing behavior
* Using multiple typed actors to represent different parts of a protocol in a type safe way
@ -182,33 +182,31 @@ When a new `GetSession` command comes in we add that client to the
list that is in the returned behavior. Then we also need to create the sessions
`ActorRef` that will be used to post messages. In this case we want to
create a very simple Actor that just repackages the `PostMessage`
command into a `PostSessionMessage` command which also includes the
screen name. Such a wrapper Actor can be created by using the
`spawnAdapter` method on the `ActorContext`, so that we can then
go on to reply to the client with the `SessionGranted` result.
command into a `PublishSessionMessage` command which also includes the
screen name.
The behavior that we declare here can handle both subtypes of `Command`.
The behavior that we declare here can handle both subtypes of `RoomCommand`.
`GetSession` has been explained already and the
`PostSessionMessage` commands coming from the wrapper Actors will
`PublishSessionMessage` commands coming from the session Actors will
trigger the dissemination of the contained chat room message to all connected
clients. But we do not want to give the ability to send
`PostSessionMessage` commands to arbitrary clients, we reserve that
right to the wrappers we create—otherwise clients could pose as completely
`PublishSessionMessage` commands to arbitrary clients, we reserve that
right to the internal session actors we create—otherwise clients could pose as completely
different screen names (imagine the `GetSession` protocol to include
authentication information to further secure this). Therefore `PostSessionMessage`
has `private` visibility and can't be created outside the actor.
authentication information to further secure this). Therefore `PublishSessionMessage`
has `private` visibility and can't be created outside the `ChatRoom` @scala[object]@java[class].
If we did not care about securing the correspondence between a session and a
screen name then we could change the protocol such that `PostMessage` is
removed and all clients just get an @scala[`ActorRef[PostSessionMessage]`]@java[`ActorRef<PostSessionMessage>`] to
send to. In this case no wrapper would be needed and we could just use
removed and all clients just get an @scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef<PublishSessionMessage>`] to
send to. In this case no session actor would be needed and we could just use
@scala[`ctx.self`]@java[`ctx.getSelf()`]. The type-checks work out in that case because
@scala[`ActorRef[-T]`]@java[`ActorRef<T>`] is contravariant in its type parameter, meaning that we
can use a @scala[`ActorRef[Command]`]@java[`ActorRef<Command>`] wherever an
@scala[`ActorRef[PostSessionMessage]`]@java[`ActorRef<PostSessionMessage>`] is needed—this makes sense because the
can use a @scala[`ActorRef[RoomCommand]`]@java[`ActorRef<RoomCommand>`] wherever an
@scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef<PublishSessionMessage>`] is needed—this makes sense because the
former simply speaks more languages than the latter. The opposite would be
problematic, so passing an @scala[`ActorRef[PostSessionMessage]`]@java[`ActorRef<PostSessionMessage>`] where
@scala[`ActorRef[Command]`]@java[`ActorRef<Command>`] is required will lead to a type error.
problematic, so passing an @scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef<PublishSessionMessage>`] where
@scala[`ActorRef[RoomCommand]`]@java[`ActorRef<RoomCommand>`] is required will lead to a type error.
### Trying it out

View file

@ -1,4 +1,4 @@
# Typed Actor Interaction Patterns
# Interaction Patterns
Interacting with an Actor in Akka Typed is done through an @scala[`ActorRef[T]`]@java[`ActorRef<T>`] where `T` is the type of messages the actor accepts, also known as the "protocol". This ensures that only the right kind of messages can be sent to an actor and also ensures no access to the Actor instance internals is available to anyone else but the Actor itself.
@ -16,13 +16,13 @@ Scala
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget }
**Scenarios fire and forget is useful:**
**Useful when:**
* When it is not critical to be sure that the message was processed
* When there is no way to act on non successful delivery or processing
* When we want to minimize the number of messages created to get higher throughput
**Problems with fire and forget:**
**Problems:**
* Consistently higher rates of fire and forget to an actor than it process will make the inbox fill up and can in the worst case cause the JVM crash with an `OutOfMemoryError`
* If the message got lost, we will not notice
@ -33,33 +33,60 @@ In many interactions a request is followed by a response back from the actor. In
TODO sample
**Scenarios where request response with tell is useful:**
**Useful when:**
* Subscribing to an actor that will send many response messages (of the same protocol) back
* When communicating between a parent and its children, where the protocol can be made include the messages for the interaction
* ???
**Problems request-response:**
**Problems:**
* Often the response that the other actor wants to send back is not a part of the sending actor's protocol (see adapted request response or ask)
* It is hard to detect and that a message request was not delivered or processed (see ask)
* Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor
## Adapted Request-Response
## Adapted Response
Very often the receiving does not, and should not be made, know of the protocol of the sending actor, and will respond with one or more messages that the sending actor cannot receive.
Very often the receiving actor does not, and should, know of the protocol of the sending actor, and
will respond with one or more messages that the sending actor cannot receive.
TODO sample
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #adapted-response }
**Scenarios where Adapted Request-Response is useful:**
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #adapted-response }
* Subscribing to an actor that will send many response messages back
You can register several message adapters for different message classes.
It's only possible to have one message adapter per message class to make sure
that the number of adapters are not growing unbounded if registered repeatedly.
That also means that a registered adapter will replace an existing adapter for
the same message class.
A message adapter will be used if the message class matches the given class or
is a subclass thereof. The registered adapters are tried in reverse order of
their registration order, i.e. the last registered first.
A message adapter (and the returned `ActorRef`) has the same lifecycle as
the receiving actor. It's recommended to register the adapters in a top level
`Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to
register them later also if needed.
The function is running in the receiving actor and can safely access state of it.
**Useful when:**
* Subscribing to an actor that will send many response messages back
* Translating between different actor message protocols
**Problems with adapted request-response:**
**Problems:**
* It is hard to detect and that a message request was not delivered or processed (see ask)
* Only one adaption can be made per response message type, if a new one is registered the old one is replaced, for example different target actors can't have different adaption if they use the same response types, unless some correlation is encoded in the messages
* Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor
* It is hard to detect that a message request was not delivered or processed (see ask)
* Only one adaption can be made per response message type, if a new one is registered the old one is replaced,
for example different target actors can't have different adaption if they use the same response types, unless some
correlation is encoded in the messages
* Unless the protocol already includes a way to provide context, for example a request id that is also sent in the
response, it is not possible to tie an interaction to some specific context without introducing a new,
separate, actor
## 1:1 Request-Response with ask between two actors
@ -70,8 +97,9 @@ The interaction has two steps, first we need to construct the outgoing message,
TODO sample
The function is running in the receiving actor and can safely access state of it.
**Scenarios where ask is useful:**
**Useful when:**
* Single response queries
* When an actor needs to know that the message was processed before continuing
@ -79,7 +107,7 @@ TODO sample
* To keep track of outstanding requests and not overwhelm a recipient with messages (simple backpressure)
* When some context should be attached to the interaction but the protocol does not support that (request id, what query the response was for)
**Problems with ask:**
**Problems:**
* There can only be a single response to one `ask`
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
@ -92,12 +120,12 @@ In an interaction where there is a 1:1 mapping between a request and a response
TODO sample
**Scenarios where this ask variant is useful:**
**Useful when:**
* Single response queries where the response should be passed on to some other actor
* ???
**Problems with ask:**
**Problems:**
* There can only be a single response to one `ask`
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
@ -109,12 +137,14 @@ Keeping context for an interaction, or multiple interactions can be done by movi
TODO
**Scenarios where per session child actor is useful:**
**Useful when:**
* A single incoming request should result in multiple interactions with other actions before a result can be built
* A single incoming request should result in multiple interactions with other actors before a result can be built,
for example aggregation of several results
* Handle acknowledgement and retry messages for at-least-once delivery
* ???
**Problems with ask:**
**Problems:**
* Children have lifecycles that must be managed to not create a resource leak
* ???
* ???

View file

@ -273,7 +273,7 @@ object PersistentActorCompileOnlyTest {
var basket = Basket(Nil)
var stash: Seq[Command] = Nil
val adapt = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
val adapt = ctx.messageAdapter((m: MetaData) GotMetaData(m))
def addItem(id: Id, self: ActorRef[Command]) =
Effect
@ -316,8 +316,7 @@ object PersistentActorCompileOnlyTest {
case ItemAdded(id) id +: state
case ItemRemoved(id) state.filter(_ != id)
}).onRecoveryCompleted((ctx, state) {
val ad = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
state.foreach(id metadataRegistry ! GetMetaData(id, ad))
state.foreach(id metadataRegistry ! GetMetaData(id, adapt))
})
}
}

View file

@ -59,12 +59,12 @@ object Effect {
ref
}
override def spawnAdapter[U](f: U T): ActorRef[U] = {
spawnAdapter(f, "")
override def spawnMessageAdapter[U](f: U T): ActorRef[U] = {
spawnMessageAdapter(f, "")
}
override def spawnAdapter[U](f: U T, name: String): ActorRef[U] = {
val ref = super.spawnAdapter(f, name)
override def spawnMessageAdapter[U](f: U T, name: String): ActorRef[U] = {
val ref = super.spawnMessageAdapter(f, name)
effectQueue.offer(SpawnedAdapter)
ref
}

View file

@ -104,7 +104,7 @@ private[akka] final class FunctionRef[-T](
/**
* INTERNAL API
*/
@InternalApi private[akka] def internalSpawnAdapter[U](f: U T, name: String): ActorRef[U] = {
@InternalApi private[akka] def internalSpawnMessageAdapter[U](f: U T, name: String): ActorRef[U] = {
val n = if (name != "") s"${childName.next()}-$name" else childName.next()
val i = TestInbox[U](n)

View file

@ -50,12 +50,12 @@ object BehaviorTestkitSpec {
}
Behaviors.same
case SpawnAdapter
ctx.spawnAdapter {
ctx.spawnMessageAdapter {
r: Reproduce SpawnAnonymous(r.times)
}
Behaviors.same
case SpawnAdapterWithName(name)
ctx.spawnAdapter({
ctx.spawnMessageAdapter({
r: Reproduce SpawnAnonymous(r.times)
}, name)
Behaviors.same
@ -115,7 +115,7 @@ class BehaviorTestkitSpec extends WordSpec with Matchers {
}
}
"BehaviorTestkit's spawnAdapter" must {
"BehaviorTestkit's spawnMessageAdapter" must {
"create adapters without name and record effects" in {
val testkit = BehaviorTestkit[Father.Command](Father.init())
testkit.run(SpawnAdapter)