diff --git a/akka-docs-new/src/test/java/jdocs/quickstart/HelloWorldActor.java b/akka-docs-new/src/test/java/jdocs/quickstart/HelloWorldActor.java new file mode 100644 index 0000000000..d467c426d0 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/quickstart/HelloWorldActor.java @@ -0,0 +1,16 @@ +package jdocs.quickstart; + +import akka.actor.AbstractActor; +import akka.actor.AbstractActor.Receive; + +//#actor-impl +public class HelloWorldActor extends AbstractActor { + public Receive createReceive() { + return receiveBuilder() + .match(String.class, msg -> { + System.out.println("Hello " + msg); + }) + .build(); + } +} +//#actor-impl \ No newline at end of file diff --git a/akka-docs-new/src/test/java/jdocs/quickstart/HelloWorldMain.java b/akka-docs-new/src/test/java/jdocs/quickstart/HelloWorldMain.java new file mode 100644 index 0000000000..bbf7ec4213 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/quickstart/HelloWorldMain.java @@ -0,0 +1,29 @@ +//#full-example +package jdocs.quickstart; + +import java.io.IOException; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.ActorSystem; + +public class HelloWorldMain { + public static void main(String[] args) throws IOException { + //#create-send + ActorSystem system = ActorSystem.create("hello-world-actor-system"); + try { + // Create hello world actor + ActorRef helloWorldActor = system.actorOf(Props.create(HelloWorldActor.class), "HelloWorldActor"); + // Send message to actor + helloWorldActor.tell("World", ActorRef.noSender()); + + System.out.println("Press ENTER to exit the system"); + System.in.read(); + } finally { + system.terminate(); + } + //#create-send + } +} + +//#full-example diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_1/IotMain.java b/akka-docs-new/src/test/java/jdocs/tutorial_1/IotMain.java new file mode 100644 index 0000000000..49b49d9b23 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_1/IotMain.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_1; + +//#iot-app + +import java.io.IOException; + +import akka.actor.ActorSystem; +import akka.actor.ActorRef; + +public class IotMain { + + public static void main(String[] args) throws IOException { + ActorSystem system = ActorSystem.create("iot-system"); + + try { + // Create top level supervisor + ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor"); + + System.out.println("Press ENTER to exit the system"); + System.in.read(); + } finally { + system.terminate(); + } + } + +} +//#iot-app diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_1/IotSupervisor.java b/akka-docs-new/src/test/java/jdocs/tutorial_1/IotSupervisor.java new file mode 100644 index 0000000000..07f3d28c51 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_1/IotSupervisor.java @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_1; + +//#iot-supervisor + +import akka.actor.AbstractActor; +import akka.actor.ActorLogging; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +public class IotSupervisor extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + public static Props props() { + return Props.create(IotSupervisor.class); + } + + @Override + public void preStart() { + log.info("IoT Application started"); + } + + @Override + public void postStop() { + log.info("IoT Application stopped"); + } + + // No need to handle any messages + @Override + public Receive createReceive() { + return receiveBuilder() + .build(); + } + +} +//#iot-supervisor diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_2/Device.java b/akka-docs-new/src/test/java/jdocs/tutorial_2/Device.java new file mode 100644 index 0000000000..8d3f51b298 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_2/Device.java @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_2; + +//#full-device + +import java.util.Optional; + +import akka.actor.AbstractActor; +import akka.actor.AbstractActor.Receive; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +public class Device extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + final String deviceId; + + public Device(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + + public static Props props(String groupId, String deviceId) { + return Props.create(Device.class, groupId, deviceId); + } + + public static final class RecordTemperature { + final long requestId; + final double value; + + public RecordTemperature(long requestId, double value) { + this.requestId = requestId; + this.value = value; + } + } + + public static final class TemperatureRecorded { + final long requestId; + + public TemperatureRecorded(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReadTemperature { + final long requestId; + + public ReadTemperature(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondTemperature { + final long requestId; + final Optional value; + + public RespondTemperature(long requestId, Optional value) { + this.requestId = requestId; + this.value = value; + } + } + + Optional lastTemperatureReading = Optional.empty(); + + @Override + public void preStart() { + log.info("Device actor {}-{} started", groupId, deviceId); + } + + @Override + public void postStop() { + log.info("Device actor {}-{} stopped", groupId, deviceId); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(RecordTemperature.class, r -> { + log.info("Recorded temperature reading {} with {}", r.value, r.requestId); + lastTemperatureReading = Optional.of(r.value); + getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); + }) + .match(ReadTemperature.class, r -> { + getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); + }) + .build(); + } +} +//#full-device diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress.java b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress.java new file mode 100644 index 0000000000..6b1801c3a5 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress.java @@ -0,0 +1,26 @@ +package jdocs.tutorial_2; + +import java.util.Optional; + +import jdocs.tutorial_2.Device.ReadTemperature; +import jdocs.tutorial_2.Device.RecordTemperature; +import jdocs.tutorial_2.Device.RespondTemperature; +import jdocs.tutorial_2.Device.TemperatureRecorded; + +class DeviceInProgress1 { + + //#read-protocol-1 + public static final class ReadTemperature { + } + + public static final class RespondTemperature { + final Optional value; + + public RespondTemperature(Optional value) { + this.value = value; + } + } + //#read-protocol-1 + +} + diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress2.java b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress2.java new file mode 100644 index 0000000000..2a46359392 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress2.java @@ -0,0 +1,71 @@ +package jdocs.tutorial_2.inprogress2; + +//#device-with-read + +import java.util.Optional; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +class Device extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + final String deviceId; + + public Device(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + + public static Props props(String groupId, String deviceId) { + return Props.create(Device.class, groupId, deviceId); + } + + //#read-protocol-2 + public static final class ReadTemperature { + long requestId; + + public ReadTemperature(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondTemperature { + long requestId; + Optional value; + + public RespondTemperature(long requestId, Optional value) { + this.requestId = requestId; + this.value = value; + } + } + //#read-protocol-2 + + Optional lastTemperatureReading = Optional.empty(); + + @Override + public void preStart() { + log.info("Device actor {}-{} started", groupId, deviceId); + } + + @Override + public void postStop() { + log.info("Device actor {}-{} stopped", groupId, deviceId); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(ReadTemperature.class, r -> { + getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); + }) + .build(); + } + +} + +//#device-with-read diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress3.java b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress3.java new file mode 100644 index 0000000000..6deb67678a --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceInProgress3.java @@ -0,0 +1,14 @@ +package jdocs.tutorial_2; + +class DeviceInProgress3 { + + //#write-protocol-1 + public static final class RecordTemperature { + final double value; + + public RecordTemperature(double value) { + this.value = value; + } + } + //#write-protocol-1 +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceTest.java new file mode 100644 index 0000000000..0cd3bb5bea --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_2/DeviceTest.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_2; + +import java.util.Optional; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import org.scalatest.junit.JUnitSuite; + +import akka.actor.ActorSystem; +import akka.actor.ActorRef; +import akka.testkit.javadsl.TestKit; + +public class DeviceTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + //#device-read-test + @Test + public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); + Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(42L, response.requestId); + assertEquals(Optional.empty(), response.value); + } + //#device-read-test + + //#device-write-read-test + @Test + public void testReplyWithLatestTemperatureReading() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); + Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(2L, response1.requestId); + assertEquals(Optional.of(24.0), response1.value); + + deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); + assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); + Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(4L, response2.requestId); + assertEquals(Optional.of(55.0), response2.value); + } + //#device-write-read-test + +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_3/Device.java b/akka-docs-new/src/test/java/jdocs/tutorial_3/Device.java new file mode 100644 index 0000000000..3bc1eae1a8 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_3/Device.java @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_3; + +//#device-with-register + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import jdocs.tutorial_3.DeviceManager.DeviceRegistered; +import jdocs.tutorial_3.DeviceManager.RequestTrackDevice; + +import java.util.Optional; + +public class Device extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + final String deviceId; + + public Device(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + + public static Props props(String groupId, String deviceId) { + return Props.create(Device.class, groupId, deviceId); + } + + public static final class RecordTemperature { + final long requestId; + final double value; + + public RecordTemperature(long requestId, double value) { + this.requestId = requestId; + this.value = value; + } + } + + public static final class TemperatureRecorded { + final long requestId; + + public TemperatureRecorded(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReadTemperature { + final long requestId; + + public ReadTemperature(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondTemperature { + final long requestId; + final Optional value; + + public RespondTemperature(long requestId, Optional value) { + this.requestId = requestId; + this.value = value; + } + } + + Optional lastTemperatureReading = Optional.empty(); + + @Override + public void preStart() { + log.info("Device actor {}-{} started", groupId, deviceId); + } + + @Override + public void postStop() { + log.info("Device actor {}-{} stopped", groupId, deviceId); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(RequestTrackDevice.class, r -> { + if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { + getSender().tell(new DeviceRegistered(), getSelf()); + } else { + log.warning( + "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", + r.groupId, r.deviceId, this.groupId, this.deviceId + ); + } + }) + .match(RecordTemperature.class, r -> { + log.info("Recorded temperature reading {} with {}", r.value, r.requestId); + lastTemperatureReading = Optional.of(r.value); + getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); + }) + .match(ReadTemperature.class, r -> { + getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); + }) + .build(); + } +} +//#device-with-register diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceGroup.java b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceGroup.java new file mode 100644 index 0000000000..7a67b34c18 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceGroup.java @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_3; + +import java.util.Set; +import java.util.Map; +import java.util.HashMap; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import jdocs.tutorial_3.Device; +import jdocs.tutorial_3.DeviceManager; + +//#device-group-full +public class DeviceGroup extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + public DeviceGroup(String groupId) { + this.groupId = groupId; + } + + //#device-group-register + public static Props props(String groupId) { + return Props.create(DeviceGroup.class, groupId); + } + //#device-group-register + + public static final class RequestDeviceList { + final long requestId; + + public RequestDeviceList(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReplyDeviceList { + final long requestId; + final Set ids; + + public ReplyDeviceList(long requestId, Set ids) { + this.requestId = requestId; + this.ids = ids; + } + } + //#device-group-register +//#device-group-register +//#device-group-register +//#device-group-remove + + final Map deviceIdToActor = new HashMap<>(); + //#device-group-register + final Map actorToDeviceId = new HashMap<>(); + //#device-group-register + + @Override + public void preStart() { + log.info("DeviceGroup {} started", groupId); + } + + @Override + public void postStop() { + log.info("DeviceGroup {} stopped", groupId); + } + + private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { + if (this.groupId.equals(trackMsg.groupId)) { + ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); + if (deviceActor != null) { + deviceActor.forward(trackMsg, getContext()); + } else { + log.info("Creating device actor for {}", trackMsg.deviceId); + deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); + //#device-group-register + getContext().watch(deviceActor); + actorToDeviceId.put(deviceActor, trackMsg.deviceId); + //#device-group-register + deviceIdToActor.put(trackMsg.deviceId, deviceActor); + deviceActor.forward(trackMsg, getContext()); + } + } else { + log.warning( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, this.groupId + ); + } + } + + private void onDeviceList(RequestDeviceList r) { + getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); + } + + private void onTerminated(Terminated t) { + ActorRef deviceActor = t.getActor(); + String deviceId = actorToDeviceId.get(deviceActor); + log.info("Device actor for {} has been terminated", deviceId); + actorToDeviceId.remove(deviceActor); + deviceIdToActor.remove(deviceId); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) + .match(RequestDeviceList.class, this::onDeviceList) + .match(Terminated.class, this::onTerminated) + .build(); + } +} +//#device-group-remove +//#device-group-register +//#device-group-full diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceGroupTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceGroupTest.java new file mode 100644 index 0000000000..dfd40e9976 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceGroupTest.java @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_3; + +import java.util.stream.Stream; +import java.util.stream.Collectors; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.scalatest.junit.JUnitSuite; + +public class DeviceGroupTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + //#device-group-test-registration + @Test + public void testRegisterDeviceActor() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + assertNotEquals(deviceActor1, deviceActor2); + + // Check that the device actors are workingl + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); + assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + } + + @Test + public void testIgnoreRequestsForWrongGroupId() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef()); + probe.expectNoMsg(); + } + //#device-group-test-registration + + //#device-group-test3 + @Test + public void testReturnSameActorForSameDeviceId() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + assertEquals(deviceActor1, deviceActor2); + } + //#device-group-test3 + + //#device-group-list-terminate-test + @Test + public void testListActiveDevices() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); + DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + } + + @Test + public void teestListActiveDevicesAfterOneShutsDown() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef toShutDown = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); + DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + + probe.watch(toShutDown); + toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender()); + probe.expectTerminated(toShutDown); + + groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef()); + reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(1L, reply.requestId); + assertEquals(Stream.of("device2").collect(Collectors.toSet()), reply.ids); + } + //#device-group-list-terminate-test +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceManager.java b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceManager.java new file mode 100644 index 0000000000..8a1fb813ac --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceManager.java @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.tutorial_3; + +import java.util.Map; +import java.util.HashMap; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +//#device-manager-full +public class DeviceManager extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + public static Props props() { + return Props.create(DeviceManager.class); + } + + //#device-manager-msgs + public static final class RequestTrackDevice { + public final String groupId; + public final String deviceId; + + public RequestTrackDevice(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + } + + public static final class DeviceRegistered { + } + + //#device-manager-msgs + final Map groupIdToActor = new HashMap<>(); + final Map actorToGroupId = new HashMap<>(); + + @Override + public void preStart() { + log.info("DeviceManager started"); + } + + @Override + public void postStop() { + log.info("DeviceManager stopped"); + } + + private void onTrackDevice(RequestTrackDevice trackMsg) { + String groupId = trackMsg.groupId; + ActorRef ref = groupIdToActor.get(groupId); + if (ref != null) { + ref.forward(trackMsg, getContext()); + } else { + log.info("Creating device group actor for {}", groupId); + ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId); + getContext().watch(groupActor); + groupActor.forward(trackMsg, getContext()); + groupIdToActor.put(groupId, groupActor); + actorToGroupId.put(groupActor, groupId); + } + } + + private void onTerminated(Terminated t) { + ActorRef groupActor = t.getActor(); + String groupId = actorToGroupId.get(groupActor); + log.info("Device group actor for {} has been terminated", groupId); + actorToGroupId.remove(groupActor); + groupIdToActor.remove(groupId); + } + + public Receive createReceive() { + return receiveBuilder() + .match(RequestTrackDevice.class, this::onTrackDevice) + .match(Terminated.class, this::onTerminated) + .build(); + } + +} +//#device-manager-full diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceTest.java new file mode 100644 index 0000000000..156641e3de --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_3/DeviceTest.java @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_3; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import org.scalatest.junit.JUnitSuite; + +import java.util.Optional; + +public class DeviceTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + //#device-registration-tests + @Test + public void testReplyToRegistrationRequests() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + assertEquals(deviceActor, probe.getLastSender()); + } + + @Test + public void testIgnoreWrongRegistrationRequests() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef()); + probe.expectNoMsg(); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef()); + probe.expectNoMsg(); + } + //#device-registration-tests + + //#device-read-test + @Test + public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); + Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(42L, response.requestId); + assertEquals(Optional.empty(), response.value); + } + //#device-read-test + + //#device-write-read-test + @Test + public void testReplyWithLatestTemperatureReading() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); + Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(2L, response1.requestId); + assertEquals(Optional.of(24.0), response1.value); + + deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); + assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); + Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(4L, response2.requestId); + assertEquals(Optional.of(55.0), response2.value); + } + //#device-write-read-test + +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/Device.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/Device.java new file mode 100644 index 0000000000..766dcd37a3 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/Device.java @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_4; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import jdocs.tutorial_4.DeviceManager.DeviceRegistered; +import jdocs.tutorial_4.DeviceManager.RequestTrackDevice; + +import java.util.Optional; + +public class Device extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + final String deviceId; + + public Device(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + + public static Props props(String groupId, String deviceId) { + return Props.create(Device.class, groupId, deviceId); + } + + public static final class RecordTemperature { + final long requestId; + final double value; + + public RecordTemperature(long requestId, double value) { + this.requestId = requestId; + this.value = value; + } + } + + public static final class TemperatureRecorded { + final long requestId; + + public TemperatureRecorded(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReadTemperature { + final long requestId; + + public ReadTemperature(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondTemperature { + final long requestId; + final Optional value; + + public RespondTemperature(long requestId, Optional value) { + this.requestId = requestId; + this.value = value; + } + } + + Optional lastTemperatureReading = Optional.empty(); + + @Override + public void preStart() { + log.info("Device actor {}-{} started", groupId, deviceId); + } + + @Override + public void postStop() { + log.info("Device actor {}-{} stopped", groupId, deviceId); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(RequestTrackDevice.class, r -> { + if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { + getSender().tell(new DeviceRegistered(), getSelf()); + } else { + log.warning( + "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", + r.groupId, r.deviceId, this.groupId, this.deviceId + ); + } + }) + .match(RecordTemperature.class, r -> { + log.info("Recorded temperature reading {} with {}", r.value, r.requestId); + lastTemperatureReading = Optional.of(r.value); + getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); + }) + .match(ReadTemperature.class, r -> { + getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); + }) + .build(); + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroup.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroup.java new file mode 100644 index 0000000000..3e41da0e8b --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroup.java @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_4; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +//#query-added +public class DeviceGroup extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + public DeviceGroup(String groupId) { + this.groupId = groupId; + } + + public static Props props(String groupId) { + return Props.create(DeviceGroup.class, groupId); + } + + public static final class RequestDeviceList { + final long requestId; + + public RequestDeviceList(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReplyDeviceList { + final long requestId; + final Set ids; + + public ReplyDeviceList(long requestId, Set ids) { + this.requestId = requestId; + this.ids = ids; + } + } + + //#query-protocol + public static final class RequestAllTemperatures { + final long requestId; + + public RequestAllTemperatures(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondAllTemperatures { + final long requestId; + final Map temperatures; + + public RespondAllTemperatures(long requestId, Map temperatures) { + this.requestId = requestId; + this.temperatures = temperatures; + } + } + + public static interface TemperatureReading { + } + + public static final class Temperature implements TemperatureReading { + public final double value; + + public Temperature(double value) { + this.value = value; + } + } + + public static final class TemperatureNotAvailable implements TemperatureReading { + } + + public static final class DeviceNotAvailable implements TemperatureReading { + } + + public static final class DeviceTimedOut implements TemperatureReading { + } + //#query-protocol + + + final Map deviceIdToActor = new HashMap<>(); + final Map actorToDeviceId = new HashMap<>(); + final long nextCollectionId = 0L; + + @Override + public void preStart() { + log.info("DeviceGroup {} started", groupId); + } + + @Override + public void postStop() { + log.info("DeviceGroup {} stopped", groupId); + } + + //#query-added + private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { + if (this.groupId.equals(trackMsg.groupId)) { + ActorRef ref = deviceIdToActor.get(trackMsg.deviceId); + if (ref != null) { + ref.forward(trackMsg, getContext()); + } else { + log.info("Creating device actor for {}", trackMsg.deviceId); + ActorRef deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); + getContext().watch(deviceActor); + deviceActor.forward(trackMsg, getContext()); + actorToDeviceId.put(deviceActor, trackMsg.deviceId); + deviceIdToActor.put(trackMsg.deviceId, deviceActor); + } + } else { + log.warning( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, this.groupId + ); + } + } + + private void onDeviceList(RequestDeviceList r) { + getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); + } + + private void onTerminated(Terminated t) { + ActorRef deviceActor = t.getActor(); + String deviceId = actorToDeviceId.get(deviceActor); + log.info("Device actor for {} has been terminated", deviceId); + actorToDeviceId.remove(deviceActor); + deviceIdToActor.remove(deviceId); + } + //#query-added + + private void onAllTemperatures(RequestAllTemperatures r) { + getContext().actorOf(DeviceGroupQuery.props( + actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS))); + } + + @Override + public Receive createReceive() { + //#query-added + return receiveBuilder() + .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) + .match(RequestDeviceList.class, this::onDeviceList) + .match(Terminated.class, this::onTerminated) + //#query-added + // ... other cases omitted + .match(RequestAllTemperatures.class, this::onAllTemperatures) + .build(); + } +} +//#query-added diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupQuery.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupQuery.java new file mode 100644 index 0000000000..24699c4fa7 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupQuery.java @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_4; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import scala.concurrent.duration.FiniteDuration; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.actor.Terminated; + +import akka.event.Logging; +import akka.event.LoggingAdapter; + +//#query-full +//#query-outline +public class DeviceGroupQuery extends AbstractActor { + public static final class CollectionTimeout { + } + + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final Map actorToDeviceId; + final long requestId; + final ActorRef requester; + + Cancellable queryTimeoutTimer; + + public DeviceGroupQuery(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { + this.actorToDeviceId = actorToDeviceId; + this.requestId = requestId; + this.requester = requester; + + queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( + timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() + ); + } + + public static Props props(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { + return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout); + } + + @Override + public void preStart() { + for (ActorRef deviceActor : actorToDeviceId.keySet()) { + getContext().watch(deviceActor); + deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); + } + } + + @Override + public void postStop() { + queryTimeoutTimer.cancel(); + } + + //#query-outline + //#query-state + @Override + public Receive createReceive() { + return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); + } + + public Receive waitingForReplies( + Map repliesSoFar, + Set stillWaiting) { + return receiveBuilder() + .match(Device.RespondTemperature.class, r -> { + ActorRef deviceActor = getSender(); + DeviceGroup.TemperatureReading reading = r.value + .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) + .orElse(new DeviceGroup.TemperatureNotAvailable()); + receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); + }) + .match(Terminated.class, t -> { + receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar); + }) + .match(CollectionTimeout.class, t -> { + Map replies = new HashMap<>(repliesSoFar); + for (ActorRef deviceActor : stillWaiting) { + String deviceId = actorToDeviceId.get(deviceActor); + replies.put(deviceId, new DeviceGroup.DeviceTimedOut()); + } + requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); + getContext().stop(getSelf()); + }) + .build(); + } + //#query-state + + //#query-collect-reply + public void receivedResponse(ActorRef deviceActor, + DeviceGroup.TemperatureReading reading, + Set stillWaiting, + Map repliesSoFar) { + getContext().unwatch(deviceActor); + String deviceId = actorToDeviceId.get(deviceActor); + + Set newStillWaiting = new HashSet<>(stillWaiting); + newStillWaiting.remove(deviceActor); + + Map newRepliesSoFar = new HashMap<>(repliesSoFar); + newRepliesSoFar.put(deviceId, reading); + if (newStillWaiting.isEmpty()) { + requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); + getContext().stop(getSelf()); + } else { + getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); + } + } + //#query-collect-reply + //#query-outline +} +//#query-outline +//#query-full \ No newline at end of file diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupQueryTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupQueryTest.java new file mode 100644 index 0000000000..f4f83f9711 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupQueryTest.java @@ -0,0 +1,229 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_4; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.scalatest.junit.JUnitSuite; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +public class DeviceGroupQueryTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + //#query-test-normal + @Test + public void testReturnTemperatureValueForWorkingDevices() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + //#query-test-normal + + //#query-test-no-reading + @Test + public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef()); + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable()); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + //#query-test-no-reading + + //#query-test-stopped + @Test + public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable()); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + //#query-test-stopped + + //#query-test-stopped-later + @Test + public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); + device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + //#query-test-stopped-later + + //#query-test-timeout + @Test + public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass( + FiniteDuration.create(5, TimeUnit.SECONDS), + DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut()); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + //#query-test-timeout + + public static void assertEqualTemperatures(Map expected, Map actual) { + for (Map.Entry entry : expected.entrySet()) { + DeviceGroup.TemperatureReading expectedReading = entry.getValue(); + DeviceGroup.TemperatureReading actualReading = actual.get(entry.getKey()); + assertNotNull(actualReading); + assertEquals(expectedReading.getClass(), actualReading.getClass()); + if (expectedReading instanceof DeviceGroup.Temperature) { + assertEquals(((DeviceGroup.Temperature) expectedReading).value, ((DeviceGroup.Temperature) actualReading).value, 0.01); + } + } + assertEquals(expected.size(), actual.size()); + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupTest.java new file mode 100644 index 0000000000..7c3139ba9c --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceGroupTest.java @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_4; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.scalatest.junit.JUnitSuite; + +import static jdocs.tutorial_4.DeviceGroupQueryTest.assertEqualTemperatures; + +public class DeviceGroupTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testRegisterDeviceActor() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + assertNotEquals(deviceActor1, deviceActor2); + + // Check that the device actors are working + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); + assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + } + + @Test + public void testIgnoreRequestsForWrongGroupId() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef()); + probe.expectNoMsg(); + } + + @Test + public void testReturnSameActorForSameDeviceId() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + assertEquals(deviceActor1, deviceActor2); + } + + @Test + public void testListActiveDevices() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); + DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + } + + @Test + public void testListActiveDevicesAfterOneShutsDown() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef toShutDown = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); + DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + + probe.watch(toShutDown); + toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender()); + probe.expectTerminated(toShutDown); + + groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef()); + reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(1L, reply.requestId); + assertEquals(Stream.of("device2").collect(Collectors.toSet()), reply.ids); + } + + //#group-query-integration-test + @Test + public void testCollectTemperaturesFromAllActiveDevices() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor3 = probe.getLastSender(); + + // Check that the device actors are working + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); + assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + // No temperature for device 3 + + groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef()); + DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(0L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable()); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + //#group-query-integration-test +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceManager.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceManager.java new file mode 100644 index 0000000000..6f2e1dd672 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceManager.java @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.tutorial_4; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import java.util.HashMap; +import java.util.Map; + +public class DeviceManager extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + public static Props props() { + return Props.create(DeviceManager.class); + } + + public static final class RequestTrackDevice { + public final String groupId; + public final String deviceId; + + public RequestTrackDevice(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + } + + public static final class DeviceRegistered { + } + + final Map groupIdToActor = new HashMap<>(); + final Map actorToGroupId = new HashMap<>(); + + @Override + public void preStart() { + log.info("DeviceManager started"); + } + + @Override + public void postStop() { + log.info("DeviceManager stopped"); + } + + private void onTrackDevice(RequestTrackDevice trackMsg) { + String groupId = trackMsg.groupId; + ActorRef ref = groupIdToActor.get(groupId); + if (ref != null) { + ref.forward(trackMsg, getContext()); + } else { + log.info("Creating device group actor for {}", groupId); + ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId); + getContext().watch(groupActor); + groupActor.forward(trackMsg, getContext()); + groupIdToActor.put(groupId, groupActor); + actorToGroupId.put(groupActor, groupId); + } + } + + private void onTerminated(Terminated t) { + ActorRef groupActor = t.getActor(); + String groupId = actorToGroupId.get(groupActor); + log.info("Device group actor for {} has been terminated", groupId); + actorToGroupId.remove(groupActor); + groupIdToActor.remove(groupId); + } + + public Receive createReceive() { + return receiveBuilder() + .match(RequestTrackDevice.class, this::onTrackDevice) + .match(Terminated.class, this::onTerminated) + .build(); + } + +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceTest.java new file mode 100644 index 0000000000..09b20766c2 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_4/DeviceTest.java @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_4; + +import java.util.Optional; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import org.scalatest.junit.JUnitSuite; + +public class DeviceTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testReplyToRegistrationRequests() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + assertEquals(deviceActor, probe.getLastSender()); + } + + @Test + public void testIgnoreWrongRegistrationRequests() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef()); + probe.expectNoMsg(); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef()); + probe.expectNoMsg(); + } + + @Test + public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); + Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(42L, response.requestId); + assertEquals(Optional.empty(), response.value); + } + + @Test + public void testReplyWithLatestTemperatureReading() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); + Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(2L, response1.requestId); + assertEquals(Optional.of(24.0), response1.value); + + deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); + assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); + Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(4L, response2.requestId); + assertEquals(Optional.of(55.0), response2.value); + } + +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/Device.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/Device.java new file mode 100644 index 0000000000..00d6b5e131 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/Device.java @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import jdocs.tutorial_5.DeviceManager.DeviceRegistered; +import jdocs.tutorial_5.DeviceManager.RequestTrackDevice; + +import java.util.Optional; + +public class Device extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + final String deviceId; + + public Device(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + + public static Props props(String groupId, String deviceId) { + return Props.create(Device.class, groupId, deviceId); + } + + public static final class RecordTemperature { + final long requestId; + final double value; + + public RecordTemperature(long requestId, double value) { + this.requestId = requestId; + this.value = value; + } + } + + public static final class TemperatureRecorded { + final long requestId; + + public TemperatureRecorded(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReadTemperature { + final long requestId; + + public ReadTemperature(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondTemperature { + final long requestId; + final Optional value; + + public RespondTemperature(long requestId, Optional value) { + this.requestId = requestId; + this.value = value; + } + } + + Optional lastTemperatureReading = Optional.empty(); + + @Override + public void preStart() { + log.info("Device actor {}-{} started", groupId, deviceId); + } + + @Override + public void postStop() { + log.info("Device actor {}-{} stopped", groupId, deviceId); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(RequestTrackDevice.class, r -> { + if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { + getSender().tell(new DeviceRegistered(), getSelf()); + } else { + log.warning( + "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", + r.groupId, r.deviceId, this.groupId, this.deviceId + ); + } + }) + .match(RecordTemperature.class, r -> { + log.info("Recorded temperature reading {} with {}", r.value, r.requestId); + lastTemperatureReading = Optional.of(r.value); + getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); + }) + .match(ReadTemperature.class, r -> { + getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); + }) + .build(); + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroup.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroup.java new file mode 100644 index 0000000000..de09c84fb8 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroup.java @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class DeviceGroup extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final String groupId; + + public DeviceGroup(String groupId) { + this.groupId = groupId; + } + + public static Props props(String groupId) { + return Props.create(DeviceGroup.class, groupId); + } + + public static final class RequestDeviceList { + final long requestId; + + public RequestDeviceList(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReplyDeviceList { + final long requestId; + final Set ids; + + public ReplyDeviceList(long requestId, Set ids) { + this.requestId = requestId; + this.ids = ids; + } + } + + public static final class RequestAllTemperatures { + final long requestId; + + public RequestAllTemperatures(long requestId) { + this.requestId = requestId; + } + } + + public static final class RespondAllTemperatures { + final long requestId; + final Map temperatures; + + public RespondAllTemperatures(long requestId, Map temperatures) { + this.requestId = requestId; + this.temperatures = temperatures; + } + } + + public static interface TemperatureReading { + } + + public static final class Temperature implements TemperatureReading { + public final double value; + + public Temperature(double value) { + this.value = value; + } + } + + public static final class TemperatureNotAvailable implements TemperatureReading { + } + + public static final class DeviceNotAvailable implements TemperatureReading { + } + + public static final class DeviceTimedOut implements TemperatureReading { + } + + final Map deviceIdToActor = new HashMap<>(); + final Map actorToDeviceId = new HashMap<>(); + final long nextCollectionId = 0L; + + @Override + public void preStart() { + log.info("DeviceGroup {} started", groupId); + } + + @Override + public void postStop() { + log.info("DeviceGroup {} stopped", groupId); + } + + private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { + if (this.groupId.equals(trackMsg.groupId)) { + ActorRef ref = deviceIdToActor.get(trackMsg.deviceId); + if (ref != null) { + ref.forward(trackMsg, getContext()); + } else { + log.info("Creating device actor for {}", trackMsg.deviceId); + ActorRef deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); + getContext().watch(deviceActor); + deviceActor.forward(trackMsg, getContext()); + actorToDeviceId.put(deviceActor, trackMsg.deviceId); + deviceIdToActor.put(trackMsg.deviceId, deviceActor); + } + } else { + log.warning( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, this.groupId + ); + } + } + + private void onDeviceList(RequestDeviceList r) { + getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); + } + + private void onTerminated(Terminated t) { + ActorRef deviceActor = t.getActor(); + String deviceId = actorToDeviceId.get(deviceActor); + log.info("Device actor for {} has been terminated", deviceId); + actorToDeviceId.remove(deviceActor); + deviceIdToActor.remove(deviceId); + } + + private void onAllTemperatures(RequestAllTemperatures r) { + getContext().actorOf(DeviceGroupQuery.props( + actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS))); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) + .match(RequestDeviceList.class, this::onDeviceList) + .match(Terminated.class, this::onTerminated) + .match(RequestAllTemperatures.class, this::onAllTemperatures) + .build(); + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java new file mode 100644 index 0000000000..07df119dc8 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import akka.actor.*; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class DeviceGroupQuery extends AbstractActor { + public static final class CollectionTimeout { + } + + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + final Map actorToDeviceId; + final long requestId; + final ActorRef requester; + + Cancellable queryTimeoutTimer; + + public DeviceGroupQuery(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { + this.actorToDeviceId = actorToDeviceId; + this.requestId = requestId; + this.requester = requester; + + queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( + timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() + ); + } + + public static Props props(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { + return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout); + } + + @Override + public void preStart() { + for (ActorRef deviceActor : actorToDeviceId.keySet()) { + getContext().watch(deviceActor); + deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); + } + } + + @Override + public void postStop() { + queryTimeoutTimer.cancel(); + } + + @Override + public Receive createReceive() { + return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); + } + + public Receive waitingForReplies( + Map repliesSoFar, + Set stillWaiting) { + return receiveBuilder() + .match(Device.RespondTemperature.class, r -> { + ActorRef deviceActor = getSender(); + DeviceGroup.TemperatureReading reading = r.value + .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) + .orElse(new DeviceGroup.TemperatureNotAvailable()); + receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); + }) + .match(Terminated.class, t -> { + if (stillWaiting.contains(t.getActor())) { + receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar); + } + // else ignore + }) + .match(CollectionTimeout.class, t -> { + Map replies = new HashMap<>(repliesSoFar); + for (ActorRef deviceActor : stillWaiting) { + String deviceId = actorToDeviceId.get(deviceActor); + replies.put(deviceId, new DeviceGroup.DeviceTimedOut()); + } + requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); + getContext().stop(getSelf()); + }) + .build(); + } + + public void receivedResponse(ActorRef deviceActor, + DeviceGroup.TemperatureReading reading, + Set stillWaiting, + Map repliesSoFar) { + getContext().unwatch(deviceActor); + String deviceId = actorToDeviceId.get(deviceActor); + + Set newStillWaiting = new HashSet<>(stillWaiting); + newStillWaiting.remove(deviceActor); + + Map newRepliesSoFar = new HashMap<>(repliesSoFar); + newRepliesSoFar.put(deviceId, reading); + if (newStillWaiting.isEmpty()) { + requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); + getContext().stop(getSelf()); + } else { + getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); + } + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupQueryTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupQueryTest.java new file mode 100644 index 0000000000..26b741b0e6 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupQueryTest.java @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +public class DeviceGroupQueryTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testReturnTemperatureValueForWorkingDevices() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef()); + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable()); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable()); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); + device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { + TestKit requester = new TestKit(system); + + TestKit device1 = new TestKit(system); + TestKit device2 = new TestKit(system); + + Map actorToDeviceId = new HashMap<>(); + actorToDeviceId.put(device1.getRef(), "device1"); + actorToDeviceId.put(device2.getRef(), "device2"); + + ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId, + 1L, + requester.getRef(), + new FiniteDuration(3, TimeUnit.SECONDS))); + + assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); + + queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); + + DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass( + FiniteDuration.create(5, TimeUnit.SECONDS), + DeviceGroup.RespondAllTemperatures.class); + assertEquals(1L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut()); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } + + public static void assertEqualTemperatures(Map expected, Map actual) { + for (Map.Entry entry : expected.entrySet()) { + DeviceGroup.TemperatureReading expectedReading = entry.getValue(); + DeviceGroup.TemperatureReading actualReading = actual.get(entry.getKey()); + assertNotNull(actualReading); + assertEquals(expectedReading.getClass(), actualReading.getClass()); + if (expectedReading instanceof DeviceGroup.Temperature) { + assertEquals(((DeviceGroup.Temperature) expectedReading).value, ((DeviceGroup.Temperature) actualReading).value, 0.01); + } + } + assertEquals(expected.size(), actual.size()); + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupTest.java new file mode 100644 index 0000000000..932f46d50c --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceGroupTest.java @@ -0,0 +1,164 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import static jdocs.tutorial_5.DeviceGroupQueryTest.assertEqualTemperatures; + +public class DeviceGroupTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testRegisterDeviceActor() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + assertNotEquals(deviceActor1, deviceActor2); + + // Check that the device actors are working + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); + assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + } + + @Test + public void testIgnoreRequestsForWrongGroupId() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef()); + probe.expectNoMsg(); + } + + @Test + public void testReturnSameActorForSameDeviceId() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + assertEquals(deviceActor1, deviceActor2); + } + + @Test + public void testListActiveDevices() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); + DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + } + + @Test + public void testListActiveDevicesAfterOneShutsDown() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef toShutDown = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + + groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); + DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + + probe.watch(toShutDown); + toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender()); + probe.expectTerminated(toShutDown); + + groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef()); + reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); + assertEquals(1L, reply.requestId); + assertEquals(Stream.of("device2").collect(Collectors.toSet()), reply.ids); + } + + @Test + public void testCollectTemperaturesFromAllActiveDevices() { + TestKit probe = new TestKit(system); + ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor1 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor2 = probe.getLastSender(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + ActorRef deviceActor3 = probe.getLastSender(); + + // Check that the device actors are working + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); + assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + // No temperature for device 3 + + groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef()); + DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); + assertEquals(0L, response.requestId); + + Map expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); + expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); + expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable()); + + assertEqualTemperatures(expectedTemperatures, response.temperatures); + } +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceManager.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceManager.java new file mode 100644 index 0000000000..b07fd480db --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceManager.java @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.tutorial_5; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import java.util.HashMap; +import java.util.Map; + +public class DeviceManager extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + public static Props props() { + return Props.create(DeviceManager.class); + } + + public static final class RequestTrackDevice { + public final String groupId; + public final String deviceId; + + public RequestTrackDevice(String groupId, String deviceId) { + this.groupId = groupId; + this.deviceId = deviceId; + } + } + + public static final class DeviceRegistered { + } + + final Map groupIdToActor = new HashMap<>(); + final Map actorToGroupId = new HashMap<>(); + + @Override + public void preStart() { + log.info("DeviceManager started"); + } + + @Override + public void postStop() { + log.info("DeviceManager stopped"); + } + + private void onTrackDevice(RequestTrackDevice trackMsg) { + String groupId = trackMsg.groupId; + ActorRef ref = groupIdToActor.get(groupId); + if (ref != null) { + ref.forward(trackMsg, getContext()); + } else { + log.info("Creating device group actor for {}", groupId); + ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId); + getContext().watch(groupActor); + groupActor.forward(trackMsg, getContext()); + groupIdToActor.put(groupId, groupActor); + actorToGroupId.put(groupActor, groupId); + } + } + + private void onTerminated(Terminated t) { + ActorRef groupActor = t.getActor(); + String groupId = actorToGroupId.get(groupActor); + log.info("Device group actor for {} has been terminated", groupId); + actorToGroupId.remove(groupActor); + groupIdToActor.remove(groupId); + } + + public Receive createReceive() { + return receiveBuilder() + .match(RequestTrackDevice.class, this::onTrackDevice) + .match(Terminated.class, this::onTerminated) + .build(); + } + +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceTest.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceTest.java new file mode 100644 index 0000000000..10c2235718 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/DeviceTest.java @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import java.util.Optional; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.javadsl.TestKit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import org.scalatest.junit.JUnitSuite; + + +public class DeviceTest extends JUnitSuite { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testReplyToRegistrationRequests() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef()); + probe.expectMsgClass(DeviceManager.DeviceRegistered.class); + assertEquals(deviceActor, probe.getLastSender()); + } + + @Test + public void testIgnoreWrongRegistrationRequests() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef()); + probe.expectNoMsg(); + + deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef()); + probe.expectNoMsg(); + } + + @Test + public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); + Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(42L, response.requestId); + assertEquals(Optional.empty(), response.value); + } + + @Test + public void testReplyWithLatestTemperatureReading() { + TestKit probe = new TestKit(system); + ActorRef deviceActor = system.actorOf(Device.props("group", "device")); + + deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); + assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); + Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(2L, response1.requestId); + assertEquals(Optional.of(24.0), response1.value); + + deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); + assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); + + deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); + Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); + assertEquals(4L, response2.requestId); + assertEquals(Optional.of(55.0), response2.value); + } + +} diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/IotMain.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/IotMain.java new file mode 100644 index 0000000000..d0217925e5 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/IotMain.java @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; + +import java.io.IOException; + +public class IotMain { + + public static void main(String[] args) throws IOException { + ActorSystem system = ActorSystem.create("iot-system"); + + try { + // Create top level supervisor + ActorRef supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor"); + + supervisor.tell(new DeviceManager.RequestTrackDevice("mygroup", "device1"), ActorRef.noSender()); + + System.out.println("Press ENTER to exit the system"); + System.in.read(); + } finally { + system.terminate(); + } + } + +} \ No newline at end of file diff --git a/akka-docs-new/src/test/java/jdocs/tutorial_5/IotSupervisor.java b/akka-docs-new/src/test/java/jdocs/tutorial_5/IotSupervisor.java new file mode 100644 index 0000000000..28c7bcd506 --- /dev/null +++ b/akka-docs-new/src/test/java/jdocs/tutorial_5/IotSupervisor.java @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.tutorial_5; + +//#iot-supervisor + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +public class IotSupervisor extends AbstractActor { + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + public static Props props() { + return Props.create(IotSupervisor.class); + } + + @Override + public void preStart() { + log.info("IoT Application started"); + } + + @Override + public void postStop() { + log.info("IoT Application stopped"); + } + + // No need to handle any messages + @Override + public Receive createReceive() { + return receiveBuilder() + .build(); + } + +} +//#iot-supervisor