/* * Copyright (C) 2009-2018 Lightbend Inc. */ package jdocs.tutorial_5; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import scala.concurrent.duration.FiniteDuration; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Props; import akka.actor.Terminated; import akka.event.Logging; import akka.event.LoggingAdapter; //#query-full //#query-outline public class DeviceGroupQuery extends AbstractActor { public static final class CollectionTimeout { } private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final Map actorToDeviceId; final long requestId; final ActorRef requester; Cancellable queryTimeoutTimer; public DeviceGroupQuery(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { this.actorToDeviceId = actorToDeviceId; this.requestId = requestId; this.requester = requester; queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() ); } public static Props props(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { return Props.create(DeviceGroupQuery.class, () -> new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)); } @Override public void preStart() { for (ActorRef deviceActor : actorToDeviceId.keySet()) { getContext().watch(deviceActor); deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); } } @Override public void postStop() { queryTimeoutTimer.cancel(); } //#query-outline //#query-state @Override public Receive createReceive() { return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); } public Receive waitingForReplies( Map repliesSoFar, Set stillWaiting) { return receiveBuilder() .match(Device.RespondTemperature.class, r -> { ActorRef deviceActor = getSender(); DeviceGroup.TemperatureReading reading = r.value .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) .orElse(DeviceGroup.TemperatureNotAvailable.INSTANCE); receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); }) .match(Terminated.class, t -> { receivedResponse(t.getActor(), DeviceGroup.DeviceNotAvailable.INSTANCE, stillWaiting, repliesSoFar); }) .match(CollectionTimeout.class, t -> { Map replies = new HashMap<>(repliesSoFar); for (ActorRef deviceActor : stillWaiting) { String deviceId = actorToDeviceId.get(deviceActor); replies.put(deviceId, DeviceGroup.DeviceTimedOut.INSTANCE); } requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); getContext().stop(getSelf()); }) .build(); } //#query-state //#query-collect-reply public void receivedResponse(ActorRef deviceActor, DeviceGroup.TemperatureReading reading, Set stillWaiting, Map repliesSoFar) { getContext().unwatch(deviceActor); String deviceId = actorToDeviceId.get(deviceActor); Set newStillWaiting = new HashSet<>(stillWaiting); newStillWaiting.remove(deviceActor); Map newRepliesSoFar = new HashMap<>(repliesSoFar); newRepliesSoFar.put(deviceId, reading); if (newStillWaiting.isEmpty()) { requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); getContext().stop(getSelf()); } else { getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); } } //#query-collect-reply //#query-outline } //#query-outline //#query-full