remove unused sample for classic getting started guide
* guide was removed previously
This commit is contained in:
parent
5c0f213fba
commit
bc2caf866c
37 changed files with 0 additions and 3377 deletions
|
|
@ -1,193 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
// #print-refs
|
||||
package com.example;
|
||||
|
||||
// #print-refs
|
||||
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
// #print-refs
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.AbstractActor.Receive;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
class PrintMyActorRefActor extends AbstractActor {
|
||||
static Props props() {
|
||||
return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.matchEquals(
|
||||
"printit",
|
||||
p -> {
|
||||
ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
|
||||
System.out.println("Second: " + secondRef);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #print-refs
|
||||
|
||||
// #start-stop
|
||||
class StartStopActor1 extends AbstractActor {
|
||||
static Props props() {
|
||||
return Props.create(StartStopActor1.class, StartStopActor1::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
System.out.println("first started");
|
||||
getContext().actorOf(StartStopActor2.props(), "second");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
System.out.println("first stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.matchEquals(
|
||||
"stop",
|
||||
s -> {
|
||||
getContext().stop(getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
class StartStopActor2 extends AbstractActor {
|
||||
|
||||
static Props props() {
|
||||
return Props.create(StartStopActor2.class, StartStopActor2::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
System.out.println("second started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
System.out.println("second stopped");
|
||||
}
|
||||
|
||||
// Actor.emptyBehavior is a useful placeholder when we don't
|
||||
// want to handle any messages in the actor.
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().build();
|
||||
}
|
||||
}
|
||||
// #start-stop
|
||||
|
||||
// #supervise
|
||||
class SupervisingActor extends AbstractActor {
|
||||
static Props props() {
|
||||
return Props.create(SupervisingActor.class, SupervisingActor::new);
|
||||
}
|
||||
|
||||
ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor");
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.matchEquals(
|
||||
"failChild",
|
||||
f -> {
|
||||
child.tell("fail", getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
class SupervisedActor extends AbstractActor {
|
||||
static Props props() {
|
||||
return Props.create(SupervisedActor.class, SupervisedActor::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
System.out.println("supervised actor started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
System.out.println("supervised actor stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.matchEquals(
|
||||
"fail",
|
||||
f -> {
|
||||
System.out.println("supervised actor fails now");
|
||||
throw new Exception("I failed!");
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #supervise
|
||||
|
||||
// #print-refs
|
||||
public class ActorHierarchyExperiments {
|
||||
public static void main(String[] args) throws java.io.IOException {
|
||||
ActorSystem system = ActorSystem.create("testSystem");
|
||||
|
||||
ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor");
|
||||
System.out.println("First: " + firstRef);
|
||||
firstRef.tell("printit", ActorRef.noSender());
|
||||
|
||||
System.out.println(">>> Press ENTER to exit <<<");
|
||||
try {
|
||||
System.in.read();
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
// #print-refs
|
||||
|
||||
class ActorHierarchyExperimentsTest extends JUnitSuite {
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartAndStopActors() {
|
||||
// #start-stop-main
|
||||
ActorRef first = system.actorOf(StartStopActor1.props(), "first");
|
||||
first.tell("stop", ActorRef.noSender());
|
||||
// #start-stop-main
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuperviseActors() {
|
||||
// #supervise-main
|
||||
ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor");
|
||||
supervisingActor.tell("failChild", ActorRef.noSender());
|
||||
// #supervise-main
|
||||
}
|
||||
}
|
||||
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
// #iot-app
|
||||
package com.example;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ActorRef;
|
||||
|
||||
public class IotMain {
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
ActorSystem system = ActorSystem.create("iot-system");
|
||||
|
||||
try {
|
||||
// Create top level supervisor
|
||||
ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor");
|
||||
|
||||
System.out.println("Press ENTER to exit the system");
|
||||
System.in.read();
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
// #iot-app
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
// #iot-supervisor
|
||||
package com.example;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorLogging;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class IotSupervisor extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(IotSupervisor.class, IotSupervisor::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("IoT Application started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("IoT Application stopped");
|
||||
}
|
||||
|
||||
// No need to handle any messages
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().build();
|
||||
}
|
||||
}
|
||||
// #iot-supervisor
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
// #full-device
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.AbstractActor.Receive;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, () -> new Device(groupId, deviceId));
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(
|
||||
RecordTemperature.class,
|
||||
r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(
|
||||
ReadTemperature.class,
|
||||
r -> {
|
||||
getSender()
|
||||
.tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #full-device
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import jdocs.tutorial_3.Device.ReadTemperature;
|
||||
import jdocs.tutorial_3.Device.RecordTemperature;
|
||||
import jdocs.tutorial_3.Device.RespondTemperature;
|
||||
import jdocs.tutorial_3.Device.TemperatureRecorded;
|
||||
|
||||
class DeviceInProgress1 {
|
||||
|
||||
// #read-protocol-1
|
||||
public static final class ReadTemperature {}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(Optional<Double> value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
// #read-protocol-1
|
||||
|
||||
}
|
||||
|
|
@ -1,18 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
class DeviceInProgress3 {
|
||||
|
||||
// #write-protocol-1
|
||||
public static final class RecordTemperature {
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
// #write-protocol-1
|
||||
}
|
||||
|
|
@ -1,71 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
// #device-read-test
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
// #device-read-test
|
||||
|
||||
// #device-write-read-test
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
// #device-write-read-test
|
||||
|
||||
}
|
||||
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_3.inprogress2;
|
||||
|
||||
// #device-with-read
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, () -> new Device(groupId, deviceId));
|
||||
}
|
||||
|
||||
// #read-protocol-2
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
// #read-protocol-2
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(
|
||||
ReadTemperature.class,
|
||||
r -> {
|
||||
getSender()
|
||||
.tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
// #device-with-read
|
||||
|
|
@ -1,116 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
// #device-with-register
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import jdocs.tutorial_4.DeviceManager.DeviceRegistered;
|
||||
import jdocs.tutorial_4.DeviceManager.RequestTrackDevice;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, () -> new Device(groupId, deviceId));
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(
|
||||
RequestTrackDevice.class,
|
||||
r -> {
|
||||
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
|
||||
getSender().tell(new DeviceRegistered(), getSelf());
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
r.groupId,
|
||||
r.deviceId,
|
||||
this.groupId,
|
||||
this.deviceId);
|
||||
}
|
||||
})
|
||||
.match(
|
||||
RecordTemperature.class,
|
||||
r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(
|
||||
ReadTemperature.class,
|
||||
r -> {
|
||||
getSender()
|
||||
.tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #device-with-register
|
||||
|
|
@ -1,130 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import jdocs.tutorial_4.Device;
|
||||
import jdocs.tutorial_4.DeviceManager;
|
||||
|
||||
// #device-group-full
|
||||
// #device-group-remove
|
||||
// #device-group-register
|
||||
public class DeviceGroup extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
public DeviceGroup(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId) {
|
||||
return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
|
||||
}
|
||||
// #device-group-register
|
||||
// #device-group-remove
|
||||
|
||||
public static final class RequestDeviceList {
|
||||
final long requestId;
|
||||
|
||||
public RequestDeviceList(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReplyDeviceList {
|
||||
final long requestId;
|
||||
final Set<String> ids;
|
||||
|
||||
public ReplyDeviceList(long requestId, Set<String> ids) {
|
||||
this.requestId = requestId;
|
||||
this.ids = ids;
|
||||
}
|
||||
}
|
||||
// #device-group-remove
|
||||
// #device-group-register
|
||||
|
||||
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
|
||||
// #device-group-register
|
||||
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
// #device-group-register
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceGroup {} stopped", groupId);
|
||||
}
|
||||
|
||||
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
|
||||
if (this.groupId.equals(trackMsg.groupId)) {
|
||||
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
|
||||
if (deviceActor != null) {
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId);
|
||||
deviceActor =
|
||||
getContext()
|
||||
.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
// #device-group-register
|
||||
getContext().watch(deviceActor);
|
||||
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
|
||||
// #device-group-register
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
}
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}. ",
|
||||
trackMsg.groupId,
|
||||
this.groupId);
|
||||
}
|
||||
}
|
||||
// #device-group-register
|
||||
// #device-group-remove
|
||||
|
||||
private void onDeviceList(RequestDeviceList r) {
|
||||
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
|
||||
}
|
||||
// #device-group-remove
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef deviceActor = t.getActor();
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
log.info("Device actor for {} has been terminated", deviceId);
|
||||
actorToDeviceId.remove(deviceActor);
|
||||
deviceIdToActor.remove(deviceId);
|
||||
}
|
||||
// #device-group-register
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
|
||||
// #device-group-register
|
||||
// #device-group-remove
|
||||
.match(RequestDeviceList.class, this::onDeviceList)
|
||||
// #device-group-remove
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
// #device-group-register
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #device-group-register
|
||||
// #device-group-remove
|
||||
// #device-group-full
|
||||
|
|
@ -1,138 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public class DeviceGroupTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
// #device-group-test-registration
|
||||
@Test
|
||||
public void testRegisterDeviceActor() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertNotEquals(deviceActor1, deviceActor2);
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreRequestsForWrongGroupId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
|
||||
probe.expectNoMessage();
|
||||
}
|
||||
// #device-group-test-registration
|
||||
|
||||
// #device-group-test3
|
||||
@Test
|
||||
public void testReturnSameActorForSameDeviceId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertEquals(deviceActor1, deviceActor2);
|
||||
}
|
||||
// #device-group-test3
|
||||
|
||||
// #device-group-list-terminate-test
|
||||
@Test
|
||||
public void testListActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevicesAfterOneShutsDown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef toShutDown = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
|
||||
probe.watch(toShutDown);
|
||||
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
probe.expectTerminated(toShutDown);
|
||||
|
||||
// using awaitAssert to retry because it might take longer for the groupActor
|
||||
// to see the Terminated, that order is undefined
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList r = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(1L, r.requestId);
|
||||
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
// #device-group-list-terminate-test
|
||||
}
|
||||
|
|
@ -1,82 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
// #device-manager-full
|
||||
public class DeviceManager extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(DeviceManager.class, DeviceManager::new);
|
||||
}
|
||||
|
||||
// #device-manager-msgs
|
||||
public static final class RequestTrackDevice {
|
||||
public final String groupId;
|
||||
public final String deviceId;
|
||||
|
||||
public RequestTrackDevice(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DeviceRegistered {}
|
||||
|
||||
// #device-manager-msgs
|
||||
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceManager started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceManager stopped");
|
||||
}
|
||||
|
||||
private void onTrackDevice(RequestTrackDevice trackMsg) {
|
||||
String groupId = trackMsg.groupId;
|
||||
ActorRef ref = groupIdToActor.get(groupId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device group actor for {}", groupId);
|
||||
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
|
||||
getContext().watch(groupActor);
|
||||
groupActor.forward(trackMsg, getContext());
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
actorToGroupId.put(groupActor, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef groupActor = t.getActor();
|
||||
String groupId = actorToGroupId.get(groupActor);
|
||||
log.info("Device group actor for {} has been terminated", groupId);
|
||||
actorToGroupId.remove(groupActor);
|
||||
groupIdToActor.remove(groupId);
|
||||
}
|
||||
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #device-manager-full
|
||||
|
|
@ -1,95 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
// #device-registration-tests
|
||||
@Test
|
||||
public void testReplyToRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
assertEquals(deviceActor, probe.getLastSender());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreWrongRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
|
||||
probe.expectNoMessage();
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
|
||||
probe.expectNoMessage();
|
||||
}
|
||||
// #device-registration-tests
|
||||
|
||||
// #device-read-test
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
// #device-read-test
|
||||
|
||||
// #device-write-read-test
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
// #device-write-read-test
|
||||
|
||||
}
|
||||
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import jdocs.tutorial_5.DeviceManager.DeviceRegistered;
|
||||
import jdocs.tutorial_5.DeviceManager.RequestTrackDevice;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, () -> new Device(groupId, deviceId));
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(
|
||||
RequestTrackDevice.class,
|
||||
r -> {
|
||||
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
|
||||
getSender().tell(new DeviceRegistered(), getSelf());
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
r.groupId,
|
||||
r.deviceId,
|
||||
this.groupId,
|
||||
this.deviceId);
|
||||
}
|
||||
})
|
||||
.match(
|
||||
RecordTemperature.class,
|
||||
r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(
|
||||
ReadTemperature.class,
|
||||
r -> {
|
||||
getSender()
|
||||
.tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,197 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
// #query-added
|
||||
public class DeviceGroup extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
public DeviceGroup(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId) {
|
||||
return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
|
||||
}
|
||||
|
||||
public static final class RequestDeviceList {
|
||||
final long requestId;
|
||||
|
||||
public RequestDeviceList(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReplyDeviceList {
|
||||
final long requestId;
|
||||
final Set<String> ids;
|
||||
|
||||
public ReplyDeviceList(long requestId, Set<String> ids) {
|
||||
this.requestId = requestId;
|
||||
this.ids = ids;
|
||||
}
|
||||
}
|
||||
|
||||
// #query-protocol
|
||||
public static final class RequestAllTemperatures {
|
||||
final long requestId;
|
||||
|
||||
public RequestAllTemperatures(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondAllTemperatures {
|
||||
final long requestId;
|
||||
final Map<String, TemperatureReading> temperatures;
|
||||
|
||||
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
|
||||
this.requestId = requestId;
|
||||
this.temperatures = temperatures;
|
||||
}
|
||||
}
|
||||
|
||||
public static interface TemperatureReading {}
|
||||
|
||||
public static final class Temperature implements TemperatureReading {
|
||||
public final double value;
|
||||
|
||||
public Temperature(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Temperature that = (Temperature) o;
|
||||
|
||||
return Double.compare(that.value, value) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
long temp = Double.doubleToLongBits(value);
|
||||
return (int) (temp ^ (temp >>> 32));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Temperature{" + "value=" + value + '}';
|
||||
}
|
||||
}
|
||||
|
||||
public enum TemperatureNotAvailable implements TemperatureReading {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
public enum DeviceNotAvailable implements TemperatureReading {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
public enum DeviceTimedOut implements TemperatureReading {
|
||||
INSTANCE
|
||||
}
|
||||
// #query-protocol
|
||||
|
||||
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceGroup {} stopped", groupId);
|
||||
}
|
||||
|
||||
// #query-added
|
||||
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
|
||||
if (this.groupId.equals(trackMsg.groupId)) {
|
||||
ActorRef ref = deviceIdToActor.get(trackMsg.deviceId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId);
|
||||
ActorRef deviceActor =
|
||||
getContext()
|
||||
.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
getContext().watch(deviceActor);
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
}
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
groupId,
|
||||
this.groupId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onDeviceList(RequestDeviceList r) {
|
||||
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef deviceActor = t.getActor();
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
log.info("Device actor for {} has been terminated", deviceId);
|
||||
actorToDeviceId.remove(deviceActor);
|
||||
deviceIdToActor.remove(deviceId);
|
||||
}
|
||||
// #query-added
|
||||
|
||||
private void onAllTemperatures(RequestAllTemperatures r) {
|
||||
// since Java collections are mutable, we want to avoid sharing them between actors (since
|
||||
// multiple Actors (threads)
|
||||
// modifying the same mutable data-structure is not safe), and perform a defensive copy of the
|
||||
// mutable map:
|
||||
//
|
||||
// Feel free to use your favourite immutable data-structures library with Akka in Java
|
||||
// applications!
|
||||
Map<ActorRef, String> actorToDeviceIdCopy = new HashMap<>(this.actorToDeviceId);
|
||||
|
||||
getContext()
|
||||
.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceIdCopy,
|
||||
r.requestId,
|
||||
getSender(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
// #query-added
|
||||
return receiveBuilder()
|
||||
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(RequestDeviceList.class, this::onDeviceList)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
// #query-added
|
||||
// ... other cases omitted
|
||||
.match(RequestAllTemperatures.class, this::onAllTemperatures)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #query-added
|
||||
|
|
@ -1,149 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
// #query-full
|
||||
// #query-outline
|
||||
public class DeviceGroupQuery extends AbstractActor {
|
||||
public static final class CollectionTimeout {}
|
||||
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final Map<ActorRef, String> actorToDeviceId;
|
||||
final long requestId;
|
||||
final ActorRef requester;
|
||||
|
||||
Cancellable queryTimeoutTimer;
|
||||
|
||||
public DeviceGroupQuery(
|
||||
Map<ActorRef, String> actorToDeviceId,
|
||||
long requestId,
|
||||
ActorRef requester,
|
||||
FiniteDuration timeout) {
|
||||
this.actorToDeviceId = actorToDeviceId;
|
||||
this.requestId = requestId;
|
||||
this.requester = requester;
|
||||
|
||||
queryTimeoutTimer =
|
||||
getContext()
|
||||
.getSystem()
|
||||
.scheduler()
|
||||
.scheduleOnce(
|
||||
timeout,
|
||||
getSelf(),
|
||||
new CollectionTimeout(),
|
||||
getContext().getDispatcher(),
|
||||
getSelf());
|
||||
}
|
||||
|
||||
public static Props props(
|
||||
Map<ActorRef, String> actorToDeviceId,
|
||||
long requestId,
|
||||
ActorRef requester,
|
||||
FiniteDuration timeout) {
|
||||
return Props.create(
|
||||
DeviceGroupQuery.class,
|
||||
() -> new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
|
||||
getContext().watch(deviceActor);
|
||||
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
queryTimeoutTimer.cancel();
|
||||
}
|
||||
|
||||
// #query-outline
|
||||
// #query-state
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
|
||||
}
|
||||
|
||||
public Receive waitingForReplies(
|
||||
Map<String, DeviceGroup.TemperatureReading> repliesSoFar, Set<ActorRef> stillWaiting) {
|
||||
return receiveBuilder()
|
||||
.match(
|
||||
Device.RespondTemperature.class,
|
||||
r -> {
|
||||
ActorRef deviceActor = getSender();
|
||||
DeviceGroup.TemperatureReading reading =
|
||||
r.value
|
||||
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
|
||||
.orElse(DeviceGroup.TemperatureNotAvailable.INSTANCE);
|
||||
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
|
||||
})
|
||||
.match(
|
||||
Terminated.class,
|
||||
t -> {
|
||||
receivedResponse(
|
||||
t.getActor(),
|
||||
DeviceGroup.DeviceNotAvailable.INSTANCE,
|
||||
stillWaiting,
|
||||
repliesSoFar);
|
||||
})
|
||||
.match(
|
||||
CollectionTimeout.class,
|
||||
t -> {
|
||||
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
|
||||
for (ActorRef deviceActor : stillWaiting) {
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
replies.put(deviceId, DeviceGroup.DeviceTimedOut.INSTANCE);
|
||||
}
|
||||
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
// #query-state
|
||||
|
||||
// #query-collect-reply
|
||||
public void receivedResponse(
|
||||
ActorRef deviceActor,
|
||||
DeviceGroup.TemperatureReading reading,
|
||||
Set<ActorRef> stillWaiting,
|
||||
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
|
||||
getContext().unwatch(deviceActor);
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
|
||||
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
|
||||
newStillWaiting.remove(deviceActor);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
|
||||
newRepliesSoFar.put(deviceId, reading);
|
||||
if (newStillWaiting.isEmpty()) {
|
||||
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
} else {
|
||||
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
|
||||
}
|
||||
}
|
||||
// #query-collect-reply
|
||||
// #query-outline
|
||||
}
|
||||
// #query-outline
|
||||
// #query-full
|
||||
|
|
@ -1,217 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DeviceGroupQueryTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
// #query-test-normal
|
||||
@Test
|
||||
public void testReturnTemperatureValueForWorkingDevices() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor =
|
||||
system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response =
|
||||
requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEquals(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
// #query-test-normal
|
||||
|
||||
// #query-test-no-reading
|
||||
@Test
|
||||
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor =
|
||||
system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response =
|
||||
requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", DeviceGroup.TemperatureNotAvailable.INSTANCE);
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEquals(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
// #query-test-no-reading
|
||||
|
||||
// #query-test-stopped
|
||||
@Test
|
||||
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor =
|
||||
system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response =
|
||||
requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", DeviceGroup.DeviceNotAvailable.INSTANCE);
|
||||
|
||||
assertEquals(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
// #query-test-stopped
|
||||
|
||||
// #query-test-stopped-later
|
||||
@Test
|
||||
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor =
|
||||
system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response =
|
||||
requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEquals(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
// #query-test-stopped-later
|
||||
|
||||
// #query-test-timeout
|
||||
@Test
|
||||
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor =
|
||||
system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(1, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response =
|
||||
requester.expectMsgClass(
|
||||
java.time.Duration.ofSeconds(5), DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", DeviceGroup.DeviceTimedOut.INSTANCE);
|
||||
|
||||
assertEquals(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
// #query-test-timeout
|
||||
|
||||
}
|
||||
|
|
@ -1,173 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public class DeviceGroupTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterDeviceActor() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertNotEquals(deviceActor1, deviceActor2);
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreRequestsForWrongGroupId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
|
||||
probe.expectNoMessage();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnSameActorForSameDeviceId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertEquals(deviceActor1, deviceActor2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevicesAfterOneShutsDown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef toShutDown = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
|
||||
probe.watch(toShutDown);
|
||||
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
probe.expectTerminated(toShutDown);
|
||||
|
||||
// using awaitAssert to retry because it might take longer for the groupActor
|
||||
// to see the Terminated, that order is undefined
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList r = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(1L, r.requestId);
|
||||
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
// #group-query-integration-test
|
||||
@Test
|
||||
public void testCollectTemperaturesFromAllActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor3 = probe.getLastSender();
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
// No temperature for device 3
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef());
|
||||
DeviceGroup.RespondAllTemperatures response =
|
||||
probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(0L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
expectedTemperatures.put("device3", DeviceGroup.TemperatureNotAvailable.INSTANCE);
|
||||
|
||||
assertEquals(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
// #group-query-integration-test
|
||||
}
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DeviceManager extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(DeviceManager.class, DeviceManager::new);
|
||||
}
|
||||
|
||||
public static final class RequestTrackDevice {
|
||||
public final String groupId;
|
||||
public final String deviceId;
|
||||
|
||||
public RequestTrackDevice(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DeviceRegistered {}
|
||||
|
||||
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceManager started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceManager stopped");
|
||||
}
|
||||
|
||||
private void onTrackDevice(RequestTrackDevice trackMsg) {
|
||||
String groupId = trackMsg.groupId;
|
||||
ActorRef ref = groupIdToActor.get(groupId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device group actor for {}", groupId);
|
||||
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
|
||||
getContext().watch(groupActor);
|
||||
groupActor.forward(trackMsg, getContext());
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
actorToGroupId.put(groupActor, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef groupActor = t.getActor();
|
||||
String groupId = actorToGroupId.get(groupActor);
|
||||
log.info("Device group actor for {} has been terminated", groupId);
|
||||
actorToGroupId.remove(groupActor);
|
||||
groupIdToActor.remove(groupId);
|
||||
}
|
||||
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,88 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyToRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
assertEquals(deviceActor, probe.getLastSender());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreWrongRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
|
||||
probe.expectNoMessage();
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
|
||||
probe.expectNoMessage();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,133 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
// Prevent package clashes with the Java examples:
|
||||
package docs.tutorial_1
|
||||
|
||||
//#print-refs
|
||||
package com.example
|
||||
|
||||
import akka.actor.{ Actor, ActorSystem, Props }
|
||||
import scala.io.StdIn
|
||||
|
||||
object PrintMyActorRefActor {
|
||||
def props: Props =
|
||||
Props(new PrintMyActorRefActor)
|
||||
}
|
||||
|
||||
class PrintMyActorRefActor extends Actor {
|
||||
override def receive: Receive = {
|
||||
case "printit" =>
|
||||
val secondRef = context.actorOf(Props.empty, "second-actor")
|
||||
println(s"Second: $secondRef")
|
||||
}
|
||||
}
|
||||
//#print-refs
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
//#start-stop
|
||||
object StartStopActor1 {
|
||||
def props: Props =
|
||||
Props(new StartStopActor1)
|
||||
}
|
||||
|
||||
class StartStopActor1 extends Actor {
|
||||
override def preStart(): Unit = {
|
||||
println("first started")
|
||||
context.actorOf(StartStopActor2.props, "second")
|
||||
}
|
||||
override def postStop(): Unit = println("first stopped")
|
||||
|
||||
override def receive: Receive = {
|
||||
case "stop" => context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
object StartStopActor2 {
|
||||
def props: Props =
|
||||
Props(new StartStopActor2)
|
||||
}
|
||||
|
||||
class StartStopActor2 extends Actor {
|
||||
override def preStart(): Unit = println("second started")
|
||||
override def postStop(): Unit = println("second stopped")
|
||||
|
||||
// Actor.emptyBehavior is a useful placeholder when we don't
|
||||
// want to handle any messages in the actor.
|
||||
override def receive: Receive = Actor.emptyBehavior
|
||||
}
|
||||
//#start-stop
|
||||
|
||||
//#supervise
|
||||
object SupervisingActor {
|
||||
def props: Props =
|
||||
Props(new SupervisingActor)
|
||||
}
|
||||
|
||||
class SupervisingActor extends Actor {
|
||||
val child = context.actorOf(SupervisedActor.props, "supervised-actor")
|
||||
|
||||
override def receive: Receive = {
|
||||
case "failChild" => child ! "fail"
|
||||
}
|
||||
}
|
||||
|
||||
object SupervisedActor {
|
||||
def props: Props =
|
||||
Props(new SupervisedActor)
|
||||
}
|
||||
|
||||
class SupervisedActor extends Actor {
|
||||
override def preStart(): Unit = println("supervised actor started")
|
||||
override def postStop(): Unit = println("supervised actor stopped")
|
||||
|
||||
override def receive: Receive = {
|
||||
case "fail" =>
|
||||
println("supervised actor fails now")
|
||||
throw new Exception("I failed!")
|
||||
}
|
||||
}
|
||||
//#supervise
|
||||
|
||||
class ActorHierarchyExperiments extends AkkaSpec {
|
||||
"create top and child actor" in {
|
||||
// format: OFF
|
||||
//#print-refs
|
||||
|
||||
object ActorHierarchyExperiments extends App {
|
||||
val system = ActorSystem("testSystem")
|
||||
|
||||
val firstRef = system.actorOf(PrintMyActorRefActor.props, "first-actor")
|
||||
println(s"First: $firstRef")
|
||||
firstRef ! "printit"
|
||||
|
||||
println(">>> Press ENTER to exit <<<")
|
||||
try StdIn.readLine()
|
||||
finally system.terminate()
|
||||
}
|
||||
//#print-refs
|
||||
// format: ON
|
||||
}
|
||||
|
||||
"start and stop actors" in {
|
||||
// format: OFF
|
||||
//#start-stop-main
|
||||
|
||||
val first = system.actorOf(StartStopActor1.props, "first")
|
||||
first ! "stop"
|
||||
//#start-stop-main
|
||||
// format: ON
|
||||
}
|
||||
|
||||
"supervise actors" in {
|
||||
// format: OFF
|
||||
//#supervise-main
|
||||
|
||||
val supervisingActor = system.actorOf(SupervisingActor.props, "supervising-actor")
|
||||
supervisingActor ! "failChild"
|
||||
//#supervise-main
|
||||
// format: ON
|
||||
}
|
||||
}
|
||||
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_2
|
||||
|
||||
//#iot-app
|
||||
package com.example
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import scala.io.StdIn
|
||||
|
||||
object IotApp {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val system = ActorSystem("iot-system")
|
||||
|
||||
try {
|
||||
// Create top level supervisor
|
||||
val supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor")
|
||||
// Exit the system after ENTER is pressed
|
||||
StdIn.readLine()
|
||||
} finally {
|
||||
system.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#iot-app
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_2
|
||||
|
||||
//#iot-supervisor
|
||||
package com.example
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
|
||||
object IotSupervisor {
|
||||
def props(): Props = Props(new IotSupervisor)
|
||||
}
|
||||
|
||||
class IotSupervisor extends Actor with ActorLogging {
|
||||
override def preStart(): Unit = log.info("IoT Application started")
|
||||
override def postStop(): Unit = log.info("IoT Application stopped")
|
||||
|
||||
// No need to handle any messages
|
||||
override def receive = Actor.emptyBehavior
|
||||
|
||||
}
|
||||
//#iot-supervisor
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_3
|
||||
|
||||
//#full-device
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
|
||||
object Device {
|
||||
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
|
||||
|
||||
final case class RecordTemperature(requestId: Long, value: Double)
|
||||
final case class TemperatureRecorded(requestId: Long)
|
||||
|
||||
final case class ReadTemperature(requestId: Long)
|
||||
final case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
}
|
||||
|
||||
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
|
||||
import Device._
|
||||
var lastTemperatureReading: Option[Double] = None
|
||||
|
||||
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
|
||||
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
|
||||
|
||||
override def receive: Receive = {
|
||||
case RecordTemperature(id, value) =>
|
||||
log.info("Recorded temperature reading {} with {}", value, id)
|
||||
lastTemperatureReading = Some(value)
|
||||
sender() ! TemperatureRecorded(id)
|
||||
|
||||
case ReadTemperature(id) =>
|
||||
sender() ! RespondTemperature(id, lastTemperatureReading)
|
||||
}
|
||||
}
|
||||
//#full-device
|
||||
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_3
|
||||
|
||||
object DeviceInProgress1 {
|
||||
|
||||
object Device {
|
||||
//#read-protocol-1
|
||||
final case object ReadTemperature
|
||||
final case class RespondTemperature(value: Option[Double])
|
||||
//#read-protocol-1
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object DeviceInProgress2 {
|
||||
|
||||
//#device-with-read
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
|
||||
object Device {
|
||||
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
|
||||
|
||||
//#read-protocol-2
|
||||
final case class ReadTemperature(requestId: Long)
|
||||
final case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
//#read-protocol-2
|
||||
}
|
||||
|
||||
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
|
||||
import Device._
|
||||
|
||||
var lastTemperatureReading: Option[Double] = None
|
||||
|
||||
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
|
||||
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
|
||||
|
||||
override def receive: Receive = {
|
||||
case ReadTemperature(id) =>
|
||||
sender() ! RespondTemperature(id, lastTemperatureReading)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#device-with-read
|
||||
|
||||
}
|
||||
|
||||
object DeviceInProgress3 {
|
||||
|
||||
object Device {
|
||||
//#write-protocol-1
|
||||
final case class RecordTemperature(value: Double)
|
||||
//#write-protocol-1
|
||||
}
|
||||
}
|
||||
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_3
|
||||
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceSpec extends AkkaSpec {
|
||||
|
||||
"Device actor" must {
|
||||
|
||||
//#device-read-test
|
||||
"reply with empty reading if no temperature is known" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
|
||||
val response = probe.expectMsgType[Device.RespondTemperature]
|
||||
response.requestId should ===(42L)
|
||||
response.value should ===(None)
|
||||
}
|
||||
//#device-read-test
|
||||
|
||||
//#device-write-read-test
|
||||
"reply with latest temperature reading" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
|
||||
val response1 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response1.requestId should ===(2L)
|
||||
response1.value should ===(Some(24.0))
|
||||
|
||||
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
|
||||
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
|
||||
val response2 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response2.requestId should ===(4L)
|
||||
response2.value should ===(Some(55.0))
|
||||
}
|
||||
//#device-write-read-test
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,50 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_4
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
|
||||
//#device-with-register
|
||||
object Device {
|
||||
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
|
||||
|
||||
final case class RecordTemperature(requestId: Long, value: Double)
|
||||
final case class TemperatureRecorded(requestId: Long)
|
||||
|
||||
final case class ReadTemperature(requestId: Long)
|
||||
final case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
}
|
||||
|
||||
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
|
||||
import Device._
|
||||
|
||||
var lastTemperatureReading: Option[Double] = None
|
||||
|
||||
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
|
||||
|
||||
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
|
||||
|
||||
override def receive: Receive = {
|
||||
case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) =>
|
||||
sender() ! DeviceManager.DeviceRegistered
|
||||
|
||||
case DeviceManager.RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
groupId,
|
||||
deviceId,
|
||||
this.groupId,
|
||||
this.deviceId)
|
||||
|
||||
case RecordTemperature(id, value) =>
|
||||
log.info("Recorded temperature reading {} with {}", value, id)
|
||||
lastTemperatureReading = Some(value)
|
||||
sender() ! TemperatureRecorded(id)
|
||||
|
||||
case ReadTemperature(id) =>
|
||||
sender() ! RespondTemperature(id, lastTemperatureReading)
|
||||
}
|
||||
}
|
||||
//#device-with-register
|
||||
|
|
@ -1,73 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_4
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import DeviceGroup._
|
||||
import DeviceManager.RequestTrackDevice
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
//#device-group-full
|
||||
//#device-group-register
|
||||
object DeviceGroup {
|
||||
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
|
||||
//#device-group-register
|
||||
|
||||
final case class RequestDeviceList(requestId: Long)
|
||||
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
|
||||
//#device-group-register
|
||||
}
|
||||
//#device-group-register
|
||||
//#device-group-register
|
||||
//#device-group-remove
|
||||
|
||||
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
|
||||
var deviceIdToActor = Map.empty[String, ActorRef]
|
||||
//#device-group-register
|
||||
var actorToDeviceId = Map.empty[ActorRef, String]
|
||||
//#device-group-register
|
||||
|
||||
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
|
||||
|
||||
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
|
||||
|
||||
override def receive: Receive = {
|
||||
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
|
||||
deviceIdToActor.get(trackMsg.deviceId) match {
|
||||
case Some(deviceActor) =>
|
||||
deviceActor.forward(trackMsg)
|
||||
case None =>
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId)
|
||||
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
|
||||
//#device-group-register
|
||||
context.watch(deviceActor)
|
||||
actorToDeviceId += deviceActor -> trackMsg.deviceId
|
||||
//#device-group-register
|
||||
deviceIdToActor += trackMsg.deviceId -> deviceActor
|
||||
deviceActor.forward(trackMsg)
|
||||
}
|
||||
|
||||
case RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId)
|
||||
//#device-group-register
|
||||
//#device-group-remove
|
||||
|
||||
case RequestDeviceList(requestId) =>
|
||||
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
|
||||
//#device-group-remove
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
log.info("Device actor for {} has been terminated", deviceId)
|
||||
actorToDeviceId -= deviceActor
|
||||
deviceIdToActor -= deviceId
|
||||
|
||||
//#device-group-register
|
||||
}
|
||||
}
|
||||
//#device-group-remove
|
||||
//#device-group-register
|
||||
//#device-group-full
|
||||
|
|
@ -1,107 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_4
|
||||
|
||||
import akka.actor.PoisonPill
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceGroupSpec extends AkkaSpec {
|
||||
|
||||
"DeviceGroup actor" must {
|
||||
|
||||
//#device-group-test-registration
|
||||
"be able to register a device actor" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
deviceActor1 should !==(deviceActor2)
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
}
|
||||
|
||||
"ignore requests for wrong groupId" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
|
||||
probe.expectNoMessage()
|
||||
}
|
||||
//#device-group-test-registration
|
||||
|
||||
//#device-group-test3
|
||||
"return same actor for same deviceId" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
deviceActor1 should ===(deviceActor2)
|
||||
}
|
||||
//#device-group-test3
|
||||
|
||||
//#device-group-list-terminate-test
|
||||
"be able to list active devices" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
}
|
||||
|
||||
"be able to list active devices after one shuts down" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val toShutDown = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
|
||||
probe.watch(toShutDown)
|
||||
toShutDown ! PoisonPill
|
||||
probe.expectTerminated(toShutDown)
|
||||
|
||||
// using awaitAssert to retry because it might take longer for the groupActor
|
||||
// to see the Terminated, that order is undefined
|
||||
probe.awaitAssert {
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
|
||||
}
|
||||
}
|
||||
//#device-group-list-terminate-test
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_4
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import DeviceManager.RequestTrackDevice
|
||||
|
||||
//#device-manager-full
|
||||
object DeviceManager {
|
||||
def props(): Props = Props(new DeviceManager)
|
||||
|
||||
//#device-manager-msgs
|
||||
final case class RequestTrackDevice(groupId: String, deviceId: String)
|
||||
case object DeviceRegistered
|
||||
//#device-manager-msgs
|
||||
}
|
||||
|
||||
class DeviceManager extends Actor with ActorLogging {
|
||||
var groupIdToActor = Map.empty[String, ActorRef]
|
||||
var actorToGroupId = Map.empty[ActorRef, String]
|
||||
|
||||
override def preStart(): Unit = log.info("DeviceManager started")
|
||||
|
||||
override def postStop(): Unit = log.info("DeviceManager stopped")
|
||||
|
||||
override def receive = {
|
||||
case trackMsg @ RequestTrackDevice(groupId, _) =>
|
||||
groupIdToActor.get(groupId) match {
|
||||
case Some(ref) =>
|
||||
ref.forward(trackMsg)
|
||||
case None =>
|
||||
log.info("Creating device group actor for {}", groupId)
|
||||
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
|
||||
context.watch(groupActor)
|
||||
groupActor.forward(trackMsg)
|
||||
groupIdToActor += groupId -> groupActor
|
||||
actorToGroupId += groupActor -> groupId
|
||||
}
|
||||
|
||||
case Terminated(groupActor) =>
|
||||
val groupId = actorToGroupId(groupActor)
|
||||
log.info("Device group actor for {} has been terminated", groupId)
|
||||
actorToGroupId -= groupActor
|
||||
groupIdToActor -= groupId
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
//#device-manager-full
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_4
|
||||
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceSpec extends AkkaSpec {
|
||||
|
||||
"Device actor" must {
|
||||
|
||||
//#device-registration-tests
|
||||
"reply to registration requests" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
probe.lastSender should ===(deviceActor)
|
||||
}
|
||||
|
||||
"ignore wrong registration requests" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
|
||||
probe.expectNoMessage()
|
||||
|
||||
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
|
||||
probe.expectNoMessage()
|
||||
}
|
||||
//#device-registration-tests
|
||||
|
||||
"reply with empty reading if no temperature is known" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
|
||||
val response = probe.expectMsgType[Device.RespondTemperature]
|
||||
response.requestId should ===(42L)
|
||||
response.value should ===(None)
|
||||
}
|
||||
|
||||
"reply with latest temperature reading" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
|
||||
val response1 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response1.requestId should ===(2L)
|
||||
response1.value should ===(Some(24.0))
|
||||
|
||||
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
|
||||
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
|
||||
val response2 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response2.requestId should ===(4L)
|
||||
response2.value should ===(Some(55.0))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_5
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
|
||||
object Device {
|
||||
|
||||
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
|
||||
|
||||
final case class RecordTemperature(requestId: Long, value: Double)
|
||||
final case class TemperatureRecorded(requestId: Long)
|
||||
|
||||
final case class ReadTemperature(requestId: Long)
|
||||
final case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
}
|
||||
|
||||
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
|
||||
import Device._
|
||||
|
||||
var lastTemperatureReading: Option[Double] = None
|
||||
|
||||
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
|
||||
|
||||
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
|
||||
|
||||
override def receive: Receive = {
|
||||
case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) =>
|
||||
sender() ! DeviceManager.DeviceRegistered
|
||||
|
||||
case DeviceManager.RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
groupId,
|
||||
deviceId,
|
||||
this.groupId,
|
||||
this.deviceId)
|
||||
|
||||
case RecordTemperature(id, value) =>
|
||||
log.info("Recorded temperature reading {} with {}", value, id)
|
||||
lastTemperatureReading = Some(value)
|
||||
sender() ! TemperatureRecorded(id)
|
||||
|
||||
case ReadTemperature(id) =>
|
||||
sender() ! RespondTemperature(id, lastTemperatureReading)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_5
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import DeviceGroup._
|
||||
import DeviceManager.RequestTrackDevice
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object DeviceGroup {
|
||||
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
|
||||
|
||||
final case class RequestDeviceList(requestId: Long)
|
||||
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
|
||||
|
||||
//#query-protocol
|
||||
final case class RequestAllTemperatures(requestId: Long)
|
||||
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
|
||||
|
||||
sealed trait TemperatureReading
|
||||
final case class Temperature(value: Double) extends TemperatureReading
|
||||
case object TemperatureNotAvailable extends TemperatureReading
|
||||
case object DeviceNotAvailable extends TemperatureReading
|
||||
case object DeviceTimedOut extends TemperatureReading
|
||||
//#query-protocol
|
||||
}
|
||||
|
||||
//#query-added
|
||||
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
|
||||
var deviceIdToActor = Map.empty[String, ActorRef]
|
||||
var actorToDeviceId = Map.empty[ActorRef, String]
|
||||
var nextCollectionId = 0L
|
||||
|
||||
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
|
||||
|
||||
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
|
||||
|
||||
override def receive: Receive = {
|
||||
//#query-added
|
||||
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
|
||||
deviceIdToActor.get(trackMsg.deviceId) match {
|
||||
case Some(ref) =>
|
||||
ref.forward(trackMsg)
|
||||
case None =>
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId)
|
||||
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
|
||||
context.watch(deviceActor)
|
||||
deviceActor.forward(trackMsg)
|
||||
deviceIdToActor += trackMsg.deviceId -> deviceActor
|
||||
actorToDeviceId += deviceActor -> trackMsg.deviceId
|
||||
}
|
||||
|
||||
case RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId)
|
||||
|
||||
case RequestDeviceList(requestId) =>
|
||||
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
log.info("Device actor for {} has been terminated", deviceId)
|
||||
actorToDeviceId -= deviceActor
|
||||
deviceIdToActor -= deviceId
|
||||
|
||||
//#query-added
|
||||
// ... other cases omitted
|
||||
|
||||
case RequestAllTemperatures(requestId) =>
|
||||
context.actorOf(
|
||||
DeviceGroupQuery
|
||||
.props(actorToDeviceId = actorToDeviceId, requestId = requestId, requester = sender(), 3.seconds))
|
||||
}
|
||||
|
||||
}
|
||||
//#query-added
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_5
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
//#query-full
|
||||
//#query-outline
|
||||
object DeviceGroupQuery {
|
||||
case object CollectionTimeout
|
||||
|
||||
def props(
|
||||
actorToDeviceId: Map[ActorRef, String],
|
||||
requestId: Long,
|
||||
requester: ActorRef,
|
||||
timeout: FiniteDuration): Props = {
|
||||
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
|
||||
}
|
||||
}
|
||||
|
||||
class DeviceGroupQuery(
|
||||
actorToDeviceId: Map[ActorRef, String],
|
||||
requestId: Long,
|
||||
requester: ActorRef,
|
||||
timeout: FiniteDuration)
|
||||
extends Actor
|
||||
with ActorLogging {
|
||||
import DeviceGroupQuery._
|
||||
import context.dispatcher
|
||||
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
actorToDeviceId.keysIterator.foreach { deviceActor =>
|
||||
context.watch(deviceActor)
|
||||
deviceActor ! Device.ReadTemperature(0)
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
queryTimeoutTimer.cancel()
|
||||
}
|
||||
|
||||
//#query-outline
|
||||
//#query-state
|
||||
override def receive: Receive =
|
||||
waitingForReplies(Map.empty, actorToDeviceId.keySet)
|
||||
|
||||
def waitingForReplies(
|
||||
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
|
||||
stillWaiting: Set[ActorRef]): Receive = {
|
||||
case Device.RespondTemperature(0, valueOption) =>
|
||||
val deviceActor = sender()
|
||||
val reading = valueOption match {
|
||||
case Some(value) => DeviceGroup.Temperature(value)
|
||||
case None => DeviceGroup.TemperatureNotAvailable
|
||||
}
|
||||
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
|
||||
|
||||
case CollectionTimeout =>
|
||||
val timedOutReplies =
|
||||
stillWaiting.map { deviceActor =>
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
deviceId -> DeviceGroup.DeviceTimedOut
|
||||
}
|
||||
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
|
||||
context.stop(self)
|
||||
}
|
||||
//#query-state
|
||||
|
||||
//#query-collect-reply
|
||||
def receivedResponse(
|
||||
deviceActor: ActorRef,
|
||||
reading: DeviceGroup.TemperatureReading,
|
||||
stillWaiting: Set[ActorRef],
|
||||
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]): Unit = {
|
||||
context.unwatch(deviceActor)
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
val newStillWaiting = stillWaiting - deviceActor
|
||||
|
||||
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
|
||||
if (newStillWaiting.isEmpty) {
|
||||
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
|
||||
context.stop(self)
|
||||
} else {
|
||||
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
|
||||
}
|
||||
}
|
||||
//#query-collect-reply
|
||||
|
||||
//#query-outline
|
||||
}
|
||||
//#query-outline
|
||||
//#query-full
|
||||
|
|
@ -1,154 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_5
|
||||
|
||||
import akka.actor.PoisonPill
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceGroupQuerySpec extends AkkaSpec {
|
||||
|
||||
"DeviceGroupQuery" must {
|
||||
|
||||
//#query-test-normal
|
||||
"return temperature value for working devices" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
|
||||
|
||||
requester.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.Temperature(2.0))))
|
||||
}
|
||||
//#query-test-normal
|
||||
|
||||
//#query-test-no-reading
|
||||
"return TemperatureNotAvailable for devices with no readings" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
|
||||
|
||||
requester.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures =
|
||||
Map("device1" -> DeviceGroup.TemperatureNotAvailable, "device2" -> DeviceGroup.Temperature(2.0))))
|
||||
}
|
||||
//#query-test-no-reading
|
||||
|
||||
//#query-test-stopped
|
||||
"return DeviceNotAvailable if device stops before answering" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
device2.ref ! PoisonPill
|
||||
|
||||
requester.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.DeviceNotAvailable)))
|
||||
}
|
||||
//#query-test-stopped
|
||||
|
||||
//#query-test-stopped-later
|
||||
"return temperature reading even if device stops after answering" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
|
||||
device2.ref ! PoisonPill
|
||||
|
||||
requester.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.Temperature(2.0))))
|
||||
}
|
||||
//#query-test-stopped-later
|
||||
|
||||
//#query-test-timeout
|
||||
"return DeviceTimedOut if device does not answer in time" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(
|
||||
DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 1.second))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
|
||||
requester.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.DeviceTimedOut)))
|
||||
}
|
||||
//#query-test-timeout
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,136 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_5
|
||||
|
||||
import akka.actor.PoisonPill
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceGroupSpec extends AkkaSpec {
|
||||
|
||||
"DeviceGroup actor" must {
|
||||
|
||||
"be able to register a device actor" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
deviceActor1 should !==(deviceActor2)
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
}
|
||||
|
||||
"ignore requests for wrong groupId" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
}
|
||||
|
||||
"return same actor for same deviceId" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
deviceActor1 should ===(deviceActor2)
|
||||
}
|
||||
|
||||
"be able to list active devices" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
}
|
||||
|
||||
"be able to list active devices after one shuts down" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val toShutDown = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
|
||||
probe.watch(toShutDown)
|
||||
toShutDown ! PoisonPill
|
||||
probe.expectTerminated(toShutDown)
|
||||
|
||||
// using awaitAssert to retry because it might take longer for the groupActor
|
||||
// to see the Terminated, that order is undefined
|
||||
probe.awaitAssert {
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
|
||||
}
|
||||
}
|
||||
|
||||
//#group-query-integration-test
|
||||
"be able to collect temperatures from all active devices" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor3 = probe.lastSender
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
// No temperature for device3
|
||||
|
||||
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
|
||||
probe.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 0,
|
||||
temperatures = Map(
|
||||
"device1" -> DeviceGroup.Temperature(1.0),
|
||||
"device2" -> DeviceGroup.Temperature(2.0),
|
||||
"device3" -> DeviceGroup.TemperatureNotAvailable)))
|
||||
}
|
||||
//#group-query-integration-test
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,47 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_5
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import DeviceManager.RequestTrackDevice
|
||||
|
||||
object DeviceManager {
|
||||
def props(): Props = Props(new DeviceManager)
|
||||
|
||||
final case class RequestTrackDevice(groupId: String, deviceId: String)
|
||||
case object DeviceRegistered
|
||||
}
|
||||
|
||||
class DeviceManager extends Actor with ActorLogging {
|
||||
var groupIdToActor = Map.empty[String, ActorRef]
|
||||
var actorToGroupId = Map.empty[ActorRef, String]
|
||||
|
||||
override def preStart(): Unit = log.info("DeviceManager started")
|
||||
|
||||
override def postStop(): Unit = log.info("DeviceManager stopped")
|
||||
|
||||
override def receive = {
|
||||
case trackMsg @ RequestTrackDevice(groupId, _) =>
|
||||
groupIdToActor.get(groupId) match {
|
||||
case Some(ref) =>
|
||||
ref.forward(trackMsg)
|
||||
case None =>
|
||||
log.info("Creating device group actor for {}", groupId)
|
||||
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
|
||||
context.watch(groupActor)
|
||||
groupActor.forward(trackMsg)
|
||||
groupIdToActor += groupId -> groupActor
|
||||
actorToGroupId += groupActor -> groupId
|
||||
}
|
||||
|
||||
case Terminated(groupActor) =>
|
||||
val groupId = actorToGroupId(groupActor)
|
||||
log.info("Device group actor for {} has been terminated", groupId)
|
||||
actorToGroupId -= groupActor
|
||||
groupIdToActor -= groupId
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue