Actor interaction patterns docs for typed (#24404)

* Request response and actor-to-actor-ask

* Outside of actor ask

* Review feedback plus simplification of tell sample

* ask and mention symbolic

* Final sample and some proof reading feedback adressed

* Tests should pass in InteractionPatternSpec

* Incorrect comment removed

* Made sure all scala samples execute, to protect from regression, added forward refereces in text

* Use context logger

* Some minor rewording

* Feedback applied

* Rebase fix

* expectMessageType
This commit is contained in:
Johan Andrén 2018-02-02 17:49:03 +01:00 committed by Patrik Nordwall
parent 7feca6638b
commit 06b36db458
3 changed files with 638 additions and 127 deletions

View file

@ -3,15 +3,14 @@
*/ */
package jdocs.akka.typed; package jdocs.akka.typed;
import akka.NotUsed;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem; import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior; import akka.actor.typed.Behavior;
import akka.actor.typed.Props; import akka.actor.typed.Props;
import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.*;
import akka.actor.typed.javadsl.BehaviorBuilder;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.TimerScheduler;
import akka.testkit.typed.scaladsl.TestProbe; import akka.testkit.typed.scaladsl.TestProbe;
import akka.util.Timeout;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await; import scala.concurrent.Await;
@ -20,36 +19,64 @@ import scala.concurrent.duration.FiniteDuration;
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class InteractionPatternsTest extends JUnitSuite { public class InteractionPatternsTest extends JUnitSuite {
// #fire-and-forget // #fire-and-forget-definition
interface PrinterProtocol { } class PrintMe {
class DisableOutput implements PrinterProtocol { }
class EnableOutput implements PrinterProtocol { }
class PrintMe implements PrinterProtocol {
public final String message; public final String message;
public PrintMe(String message) { public PrintMe(String message) {
this.message = message; this.message = message;
} }
} }
public Behavior<PrinterProtocol> enabledPrinterBehavior() { static final Behavior<PrintMe> printerBehavior = Behaviors.immutable(PrintMe.class)
return BehaviorBuilder.<PrinterProtocol>create() .onMessage(PrintMe.class, (ctx, printMe) -> {
.onMessage(DisableOutput.class, (ctx, disableOutput) -> disabledPrinterBehavior()) ctx.getLog().info(printMe.message);
.onMessage(PrintMe.class, (ctx, printMe) -> { return Behaviors.same();
System.out.println(printMe.message); }).build();
return Behaviors.same(); // #fire-and-forget-definition
}).build();
// #request-response-protocol
class Request {
public final String query;
public final ActorRef<Response> respondTo;
public Request(String query, ActorRef<Response> respondTo) {
this.query = query;
this.respondTo = respondTo;
}
} }
public Behavior<PrinterProtocol> disabledPrinterBehavior() { class Response {
return BehaviorBuilder.<PrinterProtocol>create() public final String result;
.onMessage(EnableOutput.class, (ctx, enableOutput) -> enabledPrinterBehavior()) public Response(String result) {
.build(); this.result = result;
}
} }
// #fire-and-forget // #request-response-protocol
public void compileOnlyRequestResponse() {
// #request-response-respond
// actor behavior
Behaviors.immutable(Request.class)
.onMessage(Request.class, (ctx, request) -> {
// ... process request ...
request.respondTo.tell(new Response("Here's your response!"));
return Behaviors.same();
}).build();
// #request-response-respond
ActorRef<Request> otherActor = null;
ActorContext<Response> ctx = null;
// #request-response-send
otherActor.tell(new Request("give me cookies", ctx.getSelf()));
// #request-response-send
}
// #adapted-response // #adapted-response
@ -101,7 +128,7 @@ public class InteractionPatternsTest extends JUnitSuite {
public static class Frontend { public static class Frontend {
interface Command {} interface Command {}
public static class Translate implements Command { public static class Translate implements Command {
public final URI site; public final URI site;
public final ActorRef<URI> replyTo; public final ActorRef<URI> replyTo;
@ -177,15 +204,17 @@ public class InteractionPatternsTest extends JUnitSuite {
return this; return this;
}) })
.onMessage(WrappedJobStarted.class, wrapped -> { .onMessage(WrappedJobStarted.class, wrapped -> {
System.out.println("Started " + wrapped.response.taskId); ctx.getLog().info("Started {}", wrapped.response.taskId);
return this; return this;
}) })
.onMessage(WrappedJobProgress.class, wrapped -> { .onMessage(WrappedJobProgress.class, wrapped -> {
System.out.println("Progress " + wrapped.response.taskId + ": " + wrapped.response.progress); ctx.getLog().info("Progress {}: {}", wrapped.response.taskId,
wrapped.response.progress);
return this; return this;
}) })
.onMessage(WrappedJobCompleted.class, wrapped -> { .onMessage(WrappedJobCompleted.class, wrapped -> {
System.out.println("Completed " + wrapped.response.taskId + ": " + wrapped.response.result); ctx.getLog().info("Completed {}: {}", wrapped.response.taskId,
wrapped.response.result);
return this; return this;
}) })
.onMessage(OtherResponse.class, other -> Behaviors.unhandled()) .onMessage(OtherResponse.class, other -> Behaviors.unhandled())
@ -198,20 +227,17 @@ public class InteractionPatternsTest extends JUnitSuite {
@Test @Test
public void fireAndForgetSample() throws Exception { public void fireAndForgetSample() throws Exception {
// #fire-and-forget // #fire-and-forget-doit
final ActorSystem<PrinterProtocol> system = final ActorSystem<PrintMe> system =
ActorSystem.create(enabledPrinterBehavior(), "printer-sample-system"); ActorSystem.create(printerBehavior, "printer-sample-system");
// note that system is also the ActorRef to the guardian actor // note that system is also the ActorRef to the guardian actor
final ActorRef<PrinterProtocol> ref = system; final ActorRef<PrintMe> ref = system;
// these are all fire and forget // these are all fire and forget
ref.tell(new PrintMe("message")); ref.tell(new PrintMe("message 1"));
ref.tell(new DisableOutput()); ref.tell(new PrintMe("message 2"));
ref.tell(new PrintMe("message")); // #fire-and-forget-doit
ref.tell(new EnableOutput());
// #fire-and-forget
Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS));
} }
@ -255,7 +281,7 @@ public class InteractionPatternsTest extends JUnitSuite {
private static final Object TIMER_KEY = new Object(); private static final Object TIMER_KEY = new Object();
private static class Timeout implements Msg { private static class TimeoutMsg implements Msg {
} }
public static Behavior<Msg> behavior(ActorRef<Batch> target, FiniteDuration after, int maxSize) { public static Behavior<Msg> behavior(ActorRef<Batch> target, FiniteDuration after, int maxSize) {
@ -266,7 +292,7 @@ public class InteractionPatternsTest extends JUnitSuite {
FiniteDuration after, int maxSize) { FiniteDuration after, int maxSize) {
return Behaviors.immutable(Msg.class) return Behaviors.immutable(Msg.class)
.onMessage(Msg.class, (ctx, msg) -> { .onMessage(Msg.class, (ctx, msg) -> {
timers.startSingleTimer(TIMER_KEY, new Timeout(), after); timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after);
List<Msg> buffer = new ArrayList<>(); List<Msg> buffer = new ArrayList<>();
buffer.add(msg); buffer.add(msg);
return active(buffer, timers, target, after, maxSize); return active(buffer, timers, target, after, maxSize);
@ -277,7 +303,7 @@ public class InteractionPatternsTest extends JUnitSuite {
private static Behavior<Msg> active(List<Msg> buffer, TimerScheduler<Msg> timers, private static Behavior<Msg> active(List<Msg> buffer, TimerScheduler<Msg> timers,
ActorRef<Batch> target, FiniteDuration after, int maxSize) { ActorRef<Batch> target, FiniteDuration after, int maxSize) {
return Behaviors.immutable(Msg.class) return Behaviors.immutable(Msg.class)
.onMessage(Timeout.class, (ctx, msg) -> { .onMessage(TimeoutMsg.class, (ctx, msg) -> {
target.tell(new Batch(buffer)); target.tell(new Batch(buffer));
return idle(timers, target, after, maxSize); return idle(timers, target, after, maxSize);
}) })
@ -301,7 +327,8 @@ public class InteractionPatternsTest extends JUnitSuite {
TestProbe<Batch> probe = new TestProbe<>("batcher", system); TestProbe<Batch> probe = new TestProbe<>("batcher", system);
ActorRef<Msg> bufferer = Await.result(system.systemActorOf( ActorRef<Msg> bufferer = Await.result(system.systemActorOf(
behavior(probe.ref(), new FiniteDuration(1, TimeUnit.SECONDS), 10), behavior(probe.ref(), new FiniteDuration(1, TimeUnit.SECONDS), 10),
"batcher", Props.empty(), akka.util.Timeout.apply(1, TimeUnit.SECONDS)), new FiniteDuration(1, TimeUnit.SECONDS)); "batcher", Props.empty(), akka.util.Timeout.apply(1, TimeUnit.SECONDS)),
new FiniteDuration(1, TimeUnit.SECONDS));
ExcitingMessage msgOne = new ExcitingMessage("one"); ExcitingMessage msgOne = new ExcitingMessage("one");
ExcitingMessage msgTwo = new ExcitingMessage("two"); ExcitingMessage msgTwo = new ExcitingMessage("two");
@ -314,6 +341,223 @@ public class InteractionPatternsTest extends JUnitSuite {
Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS));
} }
// #actor-ask
interface HalCommand {}
static final class OpenThePodBayDoorsPlease implements HalCommand {
public final ActorRef<HalResponse> respondTo;
OpenThePodBayDoorsPlease(ActorRef<HalResponse> respondTo) {
this.respondTo = respondTo;
}
}
static final class HalResponse {
public final String message;
HalResponse(String message) {
this.message = message;
}
}
static final Behavior<HalCommand> halBehavior =
Behaviors.immutable(HalCommand.class)
.onMessage(OpenThePodBayDoorsPlease.class, (ctx, msg) -> {
msg.respondTo.tell(new HalResponse("I'm sorry, Dave. I'm afraid I can't do that."));
return Behaviors.same();
}).build();
interface DaveProtocol {}
// this is a part of the protocol that is internal to the actor itself
private static final class AdaptedResponse implements DaveProtocol {
public final String message;
public AdaptedResponse(String message) {
this.message = message;
}
}
public static Behavior<DaveProtocol> daveBehavior(final ActorRef<HalCommand> hal) {
return Behaviors.deferred((ActorContext<DaveProtocol> ctx) -> {
// asking someone requires a timeout, if the timeout hits without response
// the ask is failed with a TimeoutException
final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS);
ctx.ask(
HalResponse.class,
hal,
timeout,
// construct the outgoing message
(ActorRef<HalResponse> ref) -> new OpenThePodBayDoorsPlease(ref),
// adapt the response (or failure to respond)
(response, throwable) -> {
if (response != null) {
return new AdaptedResponse(response.message);
} else {
return new AdaptedResponse("Request failed");
}
});
// we can also tie in request context into an interaction, it is safe to look at
// actor internal state from the transformation function, but remember that it may have
// changed at the time the response arrives and the transformation is done, best is to
// use immutable state we have closed over like here.
final int requestId = 1;
ctx.ask(
HalResponse.class,
hal,
timeout,
// construct the outgoing message
(ActorRef<HalResponse> ref) -> new OpenThePodBayDoorsPlease(ref),
// adapt the response (or failure to respond)
(response, throwable) -> {
if (response != null) {
return new AdaptedResponse(requestId + ": " + response.message);
} else {
return new AdaptedResponse(requestId + ": Request failed");
}
});
return Behaviors.immutable(DaveProtocol.class)
// the adapted message ends up being processed like any other
// message sent to the actor
.onMessage(AdaptedResponse.class, (innerCtx, response) -> {
innerCtx.getLog().info("Got response from HAL: {}", response.message);
return Behaviors.same();
}).build();
});
}
// #actor-ask
// #standalone-ask
interface CookieCommand {}
static class GiveMeCookies implements CookieCommand {
public final ActorRef<Cookies> cookies;
GiveMeCookies(ActorRef<Cookies> cookies) {
this.cookies = cookies;
}
};
static class Cookies {}
public void askAndPrint(ActorSystem<Object> system, ActorRef<CookieCommand> cookieActorRef) {
CompletionStage<Cookies> result = AskPattern.ask(
cookieActorRef,
GiveMeCookies::new,
Timeout.apply(3, TimeUnit.SECONDS),
system.scheduler());
result.whenComplete((cookies, failure) -> {
if (cookies != null) System.out.println("Yay, cookies!");
else System.out.println("Boo! didn't get cookies in time.");
});
}
// #standalone-ask
// #per-session-child
// dummy data types just for this sample
interface Keys {}
interface Wallet {}
// #per-session-child
static final Behavior<GetKeys> keyCabinetBehavior = null;
static final Behavior<GetWallet> drawerBehavior = null;
// #per-session-child
// messages for the two services we interact with
class GetKeys {
public final String whoseKeys;
public final ActorRef<Keys> respondTo;
public GetKeys(String whoseKeys, ActorRef<Keys> respondTo) {
this.whoseKeys = whoseKeys;
this.respondTo = respondTo;
}
}
class GetWallet {
public final String whoseWallet;
public final ActorRef<Wallet> respondTo;
public GetWallet(String whoseWallet, ActorRef<Wallet> respondTo) {
this.whoseWallet = whoseWallet;
this.respondTo = respondTo;
}
}
interface HomeCommand {}
class LeaveHome implements HomeCommand {
public final String who;
public final ActorRef<ReadyToLeaveHome> respondTo;
public LeaveHome(String who, ActorRef<ReadyToLeaveHome> respondTo) {
this.who = who;
this.respondTo = respondTo;
}
}
class ReadyToLeaveHome {
public final String who;
public final Keys keys;
public final Wallet wallet;
public ReadyToLeaveHome(String who, Keys keys, Wallet wallet) {
this.who = who;
this.keys = keys;
this.wallet = wallet;
}
}
// actor behavior
public Behavior<HomeCommand> homeBehavior() {
return Behaviors.deferred((ctx) -> {
final ActorRef<GetKeys> keyCabinet = ctx.spawn(keyCabinetBehavior, "key-cabinet");
final ActorRef<GetWallet> drawer = ctx.spawn(drawerBehavior, "drawer");
return Behaviors.immutable(HomeCommand.class)
.onMessage(LeaveHome.class, (innerCtx, msg) -> {
ctx.spawn(new PrepareToLeaveHome(msg.who, msg.respondTo, keyCabinet, drawer), "leaving" + msg.who);
return Behavior.same();
}).build();
});
}
// per session actor behavior
class PrepareToLeaveHome extends Behaviors.MutableBehavior<Object> {
private final String whoIsLeaving;
private final ActorRef<ReadyToLeaveHome> respondTo;
private final ActorRef<GetKeys> keyCabinet;
private final ActorRef<GetWallet> drawer;
private Optional<Wallet> wallet = Optional.empty();
private Optional<Keys> keys = Optional.empty();
public PrepareToLeaveHome(String whoIsLeaving, ActorRef<ReadyToLeaveHome> respondTo, ActorRef<GetKeys> keyCabinet, ActorRef<GetWallet> drawer) {
this.whoIsLeaving = whoIsLeaving;
this.respondTo = respondTo;
this.keyCabinet = keyCabinet;
this.drawer = drawer;
}
@Override
public Behaviors.Receive<Object> createReceive() {
return receiveBuilder()
.onMessage(Wallet.class, (wallet) -> {
this.wallet = Optional.of(wallet);
return completeOrContinue();
}).onMessage(Keys.class, (keys) -> {
this.keys = Optional.of(keys);
return completeOrContinue();
}).build();
}
private Behavior<Object> completeOrContinue() {
if (wallet.isPresent() && keys.isPresent()) {
respondTo.tell(new ReadyToLeaveHome(whoIsLeaving, keys.get(), wallet.get()));
return Behaviors.stopped();
} else {
return this;
}
}
}
// #per-session-child
} }

View file

@ -5,69 +5,88 @@ package docs.akka.typed
import java.net.URI import java.net.URI
import akka.NotUsed
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedAkkaSpecWithShutdown } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedAkkaSpecWithShutdown }
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, TimerScheduler }
import akka.testkit.typed.TestKit import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe import akka.testkit.typed.scaladsl.TestProbe
import akka.util.Timeout
import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{ Failure, Success }
class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown { class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
"The interaction patterns docs" must { "The interaction patterns docs" must {
"contain a sample for fire and forget" in { "contain a sample for fire and forget" in {
// #fire-and-forget // #fire-and-forget-definition
case class PrintMe(message: String)
sealed trait PrinterProtocol val printerBehavior: Behavior[PrintMe] = Behaviors.immutable {
case object DisableOutput extends PrinterProtocol case (ctx, PrintMe(message))
case object EnableOutput extends PrinterProtocol ctx.log.info(message)
case class PrintMe(message: String) extends PrinterProtocol
// two state behavior
def enabledPrinterBehavior: Behavior[PrinterProtocol] = Behaviors.immutable {
case (_, DisableOutput)
disabledPrinterBehavior
case (_, EnableOutput)
Behaviors.ignore
case (_, PrintMe(message))
println(message)
Behaviors.same Behaviors.same
} }
// #fire-and-forget-definition
def disabledPrinterBehavior: Behavior[PrinterProtocol] = Behaviors.immutable { // #fire-and-forget-doit
case (_, DisableOutput) val system = ActorSystem(printerBehavior, "fire-and-forget-sample")
enabledPrinterBehavior
case (_, _)
// ignore any message
Behaviors.ignore
}
val system = ActorSystem(enabledPrinterBehavior, "fire-and-forget-sample")
// note how the system is also the top level actor ref // note how the system is also the top level actor ref
val printer: ActorRef[PrinterProtocol] = system val printer: ActorRef[PrintMe] = system
// these are all fire and forget // these are all fire and forget
printer ! PrintMe("printed") printer ! PrintMe("message 1")
printer ! DisableOutput printer ! PrintMe("not message 2")
printer ! PrintMe("not printed") // #fire-and-forget-doit
printer ! EnableOutput
// #fire-and-forget
system.terminate().futureValue system.terminate().futureValue
} }
"contain a sample for request response" in {
// #request-response-protocol
case class Request(query: String, respondTo: ActorRef[Response])
case class Response(result: String)
// #request-response-protocol
// #request-response-respond
val otherBehavior = Behaviors.immutable[Request] { (ctx, msg)
msg match {
case Request(query, respondTo)
// ... process query ...
respondTo ! Response("Here's your cookies!")
Behaviors.same
}
}
// #request-response-respond
val otherActor: ActorRef[Request] = spawn(otherBehavior)
val probe = TestProbe[Response]()
// shhh, don't tell anyone
import scala.language.reflectiveCalls
val ctx = new {
def self = probe.ref
}
// #request-response-send
otherActor ! Request("give me cookies", ctx.self)
// #request-response-send
probe.expectMessageType[Response]
}
"contain a sample for adapted response" in { "contain a sample for adapted response" in {
// #adapted-response // #adapted-response
object Backend { object Backend {
sealed trait Request sealed trait Request
final case class StartTranslationJob(taskId: Int, site: URI, replyTo: ActorRef[Response]) extends Request final case class StartTranslationJob(
taskId: Int,
site: URI,
replyTo: ActorRef[Response]
) extends Request
sealed trait Response sealed trait Response
final case class JobStarted(taskId: Int) extends Response final case class JobStarted(taskId: Int) extends Response
@ -98,13 +117,13 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
case wrapped: WrappedBackendResponse wrapped.response match { case wrapped: WrappedBackendResponse wrapped.response match {
case Backend.JobStarted(taskId) case Backend.JobStarted(taskId)
println(s"Started $taskId") ctx.log.info("Started {}", taskId)
Behaviors.same Behaviors.same
case Backend.JobProgress(taskId, progress) case Backend.JobProgress(taskId, progress)
println(s"Progress $taskId: $progress") ctx.log.info("Progress {}: {}", taskId, progress)
Behaviors.same Behaviors.same
case Backend.JobCompleted(taskId, result) case Backend.JobCompleted(taskId, result)
println(s"Completed $taskId: $result") ctx.log.info("Completed {}: {}", taskId, result)
inProgress(taskId) ! result inProgress(taskId) ! result
active(inProgress - taskId, count) active(inProgress - taskId, count)
} }
@ -127,15 +146,12 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
replyTo ! Backend.JobCompleted(taskId, new URI("https://akka.io/docs/sv/")) replyTo ! Backend.JobCompleted(taskId, new URI("https://akka.io/docs/sv/"))
Behaviors.same Behaviors.same
} }
})
}
)
val frontend = spawn(Frontend.translator(backend)) val frontend = spawn(Frontend.translator(backend))
val probe = TestProbe[URI]() val probe = TestProbe[URI]()
frontend ! Frontend.Translate(new URI("https://akka.io/docs/"), probe.ref) frontend ! Frontend.Translate(new URI("https://akka.io/docs/"), probe.ref)
probe.expectMessage(new URI("https://akka.io/docs/sv/")) probe.expectMessage(new URI("https://akka.io/docs/sv/"))
} }
} }
@ -189,4 +205,194 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
probe.expectNoMessage(1.millisecond) probe.expectNoMessage(1.millisecond)
probe.expectMessage(2.seconds, Batch(Vector[Msg](ExcitingMessage("one"), ExcitingMessage("two")))) probe.expectMessage(2.seconds, Batch(Vector[Msg](ExcitingMessage("one"), ExcitingMessage("two"))))
} }
"contain a sample for ask" in {
// #actor-ask
sealed trait HalCommand
case class OpenThePodBayDoorsPlease(respondTo: ActorRef[HalResponse]) extends HalCommand
case class HalResponse(message: String)
val halBehavior = Behaviors.immutable[HalCommand] { (ctx, msg)
msg match {
case OpenThePodBayDoorsPlease(respondTo)
respondTo ! HalResponse("I'm sorry, Dave. I'm afraid I can't do that.")
Behaviors.same
}
}
sealed trait DaveMessage
// this is a part of the protocol that is internal to the actor itself
case class AdaptedResponse(message: String) extends DaveMessage
def daveBehavior(hal: ActorRef[HalCommand]) = Behaviors.deferred[DaveMessage] { ctx
// asking someone requires a timeout, if the timeout hits without response
// the ask is failed with a TimeoutException
implicit val timeout: Timeout = 3.seconds
// Note: The second parameter list takes a function `ActorRef[T] => Message`,
// as OpenThePodBayDoorsPlease is a case class it has a factory apply method
// that is what we are passing as the second parameter here it could also be written
// as `ref => OpenThePodBayDoorsPlease(ref)`
ctx.ask(hal)(OpenThePodBayDoorsPlease) {
case Success(HalResponse(message)) AdaptedResponse(message)
case Failure(ex) AdaptedResponse("Request failed")
}
// we can also tie in request context into an interaction, it is safe to look at
// actor internal state from the transformation function, but remember that it may have
// changed at the time the response arrives and the transformation is done, best is to
// use immutable state we have closed over like here.
val requestId = 1
ctx.ask(hal)(OpenThePodBayDoorsPlease) {
case Success(HalResponse(message)) AdaptedResponse(s"$requestId: $message")
case Failure(ex) AdaptedResponse(s"$requestId: Request failed")
}
Behaviors.immutable { (ctx, msg)
msg match {
// the adapted message ends up being processed like any other
// message sent to the actor
case AdaptedResponse(msg)
ctx.log.info("Got response from hal: {}", msg)
Behaviors.same
}
}
}
// #actor-ask
// somewhat modified behavior to let us know we saw the two requests
val monitor = TestProbe[HalCommand]()
val hal = spawn(Behaviors.monitor(monitor.ref, halBehavior))
spawn(daveBehavior(hal))
monitor.expectMessageType[OpenThePodBayDoorsPlease]
monitor.expectMessageType[OpenThePodBayDoorsPlease]
}
"contain a sample for per session child" in {
// #per-session-child
// dummy data types just for this sample
case class Keys()
case class Wallet()
// #per-session-child
val keyCabinetBehavior: Behavior[GetKeys] = Behaviors.immutable { (ctx, msg)
msg match {
case GetKeys(_, respondTo)
respondTo ! Keys()
Behaviors.same
}
}
val drawerBehavior: Behavior[GetWallet] = Behaviors.immutable { (ctx, msg)
msg match {
case GetWallet(_, respondTo)
respondTo ! Wallet()
Behaviors.same
}
}
// #per-session-child
// messages for the two services we interact with
trait HomeCommand
case class LeaveHome(who: String, respondTo: ActorRef[ReadyToLeaveHome]) extends HomeCommand
case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet)
case class GetKeys(whoseKeys: String, respondTo: ActorRef[Keys])
case class GetWallet(whoseWallet: String, respondTo: ActorRef[Wallet])
def homeBehavior = Behaviors.immutable[HomeCommand] { (ctx, msg)
val keyCabinet: ActorRef[GetKeys] = ctx.spawn(keyCabinetBehavior, "key-cabinet")
val drawer: ActorRef[GetWallet] = ctx.spawn(drawerBehavior, "drawer")
msg match {
case LeaveHome(who, respondTo)
ctx.spawn(prepareToLeaveHome(who, respondTo, keyCabinet, drawer), s"leaving-$who")
Behavior.same
}
}
// per session actor behavior
def prepareToLeaveHome(
whoIsLeaving: String,
respondTo: ActorRef[ReadyToLeaveHome],
keyCabinet: ActorRef[GetKeys],
drawer: ActorRef[GetWallet]): Behavior[NotUsed] =
// we don't _really_ care about the actor protocol here as nobody will send us
// messages except for responses to our queries, so we just accept any kind of message
// but narrow that to more limited types then we interact
Behaviors.deferred[AnyRef] { ctx
var wallet: Option[Wallet] = None
var keys: Option[Keys] = None
// we narrow the ActorRef type to any subtype of the actual type we accept
keyCabinet ! GetKeys(whoIsLeaving, ctx.self.narrow[Keys])
drawer ! GetWallet(whoIsLeaving, ctx.self.narrow[Wallet])
def nextBehavior: Behavior[AnyRef] =
(keys, wallet) match {
case (Some(w), Some(k))
// we got both, "session" is completed!
respondTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
Behavior.stopped
case _
Behavior.same
}
Behaviors.immutable((ctx, msg) {
msg match {
case w: Wallet
wallet = Some(w)
nextBehavior
case k: Keys
keys = Some(k)
nextBehavior
case _
Behaviors.unhandled
}
})
}.narrow[NotUsed] // we don't let anyone else know we accept anything
// #per-session-child
val requestor = TestProbe[ReadyToLeaveHome]()
val home = spawn(homeBehavior, "home")
home ! LeaveHome("Bobby", requestor.ref)
requestor.expectMessage(ReadyToLeaveHome("Bobby", Keys(), Wallet()))
}
"contain a sample for ask from outside the actor system" in {
// #standalone-ask
trait CookieCommand {}
case class GiveMeCookies(replyTo: ActorRef[Cookies]) extends CookieCommand
case class Cookies(count: Int)
// #standalone-ask
// keep this out of the sample as it uses the testkit spawn
val cookieActorRef = spawn(Behaviors.immutable[GiveMeCookies] { (ctx, msg)
msg.replyTo ! Cookies(5)
Behaviors.same
})
// #standalone-ask
import akka.actor.typed.scaladsl.AskPattern._
// asking someone requires a timeout, if the timeout hits without response
// the ask is failed with a TimeoutException
implicit val timeout: Timeout = 3.seconds
// the response callback will be executed on this execution context
import system.executionContext
val result: Future[Cookies] = cookieActorRef ? (ref GiveMeCookies(ref))
result.onComplete {
case Success(cookies) println("Yay, cookies!")
case Failure(ex) println("Boo! didn't get cookies in time.")
}
// #standalone-ask
result.futureValue shouldEqual Cookies(5)
}
} }

