diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala index 403b3638fe..91cb15f1b6 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala @@ -5,8 +5,10 @@ package akka.actor.typed.scaladsl import akka.Done -import akka.actor.typed.{ PostStop, TypedAkkaSpecWithShutdown } -import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.TE +import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } +import akka.actor.typed.{ Behavior, PostStop, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } +import akka.testkit.EventFilter import scala.concurrent.Promise @@ -58,4 +60,19 @@ class StopSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { } + "PostStop" should { + "immediately throw when a deferred behavior (setup) is passed in as postStop" in { + val ex = intercept[IllegalArgumentException] { + Behaviors.stopped( + // illegal: + Behaviors.setup[String] { _ ⇒ + throw TE("boom!") + } + ) + } + + ex.getMessage should include("Behavior used as `postStop` behavior in Stopped(...) was a deferred one ") + } + } + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 5a47ed902c..0f22ba147c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -233,7 +233,23 @@ object Behavior { * that PostStop can be sent to previous behavior from `finishTerminate`. */ private[akka] class StoppedBehavior[T](val postStop: OptionVal[Behavior[T]]) extends Behavior[T] { - override def toString = "Stopped" + validatePostStop(postStop) + + @throws[IllegalArgumentException] + private final def validatePostStop(postStop: OptionVal[Behavior[T]]): Unit = { + postStop match { + case OptionVal.Some(b: DeferredBehavior[_]) ⇒ + throw new IllegalArgumentException(s"Behavior used as `postStop` behavior in Stopped(...) was a deferred one [${b.toString}], which is not supported (it would never be evaluated).") + case _ ⇒ // all good + } + } + + override def toString = "Stopped" + { + postStop match { + case OptionVal.Some(_) ⇒ "(postStop)" + case _ ⇒ "()" + } + } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index d13e8338dc..7b9000ae2c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -195,7 +195,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc /** * Configurator for creating [[akka.dispatch.Dispatcher]]. - * Returns the same dispatcher instance for for each invocation + * Returns the same dispatcher instance for each invocation * of the `dispatcher()` method. */ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) @@ -228,7 +228,7 @@ private[akka] object BalancingDispatcherConfigurator { /** * Configurator for creating [[akka.dispatch.BalancingDispatcher]]. - * Returns the same dispatcher instance for for each invocation + * Returns the same dispatcher instance for each invocation * of the `dispatcher()` method. */ class BalancingDispatcherConfigurator(_config: Config, _prerequisites: DispatcherPrerequisites) @@ -278,7 +278,7 @@ class BalancingDispatcherConfigurator(_config: Config, _prerequisites: Dispatche /** * Configurator for creating [[akka.dispatch.PinnedDispatcher]]. - * Returns new dispatcher instance for for each invocation + * Returns new dispatcher instance for each invocation * of the `dispatcher()` method. */ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index fe8abcab5b..aa33e1b04a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -683,12 +683,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh updateLatestGossip(newGossip) - logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", ")) if (joiningNode == selfUniqueAddress) { + logInfo("Node [{}] is JOINING itself (with roles [{}]) and forming new cluster", joiningNode.address, roles.mkString(", ")) if (localMembers.isEmpty) leaderActions() // important for deterministic oldest when bootstrapping - } else + } else { + logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", ")) sender() ! Welcome(selfUniqueAddress, latestGossip) + } publishMembershipState() } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/map.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/map.md index 6d5d0fa891..38d7915f8c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/map.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/map.md @@ -27,4 +27,11 @@ Transform each element in the stream by calling a mapping function with it and p @@@ +## Examples + + +Scala +: @@snip [Flow.scala]($akka$/akka-docs/src/test/scala/docs/stream/operators/Map.scala) { #imports #map } + + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/combine.md b/akka-docs/src/main/paradox/stream/operators/Source/combine.md index f4045abfe2..55bf156864 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/combine.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/combine.md @@ -23,3 +23,11 @@ Combine several sources, using a given strategy such as merge or concat, into on @@@ +## Examples + + +Scala +: @@snip [combine.scala]($akka$/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #combine } + + + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/from.md b/akka-docs/src/main/paradox/stream/operators/Source/from.md index cfc6858d2f..6f56423ca8 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/from.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/from.md @@ -4,10 +4,19 @@ Stream the values of an `Iterable`. @ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala]($akka$/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala) { #from } + +@@@ + ## Description Stream the values of an `Iterable`. Make sure the `Iterable` is immutable or at least not modified after being used -as a source. +as a source. Otherwise the stream may fail with `ConcurrentModificationException` or other more subtle errors may occur. @@@div { .callout } @@ -17,3 +26,8 @@ as a source. @@@ + +## Examples + +Java +: @@snip [from.java]($akka$/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #imports #source-from-example } diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md b/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md index 4588c78d53..fee6c52cb1 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md @@ -26,3 +26,7 @@ If the future fails the stream is failed with that exception. @@@ +## Example +Scala +: @@snip [SourceFromFuture.scala]($akka$/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture } + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/single.md b/akka-docs/src/main/paradox/stream/operators/Source/single.md index 92dfdad3bc..8541e26b6d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/single.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/single.md @@ -25,3 +25,12 @@ Stream a single object @@@ +## Examples + +Scala +: @@snip [source.scala]($akka$/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #source-single } + +Java +: @@snip [source.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #imports #source-single } + + diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java new file mode 100644 index 0000000000..e13bc9dfa3 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.stream.operators; + +//#imports +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Source; + +import java.util.Arrays; + +//#imports + +public class SourceDocExamples { + + public static void fromExample() { + //#source-from-example + final ActorSystem system = ActorSystem.create("SourceFromExample"); + final Materializer materializer = ActorMaterializer.create(system); + + Source ints = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5)); + ints.runForeach(System.out::println, materializer); + + String text = "Perfection is finally attained not when there is no longer more to add," + + "but when there is no longer anything to take away."; + Source words = Source.from(Arrays.asList(text.split("\\s"))); + words.runForeach(System.out::println, materializer); + //#source-from-example + } + +} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/Device.java b/akka-docs/src/test/java/jdocs/tutorial_6/Device.java deleted file mode 100644 index 9439d19c80..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/Device.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import akka.actor.AbstractActor; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import jdocs.tutorial_6.DeviceManager.DeviceRegistered; -import jdocs.tutorial_6.DeviceManager.RequestTrackDevice; - -import java.util.Optional; - -public class Device extends AbstractActor { - private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); - - final String groupId; - - final String deviceId; - - public Device(String groupId, String deviceId) { - this.groupId = groupId; - this.deviceId = deviceId; - } - - public static Props props(String groupId, String deviceId) { - return Props.create(Device.class, groupId, deviceId); - } - - public static final class RecordTemperature { - final long requestId; - final double value; - - public RecordTemperature(long requestId, double value) { - this.requestId = requestId; - this.value = value; - } - } - - public static final class TemperatureRecorded { - final long requestId; - - public TemperatureRecorded(long requestId) { - this.requestId = requestId; - } - } - - public static final class ReadTemperature { - final long requestId; - - public ReadTemperature(long requestId) { - this.requestId = requestId; - } - } - - public static final class RespondTemperature { - final long requestId; - final Optional value; - - public RespondTemperature(long requestId, Optional value) { - this.requestId = requestId; - this.value = value; - } - } - - Optional lastTemperatureReading = Optional.empty(); - - @Override - public void preStart() { - log.info("Device actor {}-{} started", groupId, deviceId); - } - - @Override - public void postStop() { - log.info("Device actor {}-{} stopped", groupId, deviceId); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match(RequestTrackDevice.class, r -> { - if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { - getSender().tell(new DeviceRegistered(), getSelf()); - } else { - log.warning( - "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", - r.groupId, r.deviceId, this.groupId, this.deviceId - ); - } - }) - .match(RecordTemperature.class, r -> { - log.info("Recorded temperature reading {} with {}", r.value, r.requestId); - lastTemperatureReading = Optional.of(r.value); - getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); - }) - .match(ReadTemperature.class, r -> { - getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); - }) - .build(); - } -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroup.java b/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroup.java deleted file mode 100644 index 7af6659bae..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroup.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.actor.Terminated; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import scala.concurrent.duration.FiniteDuration; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -public class DeviceGroup extends AbstractActor { - private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); - - final String groupId; - - public DeviceGroup(String groupId) { - this.groupId = groupId; - } - - public static Props props(String groupId) { - return Props.create(DeviceGroup.class, groupId); - } - - public static final class RequestDeviceList { - final long requestId; - - public RequestDeviceList(long requestId) { - this.requestId = requestId; - } - } - - public static final class ReplyDeviceList { - final long requestId; - final Set ids; - - public ReplyDeviceList(long requestId, Set ids) { - this.requestId = requestId; - this.ids = ids; - } - } - - public static final class RequestAllTemperatures { - final long requestId; - - public RequestAllTemperatures(long requestId) { - this.requestId = requestId; - } - } - - public static final class RespondAllTemperatures { - final long requestId; - final Map temperatures; - - public RespondAllTemperatures(long requestId, Map temperatures) { - this.requestId = requestId; - this.temperatures = temperatures; - } - } - - public static interface TemperatureReading { - } - - public static final class Temperature implements TemperatureReading { - public final double value; - - public Temperature(double value) { - this.value = value; - } - } - - public static final class TemperatureNotAvailable implements TemperatureReading { - } - - public static final class DeviceNotAvailable implements TemperatureReading { - } - - public static final class DeviceTimedOut implements TemperatureReading { - } - - final Map deviceIdToActor = new HashMap<>(); - final Map actorToDeviceId = new HashMap<>(); - final long nextCollectionId = 0L; - - @Override - public void preStart() { - log.info("DeviceGroup {} started", groupId); - } - - @Override - public void postStop() { - log.info("DeviceGroup {} stopped", groupId); - } - - private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { - if (this.groupId.equals(trackMsg.groupId)) { - ActorRef ref = deviceIdToActor.get(trackMsg.deviceId); - if (ref != null) { - ref.forward(trackMsg, getContext()); - } else { - log.info("Creating device actor for {}", trackMsg.deviceId); - ActorRef deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); - getContext().watch(deviceActor); - deviceActor.forward(trackMsg, getContext()); - actorToDeviceId.put(deviceActor, trackMsg.deviceId); - deviceIdToActor.put(trackMsg.deviceId, deviceActor); - } - } else { - log.warning( - "Ignoring TrackDevice request for {}. This actor is responsible for {}.", - groupId, this.groupId - ); - } - } - - private void onDeviceList(RequestDeviceList r) { - getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); - } - - private void onTerminated(Terminated t) { - ActorRef deviceActor = t.getActor(); - String deviceId = actorToDeviceId.get(deviceActor); - log.info("Device actor for {} has been terminated", deviceId); - actorToDeviceId.remove(deviceActor); - deviceIdToActor.remove(deviceId); - } - - private void onAllTemperatures(RequestAllTemperatures r) { - // since Java collections are mutable, we want to avoid sharing them between actors (since multiple Actors (threads) - // modifying the same mutable data-structure is not safe), and perform a defensive copy of the mutable map: - // - // Feel free to use your favourite immutable data-structures library with Akka in Java applications! - Map actorToDeviceIdCopy = new HashMap<>(this.actorToDeviceId); - - getContext().actorOf(DeviceGroupQuery.props( - actorToDeviceIdCopy, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS))); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) - .match(RequestDeviceList.class, this::onDeviceList) - .match(Terminated.class, this::onTerminated) - .match(RequestAllTemperatures.class, this::onAllTemperatures) - .build(); - } -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupQuery.java b/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupQuery.java deleted file mode 100644 index 17993bdafc..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupQuery.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import akka.actor.*; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import scala.concurrent.duration.FiniteDuration; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class DeviceGroupQuery extends AbstractActor { - public static final class CollectionTimeout { - } - - private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); - - final Map actorToDeviceId; - final long requestId; - final ActorRef requester; - - Cancellable queryTimeoutTimer; - - public DeviceGroupQuery(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { - this.actorToDeviceId = actorToDeviceId; - this.requestId = requestId; - this.requester = requester; - - queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( - timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() - ); - } - - public static Props props(Map actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { - return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout); - } - - @Override - public void preStart() { - for (ActorRef deviceActor : actorToDeviceId.keySet()) { - getContext().watch(deviceActor); - deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); - } - } - - @Override - public void postStop() { - queryTimeoutTimer.cancel(); - } - - @Override - public Receive createReceive() { - return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); - } - - public Receive waitingForReplies( - Map repliesSoFar, - Set stillWaiting) { - return receiveBuilder() - .match(Device.RespondTemperature.class, r -> { - ActorRef deviceActor = getSender(); - DeviceGroup.TemperatureReading reading = r.value - .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) - .orElse(new DeviceGroup.TemperatureNotAvailable()); - receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); - }) - .match(Terminated.class, t -> { - if (stillWaiting.contains(t.getActor())) { - receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar); - } - // else ignore - }) - .match(CollectionTimeout.class, t -> { - Map replies = new HashMap<>(repliesSoFar); - for (ActorRef deviceActor : stillWaiting) { - String deviceId = actorToDeviceId.get(deviceActor); - replies.put(deviceId, new DeviceGroup.DeviceTimedOut()); - } - requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); - getContext().stop(getSelf()); - }) - .build(); - } - - public void receivedResponse(ActorRef deviceActor, - DeviceGroup.TemperatureReading reading, - Set stillWaiting, - Map repliesSoFar) { - getContext().unwatch(deviceActor); - String deviceId = actorToDeviceId.get(deviceActor); - - Set newStillWaiting = new HashSet<>(stillWaiting); - newStillWaiting.remove(deviceActor); - - Map newRepliesSoFar = new HashMap<>(repliesSoFar); - newRepliesSoFar.put(deviceId, reading); - if (newStillWaiting.isEmpty()) { - requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); - getContext().stop(getSelf()); - } else { - getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); - } - } -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupQueryTest.java b/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupQueryTest.java deleted file mode 100644 index ca1a3d44fe..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupQueryTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.javadsl.TestKit; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import scala.concurrent.duration.FiniteDuration; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -public class DeviceGroupQueryTest extends JUnitSuite { - - static ActorSystem system; - - @BeforeClass - public static void setup() { - system = ActorSystem.create(); - } - - @AfterClass - public static void teardown() { - TestKit.shutdownActorSystem(system); - system = null; - } - - @Test - public void testReturnTemperatureValueForWorkingDevices() { - TestKit requester = new TestKit(system); - - TestKit device1 = new TestKit(system); - TestKit device2 = new TestKit(system); - - Map actorToDeviceId = new HashMap<>(); - actorToDeviceId.put(device1.getRef(), "device1"); - actorToDeviceId.put(device2.getRef(), "device2"); - - ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId, - 1L, - requester.getRef(), - new FiniteDuration(3, TimeUnit.SECONDS))); - - assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); - assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); - - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); - - DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); - assertEquals(1L, response.requestId); - - Map expectedTemperatures = new HashMap<>(); - expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); - expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); - - assertEqualTemperatures(expectedTemperatures, response.temperatures); - } - - @Test - public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { - TestKit requester = new TestKit(system); - - TestKit device1 = new TestKit(system); - TestKit device2 = new TestKit(system); - - Map actorToDeviceId = new HashMap<>(); - actorToDeviceId.put(device1.getRef(), "device1"); - actorToDeviceId.put(device2.getRef(), "device2"); - - ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId, - 1L, - requester.getRef(), - new FiniteDuration(3, TimeUnit.SECONDS))); - - assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); - assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); - - queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef()); - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); - - DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); - assertEquals(1L, response.requestId); - - Map expectedTemperatures = new HashMap<>(); - expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable()); - expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); - - assertEqualTemperatures(expectedTemperatures, response.temperatures); - } - - @Test - public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { - TestKit requester = new TestKit(system); - - TestKit device1 = new TestKit(system); - TestKit device2 = new TestKit(system); - - Map actorToDeviceId = new HashMap<>(); - actorToDeviceId.put(device1.getRef(), "device1"); - actorToDeviceId.put(device2.getRef(), "device2"); - - ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId, - 1L, - requester.getRef(), - new FiniteDuration(3, TimeUnit.SECONDS))); - - assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); - assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); - - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); - device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); - - DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); - assertEquals(1L, response.requestId); - - Map expectedTemperatures = new HashMap<>(); - expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); - expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable()); - - assertEqualTemperatures(expectedTemperatures, response.temperatures); - } - - @Test - public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { - TestKit requester = new TestKit(system); - - TestKit device1 = new TestKit(system); - TestKit device2 = new TestKit(system); - - Map actorToDeviceId = new HashMap<>(); - actorToDeviceId.put(device1.getRef(), "device1"); - actorToDeviceId.put(device2.getRef(), "device2"); - - ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId, - 1L, - requester.getRef(), - new FiniteDuration(3, TimeUnit.SECONDS))); - - assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); - assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); - - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); - device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); - - DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); - assertEquals(1L, response.requestId); - - Map expectedTemperatures = new HashMap<>(); - expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); - expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); - - assertEqualTemperatures(expectedTemperatures, response.temperatures); - } - - @Test - public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { - TestKit requester = new TestKit(system); - - TestKit device1 = new TestKit(system); - TestKit device2 = new TestKit(system); - - Map actorToDeviceId = new HashMap<>(); - actorToDeviceId.put(device1.getRef(), "device1"); - actorToDeviceId.put(device2.getRef(), "device2"); - - ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId, - 1L, - requester.getRef(), - new FiniteDuration(1, TimeUnit.SECONDS))); - - assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); - assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); - - queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); - - DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass( - java.time.Duration.ofSeconds(5), - DeviceGroup.RespondAllTemperatures.class); - assertEquals(1L, response.requestId); - - Map expectedTemperatures = new HashMap<>(); - expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); - expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut()); - - assertEqualTemperatures(expectedTemperatures, response.temperatures); - } - - public static void assertEqualTemperatures(Map expected, Map actual) { - for (Map.Entry entry : expected.entrySet()) { - DeviceGroup.TemperatureReading expectedReading = entry.getValue(); - DeviceGroup.TemperatureReading actualReading = actual.get(entry.getKey()); - assertNotNull(actualReading); - assertEquals(expectedReading.getClass(), actualReading.getClass()); - if (expectedReading instanceof DeviceGroup.Temperature) { - assertEquals(((DeviceGroup.Temperature) expectedReading).value, ((DeviceGroup.Temperature) actualReading).value, 0.01); - } - } - assertEquals(expected.size(), actual.size()); - } -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupTest.java b/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupTest.java deleted file mode 100644 index 0cd68b2c78..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceGroupTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.javadsl.TestKit; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -import static jdocs.tutorial_6.DeviceGroupQueryTest.assertEqualTemperatures; - -public class DeviceGroupTest extends JUnitSuite { - - static ActorSystem system; - - @BeforeClass - public static void setup() { - system = ActorSystem.create(); - } - - @AfterClass - public static void teardown() { - TestKit.shutdownActorSystem(system); - system = null; - } - - @Test - public void testRegisterDeviceActor() { - TestKit probe = new TestKit(system); - ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor1 = probe.getLastSender(); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor2 = probe.getLastSender(); - assertNotEquals(deviceActor1, deviceActor2); - - // Check that the device actors are working - deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); - assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); - deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); - assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); - } - - @Test - public void testIgnoreRequestsForWrongGroupId() { - TestKit probe = new TestKit(system); - ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); - - groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef()); - probe.expectNoMsg(); - } - - @Test - public void testReturnSameActorForSameDeviceId() { - TestKit probe = new TestKit(system); - ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor1 = probe.getLastSender(); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor2 = probe.getLastSender(); - assertEquals(deviceActor1, deviceActor2); - } - - @Test - public void testListActiveDevices() { - TestKit probe = new TestKit(system); - ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - - groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); - DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); - assertEquals(0L, reply.requestId); - assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); - } - - @Test - public void testListActiveDevicesAfterOneShutsDown() { - TestKit probe = new TestKit(system); - ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef toShutDown = probe.getLastSender(); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - - groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); - DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); - assertEquals(0L, reply.requestId); - assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); - - probe.watch(toShutDown); - toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender()); - probe.expectTerminated(toShutDown); - - // using awaitAssert to retry because it might take longer for the groupActor - // to see the Terminated, that order is undefined - probe.awaitAssert(() -> { - groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef()); - DeviceGroup.ReplyDeviceList r = - probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); - assertEquals(1L, r.requestId); - assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids); - return null; - }); - } - - @Test - public void testCollectTemperaturesFromAllActiveDevices() { - TestKit probe = new TestKit(system); - ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor1 = probe.getLastSender(); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor2 = probe.getLastSender(); - - groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - ActorRef deviceActor3 = probe.getLastSender(); - - // Check that the device actors are working - deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); - assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); - deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); - assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); - // No temperature for device 3 - - groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef()); - DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); - assertEquals(0L, response.requestId); - - Map expectedTemperatures = new HashMap<>(); - expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); - expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); - expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable()); - - assertEqualTemperatures(expectedTemperatures, response.temperatures); - } -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceManager.java b/akka-docs/src/test/java/jdocs/tutorial_6/DeviceManager.java deleted file mode 100644 index ee4132050f..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceManager.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.actor.Terminated; -import akka.event.Logging; -import akka.event.LoggingAdapter; - -import java.util.HashMap; -import java.util.Map; - -public class DeviceManager extends AbstractActor { - private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); - - public static Props props() { - return Props.create(DeviceManager.class); - } - - public static final class RequestTrackDevice { - public final String groupId; - public final String deviceId; - - public RequestTrackDevice(String groupId, String deviceId) { - this.groupId = groupId; - this.deviceId = deviceId; - } - } - - public static final class DeviceRegistered { - } - - final Map groupIdToActor = new HashMap<>(); - final Map actorToGroupId = new HashMap<>(); - - @Override - public void preStart() { - log.info("DeviceManager started"); - } - - @Override - public void postStop() { - log.info("DeviceManager stopped"); - } - - private void onTrackDevice(RequestTrackDevice trackMsg) { - String groupId = trackMsg.groupId; - ActorRef ref = groupIdToActor.get(groupId); - if (ref != null) { - ref.forward(trackMsg, getContext()); - } else { - log.info("Creating device group actor for {}", groupId); - ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId); - getContext().watch(groupActor); - groupActor.forward(trackMsg, getContext()); - groupIdToActor.put(groupId, groupActor); - actorToGroupId.put(groupActor, groupId); - } - } - - private void onTerminated(Terminated t) { - ActorRef groupActor = t.getActor(); - String groupId = actorToGroupId.get(groupActor); - log.info("Device group actor for {} has been terminated", groupId); - actorToGroupId.remove(groupActor); - groupIdToActor.remove(groupId); - } - - public Receive createReceive() { - return receiveBuilder() - .match(RequestTrackDevice.class, this::onTrackDevice) - .match(Terminated.class, this::onTerminated) - .build(); - } - -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceTest.java b/akka-docs/src/test/java/jdocs/tutorial_6/DeviceTest.java deleted file mode 100644 index ed5414665d..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/DeviceTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import java.util.Optional; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.javadsl.TestKit; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertEquals; - -import org.scalatest.junit.JUnitSuite; - - -public class DeviceTest extends JUnitSuite { - - static ActorSystem system; - - @BeforeClass - public static void setup() { - system = ActorSystem.create(); - } - - @AfterClass - public static void teardown() { - TestKit.shutdownActorSystem(system); - system = null; - } - - @Test - public void testReplyToRegistrationRequests() { - TestKit probe = new TestKit(system); - ActorRef deviceActor = system.actorOf(Device.props("group", "device")); - - deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef()); - probe.expectMsgClass(DeviceManager.DeviceRegistered.class); - assertEquals(deviceActor, probe.getLastSender()); - } - - @Test - public void testIgnoreWrongRegistrationRequests() { - TestKit probe = new TestKit(system); - ActorRef deviceActor = system.actorOf(Device.props("group", "device")); - - deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef()); - probe.expectNoMsg(); - - deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef()); - probe.expectNoMsg(); - } - - @Test - public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { - TestKit probe = new TestKit(system); - ActorRef deviceActor = system.actorOf(Device.props("group", "device")); - deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); - Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); - assertEquals(42L, response.requestId); - assertEquals(Optional.empty(), response.value); - } - - @Test - public void testReplyWithLatestTemperatureReading() { - TestKit probe = new TestKit(system); - ActorRef deviceActor = system.actorOf(Device.props("group", "device")); - - deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); - assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); - - deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); - Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); - assertEquals(2L, response1.requestId); - assertEquals(Optional.of(24.0), response1.value); - - deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); - assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); - - deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); - Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); - assertEquals(4L, response2.requestId); - assertEquals(Optional.of(55.0), response2.value); - } - -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/IotMain.java b/akka-docs/src/test/java/jdocs/tutorial_6/IotMain.java deleted file mode 100644 index 46ce4d8b03..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/IotMain.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; - -import java.io.IOException; - -public class IotMain { - - public static void main(String[] args) throws IOException { - ActorSystem system = ActorSystem.create("iot-system"); - - try { - // Create top level supervisor - ActorRef supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor"); - - supervisor.tell(new DeviceManager.RequestTrackDevice("mygroup", "device1"), ActorRef.noSender()); - - System.out.println("Press ENTER to exit the system"); - System.in.read(); - } finally { - system.terminate(); - } - } - -} diff --git a/akka-docs/src/test/java/jdocs/tutorial_6/IotSupervisor.java b/akka-docs/src/test/java/jdocs/tutorial_6/IotSupervisor.java deleted file mode 100644 index 30ffff48a4..0000000000 --- a/akka-docs/src/test/java/jdocs/tutorial_6/IotSupervisor.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package jdocs.tutorial_6; - -//#iot-supervisor - -import akka.actor.AbstractActor; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; - -public class IotSupervisor extends AbstractActor { - private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); - - public static Props props() { - return Props.create(IotSupervisor.class); - } - - @Override - public void preStart() { - log.info("IoT Application started"); - } - - @Override - public void postStop() { - log.info("IoT Application stopped"); - } - - // No need to handle any messages - @Override - public Receive createReceive() { - return receiveBuilder() - .build(); - } - -} -//#iot-supervisor diff --git a/akka-docs/src/test/scala/docs/stream/operators/Map.scala b/akka-docs/src/test/scala/docs/stream/operators/Map.scala new file mode 100644 index 0000000000..37f8e6c180 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/Map.scala @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.stream.operators + +//#imports +import akka.NotUsed +import akka.stream.scaladsl._ + +//#imports + +object Map { + + //#map + val source: Source[Int, NotUsed] = Source(1 to 10) + val mapped: Source[String, NotUsed] = source.map(elem ⇒ elem.toString) + //#map +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala new file mode 100644 index 0000000000..c1d32457b5 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.stream.operators + +object SourceOperators { + + def fromFuture = { + //#sourceFromFuture + + import akka.actor.ActorSystem + import akka.stream.ActorMaterializer + import akka.{ Done, NotUsed } + import akka.stream.scaladsl._ + + import scala.concurrent.Future + + implicit val system: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + + val source: Source[Int, NotUsed] = Source.fromFuture(Future.successful(10)) + val sink: Sink[Int, Future[Done]] = Sink.foreach((i: Int) ⇒ println(i)) + + val done: Future[Done] = source.runWith(sink) //10 + //#sourceFromFuture + } +} diff --git a/akka-docs/src/test/scala/tutorial_6/Device.scala b/akka-docs/src/test/scala/tutorial_6/Device.scala deleted file mode 100644 index e8c466905a..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/Device.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.{ Actor, ActorLogging, Props } - -object Device { - - def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) - - final case class RecordTemperature(requestId: Long, value: Double) - final case class TemperatureRecorded(requestId: Long) - - final case class ReadTemperature(requestId: Long) - final case class RespondTemperature(requestId: Long, value: Option[Double]) -} - -class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { - import Device._ - - var lastTemperatureReading: Option[Double] = None - - override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) - - override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) - - override def receive: Receive = { - case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) ⇒ - sender() ! DeviceManager.DeviceRegistered - - case DeviceManager.RequestTrackDevice(groupId, deviceId) ⇒ - log.warning( - "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", - groupId, deviceId, this.groupId, this.deviceId - ) - - case RecordTemperature(id, value) ⇒ - log.info("Recorded temperature reading {} with {}", value, id) - lastTemperatureReading = Some(value) - sender() ! TemperatureRecorded(id) - - case ReadTemperature(id) ⇒ - sender() ! RespondTemperature(id, lastTemperatureReading) - } -} diff --git a/akka-docs/src/test/scala/tutorial_6/DeviceGroup.scala b/akka-docs/src/test/scala/tutorial_6/DeviceGroup.scala deleted file mode 100644 index ad95cf56e9..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/DeviceGroup.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import DeviceGroup._ -import DeviceManager.RequestTrackDevice -import scala.concurrent.duration._ - -object DeviceGroup { - - def props(groupId: String): Props = Props(new DeviceGroup(groupId)) - - final case class RequestDeviceList(requestId: Long) - final case class ReplyDeviceList(requestId: Long, ids: Set[String]) - - final case class RequestAllTemperatures(requestId: Long) - final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) - - sealed trait TemperatureReading - final case class Temperature(value: Double) extends TemperatureReading - case object TemperatureNotAvailable extends TemperatureReading - case object DeviceNotAvailable extends TemperatureReading - case object DeviceTimedOut extends TemperatureReading -} - -class DeviceGroup(groupId: String) extends Actor with ActorLogging { - var deviceIdToActor = Map.empty[String, ActorRef] - var actorToDeviceId = Map.empty[ActorRef, String] - var nextCollectionId = 0L - - override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) - - override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) - - override def receive: Receive = { - // Note the backticks - case trackMsg @ RequestTrackDevice(`groupId`, _) ⇒ - deviceIdToActor.get(trackMsg.deviceId) match { - case Some(ref) ⇒ - ref forward trackMsg - case None ⇒ - log.info("Creating device actor for {}", trackMsg.deviceId) - val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId) - context.watch(deviceActor) - deviceActor forward trackMsg - deviceIdToActor += trackMsg.deviceId -> deviceActor - actorToDeviceId += deviceActor -> trackMsg.deviceId - } - - case RequestTrackDevice(groupId, deviceId) ⇒ - log.warning( - "Ignoring TrackDevice request for {}. This actor is responsible for {}.", - groupId, this.groupId - ) - - case RequestDeviceList(requestId) ⇒ - sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) - - case Terminated(deviceActor) ⇒ - val deviceId = actorToDeviceId(deviceActor) - log.info("Device actor for {} has been terminated", deviceId) - actorToDeviceId -= deviceActor - deviceIdToActor -= deviceId - - case RequestAllTemperatures(requestId) ⇒ - context.actorOf(DeviceGroupQuery.props( - actorToDeviceId = actorToDeviceId, - requestId = requestId, - requester = sender(), - 3.seconds - )) - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/DeviceGroupQuery.scala b/akka-docs/src/test/scala/tutorial_6/DeviceGroupQuery.scala deleted file mode 100644 index 8e1b6e2824..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/DeviceGroupQuery.scala +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.Actor.Receive -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } - -import scala.concurrent.duration._ - -object DeviceGroupQuery { - - case object CollectionTimeout - - def props( - actorToDeviceId: Map[ActorRef, String], - requestId: Long, - requester: ActorRef, - timeout: FiniteDuration - ): Props = { - Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) - } -} - -class DeviceGroupQuery( - actorToDeviceId: Map[ActorRef, String], - requestId: Long, - requester: ActorRef, - timeout: FiniteDuration -) extends Actor with ActorLogging { - import DeviceGroupQuery._ - import context.dispatcher - val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) - - override def preStart(): Unit = { - actorToDeviceId.keysIterator.foreach { deviceActor ⇒ - context.watch(deviceActor) - deviceActor ! Device.ReadTemperature(0) - } - - } - - override def postStop(): Unit = { - queryTimeoutTimer.cancel() - } - - override def receive: Receive = - waitingForReplies( - Map.empty, - actorToDeviceId.keySet - ) - - def waitingForReplies( - repliesSoFar: Map[String, DeviceGroup.TemperatureReading], - stillWaiting: Set[ActorRef] - ): Receive = { - case Device.RespondTemperature(0, valueOption) ⇒ - val deviceActor = sender() - val reading = valueOption match { - case Some(value) ⇒ DeviceGroup.Temperature(value) - case None ⇒ DeviceGroup.TemperatureNotAvailable - } - receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) - - case Terminated(deviceActor) ⇒ - if (stillWaiting.contains(deviceActor)) - receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) - // else ignore - - case CollectionTimeout ⇒ - val timedOutReplies = - stillWaiting.map { deviceActor ⇒ - val deviceId = actorToDeviceId(deviceActor) - deviceId -> DeviceGroup.DeviceTimedOut - } - requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) - context.stop(self) - } - - def receivedResponse( - deviceActor: ActorRef, - reading: DeviceGroup.TemperatureReading, - stillWaiting: Set[ActorRef], - repliesSoFar: Map[String, DeviceGroup.TemperatureReading] - ): Unit = { - val deviceId = actorToDeviceId(deviceActor) - val newStillWaiting = stillWaiting - deviceActor - - val newRepliesSoFar = repliesSoFar + (deviceId -> reading) - if (newStillWaiting.isEmpty) { - requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) - context.stop(self) - } else { - context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) - } - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/DeviceGroupQuerySpec.scala b/akka-docs/src/test/scala/tutorial_6/DeviceGroupQuerySpec.scala deleted file mode 100644 index d9a084e977..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/DeviceGroupQuerySpec.scala +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.PoisonPill -import akka.testkit.{ AkkaSpec, TestProbe } - -import scala.concurrent.duration._ - -class DeviceGroupQuerySpec extends AkkaSpec { - - "DeviceGroupQuery" must { - - "return temperature value for working devices" in { - val requester = TestProbe() - - val device1 = TestProbe() - val device2 = TestProbe() - - val queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), - requestId = 1, - requester = requester.ref, - timeout = 3.seconds - )) - - device1.expectMsg(Device.ReadTemperature(requestId = 0)) - device2.expectMsg(Device.ReadTemperature(requestId = 0)) - - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) - - requester.expectMsg(DeviceGroup.RespondAllTemperatures( - requestId = 1, - temperatures = Map( - "device1" -> DeviceGroup.Temperature(1.0), - "device2" -> DeviceGroup.Temperature(2.0) - ) - )) - } - - "return TemperatureNotAvailable for devices with no readings" in { - val requester = TestProbe() - - val device1 = TestProbe() - val device2 = TestProbe() - - val queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), - requestId = 1, - requester = requester.ref, - timeout = 3.seconds - )) - - device1.expectMsg(Device.ReadTemperature(requestId = 0)) - device2.expectMsg(Device.ReadTemperature(requestId = 0)) - - queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref) - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) - - requester.expectMsg(DeviceGroup.RespondAllTemperatures( - requestId = 1, - temperatures = Map( - "device1" -> DeviceGroup.TemperatureNotAvailable, - "device2" -> DeviceGroup.Temperature(2.0) - ) - )) - } - - "return DeviceNotAvailable if device stops before answering" in { - val requester = TestProbe() - - val device1 = TestProbe() - val device2 = TestProbe() - - val queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), - requestId = 1, - requester = requester.ref, - timeout = 3.seconds - )) - - device1.expectMsg(Device.ReadTemperature(requestId = 0)) - device2.expectMsg(Device.ReadTemperature(requestId = 0)) - - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) - device2.ref ! PoisonPill - - requester.expectMsg(DeviceGroup.RespondAllTemperatures( - requestId = 1, - temperatures = Map( - "device1" -> DeviceGroup.Temperature(1.0), - "device2" -> DeviceGroup.DeviceNotAvailable - ) - )) - } - - "return temperature reading even if device stops after answering" in { - val requester = TestProbe() - - val device1 = TestProbe() - val device2 = TestProbe() - - val queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), - requestId = 1, - requester = requester.ref, - timeout = 3.seconds - )) - - device1.expectMsg(Device.ReadTemperature(requestId = 0)) - device2.expectMsg(Device.ReadTemperature(requestId = 0)) - - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) - device2.ref ! PoisonPill - - requester.expectMsg(DeviceGroup.RespondAllTemperatures( - requestId = 1, - temperatures = Map( - "device1" -> DeviceGroup.Temperature(1.0), - "device2" -> DeviceGroup.Temperature(2.0) - ) - )) - } - - "return DeviceTimedOut if device does not answer in time" in { - val requester = TestProbe() - - val device1 = TestProbe() - val device2 = TestProbe() - - val queryActor = system.actorOf(DeviceGroupQuery.props( - actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), - requestId = 1, - requester = requester.ref, - timeout = 1.second - )) - - device1.expectMsg(Device.ReadTemperature(requestId = 0)) - device2.expectMsg(Device.ReadTemperature(requestId = 0)) - - queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) - - requester.expectMsg(DeviceGroup.RespondAllTemperatures( - requestId = 1, - temperatures = Map( - "device1" -> DeviceGroup.Temperature(1.0), - "device2" -> DeviceGroup.DeviceTimedOut - ) - )) - } - - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/DeviceGroupSpec.scala b/akka-docs/src/test/scala/tutorial_6/DeviceGroupSpec.scala deleted file mode 100644 index ce25e83681..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/DeviceGroupSpec.scala +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.PoisonPill -import akka.testkit.{ AkkaSpec, TestProbe } - -import scala.concurrent.duration._ - -class DeviceGroupSpec extends AkkaSpec { - - "DeviceGroup actor" must { - - "be able to register a device actor" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor1 = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor2 = probe.lastSender - deviceActor1 should !==(deviceActor2) - - // Check that the device actors are working - deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) - deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) - } - - "ignore requests for wrong groupId" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) - probe.expectNoMsg(500.milliseconds) - } - - "return same actor for same deviceId" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor1 = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor2 = probe.lastSender - - deviceActor1 should ===(deviceActor2) - } - - "be able to list active devices" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - - groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) - probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) - } - - "be able to list active devices after one shuts down" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val toShutDown = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - - groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) - probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) - - probe.watch(toShutDown) - toShutDown ! PoisonPill - probe.expectTerminated(toShutDown) - - // using awaitAssert to retry because it might take longer for the groupActor - // to see the Terminated, that order is undefined - probe.awaitAssert { - groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) - probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) - } - } - - "be able to collect temperatures from all active devices" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor1 = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor2 = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor3 = probe.lastSender - - // Check that the device actors are working - deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) - deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) - // No temperature for device3 - - groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) - probe.expectMsg( - DeviceGroup.RespondAllTemperatures( - requestId = 0, - temperatures = Map( - "device1" -> DeviceGroup.Temperature(1.0), - "device2" -> DeviceGroup.Temperature(2.0), - "device3" -> DeviceGroup.TemperatureNotAvailable))) - } - - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/DeviceManager.scala b/akka-docs/src/test/scala/tutorial_6/DeviceManager.scala deleted file mode 100644 index 1f72e502eb..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/DeviceManager.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import DeviceManager.RequestTrackDevice - -object DeviceManager { - def props(): Props = Props(new DeviceManager) - - final case class RequestTrackDevice(groupId: String, deviceId: String) - case object DeviceRegistered -} - -class DeviceManager extends Actor with ActorLogging { - var groupIdToActor = Map.empty[String, ActorRef] - var actorToGroupId = Map.empty[ActorRef, String] - - override def preStart(): Unit = log.info("DeviceManager started") - - override def postStop(): Unit = log.info("DeviceManager stopped") - - override def receive = { - case trackMsg @ RequestTrackDevice(groupId, _) ⇒ - groupIdToActor.get(groupId) match { - case Some(ref) ⇒ - ref forward trackMsg - case None ⇒ - log.info("Creating device group actor for {}", groupId) - val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId) - context.watch(groupActor) - groupActor forward trackMsg - groupIdToActor += groupId -> groupActor - actorToGroupId += groupActor -> groupId - } - - case Terminated(groupActor) ⇒ - val groupId = actorToGroupId(groupActor) - log.info("Device group actor for {} has been terminated", groupId) - actorToGroupId -= groupActor - groupIdToActor -= groupId - - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/DeviceSpec.scala b/akka-docs/src/test/scala/tutorial_6/DeviceSpec.scala deleted file mode 100644 index df626d5238..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/DeviceSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.testkit.{ AkkaSpec, TestProbe } - -import scala.concurrent.duration._ - -class DeviceSpec extends AkkaSpec { - - "Device actor" must { - - "reply to registration requests" in { - val probe = TestProbe() - val deviceActor = system.actorOf(Device.props("group", "device")) - - deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - probe.lastSender should ===(deviceActor) - } - - "ignore wrong registration requests" in { - val probe = TestProbe() - val deviceActor = system.actorOf(Device.props("group", "device")) - - deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref) - probe.expectNoMsg(500.milliseconds) - - deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref) - probe.expectNoMsg(500.milliseconds) - } - - "reply with empty reading if no temperature is known" in { - val probe = TestProbe() - val deviceActor = system.actorOf(Device.props("group", "device")) - - deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref) - val response = probe.expectMsgType[Device.RespondTemperature] - response.requestId should ===(42) - response.value should ===(None) - } - - "reply with latest temperature reading" in { - val probe = TestProbe() - val deviceActor = system.actorOf(Device.props("group", "device")) - - deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) - - deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref) - val response1 = probe.expectMsgType[Device.RespondTemperature] - response1.requestId should ===(2) - response1.value should ===(Some(24.0)) - - deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 3)) - - deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref) - val response2 = probe.expectMsgType[Device.RespondTemperature] - response2.requestId should ===(4) - response2.value should ===(Some(55.0)) - } - - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/IotApp.scala b/akka-docs/src/test/scala/tutorial_6/IotApp.scala deleted file mode 100644 index af09d08188..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/IotApp.scala +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.ActorSystem -import DeviceManager.RequestTrackDevice - -import scala.io.StdIn - -object IotApp { - - def main(args: Array[String]): Unit = { - val system = ActorSystem("iot-system") - - try { - // Create top level supervisor - val supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor") - - supervisor ! RequestTrackDevice("mygroup", "device1") - - // Exit the system after ENTER is pressed - StdIn.readLine() - } finally { - system.terminate() - } - } - -} diff --git a/akka-docs/src/test/scala/tutorial_6/IotSupervisor.scala b/akka-docs/src/test/scala/tutorial_6/IotSupervisor.scala deleted file mode 100644 index de246b0eb0..0000000000 --- a/akka-docs/src/test/scala/tutorial_6/IotSupervisor.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package tutorial_6 - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props } - -object IotSupervisor { - - def props(): Props = Props(new IotSupervisor) - -} - -class IotSupervisor extends Actor with ActorLogging { - val deviceManager: ActorRef = context.system.actorOf(DeviceManager.props(), "device-manager") - - override def preStart(): Unit = log.info("IoT Application started") - - override def postStop(): Unit = log.info("IoT Application stopped") - - // No need to handle any messages - override def receive = Actor.emptyBehavior - -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala index 3ffbda096c..7bb5299d67 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala @@ -207,7 +207,7 @@ object SSLEngineProviderSetup { } /** - * Setup for for defining a `SSLEngineProvider` that is passed in when ActorSystem + * Setup for defining a `SSLEngineProvider` that is passed in when ActorSystem * is created rather than creating one from configured class name. That is useful * when the SSLEngineProvider implementation require other external constructor parameters * or is created before the ActorSystem is created. diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala index 3b20e68dfe..a36836efb0 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala @@ -73,7 +73,7 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg case event @ Warning(logSource, logClass, message) ⇒ withMdc(logSource, event) { event match { - case e: LogEventWithCause ⇒ Logger(logClass, logSource).warn(markerIfPresent(event), if (message != null) message.toString else null, e.cause) + case e: LogEventWithCause ⇒ Logger(logClass, logSource).warn(markerIfPresent(event), if (message != null) message.toString else e.cause.getLocalizedMessage, e.cause) case _ ⇒ Logger(logClass, logSource).warn(markerIfPresent(event), if (message != null) message.toString else null) } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 6a33288fb4..c75e120bab 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -12,7 +12,10 @@ import akka.actor.Status; import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; +//#imports import akka.stream.*; + +//#imports import akka.stream.scaladsl.FlowSpec; import akka.util.ConstantFun; import akka.stream.stage.*; @@ -328,6 +331,22 @@ public class SourceTest extends StreamTest { assertEquals("A", result); } + @Test + public void mustBeAbleToUseSingle() throws Exception { + //#source-single + CompletionStage> future = Source.single("A").runWith(Sink.seq(), materializer); + CompletableFuture> completableFuture = future.toCompletableFuture(); + completableFuture.thenAccept(result -> System.out.printf("collected elements: %s\n", result)); + // result list will contain exactly one element "A" + + //#source-single + // DO NOT use get() directly in your production code! + List result = completableFuture.get(); + assertEquals(1, result.size()); + assertEquals("A", result.get(0)); + + } + @Test public void mustBeAbleToUsePrefixAndTail() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 4bc9ef101d..a46a06a214 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -5,10 +5,14 @@ package akka.stream.scaladsl import akka.testkit.DefaultTimeout -import org.scalatest.time.{ Span, Millis } +import org.scalatest.time.{ Millis, Span } + import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ +//#imports import akka.stream._ + +//#imports import akka.stream.testkit._ import akka.NotUsed import akka.testkit.EventFilter @@ -24,6 +28,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout { implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) "Single Source" must { + + "produce exactly one element" in { + implicit val ec = system.dispatcher + //#source-single + val s: Future[immutable.Seq[Int]] = Source.single(1).runWith(Sink.seq) + s.foreach(list ⇒ println(s"Collected elements: $list")) // prints: Collected elements: List(1) + + //#source-single + + s.futureValue should ===(immutable.Seq(1)) + + } + "produce element" in { val p = Source.single(1).runWith(Sink.asPublisher(false)) val c = TestSubscriber.manualProbe[Int]() @@ -103,7 +120,6 @@ class SourceSpec extends StreamSpec with DefaultTimeout { val out = TestSubscriber.manualProbe[Int] Source.combine(source(0), source(1), source(2))(Merge(_)).to(Sink.fromSubscriber(out)).run() - val sub = out.expectSubscription() sub.request(3) @@ -141,6 +157,20 @@ class SourceSpec extends StreamSpec with DefaultTimeout { out.expectComplete() } + "combine using Concat strategy two inputs with simplified API" in { + //#combine + val sources = immutable.Seq( + Source(List(1, 2, 3)), + Source(List(10, 20, 30))) + + Source.combine(sources(0), sources(1))(Concat(_)) + .runWith(Sink.seq) + // This will produce the Seq(1, 2, 3, 10, 20, 30) + //#combine + .futureValue should ===(immutable.Seq(1, 2, 3, 10, 20, 30)) + + } + "combine from two inputs with combinedMat and take a materialized value" in { val queueSource = Source.queue[Int](1, OverflowStrategy.dropBuffer) val intSeqSource = Source(1 to 3)