From e5794c57bbf2e456beb73a793fdf4228d6ff6e22 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 23 Aug 2019 08:30:09 +0200 Subject: [PATCH] doc: stylish interaction-patterns.md, #24717 --- .../akka/typed/InteractionPatternsTest.java | 1152 +++++++++-------- .../java/jdocs/akka/typed/MailboxDocTest.java | 3 +- .../akka/typed/InteractionPatternsSpec.scala | 414 +++--- .../typed/ShardingReplyCompileOnlyTest.java | 89 +- .../typed/ShardingCompileOnlySpec.scala | 209 +-- .../paradox/typed/interaction-patterns.md | 36 +- 6 files changed, 1027 insertions(+), 876 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java index c641468692..f784cf5d37 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -4,16 +4,15 @@ package jdocs.akka.typed; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; -import akka.actor.typed.Props; import akka.actor.typed.javadsl.*; import akka.actor.testkit.typed.javadsl.TestProbe; +import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.Await; -import scala.concurrent.duration.FiniteDuration; import java.net.URI; import java.time.Duration; @@ -21,615 +20,662 @@ import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import static jdocs.akka.typed.InteractionPatternsTest.Samples.*; + public class InteractionPatternsTest extends JUnitSuite { - // #fire-and-forget-definition - class PrintMe { - public final String message; + interface Samples { - public PrintMe(String message) { - this.message = message; + // #fire-and-forget-definition + public class Printer { + public static class PrintMe { + public final String message; + + public PrintMe(String message) { + this.message = message; + } + } + + public static Behavior create() { + return Behaviors.setup( + context -> + Behaviors.receive(PrintMe.class) + .onMessage( + PrintMe.class, + printMe -> { + context.getLog().info(printMe.message); + return Behaviors.same(); + }) + .build()); + } } - } + // #fire-and-forget-definition - static final Behavior printerBehavior = - Behaviors.setup( - context -> - Behaviors.receive(PrintMe.class) - .onMessage( - PrintMe.class, - printMe -> { - context.getLog().info(printMe.message); - return Behaviors.same(); - }) - .build()); - // #fire-and-forget-definition + public class CookieFabric { + // #request-response-protocol + public static class Request { + public final String query; + public final ActorRef replyTo; - // #request-response-protocol - class Request { - public final String query; - public final ActorRef respondTo; + public Request(String query, ActorRef replyTo) { + this.query = query; + this.replyTo = replyTo; + } + } - public Request(String query, ActorRef respondTo) { - this.query = query; - this.respondTo = respondTo; - } - } + public static class Response { + public final String result; - class Response { - public final String result; + public Response(String result) { + this.result = result; + } + } + // #request-response-protocol - public Response(String result) { - this.result = result; - } - } - // #request-response-protocol + // #request-response-respond + // actor behavior + public static Behavior create() { + return Behaviors.receive(Request.class) + .onMessage(Request.class, CookieFabric::onRequest) + .build(); + } - public void compileOnlyRequestResponse() { + private static Behavior onRequest(Request request) { + // ... process request ... + request.replyTo.tell(new Response("Here's the cookies for " + request.query)); + return Behaviors.same(); + } + // #request-response-respond - // #request-response-respond - // actor behavior - Behaviors.receive(Request.class) - .onMessage( - Request.class, - request -> { - // ... process request ... - request.respondTo.tell(new Response("Here's your response!")); - return Behaviors.same(); - }) - .build(); - // #request-response-respond + void demo() { + ActorRef cookieFabric = null; + ActorContext context = null; - ActorRef otherActor = null; - ActorContext context = null; - - // #request-response-send - otherActor.tell(new Request("give me cookies", context.getSelf())); - // #request-response-send - } - - // #adapted-response - - public static class Backend { - interface Request {} - - public static class StartTranslationJob implements Request { - public final int taskId; - public final URI site; - public final ActorRef replyTo; - - public StartTranslationJob(int taskId, URI site, ActorRef replyTo) { - this.taskId = taskId; - this.site = site; - this.replyTo = replyTo; + // #request-response-send + cookieFabric.tell(new CookieFabric.Request("give me cookies", context.getSelf())); + // #request-response-send } } - interface Response {} + // #adapted-response - public static class JobStarted implements Response { - public final int taskId; + public class Backend { + public interface Request {} - public JobStarted(int taskId) { - this.taskId = taskId; + public static class StartTranslationJob implements Request { + public final int taskId; + public final URI site; + public final ActorRef replyTo; + + public StartTranslationJob(int taskId, URI site, ActorRef replyTo) { + this.taskId = taskId; + this.site = site; + this.replyTo = replyTo; + } + } + + public interface Response {} + + public static class JobStarted implements Response { + public final int taskId; + + public JobStarted(int taskId) { + this.taskId = taskId; + } + } + + public static class JobProgress implements Response { + public final int taskId; + public final double progress; + + public JobProgress(int taskId, double progress) { + this.taskId = taskId; + this.progress = progress; + } + } + + public static class JobCompleted implements Response { + public final int taskId; + public final URI result; + + public JobCompleted(int taskId, URI result) { + this.taskId = taskId; + this.result = result; + } } } - public static class JobProgress implements Response { - public final int taskId; - public final double progress; + public class Frontend { - public JobProgress(int taskId, double progress) { - this.taskId = taskId; - this.progress = progress; + public interface Command {} + + public static class Translate implements Command { + public final URI site; + public final ActorRef replyTo; + + public Translate(URI site, ActorRef replyTo) { + this.site = site; + this.replyTo = replyTo; + } + } + + private static class WrappedBackendResponse implements Command { + final Backend.Response response; + + public WrappedBackendResponse(Backend.Response response) { + this.response = response; + } + } + + public static class Translator extends AbstractBehavior { + private final ActorContext context; + private final ActorRef backend; + private final ActorRef backendResponseAdapter; + + private int taskIdCounter = 0; + private Map> inProgress = new HashMap<>(); + + public Translator(ActorContext context, ActorRef backend) { + this.context = context; + this.backend = backend; + this.backendResponseAdapter = + context.messageAdapter(Backend.Response.class, WrappedBackendResponse::new); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Translate.class, this::onTranslate) + .onMessage(WrappedBackendResponse.class, this::onWrappedBackendResponse) + .build(); + } + + private Behavior onTranslate(Translate cmd) { + taskIdCounter += 1; + inProgress.put(taskIdCounter, cmd.replyTo); + backend.tell( + new Backend.StartTranslationJob(taskIdCounter, cmd.site, backendResponseAdapter)); + return this; + } + + private Behavior onWrappedBackendResponse(WrappedBackendResponse wrapped) { + Backend.Response response = wrapped.response; + if (response instanceof Backend.JobStarted) { + Backend.JobStarted rsp = (Backend.JobStarted) response; + context.getLog().info("Started {}", rsp.taskId); + } else if (response instanceof Backend.JobProgress) { + Backend.JobProgress rsp = (Backend.JobProgress) response; + context.getLog().info("Progress {}", rsp.taskId); + } else if (response instanceof Backend.JobCompleted) { + Backend.JobCompleted rsp = (Backend.JobCompleted) response; + context.getLog().info("Completed {}", rsp.taskId); + inProgress.get(rsp.taskId).tell(rsp.result); + inProgress.remove(rsp.taskId); + } else { + return Behaviors.unhandled(); + } + + return this; + } } } + // #adapted-response - public static class JobCompleted implements Response { - public final int taskId; - public final URI result; + // #timer + public class Buncher { - public JobCompleted(int taskId, URI result) { - this.taskId = taskId; - this.result = result; + public interface Command {} + + public static final class Batch { + private final List messages; + + public Batch(List messages) { + this.messages = Collections.unmodifiableList(messages); + } + + public List getMessages() { + return messages; + } + // #timer + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Batch batch = (Batch) o; + return Objects.equals(messages, batch.messages); + } + + @Override + public int hashCode() { + return Objects.hash(messages); + } // #timer + } + + public static final class ExcitingMessage implements Command { + public final String message; + + public ExcitingMessage(String message) { + this.message = message; + } + } + + private static final Object TIMER_KEY = new Object(); + + private enum Timeout implements Command { + INSTANCE + } + + public static Behavior create(ActorRef target, Duration after, int maxSize) { + return Behaviors.withTimers(timers -> new Buncher(timers, target, after, maxSize).idle()); + } + + private final TimerScheduler timers; + private final ActorRef target; + private final Duration after; + private final int maxSize; + + private Buncher( + TimerScheduler timers, ActorRef target, Duration after, int maxSize) { + this.timers = timers; + this.target = target; + this.after = after; + this.maxSize = maxSize; + } + + private Behavior idle() { + return Behaviors.receive(Command.class) + .onMessage(Command.class, this::onIdleCommand) + .build(); + } + + private Behavior onIdleCommand(Command message) { + timers.startSingleTimer(TIMER_KEY, Timeout.INSTANCE, after); + return new Active(message); + } + + private class Active extends AbstractBehavior { + private final List buffer = new ArrayList<>(); + + Active(Command firstCommand) { + buffer.add(firstCommand); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Timeout.class, message -> onTimeout()) + .onMessage(Command.class, this::onCommand) + .build(); + } + + private Behavior onTimeout() { + target.tell(new Batch(buffer)); + return idle(); // switch to idle + } + + private Behavior onCommand(Command message) { + buffer.add(message); + if (buffer.size() == maxSize) { + timers.cancel(TIMER_KEY); + target.tell(new Batch(buffer)); + return idle(); // switch to idle + } else { + return this; // stay Active + } + } } } - } + // #timer - public static class Frontend { + // #actor-ask + public class Hal extends AbstractBehavior { - interface Command {} + public interface Command {} - public static class Translate implements Command { - public final URI site; - public final ActorRef replyTo; + public static final class OpenThePodBayDoorsPlease implements Command { + public final ActorRef respondTo; - public Translate(URI site, ActorRef replyTo) { - this.site = site; - this.replyTo = replyTo; + public OpenThePodBayDoorsPlease(ActorRef respondTo) { + this.respondTo = respondTo; + } } - } - private static class WrappedJobStarted implements Command { - final Backend.JobStarted response; + public static final class HalResponse { + public final String message; - public WrappedJobStarted(Backend.JobStarted response) { - this.response = response; - } - } - - private static class WrappedJobProgress implements Command { - final Backend.JobProgress response; - - public WrappedJobProgress(Backend.JobProgress response) { - this.response = response; - } - } - - private static class WrappedJobCompleted implements Command { - final Backend.JobCompleted response; - - public WrappedJobCompleted(Backend.JobCompleted response) { - this.response = response; - } - } - - private static class OtherResponse implements Command { - final Backend.Response response; - - public OtherResponse(Backend.Response response) { - this.response = response; - } - } - - public static class Translator extends AbstractBehavior { - private final ActorContext context; - private final ActorRef backend; - private final ActorRef backendResponseAdapter; - - private int taskIdCounter = 0; - private Map> inProgress = new HashMap<>(); - - public Translator(ActorContext context, ActorRef backend) { - this.context = context; - this.backend = backend; - this.backendResponseAdapter = - context.messageAdapter( - Backend.Response.class, - rsp -> { - if (rsp instanceof Backend.JobStarted) - return new WrappedJobStarted((Backend.JobStarted) rsp); - else if (rsp instanceof Backend.JobProgress) - return new WrappedJobProgress((Backend.JobProgress) rsp); - else if (rsp instanceof Backend.JobCompleted) - return new WrappedJobCompleted((Backend.JobCompleted) rsp); - else return new OtherResponse(rsp); - }); + public HalResponse(String message) { + this.message = message; + } } @Override public Receive createReceive() { return newReceiveBuilder() - .onMessage( - Translate.class, - cmd -> { - taskIdCounter += 1; - inProgress.put(taskIdCounter, cmd.replyTo); - backend.tell( - new Backend.StartTranslationJob( - taskIdCounter, cmd.site, backendResponseAdapter)); - return this; - }) - .onMessage( - WrappedJobStarted.class, - wrapped -> { - context.getLog().info("Started {}", wrapped.response.taskId); - return this; - }) - .onMessage( - WrappedJobProgress.class, - wrapped -> { - context - .getLog() - .info("Progress {}: {}", wrapped.response.taskId, wrapped.response.progress); - return this; - }) - .onMessage( - WrappedJobCompleted.class, - wrapped -> { - context - .getLog() - .info("Completed {}: {}", wrapped.response.taskId, wrapped.response.result); - return this; - }) - .onMessage(OtherResponse.class, other -> Behaviors.unhandled()) + .onMessage(OpenThePodBayDoorsPlease.class, this::onOpenThePodBayDoorsPlease) .build(); } + + private Behavior onOpenThePodBayDoorsPlease(OpenThePodBayDoorsPlease message) { + message.respondTo.tell(new HalResponse("I'm sorry, Dave. I'm afraid I can't do that.")); + return this; + } + } + + public class Dave extends AbstractBehavior { + + public interface Command {} + + // this is a part of the protocol that is internal to the actor itself + private static final class AdaptedResponse implements Command { + public final String message; + + public AdaptedResponse(String message) { + this.message = message; + } + } + + public static Behavior create(ActorRef hal) { + return Behaviors.setup(context -> new Dave(context, hal)); + } + + private final ActorContext context; + + private Dave(ActorContext context, ActorRef hal) { + this.context = context; + + // asking someone requires a timeout, if the timeout hits without response + // the ask is failed with a TimeoutException + final Duration timeout = Duration.ofSeconds(3); + + context.ask( + Hal.HalResponse.class, + hal, + timeout, + // construct the outgoing message + (ActorRef ref) -> new Hal.OpenThePodBayDoorsPlease(ref), + // adapt the response (or failure to respond) + (response, throwable) -> { + if (response != null) { + return new AdaptedResponse(response.message); + } else { + return new AdaptedResponse("Request failed"); + } + }); + + // we can also tie in request context into an interaction, it is safe to look at + // actor internal state from the transformation function, but remember that it may have + // changed at the time the response arrives and the transformation is done, best is to + // use immutable state we have closed over like here. + final int requestId = 1; + context.ask( + Hal.HalResponse.class, + hal, + timeout, + // construct the outgoing message + (ActorRef ref) -> new Hal.OpenThePodBayDoorsPlease(ref), + // adapt the response (or failure to respond) + (response, throwable) -> { + if (response != null) { + return new AdaptedResponse(requestId + ": " + response.message); + } else { + return new AdaptedResponse(requestId + ": Request failed"); + } + }); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + // the adapted message ends up being processed like any other + // message sent to the actor + .onMessage(AdaptedResponse.class, this::onAdaptedResponse) + .build(); + } + + private Behavior onAdaptedResponse(AdaptedResponse response) { + context.getLog().info("Got response from HAL: {}", response.message); + return this; + } + } + // #actor-ask + + // #per-session-child + // dummy data types just for this sample + public class Keys {} + + public class Wallet {} + + public class KeyCabinet { + public static class GetKeys { + public final String whoseKeys; + public final ActorRef replyTo; + + public GetKeys(String whoseKeys, ActorRef respondTo) { + this.whoseKeys = whoseKeys; + this.replyTo = respondTo; + } + } + + public static Behavior create() { + return Behaviors.receiveMessage(KeyCabinet::onGetKeys); + } + + private static Behavior onGetKeys(GetKeys message) { + message.replyTo.tell(new Keys()); + return Behaviors.same(); + } + } + + public class Drawer { + + public static class GetWallet { + public final String whoseWallet; + public final ActorRef replyTo; + + public GetWallet(String whoseWallet, ActorRef replyTo) { + this.whoseWallet = whoseWallet; + this.replyTo = replyTo; + } + } + + public static Behavior create() { + return Behaviors.receiveMessage(Drawer::onGetWallet); + } + + private static Behavior onGetWallet(GetWallet message) { + message.replyTo.tell(new Wallet()); + return Behaviors.same(); + } + } + + public class Home { + + public interface Command {} + + public static class LeaveHome implements Command { + public final String who; + public final ActorRef respondTo; + + public LeaveHome(String who, ActorRef respondTo) { + this.who = who; + this.respondTo = respondTo; + } + } + + public static class ReadyToLeaveHome { + public final String who; + public final Keys keys; + public final Wallet wallet; + + public ReadyToLeaveHome(String who, Keys keys, Wallet wallet) { + this.who = who; + this.keys = keys; + this.wallet = wallet; + } + } + + private final ActorContext context; + + private final ActorRef keyCabinet; + private final ActorRef drawer; + + private Home(ActorContext context) { + this.context = context; + this.keyCabinet = context.spawn(KeyCabinet.create(), "key-cabinet"); + this.drawer = context.spawn(Drawer.create(), "drawer"); + } + + private Behavior behavior() { + return Behaviors.receive(Command.class) + .onMessage(LeaveHome.class, this::onLeaveHome) + .build(); + } + + private Behavior onLeaveHome(LeaveHome message) { + context.spawn( + new PrepareToLeaveHome(message.who, message.respondTo, keyCabinet, drawer), + "leaving" + message.who); + return Behaviors.same(); + } + + // actor behavior + public static Behavior create() { + return Behaviors.setup(context -> new Home(context).behavior()); + } + } + + // per session actor behavior + class PrepareToLeaveHome extends AbstractBehavior { + private final String whoIsLeaving; + private final ActorRef replyTo; + private final ActorRef keyCabinet; + private final ActorRef drawer; + private Optional wallet = Optional.empty(); + private Optional keys = Optional.empty(); + + PrepareToLeaveHome( + String whoIsLeaving, + ActorRef replyTo, + ActorRef keyCabinet, + ActorRef drawer) { + this.whoIsLeaving = whoIsLeaving; + this.replyTo = replyTo; + this.keyCabinet = keyCabinet; + this.drawer = drawer; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Wallet.class, this::onWallet) + .onMessage(Keys.class, this::onKeys) + .build(); + } + + private Behavior onWallet(Wallet wallet) { + this.wallet = Optional.of(wallet); + return completeOrContinue(); + } + + private Behavior onKeys(Keys keys) { + this.keys = Optional.of(keys); + return completeOrContinue(); + } + + private Behavior completeOrContinue() { + if (wallet.isPresent() && keys.isPresent()) { + replyTo.tell(new Home.ReadyToLeaveHome(whoIsLeaving, keys.get(), wallet.get())); + return Behaviors.stopped(); + } else { + return this; + } + } + } + // #per-session-child + + } + + interface StandaloneAskSample { + // #standalone-ask + public class CookieFabric { + + interface Command {} + + public static class GiveMeCookies implements Command { + public final ActorRef cookies; + + public GiveMeCookies(ActorRef cookies) { + this.cookies = cookies; + } + } + + public static class Cookies { + public final int count; + + public Cookies(int count) { + this.count = count; + } + } + } + // #standalone-ask + + class NotShown { + + // #standalone-ask + + public void askAndPrint( + ActorSystem system, ActorRef cookieFabric) { + CompletionStage result = + AskPattern.ask( + cookieFabric, + CookieFabric.GiveMeCookies::new, + // asking someone requires a timeout and a scheduler, if the timeout hits without + // response + // the ask is failed with a TimeoutException + Duration.ofSeconds(3), + system.scheduler()); + + result.whenComplete( + (cookies, failure) -> { + if (cookies != null) System.out.println("Yay, cookies!"); + else System.out.println("Boo! didn't get cookies in time."); + }); + } + // #standalone-ask } } - // #adapted-response + + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); @Test public void fireAndForgetSample() throws Exception { // #fire-and-forget-doit - final ActorSystem system = - ActorSystem.create(printerBehavior, "printer-sample-system"); + final ActorSystem system = + ActorSystem.create(Printer.create(), "printer-sample-system"); // note that system is also the ActorRef to the guardian actor - final ActorRef ref = system; + final ActorRef ref = system; // these are all fire and forget - ref.tell(new PrintMe("message 1")); - ref.tell(new PrintMe("message 2")); + ref.tell(new Printer.PrintMe("message 1")); + ref.tell(new Printer.PrintMe("message 2")); // #fire-and-forget-doit system.terminate(); system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); } - // #timer - interface Msg {} - - public static final class Batch { - private final List messages; - - public Batch(List messages) { - this.messages = Collections.unmodifiableList(messages); - } - - public List getMessages() { - return messages; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Batch batch = (Batch) o; - return Objects.equals(messages, batch.messages); - } - - @Override - public int hashCode() { - return Objects.hash(messages); - } - } - - public static final class ExcitingMessage implements Msg { - private final String message; - - public ExcitingMessage(String message) { - this.message = message; - } - } - - private static final Object TIMER_KEY = new Object(); - - private static class TimeoutMsg implements Msg {} - - public static Behavior behavior(ActorRef target, Duration after, int maxSize) { - return Behaviors.withTimers(timers -> idle(timers, target, after, maxSize)); - } - - private static Behavior idle( - TimerScheduler timers, ActorRef target, Duration after, int maxSize) { - return Behaviors.receive(Msg.class) - .onMessage( - Msg.class, - message -> { - timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after); - List buffer = new ArrayList<>(); - buffer.add(message); - return active(buffer, timers, target, after, maxSize); - }) - .build(); - } - - private static Behavior active( - List buffer, - TimerScheduler timers, - ActorRef target, - Duration after, - int maxSize) { - return Behaviors.receive(Msg.class) - .onMessage( - TimeoutMsg.class, - message -> { - target.tell(new Batch(buffer)); - return idle(timers, target, after, maxSize); - }) - .onMessage( - Msg.class, - message -> { - buffer.add(message); - if (buffer.size() == maxSize) { - timers.cancel(TIMER_KEY); - target.tell(new Batch(buffer)); - return idle(timers, target, after, maxSize); - } else { - return active(buffer, timers, target, after, maxSize); - } - }) - .build(); - } - // #timer - @Test public void timers() throws Exception { - final ActorSystem system = ActorSystem.create(Behaviors.empty(), "timers-sample"); - TestProbe probe = TestProbe.create("batcher", system); - ActorRef bufferer = - Await.result( - system.systemActorOf( - behavior(probe.ref(), Duration.ofSeconds(1), 10), - "batcher", - Props.empty(), - akka.util.Timeout.create(Duration.ofSeconds(1))), - FiniteDuration.create(3, TimeUnit.SECONDS)); + TestProbe probe = testKit.createTestProbe(Buncher.Batch.class); + ActorRef buncher = + testKit.spawn(Buncher.create(probe.ref(), Duration.ofSeconds(1), 10), "batcher"); - ExcitingMessage msgOne = new ExcitingMessage("one"); - ExcitingMessage msgTwo = new ExcitingMessage("two"); - bufferer.tell(msgOne); - bufferer.tell(msgTwo); - probe.expectNoMessage(Duration.ofMillis(1)); - probe.expectMessage(Duration.ofSeconds(2), new Batch(Arrays.asList(msgOne, msgTwo))); - - system.terminate(); - system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + Buncher.ExcitingMessage msgOne = new Buncher.ExcitingMessage("one"); + Buncher.ExcitingMessage msgTwo = new Buncher.ExcitingMessage("two"); + buncher.tell(msgOne); + buncher.tell(msgTwo); + probe.expectNoMessage(); + probe.expectMessage(Duration.ofSeconds(2), new Buncher.Batch(Arrays.asList(msgOne, msgTwo))); } - - // #actor-ask - interface HalCommand {} - - static final class OpenThePodBayDoorsPlease implements HalCommand { - public final ActorRef respondTo; - - OpenThePodBayDoorsPlease(ActorRef respondTo) { - this.respondTo = respondTo; - } - } - - static final class HalResponse { - public final String message; - - HalResponse(String message) { - this.message = message; - } - } - - static final Behavior halBehavior = - Behaviors.receive(HalCommand.class) - .onMessage( - OpenThePodBayDoorsPlease.class, - message -> { - message.respondTo.tell( - new HalResponse("I'm sorry, Dave. I'm afraid I can't do that.")); - return Behaviors.same(); - }) - .build(); - - interface DaveProtocol {} - - // this is a part of the protocol that is internal to the actor itself - private static final class AdaptedResponse implements DaveProtocol { - public final String message; - - public AdaptedResponse(String message) { - this.message = message; - } - } - - public static Behavior daveBehavior(final ActorRef hal) { - return Behaviors.setup( - (ActorContext context) -> { - - // asking someone requires a timeout, if the timeout hits without response - // the ask is failed with a TimeoutException - final Duration timeout = Duration.ofSeconds(3); - - context.ask( - HalResponse.class, - hal, - timeout, - // construct the outgoing message - (ActorRef ref) -> new OpenThePodBayDoorsPlease(ref), - // adapt the response (or failure to respond) - (response, throwable) -> { - if (response != null) { - return new AdaptedResponse(response.message); - } else { - return new AdaptedResponse("Request failed"); - } - }); - - // we can also tie in request context into an interaction, it is safe to look at - // actor internal state from the transformation function, but remember that it may have - // changed at the time the response arrives and the transformation is done, best is to - // use immutable state we have closed over like here. - final int requestId = 1; - context.ask( - HalResponse.class, - hal, - timeout, - // construct the outgoing message - (ActorRef ref) -> new OpenThePodBayDoorsPlease(ref), - // adapt the response (or failure to respond) - (response, throwable) -> { - if (response != null) { - return new AdaptedResponse(requestId + ": " + response.message); - } else { - return new AdaptedResponse(requestId + ": Request failed"); - } - }); - - return Behaviors.receive(DaveProtocol.class) - // the adapted message ends up being processed like any other - // message sent to the actor - .onMessage( - AdaptedResponse.class, - response -> { - context.getLog().info("Got response from HAL: {}", response.message); - return Behaviors.same(); - }) - .build(); - }); - } - // #actor-ask - - // #standalone-ask - interface CookieCommand {} - - static class GiveMeCookies implements CookieCommand { - public final ActorRef cookies; - - GiveMeCookies(ActorRef cookies) { - this.cookies = cookies; - } - }; - - static class Cookies {} - - public void askAndPrint(ActorSystem system, ActorRef cookieActorRef) { - CompletionStage result = - AskPattern.ask( - cookieActorRef, - GiveMeCookies::new, - // asking someone requires a timeout and a scheduler, if the timeout hits without - // response - // the ask is failed with a TimeoutException - Duration.ofSeconds(3), - system.scheduler()); - - result.whenComplete( - (cookies, failure) -> { - if (cookies != null) System.out.println("Yay, cookies!"); - else System.out.println("Boo! didn't get cookies in time."); - }); - } - // #standalone-ask - - // #per-session-child - // dummy data types just for this sample - interface Keys {} - - interface Wallet {} - // #per-session-child - - static final Behavior keyCabinetBehavior = null; - static final Behavior drawerBehavior = null; - // #per-session-child - // messages for the two services we interact with - class GetKeys { - public final String whoseKeys; - public final ActorRef respondTo; - - public GetKeys(String whoseKeys, ActorRef respondTo) { - this.whoseKeys = whoseKeys; - this.respondTo = respondTo; - } - } - - class GetWallet { - public final String whoseWallet; - public final ActorRef respondTo; - - public GetWallet(String whoseWallet, ActorRef respondTo) { - this.whoseWallet = whoseWallet; - this.respondTo = respondTo; - } - } - - interface HomeCommand {} - - class LeaveHome implements HomeCommand { - public final String who; - public final ActorRef respondTo; - - public LeaveHome(String who, ActorRef respondTo) { - this.who = who; - this.respondTo = respondTo; - } - } - - class ReadyToLeaveHome { - public final String who; - public final Keys keys; - public final Wallet wallet; - - public ReadyToLeaveHome(String who, Keys keys, Wallet wallet) { - this.who = who; - this.keys = keys; - this.wallet = wallet; - } - } - - // actor behavior - public Behavior homeBehavior() { - return Behaviors.setup( - (context) -> { - final ActorRef keyCabinet = context.spawn(keyCabinetBehavior, "key-cabinet"); - final ActorRef drawer = context.spawn(drawerBehavior, "drawer"); - - return Behaviors.receive(HomeCommand.class) - .onMessage( - LeaveHome.class, - message -> { - context.spawn( - new PrepareToLeaveHome(message.who, message.respondTo, keyCabinet, drawer), - "leaving" + message.who); - return Behaviors.same(); - }) - .build(); - }); - } - - // per session actor behavior - class PrepareToLeaveHome extends AbstractBehavior { - private final String whoIsLeaving; - private final ActorRef respondTo; - private final ActorRef keyCabinet; - private final ActorRef drawer; - private Optional wallet = Optional.empty(); - private Optional keys = Optional.empty(); - - public PrepareToLeaveHome( - String whoIsLeaving, - ActorRef respondTo, - ActorRef keyCabinet, - ActorRef drawer) { - this.whoIsLeaving = whoIsLeaving; - this.respondTo = respondTo; - this.keyCabinet = keyCabinet; - this.drawer = drawer; - } - - @Override - public Receive createReceive() { - return newReceiveBuilder() - .onMessage( - Wallet.class, - (wallet) -> { - this.wallet = Optional.of(wallet); - return completeOrContinue(); - }) - .onMessage( - Keys.class, - (keys) -> { - this.keys = Optional.of(keys); - return completeOrContinue(); - }) - .build(); - } - - private Behavior completeOrContinue() { - if (wallet.isPresent() && keys.isPresent()) { - respondTo.tell(new ReadyToLeaveHome(whoIsLeaving, keys.get(), wallet.get())); - return Behaviors.stopped(); - } else { - return this; - } - } - } - // #per-session-child - } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java index 47b6ba77a2..6d37e5ddcb 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java @@ -14,8 +14,9 @@ import akka.actor.typed.javadsl.Behaviors; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; import org.junit.Test; +import org.scalatest.junit.JUnitSuite; -public class MailboxDocTest { +public class MailboxDocTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala index 66f9fd4f8b..8716d7638e 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -9,8 +9,6 @@ import java.net.URI import akka.NotUsed import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } -import akka.actor.testkit.typed.scaladsl.TestProbe -import akka.util.Timeout import scala.concurrent.Future import scala.concurrent.duration._ @@ -24,24 +22,28 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik "contain a sample for fire and forget" in { // #fire-and-forget-definition - case class PrintMe(message: String) + object Printer { - val printerBehavior: Behavior[PrintMe] = Behaviors.receive { - case (context, PrintMe(message)) => - context.log.info(message) - Behaviors.same + case class PrintMe(message: String) + + def apply(): Behavior[PrintMe] = + Behaviors.receive { + case (context, PrintMe(message)) => + context.log.info(message) + Behaviors.same + } } // #fire-and-forget-definition // #fire-and-forget-doit - val system = ActorSystem(printerBehavior, "fire-and-forget-sample") + val system = ActorSystem(Printer(), "fire-and-forget-sample") // note how the system is also the top level actor ref - val printer: ActorRef[PrintMe] = system + val printer: ActorRef[Printer.PrintMe] = system // these are all fire and forget - printer ! PrintMe("message 1") - printer ! PrintMe("not message 2") + printer ! Printer.PrintMe("message 1") + printer ! Printer.PrintMe("not message 2") // #fire-and-forget-doit system.terminate() @@ -50,29 +52,32 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik "contain a sample for request response" in { - // #request-response-protocol - case class Request(query: String, respondTo: ActorRef[Response]) - case class Response(result: String) - // #request-response-protocol + object CookieFabric { + // #request-response-protocol + case class Request(query: String, replyTo: ActorRef[Response]) + case class Response(result: String) + // #request-response-protocol - // #request-response-respond - val otherBehavior = Behaviors.receiveMessage[Request] { - case Request(query, respondTo) => - // ... process query ... - respondTo ! Response(s"Here's the cookies for [$query]!") - Behaviors.same + // #request-response-respond + def apply(): Behaviors.Receive[Request] = + Behaviors.receiveMessage[Request] { + case Request(query, replyTo) => + // ... process query ... + replyTo ! Response(s"Here's the cookies for [$query]!") + Behaviors.same + } + // #request-response-respond } - // #request-response-respond - val otherActor: ActorRef[Request] = spawn(otherBehavior) - val probe = TestProbe[Response]() + val cookieFabric: ActorRef[CookieFabric.Request] = spawn(CookieFabric()) + val probe = createTestProbe[CookieFabric.Response]() // shhh, don't tell anyone import scala.language.reflectiveCalls val context = new { def self = probe.ref } // #request-response-send - otherActor ! Request("give me cookies", context.self) + cookieFabric ! CookieFabric.Request("give me cookies", context.self) // #request-response-send probe.receiveMessage() @@ -97,7 +102,7 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik final case class Translate(site: URI, replyTo: ActorRef[URI]) extends Command private final case class WrappedBackendResponse(response: Backend.Response) extends Command - def translator(backend: ActorRef[Backend.Request]): Behavior[Command] = + def apply(backend: ActorRef[Backend.Request]): Behavior[Command] = Behaviors.setup[Command] { context => val backendResponseMapper: ActorRef[Backend.Response] = context.messageAdapter(rsp => WrappedBackendResponse(rsp)) @@ -131,7 +136,7 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik // #adapted-response val backend = spawn(Behaviors.receiveMessage[Backend.Request] { - case Backend.StartTranslationJob(taskId, site @ _, replyTo) => + case Backend.StartTranslationJob(taskId, _, replyTo) => replyTo ! Backend.JobStarted(taskId) replyTo ! Backend.JobProgress(taskId, 0.25) replyTo ! Backend.JobProgress(taskId, 0.50) @@ -140,8 +145,8 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik Behaviors.same }) - val frontend = spawn(Frontend.translator(backend)) - val probe = TestProbe[URI]() + val frontend = spawn(Frontend(backend)) + val probe = createTestProbe[URI]() frontend ! Frontend.Translate(new URI("https://akka.io/docs/"), probe.ref) probe.expectMessage(new URI("https://akka.io/docs/sv/")) } @@ -151,114 +156,126 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik "contain a sample for scheduling messages to self" in { //#timer - case object TimerKey + object Buncher { - trait Msg - case class ExcitingMessage(message: String) extends Msg - final case class Batch(messages: Vector[Msg]) - case object Timeout extends Msg + sealed trait Command + final case class ExcitingMessage(message: String) extends Command + final case class Batch(messages: Vector[Command]) + private case object Timeout extends Command + private case object TimerKey - def behavior(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Msg] = { - Behaviors.withTimers(timers => idle(timers, target, after, maxSize)) - } - - def idle( - timers: TimerScheduler[Msg], - target: ActorRef[Batch], - after: FiniteDuration, - maxSize: Int): Behavior[Msg] = { - Behaviors.receiveMessage[Msg] { message => - timers.startSingleTimer(TimerKey, Timeout, after) - active(Vector(message), timers, target, after, maxSize) + def apply(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Command] = { + Behaviors.withTimers(timers => new Buncher(timers, target, after, maxSize).idle()) } } - def active( - buffer: Vector[Msg], - timers: TimerScheduler[Msg], - target: ActorRef[Batch], + class Buncher( + timers: TimerScheduler[Buncher.Command], + target: ActorRef[Buncher.Batch], after: FiniteDuration, - maxSize: Int): Behavior[Msg] = { - Behaviors.receiveMessage[Msg] { - case Timeout => - target ! Batch(buffer) - idle(timers, target, after, maxSize) - case m => - val newBuffer = buffer :+ m - if (newBuffer.size == maxSize) { - timers.cancel(TimerKey) - target ! Batch(newBuffer) - idle(timers, target, after, maxSize) - } else - active(newBuffer, timers, target, after, maxSize) + maxSize: Int) { + import Buncher._ + + private def idle(): Behavior[Command] = { + Behaviors.receiveMessage[Command] { message => + timers.startSingleTimer(TimerKey, Timeout, after) + active(Vector(message)) + } + } + + def active(buffer: Vector[Command]): Behavior[Command] = { + Behaviors.receiveMessage[Command] { + case Timeout => + target ! Batch(buffer) + idle() + case m => + val newBuffer = buffer :+ m + if (newBuffer.size == maxSize) { + timers.cancel(TimerKey) + target ! Batch(newBuffer) + idle() + } else + active(newBuffer) + } } } //#timer - val probe: TestProbe[Batch] = TestProbe[Batch]() - val bufferer: ActorRef[Msg] = spawn(behavior(probe.ref, 1.second, 10)) - bufferer ! ExcitingMessage("one") - bufferer ! ExcitingMessage("two") - probe.expectNoMessage(1.millisecond) - probe.expectMessage(2.seconds, Batch(Vector[Msg](ExcitingMessage("one"), ExcitingMessage("two")))) + val probe = createTestProbe[Buncher.Batch]() + val buncher: ActorRef[Buncher.Command] = spawn(Buncher(probe.ref, 1.second, 10)) + buncher ! Buncher.ExcitingMessage("one") + buncher ! Buncher.ExcitingMessage("two") + probe.expectNoMessage() + probe.expectMessage( + 2.seconds, + Buncher.Batch(Vector[Buncher.Command](Buncher.ExcitingMessage("one"), Buncher.ExcitingMessage("two")))) } "contain a sample for ask" in { - // #actor-ask - sealed trait HalCommand - case class OpenThePodBayDoorsPlease(respondTo: ActorRef[HalResponse]) extends HalCommand - case class HalResponse(message: String) + import akka.util.Timeout - val halBehavior = Behaviors.receiveMessage[HalCommand] { - case OpenThePodBayDoorsPlease(respondTo) => - respondTo ! HalResponse("I'm sorry, Dave. I'm afraid I can't do that.") - Behaviors.same + // #actor-ask + object Hal { + sealed trait Command + case class OpenThePodBayDoorsPlease(replyTo: ActorRef[Response]) extends Command + case class Response(message: String) + + def apply(): Behaviors.Receive[Hal.Command] = + Behaviors.receiveMessage[Command] { + case OpenThePodBayDoorsPlease(replyTo) => + replyTo ! Response("I'm sorry, Dave. I'm afraid I can't do that.") + Behaviors.same + } } - sealed trait DaveMessage - // this is a part of the protocol that is internal to the actor itself - case class AdaptedResponse(message: String) extends DaveMessage + object Dave { - def daveBehavior(hal: ActorRef[HalCommand]) = Behaviors.setup[DaveMessage] { context => - // asking someone requires a timeout, if the timeout hits without response - // the ask is failed with a TimeoutException - implicit val timeout: Timeout = 3.seconds + sealed trait Command + // this is a part of the protocol that is internal to the actor itself + private case class AdaptedResponse(message: String) extends Command - // Note: The second parameter list takes a function `ActorRef[T] => Message`, - // as OpenThePodBayDoorsPlease is a case class it has a factory apply method - // that is what we are passing as the second parameter here it could also be written - // as `ref => OpenThePodBayDoorsPlease(ref)` - context.ask(hal)(OpenThePodBayDoorsPlease) { - case Success(HalResponse(message)) => AdaptedResponse(message) - case Failure(_) => AdaptedResponse("Request failed") - } + def apply(hal: ActorRef[Hal.Command]): Behavior[Dave.Command] = + Behaviors.setup[Command] { context => + // asking someone requires a timeout, if the timeout hits without response + // the ask is failed with a TimeoutException + implicit val timeout: Timeout = 3.seconds - // we can also tie in request context into an interaction, it is safe to look at - // actor internal state from the transformation function, but remember that it may have - // changed at the time the response arrives and the transformation is done, best is to - // use immutable state we have closed over like here. - val requestId = 1 - context.ask(hal)(OpenThePodBayDoorsPlease) { - case Success(HalResponse(message)) => AdaptedResponse(s"$requestId: $message") - case Failure(_) => AdaptedResponse(s"$requestId: Request failed") - } + // Note: The second parameter list takes a function `ActorRef[T] => Message`, + // as OpenThePodBayDoorsPlease is a case class it has a factory apply method + // that is what we are passing as the second parameter here it could also be written + // as `ref => OpenThePodBayDoorsPlease(ref)` + context.ask(hal)(Hal.OpenThePodBayDoorsPlease) { + case Success(Hal.Response(message)) => AdaptedResponse(message) + case Failure(_) => AdaptedResponse("Request failed") + } - Behaviors.receiveMessage { - // the adapted message ends up being processed like any other - // message sent to the actor - case AdaptedResponse(message) => - context.log.info("Got response from hal: {}", message) - Behaviors.same - } + // we can also tie in request context into an interaction, it is safe to look at + // actor internal state from the transformation function, but remember that it may have + // changed at the time the response arrives and the transformation is done, best is to + // use immutable state we have closed over like here. + val requestId = 1 + context.ask(hal)(Hal.OpenThePodBayDoorsPlease) { + case Success(Hal.Response(message)) => AdaptedResponse(s"$requestId: $message") + case Failure(_) => AdaptedResponse(s"$requestId: Request failed") + } + + Behaviors.receiveMessage { + // the adapted message ends up being processed like any other + // message sent to the actor + case AdaptedResponse(message) => + context.log.info("Got response from hal: {}", message) + Behaviors.same + } + } } // #actor-ask // somewhat modified behavior to let us know we saw the two requests - val monitor = TestProbe[HalCommand]() - val hal = spawn(Behaviors.monitor(monitor.ref, halBehavior)) - spawn(daveBehavior(hal)) - monitor.expectMessageType[OpenThePodBayDoorsPlease] - monitor.expectMessageType[OpenThePodBayDoorsPlease] + val monitor = createTestProbe[Hal.Command]() + val hal = spawn(Behaviors.monitor(monitor.ref, Hal())) + spawn(Dave(hal)) + monitor.expectMessageType[Hal.OpenThePodBayDoorsPlease] + monitor.expectMessageType[Hal.OpenThePodBayDoorsPlease] } "contain a sample for per session child" in { @@ -269,110 +286,129 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik // #per-session-child - val keyCabinetBehavior: Behavior[GetKeys] = Behaviors.receiveMessage { - case GetKeys(_, respondTo) => - respondTo ! Keys() - Behaviors.same + object KeyCabinet { + case class GetKeys(whoseKeys: String, replyTo: ActorRef[Keys]) + + def apply(): Behavior[GetKeys] = + Behaviors.receiveMessage { + case GetKeys(_, replyTo) => + replyTo ! Keys() + Behaviors.same + } } - val drawerBehavior: Behavior[GetWallet] = Behaviors.receiveMessage { - case GetWallet(_, respondTo) => - respondTo ! Wallet() - Behaviors.same + + object Drawer { + case class GetWallet(whoseWallet: String, replyTo: ActorRef[Wallet]) + + def apply(): Behavior[GetWallet] = + Behaviors.receiveMessage { + case GetWallet(_, replyTo) => + replyTo ! Wallet() + Behaviors.same + } } // #per-session-child - // messages for the two services we interact with - trait HomeCommand - case class LeaveHome(who: String, respondTo: ActorRef[ReadyToLeaveHome]) extends HomeCommand - case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet) - case class GetKeys(whoseKeys: String, respondTo: ActorRef[Keys]) - case class GetWallet(whoseWallet: String, respondTo: ActorRef[Wallet]) + object Home { + sealed trait Command + case class LeaveHome(who: String, replyTo: ActorRef[ReadyToLeaveHome]) extends Command + case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet) - def homeBehavior = Behaviors.receive[HomeCommand] { (context, message) => - val keyCabinet: ActorRef[GetKeys] = context.spawn(keyCabinetBehavior, "key-cabinet") - val drawer: ActorRef[GetWallet] = context.spawn(drawerBehavior, "drawer") + def apply(): Behavior[Command] = { + Behaviors.setup[Command] { context => + val keyCabinet: ActorRef[KeyCabinet.GetKeys] = context.spawn(KeyCabinet(), "key-cabinet") + val drawer: ActorRef[Drawer.GetWallet] = context.spawn(Drawer(), "drawer") - message match { - case LeaveHome(who, respondTo) => - context.spawn(prepareToLeaveHome(who, respondTo, keyCabinet, drawer), s"leaving-$who") - Behaviors.same - } - } - - // per session actor behavior - def prepareToLeaveHome( - whoIsLeaving: String, - respondTo: ActorRef[ReadyToLeaveHome], - keyCabinet: ActorRef[GetKeys], - drawer: ActorRef[GetWallet]): Behavior[NotUsed] = - // we don't _really_ care about the actor protocol here as nobody will send us - // messages except for responses to our queries, so we just accept any kind of message - // but narrow that to more limited types then we interact - Behaviors - .setup[AnyRef] { context => - var wallet: Option[Wallet] = None - var keys: Option[Keys] = None - - // we narrow the ActorRef type to any subtype of the actual type we accept - keyCabinet ! GetKeys(whoIsLeaving, context.self.narrow[Keys]) - drawer ! GetWallet(whoIsLeaving, context.self.narrow[Wallet]) - - def nextBehavior: Behavior[AnyRef] = - (keys, wallet) match { - case (Some(w), Some(k)) => - // we got both, "session" is completed! - respondTo ! ReadyToLeaveHome(whoIsLeaving, w, k) - Behaviors.stopped - - case _ => - Behaviors.same - } - - Behaviors.receiveMessage { - case w: Wallet => - wallet = Some(w) - nextBehavior - case k: Keys => - keys = Some(k) - nextBehavior - case _ => - Behaviors.unhandled + Behaviors.receiveMessage[Command] { + case LeaveHome(who, replyTo) => + context.spawn(prepareToLeaveHome(who, replyTo, keyCabinet, drawer), s"leaving-$who") + Behaviors.same } } - .narrow[NotUsed] // we don't let anyone else know we accept anything + } + + // per session actor behavior + def prepareToLeaveHome( + whoIsLeaving: String, + replyTo: ActorRef[ReadyToLeaveHome], + keyCabinet: ActorRef[KeyCabinet.GetKeys], + drawer: ActorRef[Drawer.GetWallet]): Behavior[NotUsed] = { + // we don't _really_ care about the actor protocol here as nobody will send us + // messages except for responses to our queries, so we just accept any kind of message + // but narrow that to more limited types when we interact + Behaviors + .setup[AnyRef] { context => + var wallet: Option[Wallet] = None + var keys: Option[Keys] = None + + // we narrow the ActorRef type to any subtype of the actual type we accept + keyCabinet ! KeyCabinet.GetKeys(whoIsLeaving, context.self.narrow[Keys]) + drawer ! Drawer.GetWallet(whoIsLeaving, context.self.narrow[Wallet]) + + def nextBehavior(): Behavior[AnyRef] = + (keys, wallet) match { + case (Some(w), Some(k)) => + // we got both, "session" is completed! + replyTo ! ReadyToLeaveHome(whoIsLeaving, w, k) + Behaviors.stopped + + case _ => + Behaviors.same + } + + Behaviors.receiveMessage { + case w: Wallet => + wallet = Some(w) + nextBehavior() + case k: Keys => + keys = Some(k) + nextBehavior() + case _ => + Behaviors.unhandled + } + } + .narrow[NotUsed] // we don't let anyone else know we accept anything + } + } // #per-session-child - val requestor = TestProbe[ReadyToLeaveHome]() + val requestor = createTestProbe[Home.ReadyToLeaveHome]() - val home = spawn(homeBehavior, "home") - home ! LeaveHome("Bobby", requestor.ref) - requestor.expectMessage(ReadyToLeaveHome("Bobby", Keys(), Wallet())) + val home = spawn(Home(), "home") + home ! Home.LeaveHome("Bobby", requestor.ref) + requestor.expectMessage(Home.ReadyToLeaveHome("Bobby", Keys(), Wallet())) } "contain a sample for ask from outside the actor system" in { // #standalone-ask - trait CookieCommand {} - case class GiveMeCookies(replyTo: ActorRef[Cookies]) extends CookieCommand - case class Cookies(count: Int) + object CookieFabric { + sealed trait Command {} + case class GiveMeCookies(replyTo: ActorRef[Cookies]) extends Command + case class Cookies(count: Int) + + def apply(): Behaviors.Receive[CookieFabric.GiveMeCookies] = + Behaviors.receiveMessage { message => + message.replyTo ! Cookies(5) + Behaviors.same + } + } // #standalone-ask // keep this out of the sample as it uses the testkit spawn - val cookieActorRef = spawn(Behaviors.receiveMessage[GiveMeCookies] { message => - message.replyTo ! Cookies(5) - Behaviors.same - }) + val cookieFabric = spawn(CookieFabric()) // #standalone-ask import akka.actor.typed.scaladsl.AskPattern._ + import akka.util.Timeout // asking someone requires a timeout and a scheduler, if the timeout hits without response // the ask is failed with a TimeoutException implicit val timeout: Timeout = 3.seconds implicit val scheduler = system.scheduler - val result: Future[Cookies] = cookieActorRef.ask(ref => GiveMeCookies(ref)) + val result: Future[CookieFabric.Cookies] = cookieFabric.ask(ref => CookieFabric.GiveMeCookies(ref)) // the response callback will be executed on this execution context implicit val ec = system.executionContext @@ -383,6 +419,6 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik } // #standalone-ask - result.futureValue shouldEqual Cookies(5) + result.futureValue shouldEqual CookieFabric.Cookies(5) } } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java index eb5c3a6136..0f1cf6a835 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java @@ -4,56 +4,83 @@ package jdocs.akka.cluster.sharding.typed; -import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; -public class ShardingReplyCompileOnlyTest { - - ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); - ClusterSharding sharding = ClusterSharding.get(system); +interface ShardingReplyCompileOnlyTest { // #sharded-response // a sharded actor that needs counter updates - EntityTypeKey typeKey = EntityTypeKey.create(Command.class, "example-sharded-response"); + public class CounterConsumer { + public static EntityTypeKey typeKey = + EntityTypeKey.create(Command.class, "example-sharded-response"); - public interface Command {} + public interface Command {} - class NewCount implements Command { - public final long value; + public static class NewCount implements Command { + public final long value; - NewCount(long value) { - this.value = value; + public NewCount(long value) { + this.value = value; + } } } // a sharded counter that sends responses to another sharded actor - interface CounterCommand {} + public class Counter extends AbstractBehavior { + public static EntityTypeKey typeKey = + EntityTypeKey.create(Command.class, "example-sharded-counter"); - enum Increment implements CounterCommand { - INSTANCE - } + public interface Command {} - class GetValue implements CounterCommand { - public final String replyToEntityId; - - GetValue(String replyToEntityId) { - this.replyToEntityId = replyToEntityId; + public enum Increment implements Command { + INSTANCE } - } - public Behavior counter(int value) { - return Behaviors.receive(CounterCommand.class) - .onMessage(Increment.class, msg -> counter(value + 1)) - .onMessage( - GetValue.class, - msg -> { - sharding.entityRefFor(typeKey, msg.replyToEntityId).tell(new NewCount(value)); - return Behaviors.same(); - }) - .build(); + public static class GetValue implements Command { + public final String replyToEntityId; + + public GetValue(String replyToEntityId) { + this.replyToEntityId = replyToEntityId; + } + } + + public Behavior create() { + return Behaviors.setup(Counter::new); + } + + private final ClusterSharding sharding; + private int value = 0; + + private Counter(ActorContext context) { + this.sharding = ClusterSharding.get(context.getSystem()); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Increment.class, msg -> onIncrement()) + .onMessage(GetValue.class, this::onGetValue) + .build(); + } + + private Behavior onIncrement() { + value++; + return this; + } + + private Behavior onGetValue(GetValue msg) { + EntityRef entityRef = + sharding.entityRefFor(CounterConsumer.typeKey, msg.replyToEntityId); + entityRef.tell(new CounterConsumer.NewCount(value)); + return this; + } } // #sharded-response } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index 6eba97e940..d9e77e7759 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -17,113 +17,154 @@ object ShardingCompileOnlySpec { val system = ActorSystem(Behaviors.empty, "Sharding") - //#sharding-extension - import akka.cluster.sharding.typed.ShardingEnvelope - import akka.cluster.sharding.typed.scaladsl.ClusterSharding - import akka.cluster.sharding.typed.scaladsl.EntityTypeKey - import akka.cluster.sharding.typed.scaladsl.EntityRef + object Basics { - val sharding = ClusterSharding(system) - //#sharding-extension + //#sharding-extension + import akka.cluster.sharding.typed.ShardingEnvelope + import akka.cluster.sharding.typed.scaladsl.ClusterSharding + import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + import akka.cluster.sharding.typed.scaladsl.EntityRef - //#counter-messages - trait CounterCommand - case object Increment extends CounterCommand - final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand - //#counter-messages + val sharding = ClusterSharding(system) + //#sharding-extension - //#counter + //#counter + //#counter-messages + object Counter { + //#counter + sealed trait Command + case object Increment extends Command + final case class GetValue(replyTo: ActorRef[Int]) extends Command + //#counter-messages - def counter(entityId: String, value: Int): Behavior[CounterCommand] = - Behaviors.receiveMessage[CounterCommand] { - case Increment => - counter(entityId, value + 1) - case GetValue(replyTo) => - replyTo ! value - Behaviors.same - } - //#counter + //#counter - //#init - val TypeKey = EntityTypeKey[CounterCommand]("Counter") + def apply(entityId: String): Behavior[Command] = + counter(entityId, 0) - val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = - sharding.init(Entity(typeKey = TypeKey, createBehavior = ctx => counter(ctx.entityId, 0))) - //#init - - //#send - // With an EntityRef - val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1") - counterOne ! Increment - - // Entity id is specified via an `ShardingEnvelope` - shardRegion ! ShardingEnvelope("counter-1", Increment) - //#send - - import BlogPostExample.behavior - //#persistence - val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost") - - ClusterSharding(system).init(Entity(typeKey = BlogTypeKey, createBehavior = ctx => behavior(ctx.entityId))) - //#persistence - - //#counter-passivate - - case object Idle extends CounterCommand - case object GoodByeCounter extends CounterCommand - - def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = { - Behaviors.setup { ctx => - def become(value: Int): Behavior[CounterCommand] = - Behaviors.receiveMessage[CounterCommand] { + private def counter(entityId: String, value: Int): Behavior[Command] = + Behaviors.receiveMessage[Command] { case Increment => - become(value + 1) + counter(entityId, value + 1) case GetValue(replyTo) => replyTo ! value Behaviors.same - case Idle => - // after receive timeout - shard ! ClusterSharding.Passivate(ctx.self) - Behaviors.same - case GoodByeCounter => - // the stopMessage, used for rebalance and passivate - Behaviors.stopped } - ctx.setReceiveTimeout(30.seconds, Idle) - become(0) + //#counter-messages } + //#counter-messages + //#counter + + //#init + val TypeKey = EntityTypeKey[Counter.Command]("Counter") + + val shardRegion: ActorRef[ShardingEnvelope[Counter.Command]] = + sharding.init(Entity(typeKey = TypeKey, createBehavior = ctx => Counter(ctx.entityId))) + //#init + + //#send + // With an EntityRef + val counterOne: EntityRef[Counter.Command] = sharding.entityRefFor(TypeKey, "counter-1") + counterOne ! Counter.Increment + + // Entity id is specified via an `ShardingEnvelope` + shardRegion ! ShardingEnvelope("counter-1", Counter.Increment) + //#send + + import BlogPostExample.behavior + + //#persistence + val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost") + + ClusterSharding(system).init(Entity(typeKey = BlogTypeKey, createBehavior = ctx => behavior(ctx.entityId))) + //#persistence + } - sharding.init( - Entity(typeKey = TypeKey, createBehavior = ctx => counter2(ctx.shard, ctx.entityId)) - .withStopMessage(GoodByeCounter)) - //#counter-passivate + object CounterWithPassivate { + import akka.cluster.sharding.typed.scaladsl.ClusterSharding + import akka.cluster.sharding.typed.scaladsl.EntityTypeKey - def counterWithResponseToShardedActor(): Unit = { + //#counter-passivate + + object Counter { + sealed trait Command + case object Increment extends Command + final case class GetValue(replyTo: ActorRef[Int]) extends Command + private case object Idle extends Command + case object GoodByeCounter extends Command + + def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[Command] = { + Behaviors.setup { ctx => + def become(value: Int): Behavior[Command] = + Behaviors.receiveMessage[Command] { + case Increment => + become(value + 1) + case GetValue(replyTo) => + replyTo ! value + Behaviors.same + case Idle => + // after receive timeout + shard ! ClusterSharding.Passivate(ctx.self) + Behaviors.same + case GoodByeCounter => + // the stopMessage, used for rebalance and passivate + Behaviors.stopped + } + + ctx.setReceiveTimeout(30.seconds, Idle) + become(0) + } + } + } + + val TypeKey = EntityTypeKey[Counter.Command]("Counter") + + ClusterSharding(system).init( + Entity(typeKey = TypeKey, createBehavior = ctx => Counter(ctx.shard, ctx.entityId)) + .withStopMessage(Counter.GoodByeCounter)) + //#counter-passivate + + } + + object CounterWithResponseToShardedActor { + + import akka.cluster.sharding.typed.scaladsl.ClusterSharding + import akka.cluster.sharding.typed.scaladsl.EntityTypeKey //#sharded-response // a sharded actor that needs counter updates - trait Command - final case class NewCount(count: Long) extends Command - val entityTypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response") + object CounterConsumer { + sealed trait Command + final case class NewCount(count: Long) extends Command + val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response") + } // a sharded counter that sends responses to another sharded actor - trait CounterCommand - case object Increment extends CounterCommand - final case class GetValue(replyToEntityId: String) extends CounterCommand + object Counter { + trait Command + case object Increment extends Command + final case class GetValue(replyToEntityId: String) extends Command + val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-counter") - def counter(value: Long): Behavior[CounterCommand] = - Behaviors.receiveMessage[CounterCommand] { - case Increment => - counter(value + 1) - case GetValue(replyToEntityId) => - sharding.entityRefFor(entityTypeKey, replyToEntityId) ! NewCount(value) - Behaviors.same - } + private def apply(): Behavior[Command] = + Behaviors.setup { context => + counter(ClusterSharding(context.system), 0) + } + + private def counter(sharding: ClusterSharding, value: Long): Behavior[Command] = + Behaviors.receiveMessage { + case Increment => + counter(sharding, value + 1) + case GetValue(replyToEntityId) => + val replyToEntityRef = sharding.entityRefFor(CounterConsumer.TypeKey, replyToEntityId) + replyToEntityRef ! CounterConsumer.NewCount(value) + Behaviors.same + } + + } //#sharded-response - - counter(1) } } diff --git a/akka-docs/src/main/paradox/typed/interaction-patterns.md b/akka-docs/src/main/paradox/typed/interaction-patterns.md index dd77101edd..f3d460b225 100644 --- a/akka-docs/src/main/paradox/typed/interaction-patterns.md +++ b/akka-docs/src/main/paradox/typed/interaction-patterns.md @@ -245,24 +245,6 @@ This can be used with any type of `Behavior`, including `receive`, `receiveMessa * The `TimerScheduler` is bound to the lifecycle of the actor that owns it and it's cancelled automatically when the actor is stopped. * `Behaviors.withTimers` can also be used inside `Behaviors.supervise` and it will automatically cancel the started timers correctly when the actor is restarted, so that the new incarnation will not receive scheduled messages from previous incarnation. -## Responding to a sharded actor - -The normal pattern for expecting a reply is to include an @apidoc[akka.actor.typed.ActorRef] in the message, typically a message adapter. This can be used -for a sharded actor but if @scala[`ctx.self`]@java[`ctx.getSelf()`] is sent and the sharded actor is moved or passivated then the reply -will sent to dead letters. - -An alternative is to send the `entityId` in the message and have the reply sent via sharding: - -Scala -: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharded-response } - -Java -: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java) { #sharded-response } - -A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey` -could be included in the message if it is not known statically. - - ### Schedule periodically Scheduling of recurring messages can have two different characteristics: @@ -304,4 +286,22 @@ which may in worst case cause undesired load on the system. `scheduleWithFixedDe @@@ +## Responding to a sharded actor + +The normal pattern for expecting a reply is to include an @apidoc[akka.actor.typed.ActorRef] in the message, typically a message adapter. This can be used +for a sharded actor but if @scala[`ctx.self`]@java[`ctx.getSelf()`] is sent and the sharded actor is moved or passivated then the reply +will sent to dead letters. + +An alternative is to send the `entityId` in the message and have the reply sent via sharding: + +Scala +: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharded-response } + +Java +: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java) { #sharded-response } + +A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey` +could be included in the message if it is not known statically. + +