* ActorContext as constructor parameter in AbstractBehavior, #27689 * additional test * additional doc clarification * another rebase
This commit is contained in:
parent
0719de035b
commit
91db18b564
66 changed files with 442 additions and 323 deletions
|
|
@ -13,7 +13,9 @@ public class BlockingActor extends AbstractBehavior<Integer> {
|
|||
return Behaviors.setup(BlockingActor::new);
|
||||
}
|
||||
|
||||
private BlockingActor(ActorContext<Integer> context) {}
|
||||
private BlockingActor(ActorContext<Integer> context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive<Integer> createReceive() {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ public class BlockingDispatcherTest {
|
|||
Behaviors.setup(
|
||||
context -> {
|
||||
ActorRef<Integer> actor1 = context.spawn(BlockingActor.create(), "BlockingActor");
|
||||
ActorRef<Integer> actor2 = context.spawn(new PrintActor(), "PrintActor");
|
||||
ActorRef<Integer> actor2 = context.spawn(PrintActor.create(), "PrintActor");
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
actor1.tell(i);
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ public class DispatcherDocTest {
|
|||
// #defining-dispatcher-in-code
|
||||
ActorRef<Integer> myActor =
|
||||
context.spawn(
|
||||
new PrintActor(), "PrintActor", DispatcherSelector.fromConfig("my-dispatcher"));
|
||||
PrintActor.create(), "PrintActor", DispatcherSelector.fromConfig("my-dispatcher"));
|
||||
// #defining-dispatcher-in-code
|
||||
}
|
||||
|
||||
|
|
@ -29,7 +29,7 @@ public class DispatcherDocTest {
|
|||
// #defining-fixed-pool-size-dispatcher
|
||||
ActorRef<Integer> myActor =
|
||||
context.spawn(
|
||||
new PrintActor(),
|
||||
PrintActor.create(),
|
||||
"PrintActor",
|
||||
DispatcherSelector.fromConfig("blocking-io-dispatcher"));
|
||||
// #defining-fixed-pool-size-dispatcher
|
||||
|
|
@ -39,7 +39,9 @@ public class DispatcherDocTest {
|
|||
// #defining-pinned-dispatcher
|
||||
ActorRef<Integer> myActor =
|
||||
context.spawn(
|
||||
new PrintActor(), "PrintActor", DispatcherSelector.fromConfig("my-pinned-dispatcher"));
|
||||
PrintActor.create(),
|
||||
"PrintActor",
|
||||
DispatcherSelector.fromConfig("my-pinned-dispatcher"));
|
||||
// #defining-pinned-dispatcher
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,11 @@ import akka.actor.typed.javadsl.*;
|
|||
class PrintActor extends AbstractBehavior<Integer> {
|
||||
|
||||
public static Behavior<Integer> create() {
|
||||
return Behaviors.setup(context -> new PrintActor());
|
||||
return Behaviors.setup(PrintActor::new);
|
||||
}
|
||||
|
||||
private PrintActor(ActorContext<Integer> context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ class SeparateDispatcherFutureActor extends AbstractBehavior<Integer> {
|
|||
}
|
||||
|
||||
private SeparateDispatcherFutureActor(ActorContext<Integer> context) {
|
||||
super(context);
|
||||
ec =
|
||||
context
|
||||
.getSystem()
|
||||
|
|
|
|||
|
|
@ -52,13 +52,11 @@ interface SharedMutableStateDocTest {
|
|||
}
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
|
||||
private String state = "";
|
||||
private Set<String> mySet = new HashSet<>();
|
||||
|
||||
public MyActor(ActorContext<Command> context) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -96,12 +94,13 @@ interface SharedMutableStateDocTest {
|
|||
// Turn the future result into a message that is sent to
|
||||
// self when future completes
|
||||
CompletableFuture<String> futureResult = expensiveCalculation();
|
||||
context.pipeToSelf(
|
||||
futureResult,
|
||||
(result, failure) -> {
|
||||
if (result != null) return new UpdateState(result);
|
||||
else throw new RuntimeException(failure);
|
||||
});
|
||||
getContext()
|
||||
.pipeToSelf(
|
||||
futureResult,
|
||||
(result, failure) -> {
|
||||
if (result != null) return new UpdateState(result);
|
||||
else throw new RuntimeException(failure);
|
||||
});
|
||||
|
||||
// Another example of incorrect approach
|
||||
// mutating actor state from ask future callback
|
||||
|
|
@ -110,7 +109,7 @@ interface SharedMutableStateDocTest {
|
|||
message.otherActor,
|
||||
Query::new,
|
||||
Duration.ofSeconds(3),
|
||||
context.getSystem().scheduler());
|
||||
getContext().getSystem().scheduler());
|
||||
response.whenComplete(
|
||||
(result, failure) -> {
|
||||
if (result != null) state = "new state: " + result;
|
||||
|
|
@ -118,15 +117,16 @@ interface SharedMutableStateDocTest {
|
|||
|
||||
// use context.ask instead, turns the completion
|
||||
// into a message sent to self
|
||||
context.ask(
|
||||
String.class,
|
||||
message.otherActor,
|
||||
Duration.ofSeconds(3),
|
||||
Query::new,
|
||||
(result, failure) -> {
|
||||
if (result != null) return new UpdateState(result);
|
||||
else throw new RuntimeException(failure);
|
||||
});
|
||||
getContext()
|
||||
.ask(
|
||||
String.class,
|
||||
message.otherActor,
|
||||
Duration.ofSeconds(3),
|
||||
Query::new,
|
||||
(result, failure) -> {
|
||||
if (result != null) return new UpdateState(result);
|
||||
else throw new RuntimeException(failure);
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.actor.typed.ActorRef;
|
|||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.AbstractBehavior;
|
||||
import akka.actor.typed.javadsl.ActorContext;
|
||||
import akka.actor.typed.javadsl.Adapter;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.typed.javadsl.Receive;
|
||||
|
|
@ -79,14 +80,15 @@ public interface ResumableProjectionExample {
|
|||
}
|
||||
|
||||
public static Behavior<Command> create(String id, ExampleStore store) {
|
||||
return Behaviors.setup(context -> new TheOneWhoWritesToQueryJournal(store));
|
||||
return Behaviors.setup(context -> new TheOneWhoWritesToQueryJournal(context, store));
|
||||
}
|
||||
|
||||
private final ExampleStore store;
|
||||
|
||||
private ComplexState state = new ComplexState();
|
||||
|
||||
private TheOneWhoWritesToQueryJournal(ExampleStore store) {
|
||||
private TheOneWhoWritesToQueryJournal(ActorContext<Command> context, ExampleStore store) {
|
||||
super(context);
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -28,10 +28,8 @@ class PrintMyActorRefActor extends AbstractBehavior<String> {
|
|||
return Behaviors.setup(PrintMyActorRefActor::new);
|
||||
}
|
||||
|
||||
private final ActorContext<String> context;
|
||||
|
||||
private PrintMyActorRefActor(ActorContext<String> context) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -40,7 +38,7 @@ class PrintMyActorRefActor extends AbstractBehavior<String> {
|
|||
}
|
||||
|
||||
private Behavior<String> printIt() {
|
||||
ActorRef<String> secondRef = context.spawn(Behaviors.empty(), "second-actor");
|
||||
ActorRef<String> secondRef = getContext().spawn(Behaviors.empty(), "second-actor");
|
||||
System.out.println("Second: " + secondRef);
|
||||
return this;
|
||||
}
|
||||
|
|
@ -51,10 +49,11 @@ class PrintMyActorRefActor extends AbstractBehavior<String> {
|
|||
class StartStopActor1 extends AbstractBehavior<String> {
|
||||
|
||||
static Behavior<String> create() {
|
||||
return Behaviors.setup(context -> new StartStopActor1());
|
||||
return Behaviors.setup(StartStopActor1::new);
|
||||
}
|
||||
|
||||
private StartStopActor1() {
|
||||
private StartStopActor1(ActorContext<String> context) {
|
||||
super(context);
|
||||
System.out.println("first started");
|
||||
}
|
||||
|
||||
|
|
@ -75,10 +74,11 @@ class StartStopActor1 extends AbstractBehavior<String> {
|
|||
class StartStopActor2 extends AbstractBehavior<String> {
|
||||
|
||||
static Behavior<String> create() {
|
||||
return Behaviors.setup(context -> new StartStopActor2());
|
||||
return Behaviors.setup(StartStopActor2::new);
|
||||
}
|
||||
|
||||
private StartStopActor2() {
|
||||
private StartStopActor2(ActorContext<String> context) {
|
||||
super(context);
|
||||
System.out.println("second started");
|
||||
}
|
||||
|
||||
|
|
@ -104,6 +104,7 @@ class SupervisingActor extends AbstractBehavior<String> {
|
|||
private final ActorRef<String> child;
|
||||
|
||||
private SupervisingActor(ActorContext<String> context) {
|
||||
super(context);
|
||||
child =
|
||||
context.spawn(
|
||||
Behaviors.supervise(SupervisedActor.create()).onFailure(SupervisorStrategy.restart()),
|
||||
|
|
@ -124,10 +125,11 @@ class SupervisingActor extends AbstractBehavior<String> {
|
|||
class SupervisedActor extends AbstractBehavior<String> {
|
||||
|
||||
static Behavior<String> create() {
|
||||
return Behaviors.setup(context -> new SupervisedActor());
|
||||
return Behaviors.setup(SupervisedActor::new);
|
||||
}
|
||||
|
||||
private SupervisedActor() {
|
||||
private SupervisedActor(ActorContext<String> context) {
|
||||
super(context);
|
||||
System.out.println("supervised actor started");
|
||||
}
|
||||
|
||||
|
|
@ -165,10 +167,8 @@ class Main extends AbstractBehavior<String> {
|
|||
return Behaviors.setup(Main::new);
|
||||
}
|
||||
|
||||
private final ActorContext<String> context;
|
||||
|
||||
private Main(ActorContext<String> context) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -177,7 +177,7 @@ class Main extends AbstractBehavior<String> {
|
|||
}
|
||||
|
||||
private Behavior<String> start() {
|
||||
ActorRef<String> firstRef = context.spawn(PrintMyActorRefActor.create(), "first-actor");
|
||||
ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor");
|
||||
|
||||
System.out.println("First: " + firstRef);
|
||||
firstRef.tell("printit");
|
||||
|
|
|
|||
|
|
@ -18,10 +18,8 @@ public class IotSupervisor extends AbstractBehavior<Void> {
|
|||
return Behaviors.setup(IotSupervisor::new);
|
||||
}
|
||||
|
||||
private final ActorContext<Void> context;
|
||||
|
||||
public IotSupervisor(ActorContext<Void> context) {
|
||||
this.context = context;
|
||||
private IotSupervisor(ActorContext<Void> context) {
|
||||
super(context);
|
||||
context.getLog().info("IoT Application started");
|
||||
}
|
||||
|
||||
|
|
@ -32,7 +30,7 @@ public class IotSupervisor extends AbstractBehavior<Void> {
|
|||
}
|
||||
|
||||
private IotSupervisor onPostStop() {
|
||||
context.getLog().info("IoT Application stopped");
|
||||
getContext().getLog().info("IoT Application stopped");
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,14 +66,13 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final String groupId;
|
||||
private final String deviceId;
|
||||
|
||||
private Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
private Device(ActorContext<Command> context, String groupId, String deviceId) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
|
||||
|
|
@ -90,7 +89,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Behavior<Command> onRecordTemperature(RecordTemperature r) {
|
||||
context.getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
getContext().getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
r.replyTo.tell(new TemperatureRecorded(r.requestId));
|
||||
return this;
|
||||
|
|
@ -102,7 +101,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Behavior<Command> onPostStop() {
|
||||
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
getContext().getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
return Behaviors.stopped();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,14 +45,13 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final String groupId;
|
||||
private final String deviceId;
|
||||
|
||||
private Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
private Device(ActorContext<Command> context, String groupId, String deviceId) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
|
||||
|
|
@ -73,7 +72,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Device onPostStop() {
|
||||
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
getContext().getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
return this;
|
||||
}
|
||||
// #read-protocol-2
|
||||
|
|
|
|||
|
|
@ -70,14 +70,13 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final String groupId;
|
||||
private final String deviceId;
|
||||
|
||||
private Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
private Device(ActorContext<Command> context, String groupId, String deviceId) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
|
||||
|
|
@ -95,7 +94,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Behavior<Command> onRecordTemperature(RecordTemperature r) {
|
||||
context.getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
getContext().getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
r.replyTo.tell(new TemperatureRecorded(r.requestId));
|
||||
return this;
|
||||
|
|
@ -107,7 +106,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Behavior<Command> onPostStop() {
|
||||
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
getContext().getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
return Behaviors.stopped();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,12 +40,11 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
return Behaviors.setup(context -> new DeviceGroup(context, groupId));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final String groupId;
|
||||
private final Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
|
||||
|
||||
private DeviceGroup(ActorContext<Command> context, String groupId) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
this.groupId = groupId;
|
||||
context.getLog().info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
|
@ -56,18 +55,19 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
if (deviceActor != null) {
|
||||
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
|
||||
} else {
|
||||
context.getLog().info("Creating device actor for {}", trackMsg.deviceId);
|
||||
getContext().getLog().info("Creating device actor for {}", trackMsg.deviceId);
|
||||
deviceActor =
|
||||
context.spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
getContext()
|
||||
.spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
// #device-group-register
|
||||
context.watchWith(
|
||||
deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
|
||||
getContext()
|
||||
.watchWith(deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
|
||||
// #device-group-register
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
|
||||
}
|
||||
} else {
|
||||
context
|
||||
getContext()
|
||||
.getLog()
|
||||
.warn(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
|
|
@ -87,7 +87,7 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
// #device-group-remove
|
||||
|
||||
private DeviceGroup onTerminated(DeviceTerminated t) {
|
||||
context.getLog().info("Device actor for {} has been terminated", t.deviceId);
|
||||
getContext().getLog().info("Device actor for {} has been terminated", t.deviceId);
|
||||
deviceIdToActor.remove(t.deviceId);
|
||||
return this;
|
||||
}
|
||||
|
|
@ -111,7 +111,7 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
}
|
||||
|
||||
private DeviceGroup onPostStop() {
|
||||
context.getLog().info("DeviceGroup {} stopped", groupId);
|
||||
getContext().getLog().info("DeviceGroup {} stopped", groupId);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,11 +82,10 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
return Behaviors.setup(DeviceManager::new);
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final Map<String, ActorRef<DeviceGroup.Command>> groupIdToActor = new HashMap<>();
|
||||
|
||||
private DeviceManager(ActorContext<Command> context) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
context.getLog().info("DeviceManager started");
|
||||
}
|
||||
|
||||
|
|
@ -96,10 +95,10 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
if (ref != null) {
|
||||
ref.tell(trackMsg);
|
||||
} else {
|
||||
context.getLog().info("Creating device group actor for {}", groupId);
|
||||
getContext().getLog().info("Creating device group actor for {}", groupId);
|
||||
ActorRef<DeviceGroup.Command> groupActor =
|
||||
context.spawn(DeviceGroup.create(groupId), "group-" + groupId);
|
||||
context.watchWith(groupActor, new DeviceGroupTerminated(groupId));
|
||||
getContext().spawn(DeviceGroup.create(groupId), "group-" + groupId);
|
||||
getContext().watchWith(groupActor, new DeviceGroupTerminated(groupId));
|
||||
groupActor.tell(trackMsg);
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
}
|
||||
|
|
@ -117,7 +116,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
}
|
||||
|
||||
private DeviceManager onTerminated(DeviceGroupTerminated t) {
|
||||
context.getLog().info("Device group actor for {} has been terminated", t.groupId);
|
||||
getContext().getLog().info("Device group actor for {} has been terminated", t.groupId);
|
||||
groupIdToActor.remove(t.groupId);
|
||||
return this;
|
||||
}
|
||||
|
|
@ -132,7 +131,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
}
|
||||
|
||||
private DeviceManager onPostStop() {
|
||||
context.getLog().info("DeviceManager stopped");
|
||||
getContext().getLog().info("DeviceManager stopped");
|
||||
return this;
|
||||
}
|
||||
// #device-registration-msgs
|
||||
|
|
|
|||
|
|
@ -68,14 +68,13 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
return Behaviors.setup(context -> new Device(context, groupId, deviceId));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final String groupId;
|
||||
private final String deviceId;
|
||||
|
||||
private Optional<Double> lastTemperatureReading = Optional.empty();
|
||||
|
||||
private Device(ActorContext<Command> context, String groupId, String deviceId) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
this.groupId = groupId;
|
||||
this.deviceId = deviceId;
|
||||
|
||||
|
|
@ -93,7 +92,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Behavior<Command> onRecordTemperature(RecordTemperature r) {
|
||||
context.getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
getContext().getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
|
||||
lastTemperatureReading = Optional.of(r.value);
|
||||
r.replyTo.tell(new TemperatureRecorded(r.requestId));
|
||||
return this;
|
||||
|
|
@ -105,7 +104,7 @@ public class Device extends AbstractBehavior<Device.Command> {
|
|||
}
|
||||
|
||||
private Behavior<Command> onPostStop() {
|
||||
context.getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
getContext().getLog().info("Device actor {}-{} stopped", groupId, deviceId);
|
||||
return Behaviors.stopped();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,12 +37,11 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
return Behaviors.setup(context -> new DeviceGroup(context, groupId));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final String groupId;
|
||||
private final Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
|
||||
|
||||
private DeviceGroup(ActorContext<Command> context, String groupId) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
this.groupId = groupId;
|
||||
context.getLog().info("DeviceGroup {} started", groupId);
|
||||
}
|
||||
|
|
@ -54,16 +53,17 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
if (deviceActor != null) {
|
||||
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
|
||||
} else {
|
||||
context.getLog().info("Creating device actor for {}", trackMsg.deviceId);
|
||||
getContext().getLog().info("Creating device actor for {}", trackMsg.deviceId);
|
||||
deviceActor =
|
||||
context.spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
context.watchWith(
|
||||
deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
|
||||
getContext()
|
||||
.spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
|
||||
getContext()
|
||||
.watchWith(deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
|
||||
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
|
||||
trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
|
||||
}
|
||||
} else {
|
||||
context
|
||||
getContext()
|
||||
.getLog()
|
||||
.warn(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
|
|
@ -79,13 +79,13 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
}
|
||||
|
||||
private DeviceGroup onTerminated(DeviceTerminated t) {
|
||||
context.getLog().info("Device actor for {} has been terminated", t.deviceId);
|
||||
getContext().getLog().info("Device actor for {} has been terminated", t.deviceId);
|
||||
deviceIdToActor.remove(t.deviceId);
|
||||
return this;
|
||||
}
|
||||
|
||||
private DeviceGroup onPostStop() {
|
||||
context.getLog().info("DeviceGroup {} stopped", groupId);
|
||||
getContext().getLog().info("DeviceGroup {} stopped", groupId);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -101,9 +101,10 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
|
|||
// applications!
|
||||
Map<String, ActorRef<Device.Command>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor);
|
||||
|
||||
context.spawnAnonymous(
|
||||
DeviceGroupQuery.create(
|
||||
deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3)));
|
||||
getContext()
|
||||
.spawnAnonymous(
|
||||
DeviceGroupQuery.create(
|
||||
deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3)));
|
||||
|
||||
return this;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQuery.Command>
|
|||
Duration timeout,
|
||||
ActorContext<Command> context,
|
||||
TimerScheduler<Command> timers) {
|
||||
super(context);
|
||||
this.requestId = requestId;
|
||||
this.requester = requester;
|
||||
|
||||
|
|
|
|||
|
|
@ -144,7 +144,6 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
}
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
private final Map<String, ActorRef<DeviceGroup.Command>> groupIdToActor = new HashMap<>();
|
||||
|
||||
public static Behavior<Command> create() {
|
||||
|
|
@ -152,7 +151,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
}
|
||||
|
||||
private DeviceManager(ActorContext<Command> context) {
|
||||
this.context = context;
|
||||
super(context);
|
||||
context.getLog().info("DeviceManager started");
|
||||
}
|
||||
|
||||
|
|
@ -162,10 +161,10 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
if (ref != null) {
|
||||
ref.tell(trackMsg);
|
||||
} else {
|
||||
context.getLog().info("Creating device group actor for {}", groupId);
|
||||
getContext().getLog().info("Creating device group actor for {}", groupId);
|
||||
ActorRef<DeviceGroup.Command> groupActor =
|
||||
context.spawn(DeviceGroup.create(groupId), "group-" + groupId);
|
||||
context.watchWith(groupActor, new DeviceGroupTerminated(groupId));
|
||||
getContext().spawn(DeviceGroup.create(groupId), "group-" + groupId);
|
||||
getContext().watchWith(groupActor, new DeviceGroupTerminated(groupId));
|
||||
groupActor.tell(trackMsg);
|
||||
groupIdToActor.put(groupId, groupActor);
|
||||
}
|
||||
|
|
@ -193,7 +192,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
}
|
||||
|
||||
private DeviceManager onTerminated(DeviceGroupTerminated t) {
|
||||
context.getLog().info("Device group actor for {} has been terminated", t.groupId);
|
||||
getContext().getLog().info("Device group actor for {} has been terminated", t.groupId);
|
||||
groupIdToActor.remove(t.groupId);
|
||||
return this;
|
||||
}
|
||||
|
|
@ -209,7 +208,7 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
|
|||
}
|
||||
|
||||
private DeviceManager onPostStop() {
|
||||
context.getLog().info("DeviceManager stopped");
|
||||
getContext().getLog().info("DeviceManager stopped");
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue