remove unreferenced akka-docs/src/test/*/tutorial_6 (#25363) (#25364)

This commit is contained in:
Roman Filonenko 2018-07-19 08:22:10 +02:00 committed by Konrad `ktoso` Malawski
parent 862a66ecc7
commit 4a8368bfe0
18 changed files with 0 additions and 1683 deletions

View file

@ -1,103 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import jdocs.tutorial_6.DeviceManager.DeviceRegistered;
import jdocs.tutorial_6.DeviceManager.RequestTrackDevice;
import java.util.Optional;
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
public static final class RecordTemperature {
final long requestId;
final double value;
public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature {
final long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, r -> {
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
getSender().tell(new DeviceRegistered(), getSelf());
} else {
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
r.groupId, r.deviceId, this.groupId, this.deviceId
);
}
})
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.build();
}
}

View file

@ -1,156 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class DeviceGroup extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
public DeviceGroup(String groupId) {
this.groupId = groupId;
}
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
public static final class RequestDeviceList {
final long requestId;
public RequestDeviceList(long requestId) {
this.requestId = requestId;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
public static final class RequestAllTemperatures {
final long requestId;
public RequestAllTemperatures(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondAllTemperatures {
final long requestId;
final Map<String, TemperatureReading> temperatures;
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
this.requestId = requestId;
this.temperatures = temperatures;
}
}
public static interface TemperatureReading {
}
public static final class Temperature implements TemperatureReading {
public final double value;
public Temperature(double value) {
this.value = value;
}
}
public static final class TemperatureNotAvailable implements TemperatureReading {
}
public static final class DeviceNotAvailable implements TemperatureReading {
}
public static final class DeviceTimedOut implements TemperatureReading {
}
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
final long nextCollectionId = 0L;
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef ref = deviceIdToActor.get(trackMsg.deviceId);
if (ref != null) {
ref.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
ActorRef deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
getContext().watch(deviceActor);
deviceActor.forward(trackMsg, getContext());
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
private void onAllTemperatures(RequestAllTemperatures r) {
// since Java collections are mutable, we want to avoid sharing them between actors (since multiple Actors (threads)
// modifying the same mutable data-structure is not safe), and perform a defensive copy of the mutable map:
//
// Feel free to use your favourite immutable data-structures library with Akka in Java applications!
Map<ActorRef, String> actorToDeviceIdCopy = new HashMap<>(this.actorToDeviceId);
getContext().actorOf(DeviceGroupQuery.props(
actorToDeviceIdCopy, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS)));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
.match(RequestDeviceList.class, this::onDeviceList)
.match(Terminated.class, this::onTerminated)
.match(RequestAllTemperatures.class, this::onAllTemperatures)
.build();
}
}

View file

@ -1,109 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class DeviceGroupQuery extends AbstractActor {
public static final class CollectionTimeout {
}
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final Map<ActorRef, String> actorToDeviceId;
final long requestId;
final ActorRef requester;
Cancellable queryTimeoutTimer;
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
this.actorToDeviceId = actorToDeviceId;
this.requestId = requestId;
this.requester = requester;
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
);
}
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
}
@Override
public void preStart() {
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
getContext().watch(deviceActor);
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
}
}
@Override
public void postStop() {
queryTimeoutTimer.cancel();
}
@Override
public Receive createReceive() {
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
}
public Receive waitingForReplies(
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
Set<ActorRef> stillWaiting) {
return receiveBuilder()
.match(Device.RespondTemperature.class, r -> {
ActorRef deviceActor = getSender();
DeviceGroup.TemperatureReading reading = r.value
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
.orElse(new DeviceGroup.TemperatureNotAvailable());
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
})
.match(Terminated.class, t -> {
if (stillWaiting.contains(t.getActor())) {
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
}
// else ignore
})
.match(CollectionTimeout.class, t -> {
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
for (ActorRef deviceActor : stillWaiting) {
String deviceId = actorToDeviceId.get(deviceActor);
replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
}
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
getContext().stop(getSelf());
})
.build();
}
public void receivedResponse(ActorRef deviceActor,
DeviceGroup.TemperatureReading reading,
Set<ActorRef> stillWaiting,
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
getContext().unwatch(deviceActor);
String deviceId = actorToDeviceId.get(deviceActor);
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
newStillWaiting.remove(deviceActor);
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
newRepliesSoFar.put(deviceId, reading);
if (newStillWaiting.isEmpty()) {
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
getContext().stop(getSelf());
} else {
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
}
}
}

View file

@ -1,220 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class DeviceGroupQueryTest extends JUnitSuite {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testReturnTemperatureValueForWorkingDevices() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
@Test
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef());
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable());
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
@Test
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable());
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
@Test
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
@Test
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(1, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(
java.time.Duration.ofSeconds(5),
DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut());
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
public static void assertEqualTemperatures(Map<String, DeviceGroup.TemperatureReading> expected, Map<String, DeviceGroup.TemperatureReading> actual) {
for (Map.Entry<String, DeviceGroup.TemperatureReading> entry : expected.entrySet()) {
DeviceGroup.TemperatureReading expectedReading = entry.getValue();
DeviceGroup.TemperatureReading actualReading = actual.get(entry.getKey());
assertNotNull(actualReading);
assertEquals(expectedReading.getClass(), actualReading.getClass());
if (expectedReading instanceof DeviceGroup.Temperature) {
assertEquals(((DeviceGroup.Temperature) expectedReading).value, ((DeviceGroup.Temperature) actualReading).value, 0.01);
}
}
assertEquals(expected.size(), actual.size());
}
}

View file

@ -1,171 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static jdocs.tutorial_6.DeviceGroupQueryTest.assertEqualTemperatures;
public class DeviceGroupTest extends JUnitSuite {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testRegisterDeviceActor() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor1 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor2 = probe.getLastSender();
assertNotEquals(deviceActor1, deviceActor2);
// Check that the device actors are working
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
}
@Test
public void testIgnoreRequestsForWrongGroupId() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
probe.expectNoMsg();
}
@Test
public void testReturnSameActorForSameDeviceId() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor1 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor2 = probe.getLastSender();
assertEquals(deviceActor1, deviceActor2);
}
@Test
public void testListActiveDevices() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
}
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef toShutDown = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
probe.watch(toShutDown);
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
probe.expectTerminated(toShutDown);
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert(() -> {
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
DeviceGroup.ReplyDeviceList r =
probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(1L, r.requestId);
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
return null;
});
}
@Test
public void testCollectTemperaturesFromAllActiveDevices() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor1 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor2 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor3 = probe.getLastSender();
// Check that the device actors are working
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
// No temperature for device 3
groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef());
DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(0L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable());
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
}

View file

@ -1,80 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.HashMap;
import java.util.Map;
public class DeviceManager extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(DeviceManager.class);
}
public static final class RequestTrackDevice {
public final String groupId;
public final String deviceId;
public RequestTrackDevice(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
}
public static final class DeviceRegistered {
}
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
@Override
public void preStart() {
log.info("DeviceManager started");
}
@Override
public void postStop() {
log.info("DeviceManager stopped");
}
private void onTrackDevice(RequestTrackDevice trackMsg) {
String groupId = trackMsg.groupId;
ActorRef ref = groupIdToActor.get(groupId);
if (ref != null) {
ref.forward(trackMsg, getContext());
} else {
log.info("Creating device group actor for {}", groupId);
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
getContext().watch(groupActor);
groupActor.forward(trackMsg, getContext());
groupIdToActor.put(groupId, groupActor);
actorToGroupId.put(groupActor, groupId);
}
}
private void onTerminated(Terminated t) {
ActorRef groupActor = t.getActor();
String groupId = actorToGroupId.get(groupActor);
log.info("Device group actor for {} has been terminated", groupId);
actorToGroupId.remove(groupActor);
groupIdToActor.remove(groupId);
}
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, this::onTrackDevice)
.match(Terminated.class, this::onTerminated)
.build();
}
}

View file

@ -1,90 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import java.util.Optional;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.scalatest.junit.JUnitSuite;
public class DeviceTest extends JUnitSuite {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testReplyToRegistrationRequests() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
assertEquals(deviceActor, probe.getLastSender());
}
@Test
public void testIgnoreWrongRegistrationRequests() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
probe.expectNoMsg();
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
probe.expectNoMsg();
}
@Test
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
assertEquals(42L, response.requestId);
assertEquals(Optional.empty(), response.value);
}
@Test
public void testReplyWithLatestTemperatureReading() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
assertEquals(2L, response1.requestId);
assertEquals(Optional.of(24.0), response1.value);
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
assertEquals(4L, response2.requestId);
assertEquals(Optional.of(55.0), response2.value);
}
}

View file

@ -1,30 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.io.IOException;
public class IotMain {
public static void main(String[] args) throws IOException {
ActorSystem system = ActorSystem.create("iot-system");
try {
// Create top level supervisor
ActorRef supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor");
supervisor.tell(new DeviceManager.RequestTrackDevice("mygroup", "device1"), ActorRef.noSender());
System.out.println("Press ENTER to exit the system");
System.in.read();
} finally {
system.terminate();
}
}
}

View file

@ -1,39 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.tutorial_6;
//#iot-supervisor
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class IotSupervisor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(IotSupervisor.class);
}
@Override
public void preStart() {
log.info("IoT Application started");
}
@Override
public void postStop() {
log.info("IoT Application stopped");
}
// No need to handle any messages
@Override
public Receive createReceive() {
return receiveBuilder()
.build();
}
}
//#iot-supervisor

View file

@ -1,47 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
final case class TemperatureRecorded(requestId: Long)
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`)
sender() ! DeviceManager.DeviceRegistered
case DeviceManager.RequestTrackDevice(groupId, deviceId)
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
groupId, deviceId, this.groupId, this.deviceId
)
case RecordTemperature(id, value)
log.info("Recorded temperature reading {} with {}", value, id)
lastTemperatureReading = Some(value)
sender() ! TemperatureRecorded(id)
case ReadTemperature(id)
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}

View file

@ -1,77 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import DeviceGroup._
import DeviceManager.RequestTrackDevice
import scala.concurrent.duration._
object DeviceGroup {
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
final case class RequestDeviceList(requestId: Long)
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
final case class RequestAllTemperatures(requestId: Long)
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
sealed trait TemperatureReading
final case class Temperature(value: Double) extends TemperatureReading
case object TemperatureNotAvailable extends TemperatureReading
case object DeviceNotAvailable extends TemperatureReading
case object DeviceTimedOut extends TemperatureReading
}
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
var actorToDeviceId = Map.empty[ActorRef, String]
var nextCollectionId = 0L
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
// Note the backticks
case trackMsg @ RequestTrackDevice(`groupId`, _)
deviceIdToActor.get(trackMsg.deviceId) match {
case Some(ref)
ref forward trackMsg
case None
log.info("Creating device actor for {}", trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
context.watch(deviceActor)
deviceActor forward trackMsg
deviceIdToActor += trackMsg.deviceId -> deviceActor
actorToDeviceId += deviceActor -> trackMsg.deviceId
}
case RequestTrackDevice(groupId, deviceId)
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
)
case RequestDeviceList(requestId)
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
case Terminated(deviceActor)
val deviceId = actorToDeviceId(deviceActor)
log.info("Device actor for {} has been terminated", deviceId)
actorToDeviceId -= deviceActor
deviceIdToActor -= deviceId
case RequestAllTemperatures(requestId)
context.actorOf(DeviceGroupQuery.props(
actorToDeviceId = actorToDeviceId,
requestId = requestId,
requester = sender(),
3.seconds
))
}
}

View file

@ -1,99 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.Actor.Receive
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import scala.concurrent.duration._
object DeviceGroupQuery {
case object CollectionTimeout
def props(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
): Props = {
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
}
}
class DeviceGroupQuery(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
) extends Actor with ActorLogging {
import DeviceGroupQuery._
import context.dispatcher
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
override def preStart(): Unit = {
actorToDeviceId.keysIterator.foreach { deviceActor
context.watch(deviceActor)
deviceActor ! Device.ReadTemperature(0)
}
}
override def postStop(): Unit = {
queryTimeoutTimer.cancel()
}
override def receive: Receive =
waitingForReplies(
Map.empty,
actorToDeviceId.keySet
)
def waitingForReplies(
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
stillWaiting: Set[ActorRef]
): Receive = {
case Device.RespondTemperature(0, valueOption)
val deviceActor = sender()
val reading = valueOption match {
case Some(value) DeviceGroup.Temperature(value)
case None DeviceGroup.TemperatureNotAvailable
}
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
case Terminated(deviceActor)
if (stillWaiting.contains(deviceActor))
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
// else ignore
case CollectionTimeout
val timedOutReplies =
stillWaiting.map { deviceActor
val deviceId = actorToDeviceId(deviceActor)
deviceId -> DeviceGroup.DeviceTimedOut
}
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
context.stop(self)
}
def receivedResponse(
deviceActor: ActorRef,
reading: DeviceGroup.TemperatureReading,
stillWaiting: Set[ActorRef],
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
): Unit = {
val deviceId = actorToDeviceId(deviceActor)
val newStillWaiting = stillWaiting - deviceActor
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
if (newStillWaiting.isEmpty) {
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
context.stop(self)
} else {
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
}
}
}

View file

@ -1,158 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.PoisonPill
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceGroupQuerySpec extends AkkaSpec {
"DeviceGroupQuery" must {
"return temperature value for working devices" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
"return TemperatureNotAvailable for devices with no readings" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.TemperatureNotAvailable,
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
"return DeviceNotAvailable if device stops before answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceNotAvailable
)
))
}
"return temperature reading even if device stops after answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
"return DeviceTimedOut if device does not answer in time" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 1.second
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceTimedOut
)
))
}
}
}

View file

@ -1,134 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.PoisonPill
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceGroupSpec extends AkkaSpec {
"DeviceGroup actor" must {
"be able to register a device actor" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
deviceActor1 should !==(deviceActor2)
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
}
"ignore requests for wrong groupId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
"return same actor for same deviceId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
deviceActor1 should ===(deviceActor2)
}
"be able to list active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
}
"be able to list active devices after one shuts down" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val toShutDown = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
probe.watch(toShutDown)
toShutDown ! PoisonPill
probe.expectTerminated(toShutDown)
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert {
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
}
}
"be able to collect temperatures from all active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor3 = probe.lastSender
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
// No temperature for device3
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
probe.expectMsg(
DeviceGroup.RespondAllTemperatures(
requestId = 0,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0),
"device3" -> DeviceGroup.TemperatureNotAvailable)))
}
}
}

View file

@ -1,47 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import DeviceManager.RequestTrackDevice
object DeviceManager {
def props(): Props = Props(new DeviceManager)
final case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered
}
class DeviceManager extends Actor with ActorLogging {
var groupIdToActor = Map.empty[String, ActorRef]
var actorToGroupId = Map.empty[ActorRef, String]
override def preStart(): Unit = log.info("DeviceManager started")
override def postStop(): Unit = log.info("DeviceManager stopped")
override def receive = {
case trackMsg @ RequestTrackDevice(groupId, _)
groupIdToActor.get(groupId) match {
case Some(ref)
ref forward trackMsg
case None
log.info("Creating device group actor for {}", groupId)
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
context.watch(groupActor)
groupActor forward trackMsg
groupIdToActor += groupId -> groupActor
actorToGroupId += groupActor -> groupId
}
case Terminated(groupActor)
val groupId = actorToGroupId(groupActor)
log.info("Device group actor for {} has been terminated", groupId)
actorToGroupId -= groupActor
groupIdToActor -= groupId
}
}

View file

@ -1,68 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceSpec extends AkkaSpec {
"Device actor" must {
"reply to registration requests" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
probe.lastSender should ===(deviceActor)
}
"ignore wrong registration requests" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
probe.expectNoMsg(500.milliseconds)
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
"reply with empty reading if no temperature is known" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
val response = probe.expectMsgType[Device.RespondTemperature]
response.requestId should ===(42)
response.value should ===(None)
}
"reply with latest temperature reading" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
val response1 = probe.expectMsgType[Device.RespondTemperature]
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
val response2 = probe.expectMsgType[Device.RespondTemperature]
response2.requestId should ===(4)
response2.value should ===(Some(55.0))
}
}
}

View file

@ -1,30 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.ActorSystem
import DeviceManager.RequestTrackDevice
import scala.io.StdIn
object IotApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("iot-system")
try {
// Create top level supervisor
val supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor")
supervisor ! RequestTrackDevice("mygroup", "device1")
// Exit the system after ENTER is pressed
StdIn.readLine()
} finally {
system.terminate()
}
}
}

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package tutorial_6
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
object IotSupervisor {
def props(): Props = Props(new IotSupervisor)
}
class IotSupervisor extends Actor with ActorLogging {
val deviceManager: ActorRef = context.system.actorOf(DeviceManager.props(), "device-manager")
override def preStart(): Unit = log.info("IoT Application started")
override def postStop(): Unit = log.info("IoT Application stopped")
// No need to handle any messages
override def receive = Actor.emptyBehavior
}