doc: stylish getting started tutuorial, #24717 (#27596)

* Move messages from DeviceProtocol to Device (in .java)
* rename Message to Command
* rename createBehavior to create
* rename message handle methods to onX
* private constructor
* get rid of DeviceManagerProtocol too
* rename Message to Command
This commit is contained in:
Patrik Nordwall 2019-09-03 13:07:21 +02:00 committed by GitHub
parent 491d2f772f
commit b2bef35f0c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
41 changed files with 833 additions and 933 deletions

View file

@ -24,7 +24,7 @@ import akka.actor.typed.javadsl.Receive;
class PrintMyActorRefActor extends AbstractBehavior<String> {
static Behavior<String> createBehavior() {
static Behavior<String> create() {
return Behaviors.setup(PrintMyActorRefActor::new);
}
@ -50,7 +50,7 @@ class PrintMyActorRefActor extends AbstractBehavior<String> {
// #start-stop
class StartStopActor1 extends AbstractBehavior<String> {
static Behavior<String> createBehavior() {
static Behavior<String> create() {
return Behaviors.setup(context -> new StartStopActor1());
}
@ -62,11 +62,11 @@ class StartStopActor1 extends AbstractBehavior<String> {
public Receive<String> createReceive() {
return newReceiveBuilder()
.onMessageEquals("stop", Behaviors::stopped)
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private Behavior<String> postStop() {
private Behavior<String> onPostStop() {
System.out.println("first stopped");
return this;
}
@ -74,7 +74,7 @@ class StartStopActor1 extends AbstractBehavior<String> {
class StartStopActor2 extends AbstractBehavior<String> {
static Behavior<String> createBehavior() {
static Behavior<String> create() {
return Behaviors.setup(context -> new StartStopActor2());
}
@ -84,10 +84,10 @@ class StartStopActor2 extends AbstractBehavior<String> {
@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onSignal(PostStop.class, signal -> postStop()).build();
return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build();
}
private Behavior<String> postStop() {
private Behavior<String> onPostStop() {
System.out.println("second stopped");
return this;
}
@ -97,7 +97,7 @@ class StartStopActor2 extends AbstractBehavior<String> {
// #supervise
class SupervisingActor extends AbstractBehavior<String> {
static Behavior<String> createBehavior() {
static Behavior<String> create() {
return Behaviors.setup(SupervisingActor::new);
}
@ -106,17 +106,16 @@ class SupervisingActor extends AbstractBehavior<String> {
private SupervisingActor(ActorContext<String> context) {
child =
context.spawn(
Behaviors.supervise(SupervisedActor.createBehavior())
.onFailure(SupervisorStrategy.restart()),
Behaviors.supervise(SupervisedActor.create()).onFailure(SupervisorStrategy.restart()),
"supervised-actor");
}
@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onMessageEquals("failChild", this::failChild).build();
return newReceiveBuilder().onMessageEquals("failChild", this::onFailChild).build();
}
private Behavior<String> failChild() {
private Behavior<String> onFailChild() {
child.tell("fail");
return this;
}
@ -124,7 +123,7 @@ class SupervisingActor extends AbstractBehavior<String> {
class SupervisedActor extends AbstractBehavior<String> {
static Behavior<String> createBehavior() {
static Behavior<String> create() {
return Behaviors.setup(context -> new SupervisedActor());
}
@ -162,7 +161,7 @@ class SupervisedActor extends AbstractBehavior<String> {
class Main extends AbstractBehavior<String> {
static Behavior<String> createBehavior() {
static Behavior<String> create() {
return Behaviors.setup(Main::new);
}
@ -178,7 +177,7 @@ class Main extends AbstractBehavior<String> {
}
private Behavior<String> start() {
ActorRef<String> firstRef = context.spawn(PrintMyActorRefActor.createBehavior(), "first-actor");
ActorRef<String> firstRef = context.spawn(PrintMyActorRefActor.create(), "first-actor");
System.out.println("First: " + firstRef);
firstRef.tell("printit");
@ -188,7 +187,7 @@ class Main extends AbstractBehavior<String> {
public class ActorHierarchyExperiments {
public static void main(String[] args) {
ActorRef<String> testSystem = ActorSystem.create(Main.createBehavior(), "testSystem");
ActorRef<String> testSystem = ActorSystem.create(Main.create(), "testSystem");
testSystem.tell("start");
}
}
@ -201,7 +200,7 @@ class ActorHierarchyExperimentsTest extends JUnitSuite {
@Test
public void testStartAndStopActors() {
// #start-stop-main
ActorRef<String> first = testKit.spawn(StartStopActor1.createBehavior(), "first");
ActorRef<String> first = testKit.spawn(StartStopActor1.create(), "first");
first.tell("stop");
// #start-stop-main
}
@ -210,7 +209,7 @@ class ActorHierarchyExperimentsTest extends JUnitSuite {
public void testSuperviseActors() throws Exception {
// #supervise-main
ActorRef<String> supervisingActor =
testKit.spawn(SupervisingActor.createBehavior(), "supervising-actor");
testKit.spawn(SupervisingActor.create(), "supervising-actor");
supervisingActor.tell("failChild");
// #supervise-main
Thread.sleep(200); // allow for the println/logging to complete

View file

@ -11,7 +11,7 @@ public class IotMain {
public static void main(String[] args) {
// Create ActorSystem and top level supervisor
ActorSystem.create(IotSupervisor.createBehavior(), "iot-system");
ActorSystem.create(IotSupervisor.create(), "iot-system");
}
}
// #iot-app

View file

@ -14,7 +14,7 @@ import akka.actor.typed.javadsl.Receive;
public class IotSupervisor extends AbstractBehavior<Void> {
public static Behavior<Void> createBehavior() {
public static Behavior<Void> create() {
return Behaviors.setup(IotSupervisor::new);
}
@ -28,10 +28,10 @@ public class IotSupervisor extends AbstractBehavior<Void> {
// No need to handle any messages
@Override
public Receive<Void> createReceive() {
return newReceiveBuilder().onSignal(PostStop.class, signal -> postStop()).build();
return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build();
}
private IotSupervisor postStop() {
private IotSupervisor onPostStop() {
context.getLog().info("IoT Application stopped");
return this;
}

View file

@ -8,6 +8,7 @@ package jdocs.typed.tutorial_3;
import java.util.Optional;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.PostStop;
import akka.actor.typed.javadsl.AbstractBehavior;
@ -15,28 +16,63 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
// #full-device
import static jdocs.typed.tutorial_3.DeviceProtocol.*;
/*
//#full-device
import static com.lightbend.akka.sample.DeviceProtocol.*;
//#full-device
*/
// #full-device
public class Device extends AbstractBehavior<Device.Command> {
public class Device extends AbstractBehavior<DeviceMessage> {
public interface Command {}
public static Behavior<DeviceMessage> createBehavior(String groupId, String deviceId) {
// #write-protocol
public static final class RecordTemperature implements Command {
final long requestId;
final double value;
final ActorRef<TemperatureRecorded> replyTo;
public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
this.requestId = requestId;
this.value = value;
this.replyTo = replyTo;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
// #write-protocol
public static final class ReadTemperature implements Command {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
public static Behavior<Command> create(String groupId, String deviceId) {
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
}
private final ActorContext<DeviceMessage> context;
private final ActorContext<Command> context;
private final String groupId;
private final String deviceId;
private Optional<Double> lastTemperatureReading = Optional.empty();
public Device(ActorContext<DeviceMessage> context, String groupId, String deviceId) {
private Device(ActorContext<Command> context, String groupId, String deviceId) {
this.context = context;
this.groupId = groupId;
this.deviceId = deviceId;
@ -45,27 +81,27 @@ public class Device extends AbstractBehavior<DeviceMessage> {
}
@Override
public Receive<DeviceMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(RecordTemperature.class, this::recordTemperature)
.onMessage(ReadTemperature.class, this::readTemperature)
.onSignal(PostStop.class, signal -> postStop())
.onMessage(RecordTemperature.class, this::onRecordTemperature)
.onMessage(ReadTemperature.class, this::onReadTemperature)
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private Behavior<DeviceMessage> recordTemperature(RecordTemperature r) {
private Behavior<Command> onRecordTemperature(RecordTemperature r) {
context.getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
r.replyTo.tell(new TemperatureRecorded(r.requestId));
return this;
}
private Behavior<DeviceMessage> readTemperature(ReadTemperature r) {
private Behavior<Command> onReadTemperature(ReadTemperature r) {
r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading));
return this;
}
private Behavior<DeviceMessage> postStop() {
private Behavior<Command> onPostStop() {
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
return Behaviors.stopped();
}

View file

@ -1,58 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.typed.tutorial_3;
import akka.actor.typed.ActorRef;
import java.util.Optional;
abstract class DeviceProtocol {
// no instances of DeviceProtocol class
private DeviceProtocol() {}
interface DeviceMessage {}
// #write-protocol
public static final class RecordTemperature implements DeviceMessage {
final long requestId;
final double value;
final ActorRef<TemperatureRecorded> replyTo;
public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
this.requestId = requestId;
this.value = value;
this.replyTo = replyTo;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
// #write-protocol
public static final class ReadTemperature implements DeviceMessage {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
}

View file

@ -15,10 +15,8 @@ import java.util.Optional;
import static org.junit.Assert.assertEquals;
// #device-read-test
import static jdocs.typed.tutorial_3.DeviceProtocol.*;
/*
//#device-read-test
import static com.lightbend.akka.sample.DeviceProtocol.*;
public class DeviceTest {
//#device-read-test
@ -30,10 +28,11 @@ public class DeviceTest extends org.scalatest.junit.JUnitSuite {
@Test
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
TestProbe<RespondTemperature> probe = testKit.createTestProbe(RespondTemperature.class);
ActorRef<DeviceMessage> deviceActor = testKit.spawn(Device.createBehavior("group", "device"));
deviceActor.tell(new ReadTemperature(42L, probe.getRef()));
RespondTemperature response = probe.receiveMessage();
TestProbe<Device.RespondTemperature> probe =
testKit.createTestProbe(Device.RespondTemperature.class);
ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
deviceActor.tell(new Device.ReadTemperature(42L, probe.getRef()));
Device.RespondTemperature response = probe.receiveMessage();
assertEquals(42L, response.requestId);
assertEquals(Optional.empty(), response.value);
}
@ -42,23 +41,25 @@ public class DeviceTest extends org.scalatest.junit.JUnitSuite {
// #device-write-read-test
@Test
public void testReplyWithLatestTemperatureReading() {
TestProbe<TemperatureRecorded> recordProbe = testKit.createTestProbe(TemperatureRecorded.class);
TestProbe<RespondTemperature> readProbe = testKit.createTestProbe(RespondTemperature.class);
ActorRef<DeviceMessage> deviceActor = testKit.spawn(Device.createBehavior("group", "device"));
TestProbe<Device.TemperatureRecorded> recordProbe =
testKit.createTestProbe(Device.TemperatureRecorded.class);
TestProbe<Device.RespondTemperature> readProbe =
testKit.createTestProbe(Device.RespondTemperature.class);
ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
deviceActor.tell(new RecordTemperature(1L, 24.0, recordProbe.getRef()));
deviceActor.tell(new Device.RecordTemperature(1L, 24.0, recordProbe.getRef()));
assertEquals(1L, recordProbe.receiveMessage().requestId);
deviceActor.tell(new ReadTemperature(2L, readProbe.getRef()));
RespondTemperature response1 = readProbe.receiveMessage();
deviceActor.tell(new Device.ReadTemperature(2L, readProbe.getRef()));
Device.RespondTemperature response1 = readProbe.receiveMessage();
assertEquals(2L, response1.requestId);
assertEquals(Optional.of(24.0), response1.value);
deviceActor.tell(new RecordTemperature(3L, 55.0, recordProbe.getRef()));
deviceActor.tell(new Device.RecordTemperature(3L, 55.0, recordProbe.getRef()));
assertEquals(3L, recordProbe.receiveMessage().requestId);
deviceActor.tell(new ReadTemperature(4L, readProbe.getRef()));
RespondTemperature response2 = readProbe.receiveMessage();
deviceActor.tell(new Device.ReadTemperature(4L, readProbe.getRef()));
Device.RespondTemperature response2 = readProbe.receiveMessage();
assertEquals(4L, response2.requestId);
assertEquals(Optional.of(55.0), response2.value);
}

View file

@ -9,13 +9,11 @@ import akka.actor.typed.ActorRef;
import java.util.Optional;
// #read-protocol-1
abstract class DeviceProtocol {
// no instances of DeviceProtocol class
private DeviceProtocol() {}
public class Device {
interface DeviceMessage {}
public interface Command {}
public static final class ReadTemperature implements DeviceMessage {
public static final class ReadTemperature implements Command {
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(ActorRef<RespondTemperature> replyTo) {

View file

@ -6,6 +6,7 @@ package jdocs.typed.tutorial_3.inprogress2;
// #device-with-read
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.PostStop;
import akka.actor.typed.javadsl.AbstractBehavior;
@ -15,28 +16,42 @@ import akka.actor.typed.javadsl.Receive;
import java.util.Optional;
// #device-with-read
import static jdocs.typed.tutorial_3.inprogress2.DeviceProtocol.*;
/*
//#device-with-read
import static com.lightbend.akka.sample.DeviceProtocol.*;
//#device-with-read
*/
// #device-with-read
// #read-protocol-2
public class Device extends AbstractBehavior<Device.Command> {
public interface Command {}
public class Device extends AbstractBehavior<DeviceMessage> {
public static final class ReadTemperature implements Command {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public static Behavior<DeviceMessage> createBehavior(String groupId, String deviceId) {
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
// #read-protocol-2
public static Behavior<Command> create(String groupId, String deviceId) {
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
}
private final ActorContext<DeviceMessage> context;
private final ActorContext<Command> context;
private final String groupId;
private final String deviceId;
private Optional<Double> lastTemperatureReading = Optional.empty();
public Device(ActorContext<DeviceMessage> context, String groupId, String deviceId) {
private Device(ActorContext<Command> context, String groupId, String deviceId) {
this.context = context;
this.groupId = groupId;
this.deviceId = deviceId;
@ -45,22 +60,24 @@ public class Device extends AbstractBehavior<DeviceMessage> {
}
@Override
public Receive<DeviceMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(ReadTemperature.class, this::readTemperature)
.onSignal(PostStop.class, signal -> postStop())
.onMessage(ReadTemperature.class, this::onReadTemperature)
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private Behavior<DeviceMessage> readTemperature(ReadTemperature r) {
private Behavior<Command> onReadTemperature(ReadTemperature r) {
r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading));
return this;
}
private Device postStop() {
private Device onPostStop() {
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
return this;
}
// #read-protocol-2
}
// #read-protocol-2
// #device-with-read

View file

@ -1,38 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.typed.tutorial_3.inprogress2;
import akka.actor.typed.ActorRef;
import java.util.Optional;
// #read-protocol-2
abstract class DeviceProtocol {
// no instances of DeviceProtocol class
private DeviceProtocol() {}
interface DeviceMessage {}
public static final class ReadTemperature implements DeviceMessage {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
}
// #read-protocol-2

View file

@ -4,18 +4,12 @@
package jdocs.typed.tutorial_3.inprogress3;
import akka.actor.typed.ActorRef;
public class Device {
import java.util.Optional;
abstract class DeviceProtocol {
// no instances of DeviceProtocol class
private DeviceProtocol() {}
interface DeviceMessage {}
public interface Command {}
// #write-protocol-1
public static final class RecordTemperature implements DeviceMessage {
public static final class RecordTemperature implements Command {
final double value;
public RecordTemperature(double value) {

View file

@ -8,6 +8,7 @@ package jdocs.typed.tutorial_4;
import java.util.Optional;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.PostStop;
import akka.actor.typed.javadsl.AbstractBehavior;
@ -15,28 +16,67 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
// #device-with-passivate
import static jdocs.typed.tutorial_4.DeviceProtocol.*;
/*
//#device-with-passivate
import static com.lightbend.akka.sample.DeviceProtocol.*;
//#device-with-passivate
*/
// #device-with-passivate
public class Device extends AbstractBehavior<Device.Command> {
public class Device extends AbstractBehavior<DeviceMessage> {
public interface Command {}
public static Behavior<DeviceMessage> createBehavior(String groupId, String deviceId) {
public static final class RecordTemperature implements Command {
final long requestId;
final double value;
final ActorRef<TemperatureRecorded> replyTo;
public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
this.requestId = requestId;
this.value = value;
this.replyTo = replyTo;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature implements Command {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
// #passivate-msg
static enum Passivate implements Command {
INSTANCE
}
// #passivate-msg
public static Behavior<Command> create(String groupId, String deviceId) {
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
}
private final ActorContext<DeviceMessage> context;
private final ActorContext<Command> context;
private final String groupId;
private final String deviceId;
private Optional<Double> lastTemperatureReading = Optional.empty();
public Device(ActorContext<DeviceMessage> context, String groupId, String deviceId) {
private Device(ActorContext<Command> context, String groupId, String deviceId) {
this.context = context;
this.groupId = groupId;
this.deviceId = deviceId;
@ -45,28 +85,28 @@ public class Device extends AbstractBehavior<DeviceMessage> {
}
@Override
public Receive<DeviceMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(RecordTemperature.class, this::recordTemperature)
.onMessage(ReadTemperature.class, this::readTemperature)
.onMessage(RecordTemperature.class, this::onRecordTemperature)
.onMessage(ReadTemperature.class, this::onReadTemperature)
.onMessage(Passivate.class, m -> Behaviors.stopped())
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private Behavior<DeviceMessage> recordTemperature(RecordTemperature r) {
private Behavior<Command> onRecordTemperature(RecordTemperature r) {
context.getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
r.replyTo.tell(new TemperatureRecorded(r.requestId));
return this;
}
private Behavior<DeviceMessage> readTemperature(ReadTemperature r) {
private Behavior<Command> onReadTemperature(ReadTemperature r) {
r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading));
return this;
}
private Behavior<DeviceMessage> postStop() {
private Behavior<Command> onPostStop() {
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
return Behaviors.stopped();
}

View file

@ -15,26 +15,20 @@ import akka.actor.typed.javadsl.Receive;
import java.util.HashMap;
import java.util.Map;
import static jdocs.typed.tutorial_4.DeviceManagerProtocol.*;
import static jdocs.typed.tutorial_4.DeviceProtocol.DeviceMessage;
// #device-group-full
// #device-group-remove
// #device-group-register
public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
public static Behavior<DeviceGroupMessage> createBehavior(String groupId) {
return Behaviors.setup(context -> new DeviceGroup(context, groupId));
}
public interface Command {}
// #device-terminated
private class DeviceTerminated implements DeviceGroupMessage {
public final ActorRef<DeviceProtocol.DeviceMessage> device;
private class DeviceTerminated implements Command {
public final ActorRef<Device.Command> device;
public final String groupId;
public final String deviceId;
DeviceTerminated(
ActorRef<DeviceProtocol.DeviceMessage> device, String groupId, String deviceId) {
DeviceTerminated(ActorRef<Device.Command> device, String groupId, String deviceId) {
this.device = device;
this.groupId = groupId;
this.deviceId = deviceId;
@ -42,32 +36,35 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
}
// #device-terminated
private final ActorContext<DeviceGroupMessage> context;
private final String groupId;
private final Map<String, ActorRef<DeviceMessage>> deviceIdToActor = new HashMap<>();
public static Behavior<Command> create(String groupId) {
return Behaviors.setup(context -> new DeviceGroup(context, groupId));
}
public DeviceGroup(ActorContext<DeviceGroupMessage> context, String groupId) {
private final ActorContext<Command> context;
private final String groupId;
private final Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
private DeviceGroup(ActorContext<Command> context, String groupId) {
this.context = context;
this.groupId = groupId;
context.getLog().info("DeviceGroup {} started", groupId);
}
private DeviceGroup onTrackDevice(RequestTrackDevice trackMsg) {
private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef<DeviceMessage> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
trackMsg.replyTo.tell(new DeviceRegistered(deviceActor));
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
} else {
context.getLog().info("Creating device actor for {}", trackMsg.deviceId);
deviceActor =
context.spawn(
Device.createBehavior(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
context.spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
// #device-group-register
context.watchWith(
deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
// #device-group-register
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
trackMsg.replyTo.tell(new DeviceRegistered(deviceActor));
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
}
} else {
context
@ -83,8 +80,8 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
// #device-group-register
// #device-group-remove
private DeviceGroup onDeviceList(RequestDeviceList r) {
r.replyTo.tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()));
private DeviceGroup onDeviceList(DeviceManager.RequestDeviceList r) {
r.replyTo.tell(new DeviceManager.ReplyDeviceList(r.requestId, deviceIdToActor.keySet()));
return this;
}
// #device-group-remove
@ -97,20 +94,23 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
// #device-group-register
@Override
public Receive<DeviceGroupMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(RequestTrackDevice.class, this::onTrackDevice)
.onMessage(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
// #device-group-register
// #device-group-remove
.onMessage(RequestDeviceList.class, r -> r.groupId.equals(groupId), this::onDeviceList)
.onMessage(
DeviceManager.RequestDeviceList.class,
r -> r.groupId.equals(groupId),
this::onDeviceList)
// #device-group-remove
.onMessage(DeviceTerminated.class, this::onTerminated)
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
// #device-group-register
.build();
}
private DeviceGroup postStop() {
private DeviceGroup onPostStop() {
context.getLog().info("DeviceGroup {} stopped", groupId);
return this;
}

View file

@ -16,8 +16,11 @@ import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static jdocs.typed.tutorial_4.DeviceManagerProtocol.*;
import static jdocs.typed.tutorial_4.DeviceProtocol.*;
import static jdocs.typed.tutorial_4.DeviceManager.DeviceRegistered;
import static jdocs.typed.tutorial_4.DeviceManager.RequestTrackDevice;
import static jdocs.typed.tutorial_4.DeviceManager.ReplyDeviceList;
import static jdocs.typed.tutorial_4.DeviceManager.RequestDeviceList;
public class DeviceGroupTest extends JUnitSuite {
@ -27,7 +30,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testReplyToRegistrationRequests() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device", probe.getRef()));
DeviceRegistered registered1 = probe.receiveMessage();
@ -38,17 +41,18 @@ public class DeviceGroupTest extends JUnitSuite {
assertNotEquals(registered1.device, registered2.device);
// Check that the device actors are working
TestProbe<TemperatureRecorded> recordProbe = testKit.createTestProbe(TemperatureRecorded.class);
registered1.device.tell(new RecordTemperature(0L, 1.0, recordProbe.getRef()));
TestProbe<Device.TemperatureRecorded> recordProbe =
testKit.createTestProbe(Device.TemperatureRecorded.class);
registered1.device.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef()));
assertEquals(0L, recordProbe.receiveMessage().requestId);
registered2.device.tell(new RecordTemperature(1L, 2.0, recordProbe.getRef()));
registered2.device.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef()));
assertEquals(1L, recordProbe.receiveMessage().requestId);
}
@Test
public void testIgnoreWrongRegistrationRequests() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("wrongGroup", "device1", probe.getRef()));
probe.expectNoMessage();
}
@ -58,7 +62,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testReturnSameActorForSameDeviceId() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device", probe.getRef()));
DeviceRegistered registered1 = probe.receiveMessage();
@ -74,7 +78,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testListActiveDevices() {
TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
registeredProbe.receiveMessage();
@ -93,7 +97,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
DeviceRegistered registered1 = registeredProbe.receiveMessage();
@ -101,7 +105,7 @@ public class DeviceGroupTest extends JUnitSuite {
groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef()));
DeviceRegistered registered2 = registeredProbe.receiveMessage();
ActorRef<DeviceMessage> toShutDown = registered1.device;
ActorRef<Device.Command> toShutDown = registered1.device;
TestProbe<ReplyDeviceList> deviceListProbe = testKit.createTestProbe(ReplyDeviceList.class);
@ -110,7 +114,7 @@ public class DeviceGroupTest extends JUnitSuite {
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
toShutDown.tell(Passivate.INSTANCE);
toShutDown.tell(Device.Passivate.INSTANCE);
registeredProbe.expectTerminated(toShutDown, registeredProbe.getRemainingOrDefault());
// using awaitAssert to retry because it might take longer for the groupActor

View file

@ -15,17 +15,62 @@ import akka.actor.typed.javadsl.Receive;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static jdocs.typed.tutorial_4.DeviceManagerProtocol.*;
import java.util.Set;
// #device-manager-full
public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
// #device-registration-msgs
public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
public static Behavior<DeviceManagerMessage> createBehavior() {
return Behaviors.setup(DeviceManager::new);
public interface Command {}
public static final class RequestTrackDevice
implements DeviceManager.Command, DeviceGroup.Command {
public final String groupId;
public final String deviceId;
public final ActorRef<DeviceRegistered> replyTo;
public RequestTrackDevice(String groupId, String deviceId, ActorRef<DeviceRegistered> replyTo) {
this.groupId = groupId;
this.deviceId = deviceId;
this.replyTo = replyTo;
}
}
private static class DeviceGroupTerminated implements DeviceManagerMessage {
public static final class DeviceRegistered {
public final ActorRef<Device.Command> device;
public DeviceRegistered(ActorRef<Device.Command> device) {
this.device = device;
}
}
// #device-registration-msgs
// #device-list-msgs
public static final class RequestDeviceList
implements DeviceManager.Command, DeviceGroup.Command {
final long requestId;
final String groupId;
final ActorRef<ReplyDeviceList> replyTo;
public RequestDeviceList(long requestId, String groupId, ActorRef<ReplyDeviceList> replyTo) {
this.requestId = requestId;
this.groupId = groupId;
this.replyTo = replyTo;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
// #device-list-msgs
private static class DeviceGroupTerminated implements DeviceManager.Command {
public final String groupId;
DeviceGroupTerminated(String groupId) {
@ -33,23 +78,27 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
}
}
private final ActorContext<DeviceManagerMessage> context;
private final Map<String, ActorRef<DeviceGroupMessage>> groupIdToActor = new HashMap<>();
public static Behavior<Command> create() {
return Behaviors.setup(DeviceManager::new);
}
public DeviceManager(ActorContext<DeviceManagerMessage> context) {
private final ActorContext<Command> context;
private final Map<String, ActorRef<DeviceGroup.Command>> groupIdToActor = new HashMap<>();
private DeviceManager(ActorContext<Command> context) {
this.context = context;
context.getLog().info("DeviceManager started");
}
private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) {
String groupId = trackMsg.groupId;
ActorRef<DeviceGroupMessage> ref = groupIdToActor.get(groupId);
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId);
if (ref != null) {
ref.tell(trackMsg);
} else {
context.getLog().info("Creating device group actor for {}", groupId);
ActorRef<DeviceGroupMessage> groupActor =
context.spawn(DeviceGroup.createBehavior(groupId), "group-" + groupId);
ActorRef<DeviceGroup.Command> groupActor =
context.spawn(DeviceGroup.create(groupId), "group-" + groupId);
context.watchWith(groupActor, new DeviceGroupTerminated(groupId));
groupActor.tell(trackMsg);
groupIdToActor.put(groupId, groupActor);
@ -58,7 +107,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
}
private DeviceManager onRequestDeviceList(RequestDeviceList request) {
ActorRef<DeviceGroupMessage> ref = groupIdToActor.get(request.groupId);
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(request.groupId);
if (ref != null) {
ref.tell(request);
} else {
@ -73,18 +122,20 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
return this;
}
public Receive<DeviceManagerMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(RequestTrackDevice.class, this::onTrackDevice)
.onMessage(RequestDeviceList.class, this::onRequestDeviceList)
.onMessage(DeviceGroupTerminated.class, this::onTerminated)
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private DeviceManager postStop() {
private DeviceManager onPostStop() {
context.getLog().info("DeviceManager stopped");
return this;
}
// #device-registration-msgs
}
// #device-registration-msgs
// #device-manager-full

View file

@ -1,67 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.typed.tutorial_4;
import akka.actor.typed.ActorRef;
import java.util.Set;
// #device-registration-msgs
abstract class DeviceManagerProtocol {
// no instances of DeviceManagerProtocol class
private DeviceManagerProtocol() {}
interface DeviceManagerMessage {}
interface DeviceGroupMessage {}
public static final class RequestTrackDevice implements DeviceManagerMessage, DeviceGroupMessage {
public final String groupId;
public final String deviceId;
public final ActorRef<DeviceRegistered> replyTo;
public RequestTrackDevice(String groupId, String deviceId, ActorRef<DeviceRegistered> replyTo) {
this.groupId = groupId;
this.deviceId = deviceId;
this.replyTo = replyTo;
}
}
public static final class DeviceRegistered {
public final ActorRef<DeviceProtocol.DeviceMessage> device;
public DeviceRegistered(ActorRef<DeviceProtocol.DeviceMessage> device) {
this.device = device;
}
}
// #device-registration-msgs
// #device-list-msgs
public static final class RequestDeviceList implements DeviceManagerMessage, DeviceGroupMessage {
final long requestId;
final String groupId;
final ActorRef<ReplyDeviceList> replyTo;
public RequestDeviceList(long requestId, String groupId, ActorRef<ReplyDeviceList> replyTo) {
this.requestId = requestId;
this.groupId = groupId;
this.replyTo = replyTo;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
// #device-list-msgs
// #device-registration-msgs
}
// #device-registration-msgs

View file

@ -11,9 +11,11 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import static jdocs.typed.tutorial_4.DeviceManagerProtocol.*;
import static org.junit.Assert.assertNotEquals;
import static jdocs.typed.tutorial_4.DeviceManager.DeviceRegistered;
import static jdocs.typed.tutorial_4.DeviceManager.RequestTrackDevice;
public class DeviceManagerTest extends JUnitSuite {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@ -21,7 +23,7 @@ public class DeviceManagerTest extends JUnitSuite {
@Test
public void testReplyToRegistrationRequests() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceManagerMessage> managerActor = testKit.spawn(DeviceManager.createBehavior());
ActorRef<DeviceManager.Command> managerActor = testKit.spawn(DeviceManager.create());
managerActor.tell(new RequestTrackDevice("group1", "device", probe.getRef()));
DeviceRegistered registered1 = probe.receiveMessage();

View file

@ -1,64 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.typed.tutorial_4;
import akka.actor.typed.ActorRef;
import java.util.Optional;
import java.util.Set;
abstract class DeviceProtocol {
// no instances of DeviceProtocol class
private DeviceProtocol() {}
interface DeviceMessage {}
public static final class RecordTemperature implements DeviceMessage {
final long requestId;
final double value;
final ActorRef<TemperatureRecorded> replyTo;
public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
this.requestId = requestId;
this.value = value;
this.replyTo = replyTo;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature implements DeviceMessage {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
// #passivate-msg
static enum Passivate implements DeviceMessage {
INSTANCE
}
// #passivate-msg
}

View file

@ -14,7 +14,6 @@ import org.scalatest.junit.JUnitSuite;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static jdocs.typed.tutorial_4.DeviceProtocol.*;
public class DeviceTest extends JUnitSuite {
@ -23,10 +22,11 @@ public class DeviceTest extends JUnitSuite {
// #device-read-test
@Test
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
TestProbe<RespondTemperature> probe = testKit.createTestProbe(RespondTemperature.class);
ActorRef<DeviceMessage> deviceActor = testKit.spawn(Device.createBehavior("group", "device"));
deviceActor.tell(new ReadTemperature(42L, probe.getRef()));
RespondTemperature response = probe.receiveMessage();
TestProbe<Device.RespondTemperature> probe =
testKit.createTestProbe(Device.RespondTemperature.class);
ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
deviceActor.tell(new Device.ReadTemperature(42L, probe.getRef()));
Device.RespondTemperature response = probe.receiveMessage();
assertEquals(42L, response.requestId);
assertEquals(Optional.empty(), response.value);
}
@ -35,23 +35,25 @@ public class DeviceTest extends JUnitSuite {
// #device-write-read-test
@Test
public void testReplyWithLatestTemperatureReading() {
TestProbe<TemperatureRecorded> recordProbe = testKit.createTestProbe(TemperatureRecorded.class);
TestProbe<RespondTemperature> readProbe = testKit.createTestProbe(RespondTemperature.class);
ActorRef<DeviceMessage> deviceActor = testKit.spawn(Device.createBehavior("group", "device"));
TestProbe<Device.TemperatureRecorded> recordProbe =
testKit.createTestProbe(Device.TemperatureRecorded.class);
TestProbe<Device.RespondTemperature> readProbe =
testKit.createTestProbe(Device.RespondTemperature.class);
ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
deviceActor.tell(new RecordTemperature(1L, 24.0, recordProbe.getRef()));
deviceActor.tell(new Device.RecordTemperature(1L, 24.0, recordProbe.getRef()));
assertEquals(1L, recordProbe.receiveMessage().requestId);
deviceActor.tell(new ReadTemperature(2L, readProbe.getRef()));
RespondTemperature response1 = readProbe.receiveMessage();
deviceActor.tell(new Device.ReadTemperature(2L, readProbe.getRef()));
Device.RespondTemperature response1 = readProbe.receiveMessage();
assertEquals(2L, response1.requestId);
assertEquals(Optional.of(24.0), response1.value);
deviceActor.tell(new RecordTemperature(3L, 55.0, recordProbe.getRef()));
deviceActor.tell(new Device.RecordTemperature(3L, 55.0, recordProbe.getRef()));
assertEquals(3L, recordProbe.receiveMessage().requestId);
deviceActor.tell(new ReadTemperature(4L, readProbe.getRef()));
RespondTemperature response2 = readProbe.receiveMessage();
deviceActor.tell(new Device.ReadTemperature(4L, readProbe.getRef()));
Device.RespondTemperature response2 = readProbe.receiveMessage();
assertEquals(4L, response2.requestId);
assertEquals(Optional.of(55.0), response2.value);
}

View file

@ -6,6 +6,7 @@ package jdocs.typed.tutorial_5;
import java.util.Optional;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.PostStop;
import akka.actor.typed.javadsl.AbstractBehavior;
@ -13,21 +14,67 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import static jdocs.typed.tutorial_5.DeviceProtocol.*;
public class Device extends AbstractBehavior<Device.Command> {
public class Device extends AbstractBehavior<DeviceMessage> {
public interface Command {}
public static Behavior<DeviceMessage> createBehavior(String groupId, String deviceId) {
public static final class RecordTemperature implements Command {
final long requestId;
final double value;
final ActorRef<TemperatureRecorded> replyTo;
public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
this.requestId = requestId;
this.value = value;
this.replyTo = replyTo;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature implements Command {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final String deviceId;
final Optional<Double> value;
public RespondTemperature(long requestId, String deviceId, Optional<Double> value) {
this.requestId = requestId;
this.deviceId = deviceId;
this.value = value;
}
}
static enum Passivate implements Command {
INSTANCE
}
public static Behavior<Command> create(String groupId, String deviceId) {
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
}
private final ActorContext<DeviceMessage> context;
private final ActorContext<Command> context;
private final String groupId;
private final String deviceId;
private Optional<Double> lastTemperatureReading = Optional.empty();
public Device(ActorContext<DeviceMessage> context, String groupId, String deviceId) {
private Device(ActorContext<Command> context, String groupId, String deviceId) {
this.context = context;
this.groupId = groupId;
this.deviceId = deviceId;
@ -36,28 +83,28 @@ public class Device extends AbstractBehavior<DeviceMessage> {
}
@Override
public Receive<DeviceMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(RecordTemperature.class, this::recordTemperature)
.onMessage(ReadTemperature.class, this::readTemperature)
.onMessage(RecordTemperature.class, this::onRecordTemperature)
.onMessage(ReadTemperature.class, this::onReadTemperature)
.onMessage(Passivate.class, m -> Behaviors.stopped())
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private Behavior<DeviceMessage> recordTemperature(RecordTemperature r) {
private Behavior<Command> onRecordTemperature(RecordTemperature r) {
context.getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
r.replyTo.tell(new TemperatureRecorded(r.requestId));
return this;
}
private Behavior<DeviceMessage> readTemperature(ReadTemperature r) {
private Behavior<Command> onReadTemperature(ReadTemperature r) {
r.replyTo.tell(new RespondTemperature(r.requestId, deviceId, lastTemperatureReading));
return this;
}
private Behavior<DeviceMessage> postStop() {
private Behavior<Command> onPostStop() {
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
return Behaviors.stopped();
}

View file

@ -16,54 +16,51 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import static jdocs.typed.tutorial_5.DeviceManagerProtocol.*;
import static jdocs.typed.tutorial_5.DeviceProtocol.DeviceMessage;
// #query-added
public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
public static Behavior<DeviceGroupMessage> createBehavior(String groupId) {
return Behaviors.setup(context -> new DeviceGroup(context, groupId));
}
public interface Command {}
private class DeviceTerminated implements DeviceGroupMessage {
public final ActorRef<DeviceProtocol.DeviceMessage> device;
private class DeviceTerminated implements Command {
public final ActorRef<Device.Command> device;
public final String groupId;
public final String deviceId;
DeviceTerminated(
ActorRef<DeviceProtocol.DeviceMessage> device, String groupId, String deviceId) {
DeviceTerminated(ActorRef<Device.Command> device, String groupId, String deviceId) {
this.device = device;
this.groupId = groupId;
this.deviceId = deviceId;
}
}
private final ActorContext<DeviceGroupMessage> context;
private final String groupId;
private final Map<String, ActorRef<DeviceMessage>> deviceIdToActor = new HashMap<>();
public static Behavior<Command> create(String groupId) {
return Behaviors.setup(context -> new DeviceGroup(context, groupId));
}
public DeviceGroup(ActorContext<DeviceGroupMessage> context, String groupId) {
private final ActorContext<Command> context;
private final String groupId;
private final Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
private DeviceGroup(ActorContext<Command> context, String groupId) {
this.context = context;
this.groupId = groupId;
context.getLog().info("DeviceGroup {} started", groupId);
}
// #query-added
private DeviceGroup onTrackDevice(RequestTrackDevice trackMsg) {
private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef<DeviceMessage> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
trackMsg.replyTo.tell(new DeviceRegistered(deviceActor));
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
} else {
context.getLog().info("Creating device actor for {}", trackMsg.deviceId);
deviceActor =
context.spawn(
Device.createBehavior(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
context.spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
context.watchWith(
deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
trackMsg.replyTo.tell(new DeviceRegistered(deviceActor));
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
}
} else {
context
@ -76,8 +73,8 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
return this;
}
private DeviceGroup onDeviceList(RequestDeviceList r) {
r.replyTo.tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()));
private DeviceGroup onDeviceList(DeviceManager.RequestDeviceList r) {
r.replyTo.tell(new DeviceManager.ReplyDeviceList(r.requestId, deviceIdToActor.keySet()));
return this;
}
@ -87,14 +84,14 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
return this;
}
private DeviceGroup postStop() {
private DeviceGroup onPostStop() {
context.getLog().info("DeviceGroup {} stopped", groupId);
return this;
}
// #query-added
private DeviceGroup onAllTemperatures(RequestAllTemperatures r) {
private DeviceGroup onAllTemperatures(DeviceManager.RequestAllTemperatures r) {
// since Java collections are mutable, we want to avoid sharing them between actors (since
// multiple Actors (threads)
// modifying the same mutable data-structure is not safe), and perform a defensive copy of the
@ -102,27 +99,32 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroupMessage> {
//
// Feel free to use your favourite immutable data-structures library with Akka in Java
// applications!
Map<String, ActorRef<DeviceMessage>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor);
Map<String, ActorRef<Device.Command>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor);
context.spawnAnonymous(
DeviceGroupQuery.createBehavior(
DeviceGroupQuery.create(
deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3)));
return this;
}
@Override
public Receive<DeviceGroupMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
// #query-added
.onMessage(RequestTrackDevice.class, this::onTrackDevice)
.onMessage(RequestDeviceList.class, r -> r.groupId.equals(groupId), this::onDeviceList)
.onMessage(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
.onMessage(
DeviceManager.RequestDeviceList.class,
r -> r.groupId.equals(groupId),
this::onDeviceList)
.onMessage(DeviceTerminated.class, this::onTerminated)
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
// #query-added
// ... other cases omitted
.onMessage(
RequestAllTemperatures.class, r -> r.groupId.equals(groupId), this::onAllTemperatures)
DeviceManager.RequestAllTemperatures.class,
r -> r.groupId.equals(groupId),
this::onAllTemperatures)
.build();
}
}

View file

@ -18,16 +18,36 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static jdocs.typed.tutorial_5.DeviceManagerProtocol.*;
// #query-full
// #query-outline
public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQueryMessage> {
public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQuery.Command> {
public static Behavior<DeviceGroupQueryMessage> createBehavior(
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor,
public interface Command {}
private static enum CollectionTimeout implements Command {
INSTANCE
}
static class WrappedRespondTemperature implements Command {
final Device.RespondTemperature response;
WrappedRespondTemperature(Device.RespondTemperature response) {
this.response = response;
}
}
private static class DeviceTerminated implements Command {
final String deviceId;
private DeviceTerminated(String deviceId) {
this.deviceId = deviceId;
}
}
public static Behavior<Command> create(
Map<String, ActorRef<Device.Command>> deviceIdToActor,
long requestId,
ActorRef<RespondAllTemperatures> requester,
ActorRef<DeviceManager.RespondAllTemperatures> requester,
Duration timeout) {
return Behaviors.setup(
context ->
@ -37,56 +57,34 @@ public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQueryMessage>
deviceIdToActor, requestId, requester, timeout, context, timers)));
}
private static enum CollectionTimeout implements DeviceGroupQueryMessage {
INSTANCE
}
static class WrappedRespondTemperature implements DeviceGroupQueryMessage {
final DeviceProtocol.RespondTemperature response;
WrappedRespondTemperature(DeviceProtocol.RespondTemperature response) {
this.response = response;
}
}
private static class DeviceTerminated implements DeviceGroupQueryMessage {
final String deviceId;
private DeviceTerminated(String deviceId) {
this.deviceId = deviceId;
}
}
private final long requestId;
private final ActorRef<RespondAllTemperatures> requester;
private final ActorRef<DeviceManager.RespondAllTemperatures> requester;
// #query-outline
// #query-state
private Map<String, TemperatureReading> repliesSoFar = new HashMap<>();
private Map<String, DeviceManager.TemperatureReading> repliesSoFar = new HashMap<>();
private final Set<String> stillWaiting;
// #query-state
// #query-outline
public DeviceGroupQuery(
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor,
private DeviceGroupQuery(
Map<String, ActorRef<Device.Command>> deviceIdToActor,
long requestId,
ActorRef<RespondAllTemperatures> requester,
ActorRef<DeviceManager.RespondAllTemperatures> requester,
Duration timeout,
ActorContext<DeviceGroupQueryMessage> context,
TimerScheduler<DeviceGroupQueryMessage> timers) {
ActorContext<Command> context,
TimerScheduler<Command> timers) {
this.requestId = requestId;
this.requester = requester;
timers.startSingleTimer(CollectionTimeout.class, CollectionTimeout.INSTANCE, timeout);
ActorRef<DeviceProtocol.RespondTemperature> respondTemperatureAdapter =
context.messageAdapter(
DeviceProtocol.RespondTemperature.class, WrappedRespondTemperature::new);
ActorRef<Device.RespondTemperature> respondTemperatureAdapter =
context.messageAdapter(Device.RespondTemperature.class, WrappedRespondTemperature::new);
for (Map.Entry<String, ActorRef<DeviceProtocol.DeviceMessage>> entry :
deviceIdToActor.entrySet()) {
for (Map.Entry<String, ActorRef<Device.Command>> entry : deviceIdToActor.entrySet()) {
context.watchWith(entry.getValue(), new DeviceTerminated(entry.getKey()));
entry.getValue().tell(new DeviceProtocol.ReadTemperature(0L, respondTemperatureAdapter));
entry.getValue().tell(new Device.ReadTemperature(0L, respondTemperatureAdapter));
}
stillWaiting = new HashSet<>(deviceIdToActor.keySet());
}
@ -94,7 +92,7 @@ public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQueryMessage>
// #query-outline
// #query-state
@Override
public Receive<DeviceGroupQueryMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(WrappedRespondTemperature.class, this::onRespondTemperature)
.onMessage(DeviceTerminated.class, this::onDeviceTerminated)
@ -102,12 +100,12 @@ public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQueryMessage>
.build();
}
private Behavior<DeviceGroupQueryMessage> onRespondTemperature(WrappedRespondTemperature r) {
TemperatureReading reading =
private Behavior<Command> onRespondTemperature(WrappedRespondTemperature r) {
DeviceManager.TemperatureReading reading =
r.response
.value
.map(v -> (TemperatureReading) new Temperature(v))
.orElse(TemperatureNotAvailable.INSTANCE);
.map(v -> (DeviceManager.TemperatureReading) new DeviceManager.Temperature(v))
.orElse(DeviceManager.TemperatureNotAvailable.INSTANCE);
String deviceId = r.response.deviceId;
repliesSoFar.put(deviceId, reading);
@ -116,17 +114,17 @@ public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQueryMessage>
return respondWhenAllCollected();
}
private Behavior<DeviceGroupQueryMessage> onDeviceTerminated(DeviceTerminated terminated) {
private Behavior<Command> onDeviceTerminated(DeviceTerminated terminated) {
if (stillWaiting.contains(terminated.deviceId)) {
repliesSoFar.put(terminated.deviceId, DeviceNotAvailable.INSTANCE);
repliesSoFar.put(terminated.deviceId, DeviceManager.DeviceNotAvailable.INSTANCE);
stillWaiting.remove(terminated.deviceId);
}
return respondWhenAllCollected();
}
private Behavior<DeviceGroupQueryMessage> onCollectionTimeout(CollectionTimeout timeout) {
private Behavior<Command> onCollectionTimeout(CollectionTimeout timeout) {
for (String deviceId : stillWaiting) {
repliesSoFar.put(deviceId, DeviceTimedOut.INSTANCE);
repliesSoFar.put(deviceId, DeviceManager.DeviceTimedOut.INSTANCE);
}
stillWaiting.clear();
return respondWhenAllCollected();
@ -134,9 +132,9 @@ public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQueryMessage>
// #query-state
// #query-collect-reply
private Behavior<DeviceGroupQueryMessage> respondWhenAllCollected() {
private Behavior<Command> respondWhenAllCollected() {
if (stillWaiting.isEmpty()) {
requester.tell(new RespondAllTemperatures(requestId, repliesSoFar));
requester.tell(new DeviceManager.RespondAllTemperatures(requestId, repliesSoFar));
return Behaviors.stopped();
} else {
return this;

View file

@ -16,10 +16,15 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static jdocs.typed.tutorial_5.DeviceManagerProtocol.*;
import static jdocs.typed.tutorial_5.DeviceProtocol.*;
import static org.junit.Assert.assertEquals;
import static jdocs.typed.tutorial_5.DeviceManager.RespondAllTemperatures;
import static jdocs.typed.tutorial_5.DeviceManager.TemperatureReading;
import static jdocs.typed.tutorial_5.DeviceManager.Temperature;
import static jdocs.typed.tutorial_5.DeviceManager.TemperatureNotAvailable;
import static jdocs.typed.tutorial_5.DeviceManager.DeviceTimedOut;
import static jdocs.typed.tutorial_5.DeviceManager.DeviceNotAvailable;
public class DeviceGroupQueryTest extends JUnitSuite {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@ -29,28 +34,28 @@ public class DeviceGroupQueryTest extends JUnitSuite {
public void testReturnTemperatureValueForWorkingDevices() {
TestProbe<RespondAllTemperatures> requester =
testKit.createTestProbe(RespondAllTemperatures.class);
TestProbe<DeviceMessage> device1 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<DeviceMessage> device2 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor = new HashMap<>();
Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
deviceIdToActor.put("device1", device1.getRef());
deviceIdToActor.put("device2", device2.getRef());
ActorRef<DeviceGroupQueryMessage> queryActor =
ActorRef<DeviceGroupQuery.Command> queryActor =
testKit.spawn(
DeviceGroupQuery.createBehavior(
DeviceGroupQuery.create(
deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
device1.expectMessageClass(ReadTemperature.class);
device2.expectMessageClass(ReadTemperature.class);
device1.expectMessageClass(Device.ReadTemperature.class);
device2.expectMessageClass(Device.ReadTemperature.class);
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device1", Optional.of(1.0))));
new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device2", Optional.of(2.0))));
new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
RespondAllTemperatures response = requester.receiveMessage();
assertEquals(1L, response.requestId);
@ -68,28 +73,28 @@ public class DeviceGroupQueryTest extends JUnitSuite {
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
TestProbe<RespondAllTemperatures> requester =
testKit.createTestProbe(RespondAllTemperatures.class);
TestProbe<DeviceMessage> device1 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<DeviceMessage> device2 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor = new HashMap<>();
Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
deviceIdToActor.put("device1", device1.getRef());
deviceIdToActor.put("device2", device2.getRef());
ActorRef<DeviceGroupQueryMessage> queryActor =
ActorRef<DeviceGroupQuery.Command> queryActor =
testKit.spawn(
DeviceGroupQuery.createBehavior(
DeviceGroupQuery.create(
deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
assertEquals(0L, device1.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device1", Optional.empty())));
new Device.RespondTemperature(0L, "device1", Optional.empty())));
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device2", Optional.of(2.0))));
new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
RespondAllTemperatures response = requester.receiveMessage();
assertEquals(1L, response.requestId);
@ -107,24 +112,24 @@ public class DeviceGroupQueryTest extends JUnitSuite {
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
TestProbe<RespondAllTemperatures> requester =
testKit.createTestProbe(RespondAllTemperatures.class);
TestProbe<DeviceMessage> device1 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<DeviceMessage> device2 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor = new HashMap<>();
Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
deviceIdToActor.put("device1", device1.getRef());
deviceIdToActor.put("device2", device2.getRef());
ActorRef<DeviceGroupQueryMessage> queryActor =
ActorRef<DeviceGroupQuery.Command> queryActor =
testKit.spawn(
DeviceGroupQuery.createBehavior(
DeviceGroupQuery.create(
deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
assertEquals(0L, device1.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device1", Optional.of(1.0))));
new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
device2.stop();
@ -144,28 +149,28 @@ public class DeviceGroupQueryTest extends JUnitSuite {
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
TestProbe<RespondAllTemperatures> requester =
testKit.createTestProbe(RespondAllTemperatures.class);
TestProbe<DeviceMessage> device1 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<DeviceMessage> device2 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor = new HashMap<>();
Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
deviceIdToActor.put("device1", device1.getRef());
deviceIdToActor.put("device2", device2.getRef());
ActorRef<DeviceGroupQueryMessage> queryActor =
ActorRef<DeviceGroupQuery.Command> queryActor =
testKit.spawn(
DeviceGroupQuery.createBehavior(
DeviceGroupQuery.create(
deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
assertEquals(0L, device1.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device1", Optional.of(1.0))));
new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device2", Optional.of(2.0))));
new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
device2.stop();
@ -185,24 +190,24 @@ public class DeviceGroupQueryTest extends JUnitSuite {
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
TestProbe<RespondAllTemperatures> requester =
testKit.createTestProbe(RespondAllTemperatures.class);
TestProbe<DeviceMessage> device1 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<DeviceMessage> device2 = testKit.createTestProbe(DeviceMessage.class);
TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
Map<String, ActorRef<DeviceProtocol.DeviceMessage>> deviceIdToActor = new HashMap<>();
Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
deviceIdToActor.put("device1", device1.getRef());
deviceIdToActor.put("device2", device2.getRef());
ActorRef<DeviceGroupQueryMessage> queryActor =
ActorRef<DeviceGroupQuery.Command> queryActor =
testKit.spawn(
DeviceGroupQuery.createBehavior(
DeviceGroupQuery.create(
deviceIdToActor, 1L, requester.getRef(), Duration.ofMillis(200)));
assertEquals(0L, device1.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(ReadTemperature.class).requestId);
assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
queryActor.tell(
new DeviceGroupQuery.WrappedRespondTemperature(
new RespondTemperature(0L, "device1", Optional.of(1.0))));
new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
// no reply from device2

View file

@ -16,11 +16,19 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static jdocs.typed.tutorial_5.DeviceManagerProtocol.*;
import static jdocs.typed.tutorial_5.DeviceProtocol.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static jdocs.typed.tutorial_5.DeviceManager.RespondAllTemperatures;
import static jdocs.typed.tutorial_5.DeviceManager.TemperatureReading;
import static jdocs.typed.tutorial_5.DeviceManager.Temperature;
import static jdocs.typed.tutorial_5.DeviceManager.TemperatureNotAvailable;
import static jdocs.typed.tutorial_5.DeviceManager.DeviceRegistered;
import static jdocs.typed.tutorial_5.DeviceManager.RequestTrackDevice;
import static jdocs.typed.tutorial_5.DeviceManager.ReplyDeviceList;
import static jdocs.typed.tutorial_5.DeviceManager.RequestDeviceList;
import static jdocs.typed.tutorial_5.DeviceManager.RequestAllTemperatures;
public class DeviceGroupTest extends JUnitSuite {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@ -28,7 +36,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testReplyToRegistrationRequests() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device", probe.getRef()));
DeviceRegistered registered1 = probe.receiveMessage();
@ -39,17 +47,18 @@ public class DeviceGroupTest extends JUnitSuite {
assertNotEquals(registered1.device, registered2.device);
// Check that the device actors are working
TestProbe<TemperatureRecorded> recordProbe = testKit.createTestProbe(TemperatureRecorded.class);
registered1.device.tell(new RecordTemperature(0L, 1.0, recordProbe.getRef()));
TestProbe<Device.TemperatureRecorded> recordProbe =
testKit.createTestProbe(Device.TemperatureRecorded.class);
registered1.device.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef()));
assertEquals(0L, recordProbe.receiveMessage().requestId);
registered2.device.tell(new RecordTemperature(1L, 2.0, recordProbe.getRef()));
registered2.device.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef()));
assertEquals(1L, recordProbe.receiveMessage().requestId);
}
@Test
public void testIgnoreWrongRegistrationRequests() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("wrongGroup", "device1", probe.getRef()));
probe.expectNoMessage();
}
@ -57,7 +66,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testReturnSameActorForSameDeviceId() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device", probe.getRef()));
DeviceRegistered registered1 = probe.receiveMessage();
@ -71,7 +80,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testListActiveDevices() {
TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
registeredProbe.receiveMessage();
@ -90,7 +99,7 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
DeviceRegistered registered1 = registeredProbe.receiveMessage();
@ -98,7 +107,7 @@ public class DeviceGroupTest extends JUnitSuite {
groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef()));
registeredProbe.receiveMessage();
ActorRef<DeviceMessage> toShutDown = registered1.device;
ActorRef<Device.Command> toShutDown = registered1.device;
TestProbe<ReplyDeviceList> deviceListProbe = testKit.createTestProbe(ReplyDeviceList.class);
@ -107,7 +116,7 @@ public class DeviceGroupTest extends JUnitSuite {
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
toShutDown.tell(Passivate.INSTANCE);
toShutDown.tell(Device.Passivate.INSTANCE);
registeredProbe.expectTerminated(toShutDown, registeredProbe.getRemainingOrDefault());
// using awaitAssert to retry because it might take longer for the groupActor
@ -126,22 +135,23 @@ public class DeviceGroupTest extends JUnitSuite {
@Test
public void testCollectTemperaturesFromAllActiveDevices() {
TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceGroupMessage> groupActor = testKit.spawn(DeviceGroup.createBehavior("group"));
ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
ActorRef<DeviceMessage> deviceActor1 = registeredProbe.receiveMessage().device;
ActorRef<Device.Command> deviceActor1 = registeredProbe.receiveMessage().device;
groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef()));
ActorRef<DeviceMessage> deviceActor2 = registeredProbe.receiveMessage().device;
ActorRef<Device.Command> deviceActor2 = registeredProbe.receiveMessage().device;
groupActor.tell(new RequestTrackDevice("group", "device3", registeredProbe.getRef()));
ActorRef<DeviceMessage> deviceActor3 = registeredProbe.receiveMessage().device;
ActorRef<Device.Command> deviceActor3 = registeredProbe.receiveMessage().device;
// Check that the device actors are working
TestProbe<TemperatureRecorded> recordProbe = testKit.createTestProbe(TemperatureRecorded.class);
deviceActor1.tell(new RecordTemperature(0L, 1.0, recordProbe.getRef()));
TestProbe<Device.TemperatureRecorded> recordProbe =
testKit.createTestProbe(Device.TemperatureRecorded.class);
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef()));
assertEquals(0L, recordProbe.receiveMessage().requestId);
deviceActor2.tell(new RecordTemperature(1L, 2.0, recordProbe.getRef()));
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef()));
assertEquals(1L, recordProbe.receiveMessage().requestId);
// No temperature for device 3

View file

@ -15,16 +15,128 @@ import akka.actor.typed.javadsl.Receive;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static jdocs.typed.tutorial_5.DeviceManagerProtocol.*;
public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
public interface Command {}
public static Behavior<DeviceManagerMessage> createBehavior() {
return Behaviors.setup(DeviceManager::new);
public static final class RequestTrackDevice
implements DeviceManager.Command, DeviceGroup.Command {
public final String groupId;
public final String deviceId;
public final ActorRef<DeviceRegistered> replyTo;
public RequestTrackDevice(String groupId, String deviceId, ActorRef<DeviceRegistered> replyTo) {
this.groupId = groupId;
this.deviceId = deviceId;
this.replyTo = replyTo;
}
}
private static class DeviceGroupTerminated implements DeviceManagerMessage {
public static final class DeviceRegistered {
public final ActorRef<Device.Command> device;
public DeviceRegistered(ActorRef<Device.Command> device) {
this.device = device;
}
}
public static final class RequestDeviceList
implements DeviceManager.Command, DeviceGroup.Command {
final long requestId;
final String groupId;
final ActorRef<ReplyDeviceList> replyTo;
public RequestDeviceList(long requestId, String groupId, ActorRef<ReplyDeviceList> replyTo) {
this.requestId = requestId;
this.groupId = groupId;
this.replyTo = replyTo;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
// #query-protocol
public static final class RequestAllTemperatures
implements DeviceGroupQuery.Command, DeviceGroup.Command, Command {
final long requestId;
final String groupId;
final ActorRef<RespondAllTemperatures> replyTo;
public RequestAllTemperatures(
long requestId, String groupId, ActorRef<RespondAllTemperatures> replyTo) {
this.requestId = requestId;
this.groupId = groupId;
this.replyTo = replyTo;
}
}
public static final class RespondAllTemperatures {
final long requestId;
final Map<String, TemperatureReading> temperatures;
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
this.requestId = requestId;
this.temperatures = temperatures;
}
}
public interface TemperatureReading {}
public static final class Temperature implements TemperatureReading {
public final double value;
public Temperature(double value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Temperature that = (Temperature) o;
return Double.compare(that.value, value) == 0;
}
@Override
public int hashCode() {
long temp = Double.doubleToLongBits(value);
return (int) (temp ^ (temp >>> 32));
}
@Override
public String toString() {
return "Temperature{" + "value=" + value + '}';
}
}
public enum TemperatureNotAvailable implements TemperatureReading {
INSTANCE
}
public enum DeviceNotAvailable implements TemperatureReading {
INSTANCE
}
public enum DeviceTimedOut implements TemperatureReading {
INSTANCE
}
// #query-protocol
private static class DeviceGroupTerminated implements DeviceManager.Command {
public final String groupId;
DeviceGroupTerminated(String groupId) {
@ -32,23 +144,27 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
}
}
private final ActorContext<DeviceManagerMessage> context;
private final Map<String, ActorRef<DeviceGroupMessage>> groupIdToActor = new HashMap<>();
private final ActorContext<Command> context;
private final Map<String, ActorRef<DeviceGroup.Command>> groupIdToActor = new HashMap<>();
public DeviceManager(ActorContext<DeviceManagerMessage> context) {
public static Behavior<Command> create() {
return Behaviors.setup(DeviceManager::new);
}
private DeviceManager(ActorContext<Command> context) {
this.context = context;
context.getLog().info("DeviceManager started");
}
private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) {
String groupId = trackMsg.groupId;
ActorRef<DeviceGroupMessage> ref = groupIdToActor.get(groupId);
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId);
if (ref != null) {
ref.tell(trackMsg);
} else {
context.getLog().info("Creating device group actor for {}", groupId);
ActorRef<DeviceGroupMessage> groupActor =
context.spawn(DeviceGroup.createBehavior(groupId), "group-" + groupId);
ActorRef<DeviceGroup.Command> groupActor =
context.spawn(DeviceGroup.create(groupId), "group-" + groupId);
context.watchWith(groupActor, new DeviceGroupTerminated(groupId));
groupActor.tell(trackMsg);
groupIdToActor.put(groupId, groupActor);
@ -57,7 +173,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
}
private DeviceManager onRequestDeviceList(RequestDeviceList request) {
ActorRef<DeviceGroupMessage> ref = groupIdToActor.get(request.groupId);
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(request.groupId);
if (ref != null) {
ref.tell(request);
} else {
@ -67,7 +183,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
}
private DeviceManager onRequestAllTemperatures(RequestAllTemperatures request) {
ActorRef<DeviceGroupMessage> ref = groupIdToActor.get(request.groupId);
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(request.groupId);
if (ref != null) {
ref.tell(request);
} else {
@ -82,17 +198,17 @@ public class DeviceManager extends AbstractBehavior<DeviceManagerMessage> {
return this;
}
public Receive<DeviceManagerMessage> createReceive() {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(RequestTrackDevice.class, this::onTrackDevice)
.onMessage(RequestDeviceList.class, this::onRequestDeviceList)
.onMessage(RequestAllTemperatures.class, this::onRequestAllTemperatures)
.onMessage(DeviceGroupTerminated.class, this::onTerminated)
.onSignal(PostStop.class, signal -> postStop())
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}
private DeviceManager postStop() {
private DeviceManager onPostStop() {
context.getLog().info("DeviceManager stopped");
return this;
}

View file

@ -1,134 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.typed.tutorial_5;
import akka.actor.typed.ActorRef;
import java.util.Map;
import java.util.Set;
abstract class DeviceManagerProtocol {
// no instances of DeviceManagerProtocol class
private DeviceManagerProtocol() {}
interface DeviceManagerMessage {}
interface DeviceGroupMessage {}
public static final class RequestTrackDevice implements DeviceManagerMessage, DeviceGroupMessage {
public final String groupId;
public final String deviceId;
public final ActorRef<DeviceRegistered> replyTo;
public RequestTrackDevice(String groupId, String deviceId, ActorRef<DeviceRegistered> replyTo) {
this.groupId = groupId;
this.deviceId = deviceId;
this.replyTo = replyTo;
}
}
public static final class DeviceRegistered {
public final ActorRef<DeviceProtocol.DeviceMessage> device;
public DeviceRegistered(ActorRef<DeviceProtocol.DeviceMessage> device) {
this.device = device;
}
}
public static final class RequestDeviceList implements DeviceManagerMessage, DeviceGroupMessage {
final long requestId;
final String groupId;
final ActorRef<ReplyDeviceList> replyTo;
public RequestDeviceList(long requestId, String groupId, ActorRef<ReplyDeviceList> replyTo) {
this.requestId = requestId;
this.groupId = groupId;
this.replyTo = replyTo;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
// #query-protocol
interface DeviceGroupQueryMessage {}
public static final class RequestAllTemperatures
implements DeviceGroupQueryMessage, DeviceGroupMessage, DeviceManagerMessage {
final long requestId;
final String groupId;
final ActorRef<RespondAllTemperatures> replyTo;
public RequestAllTemperatures(
long requestId, String groupId, ActorRef<RespondAllTemperatures> replyTo) {
this.requestId = requestId;
this.groupId = groupId;
this.replyTo = replyTo;
}
}
public static final class RespondAllTemperatures {
final long requestId;
final Map<String, TemperatureReading> temperatures;
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
this.requestId = requestId;
this.temperatures = temperatures;
}
}
public static interface TemperatureReading {}
public static final class Temperature implements TemperatureReading {
public final double value;
public Temperature(double value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Temperature that = (Temperature) o;
return Double.compare(that.value, value) == 0;
}
@Override
public int hashCode() {
long temp = Double.doubleToLongBits(value);
return (int) (temp ^ (temp >>> 32));
}
@Override
public String toString() {
return "Temperature{" + "value=" + value + '}';
}
}
public enum TemperatureNotAvailable implements TemperatureReading {
INSTANCE
}
public enum DeviceNotAvailable implements TemperatureReading {
INSTANCE
}
public enum DeviceTimedOut implements TemperatureReading {
INSTANCE
}
// #query-protocol
}

View file

@ -11,9 +11,11 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import static jdocs.typed.tutorial_5.DeviceManagerProtocol.*;
import static org.junit.Assert.assertNotEquals;
import static jdocs.typed.tutorial_5.DeviceManager.DeviceRegistered;
import static jdocs.typed.tutorial_5.DeviceManager.RequestTrackDevice;
public class DeviceManagerTest extends JUnitSuite {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@ -21,7 +23,7 @@ public class DeviceManagerTest extends JUnitSuite {
@Test
public void testReplyToRegistrationRequests() {
TestProbe<DeviceRegistered> probe = testKit.createTestProbe(DeviceRegistered.class);
ActorRef<DeviceManagerMessage> managerActor = testKit.spawn(DeviceManager.createBehavior());
ActorRef<DeviceManager.Command> managerActor = testKit.spawn(DeviceManager.create());
managerActor.tell(new RequestTrackDevice("group1", "device", probe.getRef()));
DeviceRegistered registered1 = probe.receiveMessage();

View file

@ -1,62 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.typed.tutorial_5;
import akka.actor.typed.ActorRef;
import java.util.Optional;
abstract class DeviceProtocol {
// no instances of DeviceProtocol class
private DeviceProtocol() {}
interface DeviceMessage {}
public static final class RecordTemperature implements DeviceMessage {
final long requestId;
final double value;
final ActorRef<TemperatureRecorded> replyTo;
public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
this.requestId = requestId;
this.value = value;
this.replyTo = replyTo;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature implements DeviceMessage {
final long requestId;
final ActorRef<RespondTemperature> replyTo;
public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
public static final class RespondTemperature {
final long requestId;
final String deviceId;
final Optional<Double> value;
public RespondTemperature(long requestId, String deviceId, Optional<Double> value) {
this.requestId = requestId;
this.deviceId = deviceId;
this.value = value;
}
}
static enum Passivate implements DeviceMessage {
INSTANCE
}
}

View file

@ -13,7 +13,6 @@ import org.scalatest.junit.JUnitSuite;
import java.util.Optional;
import static jdocs.typed.tutorial_5.DeviceProtocol.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@ -23,33 +22,36 @@ public class DeviceTest extends JUnitSuite {
@Test
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
TestProbe<RespondTemperature> probe = testKit.createTestProbe(RespondTemperature.class);
ActorRef<DeviceMessage> deviceActor = testKit.spawn(Device.createBehavior("group", "device"));
deviceActor.tell(new ReadTemperature(42L, probe.getRef()));
RespondTemperature response = probe.receiveMessage();
TestProbe<Device.RespondTemperature> probe =
testKit.createTestProbe(Device.RespondTemperature.class);
ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
deviceActor.tell(new Device.ReadTemperature(42L, probe.getRef()));
Device.RespondTemperature response = probe.receiveMessage();
assertEquals(42L, response.requestId);
assertEquals(Optional.empty(), response.value);
}
@Test
public void testReplyWithLatestTemperatureReading() {
TestProbe<TemperatureRecorded> recordProbe = testKit.createTestProbe(TemperatureRecorded.class);
TestProbe<RespondTemperature> readProbe = testKit.createTestProbe(RespondTemperature.class);
ActorRef<DeviceMessage> deviceActor = testKit.spawn(Device.createBehavior("group", "device"));
TestProbe<Device.TemperatureRecorded> recordProbe =
testKit.createTestProbe(Device.TemperatureRecorded.class);
TestProbe<Device.RespondTemperature> readProbe =
testKit.createTestProbe(Device.RespondTemperature.class);
ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
deviceActor.tell(new RecordTemperature(1L, 24.0, recordProbe.getRef()));
deviceActor.tell(new Device.RecordTemperature(1L, 24.0, recordProbe.getRef()));
assertEquals(1L, recordProbe.receiveMessage().requestId);
deviceActor.tell(new ReadTemperature(2L, readProbe.getRef()));
RespondTemperature response1 = readProbe.receiveMessage();
deviceActor.tell(new Device.ReadTemperature(2L, readProbe.getRef()));
Device.RespondTemperature response1 = readProbe.receiveMessage();
assertEquals(2L, response1.requestId);
assertEquals(Optional.of(24.0), response1.value);
deviceActor.tell(new RecordTemperature(3L, 55.0, recordProbe.getRef()));
deviceActor.tell(new Device.RecordTemperature(3L, 55.0, recordProbe.getRef()));
assertEquals(3L, recordProbe.receiveMessage().requestId);
deviceActor.tell(new ReadTemperature(4L, readProbe.getRef()));
RespondTemperature response2 = readProbe.receiveMessage();
deviceActor.tell(new Device.ReadTemperature(4L, readProbe.getRef()));
Device.RespondTemperature response2 = readProbe.receiveMessage();
assertEquals(4L, response2.requestId);
assertEquals(Optional.of(55.0), response2.value);
}