View file

@ -1,54 +1,93 @@
# Interaction Patterns # Interaction Patterns
Interacting with an Actor in Akka Typed is done through an @scala[`ActorRef[T]`]@java[`ActorRef<T>`] where `T` is the type of messages the actor accepts, also known as the "protocol". This ensures that only the right kind of messages can be sent to an actor and also ensures no access to the Actor instance internals is available to anyone else but the Actor itself. Interacting with an Actor in Akka Typed is done through an @scala[`ActorRef[T]`]@java[`ActorRef<T>`] where `T` is the type of messages the actor accepts, also known as the "protocol". This ensures that only the right kind of messages can be sent to an actor and also that no one else but the Actor itself can access the Actor instance internals.
Message exchange with Actors follow a few common patterns, let's go through each one of them. Message exchange with Actors follow a few common patterns, let's go through each one of them.
## Fire and Forget ## Fire and Forget
The fundamental way to interact with an actor is through @scala["tell", which is so common that it has a special symbolic method name: `actorRef ! message`]@java[`actorRef.tell(message)`]. Sending a message to an actor like this can be done both from inside another actor and from any logic outside of the `ActorSystem`. The fundamental way to interact with an actor is through @scala["tell", which is so common that it has a special symbolic method name: `actorRef ! message`]@java[`actorRef.tell(message)`]. Sending a message with tell can safely be done from any thread.
Tell is asynchronous which means that the method returns right away and that when execution of the statement after it in the code is executed there is no guarantee that the message has been processed by the recipient yet. It also means there is no way to way to know if the processing succeeded or failed without additional interaction with the actor in question. Tell is asynchronous which means that the method returns right away, when the statement after it is executed there is no guarantee that the message has been processed by the recipient yet. It also means there is no way to know if the message was received, the processing succeeded or failed.
With the given protocol and actor behavior:
Scala Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #fire-and-forget } : @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #fire-and-forget-definition }
Java Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget } : @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget-definition }
Fire and forget looks like this:
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #fire-and-forget-doit }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #fire-and-forget-doit }
**Useful when:** **Useful when:**
* When it is not critical to be sure that the message was processed * It is not critical to be sure that the message was processed
* When there is no way to act on non successful delivery or processing * There is no way to act on non successful delivery or processing
* When we want to minimize the number of messages created to get higher throughput * We want to minimize the number of messages created to get higher throughput (sending a response would require creating twice the number of messages)
**Problems:** **Problems:**
* Consistently higher rates of fire and forget to an actor than it process will make the inbox fill up and can in the worst case cause the JVM crash with an `OutOfMemoryError` * If the inflow of messages is higher than the actor can process the inbox will fill up and can in the worst case cause the JVM crash with an `OutOfMemoryError`
* If the message got lost, we will not notice * If the message gets lost, the sender will not know
## Same protocol Request-Response ## Request-Response
In many interactions a request is followed by a response back from the actor. In Akka Typed the recipient of responses has to be encoded as a field in the message itself, which the recipient can then use to send a response back. When the response message is already a part of the sending actor protocol we can simply use @scala[`ActorContext.self`]@java[`ActorContext.getSelf()`] when constructing the message. Many interactions between actors requires one or more response message being sent back from the receiving actor. A response message can be a result of a query, some form of acknowledgment that the message was received and processed or events that the request subscribed to.
In Akka Typed the recipient of responses has to be encoded as a field in the message itself, which the recipient can then use to send (tell) a response back.
With the following protocol:
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #request-response-protocol }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #request-response-protocol }
The sender would use its own @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`], which it can access through @scala[`ActorContext.self`]@java[`ActorContext.getSelf()`], for the `respondTo`.
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #request-response-send }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #request-response-send }
On the receiving side the @scala[`ActorRef[response]`]@java[`ActorRef<Response>`] can then be used to send one or more responses back:
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #request-response-respond }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #request-response-respond }
TODO sample
**Useful when:** **Useful when:**
* Subscribing to an actor that will send many response messages (of the same protocol) back * Subscribing to an actor that will send many response messages back
* When communicating between a parent and its children, where the protocol can be made include the messages for the interaction
* ???
**Problems:** **Problems:**
* Often the response that the other actor wants to send back is not a part of the sending actor's protocol (see adapted request response or ask) * Actors seldom have a response message from another actor as a part of their protocol (see @ref:[adapted response](#adapted-response))
* It is hard to detect and that a message request was not delivered or processed (see ask) * It is hard to detect that a message request was not delivered or processed (see @ref:[ask](#request-response-with-ask-between-two-actors))
* Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor * Unless the protocol already includes a way to provide context, for example a request id that is also sent in the
response, it is not possible to tie an interaction to some specific context without introducing a new,
separate, actor (see ask or per session child actor)
## Adapted Response ## Adapted Response
Very often the receiving actor does not, and should, know of the protocol of the sending actor, and Most often the sending actor does not, and should not, support receiving the response messages of another actor. In such cases we need to provide an `ActorRef` of the right type and adapt the response message to a type that the sending actor can handle.
will respond with one or more messages that the sending actor cannot receive.
Scala Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #adapted-response } : @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #adapted-response }
@ -71,16 +110,16 @@ the receiving actor. It's recommended to register the adapters in a top level
`Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to `Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to
register them later also if needed. register them later also if needed.
The function is running in the receiving actor and can safely access state of it. The adapter function is running in the receiving actor and can safely access state of it, but if it throws an exception the actor is stopped.
**Useful when:** **Useful when:**
* Subscribing to an actor that will send many response messages back
* Translating between different actor message protocols * Translating between different actor message protocols
* Subscribing to an actor that will send many response messages back
**Problems:** **Problems:**
* It is hard to detect that a message request was not delivered or processed (see ask) * It is hard to detect that a message request was not delivered or processed (see @ref:[ask](#request-response-with-ask-between-two-actors))
* Only one adaption can be made per response message type, if a new one is registered the old one is replaced, * Only one adaption can be made per response message type, if a new one is registered the old one is replaced,
for example different target actors can't have different adaption if they use the same response types, unless some for example different target actors can't have different adaption if they use the same response types, unless some
correlation is encoded in the messages correlation is encoded in the messages
@ -89,65 +128,87 @@ The function is running in the receiving actor and can safely access state of it
separate, actor separate, actor
## 1:1 Request-Response with ask between two actors ## Request-Response with ask between two actors
In an interaction where there is a 1:1 mapping between a request and a response we can use `ask` on the `ActorContext` to interact with another actor. In an interaction where there is a 1:1 mapping between a request and a response we can use `ask` on the `ActorContext` to interact with another actor.
The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`] to put as recipient in the outgoing message. The second step is to transform the `Response` or the failure to produce a response, into a message that is part of the protocol of the sending actor. The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`] to put as recipient in the outgoing message. The second step is to transform the successful `Response` or failure into a message that is part of the protocol of the sending actor.
TODO sample Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #actor-ask }
The function is running in the receiving actor and can safely access state of it. Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #actor-ask }
The response adapting function is running in the receiving actor and can safely access state of it, but if it throws an exception the actor is stopped.
**Useful when:** **Useful when:**
* Single response queries * Single response queries
* When an actor needs to know that the message was processed before continuing * An actor needs to know that the message was processed before continuing
* To allow an actor to resend if a timely response is not produced * To allow an actor to resend if a timely response is not produced
* To keep track of outstanding requests and not overwhelm a recipient with messages (simple backpressure) * To keep track of outstanding requests and not overwhelm a recipient with messages ("backpressure")
* When some context should be attached to the interaction but the protocol does not support that (request id, what query the response was for) * Context should be attached to the interaction but the protocol does not support that (request id, what query the response was for)
**Problems:** **Problems:**
* There can only be a single response to one `ask` * There can only be a single response to one `ask` (see @ref:[per session child Actor](#per-session-child-actor))
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
* Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requestor, but at the same time you do not want to have many false positives * Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requester, but at the same time you do not want to have many false positives
## 1:1 Request-Response with ask from outside the ActorSystem ## Request-Response with ask from outside the ActorSystem
In an interaction where there is a 1:1 mapping between a request and a response we can use @scala[`ActorRef.?` implicitly provided by `akka.actor.typed.scaladsl.AskPattern`]@java[`akka.actor.typed.javadsl.AskPattern.ask`] to send a message to an actor and get a @scala[`Future[Response]`]@java[`CompletionState[Response]`] back. Some times you need to interact with actors from outside of the actor system, this can be done with fire-and-forget as described above or through another version of `ask` that returns a @scala[`Future[Response]`]@java[`CompletionStage<Response>`] that is either completed with a succesful response or failed with a `TimeoutException` if there was no response within the specified timeout.
To do this we use @scala[`ActorRef.ask` (or the symbolic `ActorRef.?`) implicitly provided by `akka.actor.typed.scaladsl.AskPattern`]@java[`akka.actor.typed.javadsl.AskPattern.ask`] to send a message to an actor and get a @scala[`Future[Response]`]@java[`CompletionState[Response]`] back.
TODO sample Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #standalone-ask }
**Useful when:** **Useful when:**
* Single response queries where the response should be passed on to some other actor * Querying an actor from outside of the actor system
* ???
**Problems:** **Problems:**
* There can only be a single response to one `ask` * It is easy to accidentally close over and unsafely mutable state with the callbacks on the returned @scala[`Future`]@java[`CompletionStage`] as those will be executed on a different thread
* There can only be a single response to one `ask` (see @ref:[per session child Actor](#per-session-child-actor))
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
## Per session child Actor ## Per session child Actor
Keeping context for an interaction, or multiple interactions can be done by moving the work for one "session", into a child actor. In some cases a complete response to a request can only be created and sent back after collecting multiple answers from other actors. For these kinds of interaction it can be good to delegate the work to a per "session" child actor. The child could also contain arbitrary logic to implement retrying, failing on timeout, tail chopping, progress inspection etc.
TODO Note that this in fact essentially how `ask` is implemented, if all you need is a single response with a timeout it is better to use `ask`.
The child is created with the context it needs to do the work, including an `ActorRef` that it can respond to. When the complete result is there the child responds with the result and stops itself.
As the protocol of the session actor is not a public API but rather an implementation detail of the parent actor, it may not always make sense to have an explicit protocol and adapt the messages of the actors that the session actor interacts with. For this use case it is possible to express that the actor can receive any message (@scala[`Any`]@java[`Object`]).
Scala
: @@snip [InteractionPatternsSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #per-session-child }
Java
: @@snip [InteractionPatternsTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #per-session-child }
In an actual session child you would likely want to include some form of timeout as well (see @ref:[scheduling messages to self](#scheduling-messages-to-self)).
**Useful when:** **Useful when:**
* A single incoming request should result in multiple interactions with other actors before a result can be built, * A single incoming request should result in multiple interactions with other actors before a result can be built,
for example aggregation of several results for example aggregation of several results
* Handle acknowledgement and retry messages for at-least-once delivery * You need to handle acknowledgement and retry messages for at-least-once delivery
* ???
**Problems:** **Problems:**
* Children have lifecycles that must be managed to not create a resource leak * Children have life cycles that must be managed to not create a resource leak, it can be easy to miss a scenario where the session actor is not stopped
* ??? * It increases complexity, since each such child can execute concurrently with other children and the parent
## Scheduling messages to self ## Scheduling messages to self