Move content from akka-docs-new into akka-docs
Now the paradox documentation is no longer functional until we update akka-docs to generate from paradox instead of sphinx
This commit is contained in:
parent
f064d1321a
commit
5507147073
96 changed files with 0 additions and 35 deletions
|
|
@ -0,0 +1,16 @@
|
|||
package jdocs.quickstart;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.AbstractActor.Receive;
|
||||
|
||||
//#actor-impl
|
||||
public class HelloWorldActor extends AbstractActor {
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(String.class, msg -> {
|
||||
System.out.println("Hello " + msg);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
//#actor-impl
|
||||
29
akka-docs/src/test/java/jdocs/quickstart/HelloWorldMain.java
Normal file
29
akka-docs/src/test/java/jdocs/quickstart/HelloWorldMain.java
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
//#full-example
|
||||
package jdocs.quickstart;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.ActorSystem;
|
||||
|
||||
public class HelloWorldMain {
|
||||
public static void main(String[] args) throws IOException {
|
||||
//#create-send
|
||||
ActorSystem system = ActorSystem.create("hello-world-actor-system");
|
||||
try {
|
||||
// Create hello world actor
|
||||
ActorRef helloWorldActor = system.actorOf(Props.create(HelloWorldActor.class), "HelloWorldActor");
|
||||
// Send message to actor
|
||||
helloWorldActor.tell("World", ActorRef.noSender());
|
||||
|
||||
System.out.println("Press ENTER to exit the system");
|
||||
System.in.read();
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
//#create-send
|
||||
}
|
||||
}
|
||||
|
||||
//#full-example
|
||||
30
akka-docs/src/test/java/jdocs/tutorial_1/IotMain.java
Normal file
30
akka-docs/src/test/java/jdocs/tutorial_1/IotMain.java
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_1;
|
||||
|
||||
//#iot-app
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ActorRef;
|
||||
|
||||
public class IotMain {
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
ActorSystem system = ActorSystem.create("iot-system");
|
||||
|
||||
try {
|
||||
// Create top level supervisor
|
||||
ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor");
|
||||
|
||||
System.out.println("Press ENTER to exit the system");
|
||||
System.in.read();
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#iot-app
|
||||
39
akka-docs/src/test/java/jdocs/tutorial_1/IotSupervisor.java
Normal file
39
akka-docs/src/test/java/jdocs/tutorial_1/IotSupervisor.java
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_1;
|
||||
|
||||
//#iot-supervisor
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorLogging;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class IotSupervisor extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(IotSupervisor.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("IoT Application started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("IoT Application stopped");
|
||||
}
|
||||
|
||||
// No need to handle any messages
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
//#iot-supervisor
|
||||
94
akka-docs/src/test/java/jdocs/tutorial_2/Device.java
Normal file
94
akka-docs/src/test/java/jdocs/tutorial_2/Device.java
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_2;
|
||||
|
||||
//#full-device
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.AbstractActor.Receive;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, groupId, deviceId);
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RecordTemperature.class, r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(ReadTemperature.class, r -> {
|
||||
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
//#full-device
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
package jdocs.tutorial_2;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import jdocs.tutorial_2.Device.ReadTemperature;
|
||||
import jdocs.tutorial_2.Device.RecordTemperature;
|
||||
import jdocs.tutorial_2.Device.RespondTemperature;
|
||||
import jdocs.tutorial_2.Device.TemperatureRecorded;
|
||||
|
||||
class DeviceInProgress1 {
|
||||
|
||||
//#read-protocol-1
|
||||
public static final class ReadTemperature {
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(Optional<Double> value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
//#read-protocol-1
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,71 @@
|
|||
package jdocs.tutorial_2.inprogress2;
|
||||
|
||||
//#device-with-read
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, groupId, deviceId);
|
||||
}
|
||||
|
||||
//#read-protocol-2
|
||||
public static final class ReadTemperature {
|
||||
long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
long requestId;
|
||||
Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
//#read-protocol-2
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(ReadTemperature.class, r -> {
|
||||
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#device-with-read
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package jdocs.tutorial_2;
|
||||
|
||||
class DeviceInProgress3 {
|
||||
|
||||
//#write-protocol-1
|
||||
public static final class RecordTemperature {
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
//#write-protocol-1
|
||||
}
|
||||
70
akka-docs/src/test/java/jdocs/tutorial_2/DeviceTest.java
Normal file
70
akka-docs/src/test/java/jdocs/tutorial_2/DeviceTest.java
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_2;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
//#device-read-test
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
//#device-read-test
|
||||
|
||||
//#device-write-read-test
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
//#device-write-read-test
|
||||
|
||||
}
|
||||
106
akka-docs/src/test/java/jdocs/tutorial_3/Device.java
Normal file
106
akka-docs/src/test/java/jdocs/tutorial_3/Device.java
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
//#device-with-register
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import jdocs.tutorial_3.DeviceManager.DeviceRegistered;
|
||||
import jdocs.tutorial_3.DeviceManager.RequestTrackDevice;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, groupId, deviceId);
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, r -> {
|
||||
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
|
||||
getSender().tell(new DeviceRegistered(), getSelf());
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
r.groupId, r.deviceId, this.groupId, this.deviceId
|
||||
);
|
||||
}
|
||||
})
|
||||
.match(RecordTemperature.class, r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(ReadTemperature.class, r -> {
|
||||
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
//#device-with-register
|
||||
119
akka-docs/src/test/java/jdocs/tutorial_3/DeviceGroup.java
Normal file
119
akka-docs/src/test/java/jdocs/tutorial_3/DeviceGroup.java
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import jdocs.tutorial_3.Device;
|
||||
import jdocs.tutorial_3.DeviceManager;
|
||||
|
||||
//#device-group-full
|
||||
public class DeviceGroup extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
public DeviceGroup(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
//#device-group-register
|
||||
public static Props props(String groupId) {
|
||||
return Props.create(DeviceGroup.class, groupId);
|
||||
}
|
||||
//#device-group-register
|
||||
|
||||
public static final class RequestDeviceList {
|
||||
final long requestId;
|
||||
|
||||
public RequestDeviceList(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReplyDeviceList {
|
||||
final long requestId;
|
||||
final Set<String> ids;
|
||||
|
||||
public ReplyDeviceList(long requestId, Set<String> ids) {
|
||||
this.requestId = requestId;
|
||||
this.ids = ids;
|
||||
}
|
||||
}
|
||||
//#device-group-register
|
||||
//#device-group-register
|
||||
//#device-group-register
|
||||
//#device-group-remove
|
||||
|
||||
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
|
||||
//#device-group-register
|
||||
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
//#device-group-register
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceGroup {} stopped", groupId);
|
||||
}
|
||||
|
||||
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
|
||||
if (this.groupId.equals(trackMsg.groupId)) {
|
||||
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
|
||||
if (deviceActor != null) {
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId);
|
||||
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
//#device-group-register
|
||||
getContext().watch(deviceActor);
|
||||
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
|
||||
//#device-group-register
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
}
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
groupId, this.groupId
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void onDeviceList(RequestDeviceList r) {
|
||||
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef deviceActor = t.getActor();
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
log.info("Device actor for {} has been terminated", deviceId);
|
||||
actorToDeviceId.remove(deviceActor);
|
||||
deviceIdToActor.remove(deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(RequestDeviceList.class, this::onDeviceList)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
//#device-group-remove
|
||||
//#device-group-register
|
||||
//#device-group-full
|
||||
131
akka-docs/src/test/java/jdocs/tutorial_3/DeviceGroupTest.java
Normal file
131
akka-docs/src/test/java/jdocs/tutorial_3/DeviceGroupTest.java
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public class DeviceGroupTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
//#device-group-test-registration
|
||||
@Test
|
||||
public void testRegisterDeviceActor() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertNotEquals(deviceActor1, deviceActor2);
|
||||
|
||||
// Check that the device actors are workingl
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreRequestsForWrongGroupId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
}
|
||||
//#device-group-test-registration
|
||||
|
||||
//#device-group-test3
|
||||
@Test
|
||||
public void testReturnSameActorForSameDeviceId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertEquals(deviceActor1, deviceActor2);
|
||||
}
|
||||
//#device-group-test3
|
||||
|
||||
//#device-group-list-terminate-test
|
||||
@Test
|
||||
public void testListActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void teestListActiveDevicesAfterOneShutsDown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef toShutDown = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
|
||||
probe.watch(toShutDown);
|
||||
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
probe.expectTerminated(toShutDown);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
|
||||
reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(1L, reply.requestId);
|
||||
assertEquals(Stream.of("device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
//#device-group-list-terminate-test
|
||||
}
|
||||
84
akka-docs/src/test/java/jdocs/tutorial_3/DeviceManager.java
Normal file
84
akka-docs/src/test/java/jdocs/tutorial_3/DeviceManager.java
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
//#device-manager-full
|
||||
public class DeviceManager extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(DeviceManager.class);
|
||||
}
|
||||
|
||||
//#device-manager-msgs
|
||||
public static final class RequestTrackDevice {
|
||||
public final String groupId;
|
||||
public final String deviceId;
|
||||
|
||||
public RequestTrackDevice(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DeviceRegistered {
|
||||
}
|
||||
|
||||
//#device-manager-msgs
|
||||
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceManager started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceManager stopped");
|
||||
}
|
||||
|
||||
private void onTrackDevice(RequestTrackDevice trackMsg) {
|
||||
String groupId = trackMsg.groupId;
|
||||
ActorRef ref = groupIdToActor.get(groupId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device group actor for {}", groupId);
|
||||
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
|
||||
getContext().watch(groupActor);
|
||||
groupActor.forward(trackMsg, getContext());
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
actorToGroupId.put(groupActor, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef groupActor = t.getActor();
|
||||
String groupId = actorToGroupId.get(groupActor);
|
||||
log.info("Device group actor for {} has been terminated", groupId);
|
||||
actorToGroupId.remove(groupActor);
|
||||
groupIdToActor.remove(groupId);
|
||||
}
|
||||
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
//#device-manager-full
|
||||
94
akka-docs/src/test/java/jdocs/tutorial_3/DeviceTest.java
Normal file
94
akka-docs/src/test/java/jdocs/tutorial_3/DeviceTest.java
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_3;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
//#device-registration-tests
|
||||
@Test
|
||||
public void testReplyToRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
assertEquals(deviceActor, probe.getLastSender());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreWrongRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
}
|
||||
//#device-registration-tests
|
||||
|
||||
//#device-read-test
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
//#device-read-test
|
||||
|
||||
//#device-write-read-test
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
//#device-write-read-test
|
||||
|
||||
}
|
||||
103
akka-docs/src/test/java/jdocs/tutorial_4/Device.java
Normal file
103
akka-docs/src/test/java/jdocs/tutorial_4/Device.java
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import jdocs.tutorial_4.DeviceManager.DeviceRegistered;
|
||||
import jdocs.tutorial_4.DeviceManager.RequestTrackDevice;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, groupId, deviceId);
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, r -> {
|
||||
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
|
||||
getSender().tell(new DeviceRegistered(), getSelf());
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
r.groupId, r.deviceId, this.groupId, this.deviceId
|
||||
);
|
||||
}
|
||||
})
|
||||
.match(RecordTemperature.class, r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(ReadTemperature.class, r -> {
|
||||
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
159
akka-docs/src/test/java/jdocs/tutorial_4/DeviceGroup.java
Normal file
159
akka-docs/src/test/java/jdocs/tutorial_4/DeviceGroup.java
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
//#query-added
|
||||
public class DeviceGroup extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
public DeviceGroup(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId) {
|
||||
return Props.create(DeviceGroup.class, groupId);
|
||||
}
|
||||
|
||||
public static final class RequestDeviceList {
|
||||
final long requestId;
|
||||
|
||||
public RequestDeviceList(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReplyDeviceList {
|
||||
final long requestId;
|
||||
final Set<String> ids;
|
||||
|
||||
public ReplyDeviceList(long requestId, Set<String> ids) {
|
||||
this.requestId = requestId;
|
||||
this.ids = ids;
|
||||
}
|
||||
}
|
||||
|
||||
//#query-protocol
|
||||
public static final class RequestAllTemperatures {
|
||||
final long requestId;
|
||||
|
||||
public RequestAllTemperatures(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondAllTemperatures {
|
||||
final long requestId;
|
||||
final Map<String, TemperatureReading> temperatures;
|
||||
|
||||
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
|
||||
this.requestId = requestId;
|
||||
this.temperatures = temperatures;
|
||||
}
|
||||
}
|
||||
|
||||
public static interface TemperatureReading {
|
||||
}
|
||||
|
||||
public static final class Temperature implements TemperatureReading {
|
||||
public final double value;
|
||||
|
||||
public Temperature(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureNotAvailable implements TemperatureReading {
|
||||
}
|
||||
|
||||
public static final class DeviceNotAvailable implements TemperatureReading {
|
||||
}
|
||||
|
||||
public static final class DeviceTimedOut implements TemperatureReading {
|
||||
}
|
||||
//#query-protocol
|
||||
|
||||
|
||||
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
final long nextCollectionId = 0L;
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceGroup {} stopped", groupId);
|
||||
}
|
||||
|
||||
//#query-added
|
||||
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
|
||||
if (this.groupId.equals(trackMsg.groupId)) {
|
||||
ActorRef ref = deviceIdToActor.get(trackMsg.deviceId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId);
|
||||
ActorRef deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
getContext().watch(deviceActor);
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
}
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
groupId, this.groupId
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void onDeviceList(RequestDeviceList r) {
|
||||
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef deviceActor = t.getActor();
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
log.info("Device actor for {} has been terminated", deviceId);
|
||||
actorToDeviceId.remove(deviceActor);
|
||||
deviceIdToActor.remove(deviceId);
|
||||
}
|
||||
//#query-added
|
||||
|
||||
private void onAllTemperatures(RequestAllTemperatures r) {
|
||||
getContext().actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
//#query-added
|
||||
return receiveBuilder()
|
||||
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(RequestDeviceList.class, this::onDeviceList)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
//#query-added
|
||||
// ... other cases omitted
|
||||
.match(RequestAllTemperatures.class, this::onAllTemperatures)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
//#query-added
|
||||
121
akka-docs/src/test/java/jdocs/tutorial_4/DeviceGroupQuery.java
Normal file
121
akka-docs/src/test/java/jdocs/tutorial_4/DeviceGroupQuery.java
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
//#query-full
|
||||
//#query-outline
|
||||
public class DeviceGroupQuery extends AbstractActor {
|
||||
public static final class CollectionTimeout {
|
||||
}
|
||||
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final Map<ActorRef, String> actorToDeviceId;
|
||||
final long requestId;
|
||||
final ActorRef requester;
|
||||
|
||||
Cancellable queryTimeoutTimer;
|
||||
|
||||
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
|
||||
this.actorToDeviceId = actorToDeviceId;
|
||||
this.requestId = requestId;
|
||||
this.requester = requester;
|
||||
|
||||
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
|
||||
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
|
||||
);
|
||||
}
|
||||
|
||||
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
|
||||
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
|
||||
getContext().watch(deviceActor);
|
||||
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
queryTimeoutTimer.cancel();
|
||||
}
|
||||
|
||||
//#query-outline
|
||||
//#query-state
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
|
||||
}
|
||||
|
||||
public Receive waitingForReplies(
|
||||
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
|
||||
Set<ActorRef> stillWaiting) {
|
||||
return receiveBuilder()
|
||||
.match(Device.RespondTemperature.class, r -> {
|
||||
ActorRef deviceActor = getSender();
|
||||
DeviceGroup.TemperatureReading reading = r.value
|
||||
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
|
||||
.orElse(new DeviceGroup.TemperatureNotAvailable());
|
||||
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
|
||||
})
|
||||
.match(Terminated.class, t -> {
|
||||
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
|
||||
})
|
||||
.match(CollectionTimeout.class, t -> {
|
||||
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
|
||||
for (ActorRef deviceActor : stillWaiting) {
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
|
||||
}
|
||||
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
//#query-state
|
||||
|
||||
//#query-collect-reply
|
||||
public void receivedResponse(ActorRef deviceActor,
|
||||
DeviceGroup.TemperatureReading reading,
|
||||
Set<ActorRef> stillWaiting,
|
||||
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
|
||||
getContext().unwatch(deviceActor);
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
|
||||
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
|
||||
newStillWaiting.remove(deviceActor);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
|
||||
newRepliesSoFar.put(deviceId, reading);
|
||||
if (newStillWaiting.isEmpty()) {
|
||||
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
} else {
|
||||
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
|
||||
}
|
||||
}
|
||||
//#query-collect-reply
|
||||
//#query-outline
|
||||
}
|
||||
//#query-outline
|
||||
//#query-full
|
||||
|
|
@ -0,0 +1,229 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DeviceGroupQueryTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
//#query-test-normal
|
||||
@Test
|
||||
public void testReturnTemperatureValueForWorkingDevices() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
//#query-test-normal
|
||||
|
||||
//#query-test-no-reading
|
||||
@Test
|
||||
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable());
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
//#query-test-no-reading
|
||||
|
||||
//#query-test-stopped
|
||||
@Test
|
||||
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable());
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
//#query-test-stopped
|
||||
|
||||
//#query-test-stopped-later
|
||||
@Test
|
||||
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
//#query-test-stopped-later
|
||||
|
||||
//#query-test-timeout
|
||||
@Test
|
||||
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(
|
||||
FiniteDuration.create(5, TimeUnit.SECONDS),
|
||||
DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut());
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
//#query-test-timeout
|
||||
|
||||
public static void assertEqualTemperatures(Map<String, DeviceGroup.TemperatureReading> expected, Map<String, DeviceGroup.TemperatureReading> actual) {
|
||||
for (Map.Entry<String, DeviceGroup.TemperatureReading> entry : expected.entrySet()) {
|
||||
DeviceGroup.TemperatureReading expectedReading = entry.getValue();
|
||||
DeviceGroup.TemperatureReading actualReading = actual.get(entry.getKey());
|
||||
assertNotNull(actualReading);
|
||||
assertEquals(expectedReading.getClass(), actualReading.getClass());
|
||||
if (expectedReading instanceof DeviceGroup.Temperature) {
|
||||
assertEquals(((DeviceGroup.Temperature) expectedReading).value, ((DeviceGroup.Temperature) actualReading).value, 0.01);
|
||||
}
|
||||
}
|
||||
assertEquals(expected.size(), actual.size());
|
||||
}
|
||||
}
|
||||
167
akka-docs/src/test/java/jdocs/tutorial_4/DeviceGroupTest.java
Normal file
167
akka-docs/src/test/java/jdocs/tutorial_4/DeviceGroupTest.java
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import static jdocs.tutorial_4.DeviceGroupQueryTest.assertEqualTemperatures;
|
||||
|
||||
public class DeviceGroupTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterDeviceActor() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertNotEquals(deviceActor1, deviceActor2);
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreRequestsForWrongGroupId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnSameActorForSameDeviceId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertEquals(deviceActor1, deviceActor2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevicesAfterOneShutsDown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef toShutDown = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
|
||||
probe.watch(toShutDown);
|
||||
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
probe.expectTerminated(toShutDown);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
|
||||
reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(1L, reply.requestId);
|
||||
assertEquals(Stream.of("device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
//#group-query-integration-test
|
||||
@Test
|
||||
public void testCollectTemperaturesFromAllActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor3 = probe.getLastSender();
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
// No temperature for device 3
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef());
|
||||
DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(0L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable());
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
//#group-query-integration-test
|
||||
}
|
||||
80
akka-docs/src/test/java/jdocs/tutorial_4/DeviceManager.java
Normal file
80
akka-docs/src/test/java/jdocs/tutorial_4/DeviceManager.java
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DeviceManager extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(DeviceManager.class);
|
||||
}
|
||||
|
||||
public static final class RequestTrackDevice {
|
||||
public final String groupId;
|
||||
public final String deviceId;
|
||||
|
||||
public RequestTrackDevice(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DeviceRegistered {
|
||||
}
|
||||
|
||||
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceManager started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceManager stopped");
|
||||
}
|
||||
|
||||
private void onTrackDevice(RequestTrackDevice trackMsg) {
|
||||
String groupId = trackMsg.groupId;
|
||||
ActorRef ref = groupIdToActor.get(groupId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device group actor for {}", groupId);
|
||||
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
|
||||
getContext().watch(groupActor);
|
||||
groupActor.forward(trackMsg, getContext());
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
actorToGroupId.put(groupActor, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef groupActor = t.getActor();
|
||||
String groupId = actorToGroupId.get(groupActor);
|
||||
log.info("Device group actor for {} has been terminated", groupId);
|
||||
actorToGroupId.remove(groupActor);
|
||||
groupIdToActor.remove(groupId);
|
||||
}
|
||||
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
88
akka-docs/src/test/java/jdocs/tutorial_4/DeviceTest.java
Normal file
88
akka-docs/src/test/java/jdocs/tutorial_4/DeviceTest.java
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_4;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyToRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
assertEquals(deviceActor, probe.getLastSender());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreWrongRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
|
||||
}
|
||||
102
akka-docs/src/test/java/jdocs/tutorial_5/Device.java
Normal file
102
akka-docs/src/test/java/jdocs/tutorial_5/Device.java
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import jdocs.tutorial_5.DeviceManager.DeviceRegistered;
|
||||
import jdocs.tutorial_5.DeviceManager.RequestTrackDevice;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class Device extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
final String deviceId;
|
||||
|
||||
public Device(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId, String deviceId) {
|
||||
return Props.create(Device.class, groupId, deviceId);
|
||||
}
|
||||
|
||||
public static final class RecordTemperature {
|
||||
final long requestId;
|
||||
final double value;
|
||||
|
||||
public RecordTemperature(long requestId, double value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureRecorded {
|
||||
final long requestId;
|
||||
|
||||
public TemperatureRecorded(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReadTemperature {
|
||||
final long requestId;
|
||||
|
||||
public ReadTemperature(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondTemperature {
|
||||
final long requestId;
|
||||
final Optional<Double> value;
|
||||
|
||||
public RespondTemperature(long requestId, Optional<Double> value) {
|
||||
this.requestId = requestId;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("Device actor {}-{} started", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, r -> {
|
||||
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
|
||||
getSender().tell(new DeviceRegistered(), getSelf());
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
r.groupId, r.deviceId, this.groupId, this.deviceId
|
||||
);
|
||||
}
|
||||
})
|
||||
.match(RecordTemperature.class, r -> {
|
||||
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
|
||||
})
|
||||
.match(ReadTemperature.class, r -> {
|
||||
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
149
akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroup.java
Normal file
149
akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroup.java
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DeviceGroup extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final String groupId;
|
||||
|
||||
public DeviceGroup(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public static Props props(String groupId) {
|
||||
return Props.create(DeviceGroup.class, groupId);
|
||||
}
|
||||
|
||||
public static final class RequestDeviceList {
|
||||
final long requestId;
|
||||
|
||||
public RequestDeviceList(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class ReplyDeviceList {
|
||||
final long requestId;
|
||||
final Set<String> ids;
|
||||
|
||||
public ReplyDeviceList(long requestId, Set<String> ids) {
|
||||
this.requestId = requestId;
|
||||
this.ids = ids;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RequestAllTemperatures {
|
||||
final long requestId;
|
||||
|
||||
public RequestAllTemperatures(long requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RespondAllTemperatures {
|
||||
final long requestId;
|
||||
final Map<String, TemperatureReading> temperatures;
|
||||
|
||||
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
|
||||
this.requestId = requestId;
|
||||
this.temperatures = temperatures;
|
||||
}
|
||||
}
|
||||
|
||||
public static interface TemperatureReading {
|
||||
}
|
||||
|
||||
public static final class Temperature implements TemperatureReading {
|
||||
public final double value;
|
||||
|
||||
public Temperature(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TemperatureNotAvailable implements TemperatureReading {
|
||||
}
|
||||
|
||||
public static final class DeviceNotAvailable implements TemperatureReading {
|
||||
}
|
||||
|
||||
public static final class DeviceTimedOut implements TemperatureReading {
|
||||
}
|
||||
|
||||
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
final long nextCollectionId = 0L;
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceGroup {} stopped", groupId);
|
||||
}
|
||||
|
||||
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
|
||||
if (this.groupId.equals(trackMsg.groupId)) {
|
||||
ActorRef ref = deviceIdToActor.get(trackMsg.deviceId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId);
|
||||
ActorRef deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
getContext().watch(deviceActor);
|
||||
deviceActor.forward(trackMsg, getContext());
|
||||
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
}
|
||||
} else {
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
groupId, this.groupId
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void onDeviceList(RequestDeviceList r) {
|
||||
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef deviceActor = t.getActor();
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
log.info("Device actor for {} has been terminated", deviceId);
|
||||
actorToDeviceId.remove(deviceActor);
|
||||
deviceIdToActor.remove(deviceId);
|
||||
}
|
||||
|
||||
private void onAllTemperatures(RequestAllTemperatures r) {
|
||||
getContext().actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(RequestDeviceList.class, this::onDeviceList)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.match(RequestAllTemperatures.class, this::onAllTemperatures)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
108
akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java
Normal file
108
akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class DeviceGroupQuery extends AbstractActor {
|
||||
public static final class CollectionTimeout {
|
||||
}
|
||||
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
final Map<ActorRef, String> actorToDeviceId;
|
||||
final long requestId;
|
||||
final ActorRef requester;
|
||||
|
||||
Cancellable queryTimeoutTimer;
|
||||
|
||||
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
|
||||
this.actorToDeviceId = actorToDeviceId;
|
||||
this.requestId = requestId;
|
||||
this.requester = requester;
|
||||
|
||||
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
|
||||
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
|
||||
);
|
||||
}
|
||||
|
||||
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
|
||||
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
|
||||
getContext().watch(deviceActor);
|
||||
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
queryTimeoutTimer.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
|
||||
}
|
||||
|
||||
public Receive waitingForReplies(
|
||||
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
|
||||
Set<ActorRef> stillWaiting) {
|
||||
return receiveBuilder()
|
||||
.match(Device.RespondTemperature.class, r -> {
|
||||
ActorRef deviceActor = getSender();
|
||||
DeviceGroup.TemperatureReading reading = r.value
|
||||
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
|
||||
.orElse(new DeviceGroup.TemperatureNotAvailable());
|
||||
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
|
||||
})
|
||||
.match(Terminated.class, t -> {
|
||||
if (stillWaiting.contains(t.getActor())) {
|
||||
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
|
||||
}
|
||||
// else ignore
|
||||
})
|
||||
.match(CollectionTimeout.class, t -> {
|
||||
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
|
||||
for (ActorRef deviceActor : stillWaiting) {
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
|
||||
}
|
||||
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
public void receivedResponse(ActorRef deviceActor,
|
||||
DeviceGroup.TemperatureReading reading,
|
||||
Set<ActorRef> stillWaiting,
|
||||
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
|
||||
getContext().unwatch(deviceActor);
|
||||
String deviceId = actorToDeviceId.get(deviceActor);
|
||||
|
||||
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
|
||||
newStillWaiting.remove(deviceActor);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
|
||||
newRepliesSoFar.put(deviceId, reading);
|
||||
if (newStillWaiting.isEmpty()) {
|
||||
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
} else {
|
||||
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,219 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DeviceGroupQueryTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnTemperatureValueForWorkingDevices() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable());
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable());
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
|
||||
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
|
||||
TestKit requester = new TestKit(system);
|
||||
|
||||
TestKit device1 = new TestKit(system);
|
||||
TestKit device2 = new TestKit(system);
|
||||
|
||||
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
|
||||
actorToDeviceId.put(device1.getRef(), "device1");
|
||||
actorToDeviceId.put(device2.getRef(), "device2");
|
||||
|
||||
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId,
|
||||
1L,
|
||||
requester.getRef(),
|
||||
new FiniteDuration(3, TimeUnit.SECONDS)));
|
||||
|
||||
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
|
||||
|
||||
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
|
||||
|
||||
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(
|
||||
FiniteDuration.create(5, TimeUnit.SECONDS),
|
||||
DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(1L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut());
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
|
||||
public static void assertEqualTemperatures(Map<String, DeviceGroup.TemperatureReading> expected, Map<String, DeviceGroup.TemperatureReading> actual) {
|
||||
for (Map.Entry<String, DeviceGroup.TemperatureReading> entry : expected.entrySet()) {
|
||||
DeviceGroup.TemperatureReading expectedReading = entry.getValue();
|
||||
DeviceGroup.TemperatureReading actualReading = actual.get(entry.getKey());
|
||||
assertNotNull(actualReading);
|
||||
assertEquals(expectedReading.getClass(), actualReading.getClass());
|
||||
if (expectedReading instanceof DeviceGroup.Temperature) {
|
||||
assertEquals(((DeviceGroup.Temperature) expectedReading).value, ((DeviceGroup.Temperature) actualReading).value, 0.01);
|
||||
}
|
||||
}
|
||||
assertEquals(expected.size(), actual.size());
|
||||
}
|
||||
}
|
||||
164
akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupTest.java
Normal file
164
akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupTest.java
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import static jdocs.tutorial_5.DeviceGroupQueryTest.assertEqualTemperatures;
|
||||
|
||||
public class DeviceGroupTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterDeviceActor() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertNotEquals(deviceActor1, deviceActor2);
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreRequestsForWrongGroupId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnSameActorForSameDeviceId() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
assertEquals(deviceActor1, deviceActor2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListActiveDevicesAfterOneShutsDown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef toShutDown = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
|
||||
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(0L, reply.requestId);
|
||||
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
|
||||
|
||||
probe.watch(toShutDown);
|
||||
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
probe.expectTerminated(toShutDown);
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
|
||||
reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
|
||||
assertEquals(1L, reply.requestId);
|
||||
assertEquals(Stream.of("device2").collect(Collectors.toSet()), reply.ids);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCollectTemperaturesFromAllActiveDevices() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor1 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor2 = probe.getLastSender();
|
||||
|
||||
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
ActorRef deviceActor3 = probe.getLastSender();
|
||||
|
||||
// Check that the device actors are working
|
||||
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
|
||||
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
// No temperature for device 3
|
||||
|
||||
groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef());
|
||||
DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
|
||||
assertEquals(0L, response.requestId);
|
||||
|
||||
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
|
||||
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
|
||||
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
|
||||
expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable());
|
||||
|
||||
assertEqualTemperatures(expectedTemperatures, response.temperatures);
|
||||
}
|
||||
}
|
||||
80
akka-docs/src/test/java/jdocs/tutorial_5/DeviceManager.java
Normal file
80
akka-docs/src/test/java/jdocs/tutorial_5/DeviceManager.java
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DeviceManager extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(DeviceManager.class);
|
||||
}
|
||||
|
||||
public static final class RequestTrackDevice {
|
||||
public final String groupId;
|
||||
public final String deviceId;
|
||||
|
||||
public RequestTrackDevice(String groupId, String deviceId) {
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DeviceRegistered {
|
||||
}
|
||||
|
||||
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
|
||||
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("DeviceManager started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("DeviceManager stopped");
|
||||
}
|
||||
|
||||
private void onTrackDevice(RequestTrackDevice trackMsg) {
|
||||
String groupId = trackMsg.groupId;
|
||||
ActorRef ref = groupIdToActor.get(groupId);
|
||||
if (ref != null) {
|
||||
ref.forward(trackMsg, getContext());
|
||||
} else {
|
||||
log.info("Creating device group actor for {}", groupId);
|
||||
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
|
||||
getContext().watch(groupActor);
|
||||
groupActor.forward(trackMsg, getContext());
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
actorToGroupId.put(groupActor, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onTerminated(Terminated t) {
|
||||
ActorRef groupActor = t.getActor();
|
||||
String groupId = actorToGroupId.get(groupActor);
|
||||
log.info("Device group actor for {} has been terminated", groupId);
|
||||
actorToGroupId.remove(groupActor);
|
||||
groupIdToActor.remove(groupId);
|
||||
}
|
||||
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(RequestTrackDevice.class, this::onTrackDevice)
|
||||
.match(Terminated.class, this::onTerminated)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
89
akka-docs/src/test/java/jdocs/tutorial_5/DeviceTest.java
Normal file
89
akka-docs/src/test/java/jdocs/tutorial_5/DeviceTest.java
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
|
||||
public class DeviceTest extends JUnitSuite {
|
||||
|
||||
static ActorSystem system;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyToRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
|
||||
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
|
||||
assertEquals(deviceActor, probe.getLastSender());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreWrongRegistrationRequests() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
|
||||
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
|
||||
probe.expectNoMsg();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
|
||||
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(42L, response.requestId);
|
||||
assertEquals(Optional.empty(), response.value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplyWithLatestTemperatureReading() {
|
||||
TestKit probe = new TestKit(system);
|
||||
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
|
||||
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
|
||||
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(2L, response1.requestId);
|
||||
assertEquals(Optional.of(24.0), response1.value);
|
||||
|
||||
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
|
||||
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
|
||||
|
||||
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
|
||||
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
|
||||
assertEquals(4L, response2.requestId);
|
||||
assertEquals(Optional.of(55.0), response2.value);
|
||||
}
|
||||
|
||||
}
|
||||
29
akka-docs/src/test/java/jdocs/tutorial_5/IotMain.java
Normal file
29
akka-docs/src/test/java/jdocs/tutorial_5/IotMain.java
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class IotMain {
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
ActorSystem system = ActorSystem.create("iot-system");
|
||||
|
||||
try {
|
||||
// Create top level supervisor
|
||||
ActorRef supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor");
|
||||
|
||||
supervisor.tell(new DeviceManager.RequestTrackDevice("mygroup", "device1"), ActorRef.noSender());
|
||||
|
||||
System.out.println("Press ENTER to exit the system");
|
||||
System.in.read();
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
38
akka-docs/src/test/java/jdocs/tutorial_5/IotSupervisor.java
Normal file
38
akka-docs/src/test/java/jdocs/tutorial_5/IotSupervisor.java
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.tutorial_5;
|
||||
|
||||
//#iot-supervisor
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class IotSupervisor extends AbstractActor {
|
||||
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
|
||||
|
||||
public static Props props() {
|
||||
return Props.create(IotSupervisor.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
log.info("IoT Application started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
log.info("IoT Application stopped");
|
||||
}
|
||||
|
||||
// No need to handle any messages
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
//#iot-supervisor
|
||||
Loading…
Add table
Add a link
Reference in a new issue