From 27dd69870684b38d3c5efbea61205dd28e046864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Fri, 19 Jun 2015 10:03:55 +0300 Subject: [PATCH 1/3] =doc #17291 more stream test docs --- .../docs/stream/StreamTestKitDocSpec.scala | 144 ++++++++++++++++-- akka-docs-dev/rst/scala/stream-testkit.rst | 65 ++++++-- .../scala/akka/stream/scaladsl/Source.scala | 6 +- 3 files changed, 191 insertions(+), 24 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala index 25764d8d4e..d169bb38d3 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala @@ -7,27 +7,99 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ +import scala.util._ +import scala.concurrent.duration._ +import scala.concurrent._ +import akka.testkit.TestProbe +import akka.pattern.pipe +import akka.pattern class StreamTestKitDocSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() - "test source probe" in { + "strict collection" in { + //#strict-collection + val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right) - //#test-source-probe - TestSource.probe[Int] - .toMat(Sink.cancelled)(Keep.left) - .run() - .expectCancellation() - //#test-source-probe + val future = Source(1 to 4).runWith(sinkUnderTest) + Await.ready(future, 100.millis) + val Success(result) = future.value.get + assert(result == 20) + //#strict-collection + } + + "grouped part of infinite stream" in { + //#grouped-infinite + val sourceUnderTest = Source.repeat(1).map(_ * 2) + + val future = sourceUnderTest.grouped(10).runWith(Sink.head) + Await.ready(future, 100.millis) + val Success(result) = future.value.get + assert(result == Seq.fill(10)(2)) + //#grouped-infinite + } + + "folded stream" in { + //#folded-stream + val flowUnderTest = Flow[Int].takeWhile(_ < 5) + + val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _)) + Await.ready(future, 100.millis) + val Success(result) = future.value.get + assert(result == (1 to 4)) + //#folded-stream + } + + "pipe to test probe" in { + import system.dispatcher + //#pipeto-testprobe + val sourceUnderTest = Source(1 to 4).grouped(2) + + val probe = TestProbe() + sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref) + probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4))) + //#pipeto-testprobe + } + + "sink actor ref" in { + //#sink-actorref + val sourceUnderTest = Source(0.seconds, 200.millis, ()) + + val probe = TestProbe() + val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run() + + probe.expectMsg(1.second, ()) + probe.expectNoMsg(100.millis) + probe.expectMsg(200.millis, ()) + cancellable.cancel() + probe.expectMsg(200.millis, "completed") + //#sink-actorref + } + + "source actor ref" in { + //#source-actorref + val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) + + val (ref, future) = Source.actorRef(8, OverflowStrategy.fail) + .toMat(sinkUnderTest)(Keep.both).run() + + ref ! 1 + ref ! 2 + ref ! 3 + ref ! akka.actor.Status.Success("done") + + Await.ready(future, 100.millis) + val Success(result) = future.value.get + assert(result == "123") + //#source-actorref } "test sink probe" in { - //#test-sink-probe - Source(1 to 4) - .filter(_ % 2 == 0) - .map(_ * 2) + val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2) + + sourceUnderTest .runWith(TestSink.probe[Int]) .request(2) .expectNext(4, 8) @@ -35,4 +107,54 @@ class StreamTestKitDocSpec extends AkkaSpec { //#test-sink-probe } + "test source probe" in { + //#test-source-probe + val sinkUnderTest = Sink.cancelled + + TestSource.probe[Int] + .toMat(sinkUnderTest)(Keep.left) + .run() + .expectCancellation() + //#test-source-probe + } + + "injecting failure" in { + //#injecting-failure + val sinkUnderTest = Sink.head[Int] + + val (probe, future) = TestSource.probe[Int] + .toMat(sinkUnderTest)(Keep.both) + .run() + probe.sendError(new Exception("boom")) + + Await.ready(future, 100.millis) + val Failure(exception) = future.value.get + assert(exception.getMessage == "boom") + //#injecting-failure + } + + "test source and a sink" in { + import system.dispatcher + //#test-source-and-sink + val flowUnderTest = Flow[Int].mapAsyncUnordered(2) { sleep => + pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep)) + } + + val (pub, sub) = TestSource.probe[Int] + .via(flowUnderTest) + .toMat(TestSink.probe[Int])(Keep.both) + .run() + + sub.request(n = 3) + pub.sendNext(3) + pub.sendNext(2) + pub.sendNext(1) + sub.expectNextUnordered(1, 2, 3) + + pub.sendError(new Exception("Power surge in the linear subroutine C-47!")) + val ex = sub.expectError + assert(ex.getMessage.contains("C-47")) + //#test-source-and-sink + } + } diff --git a/akka-docs-dev/rst/scala/stream-testkit.rst b/akka-docs-dev/rst/scala/stream-testkit.rst index ccddf84077..b1f6b1af60 100644 --- a/akka-docs-dev/rst/scala/stream-testkit.rst +++ b/akka-docs-dev/rst/scala/stream-testkit.rst @@ -4,22 +4,67 @@ Testing streams ############### -Akka Streams comes with an :mod:`akka-stream-testkit` module that provides tools which can be used for controlling and asserting various parts of the stream pipeline. +Verifying behaviour of Akka Stream sources, flows and sinks can be done using various code patterns and libraries. Here we will discuss testing these elements using: -Probe Sink -========== +- simple sources, sinks and flows; +- sources and sinks in combination with :class:`TestProbe` from the :mod:`akka-testkit` module; +- sources and sinks specifically crafted for writing tests from the :mod:`akka-stream-testkit` module. -Using probe as a `Sink` allows manual control over demand and assertions over elements coming downstream. Streams testkit provides a sink that materializes to a :class:`TestSubscriber.Probe`. +It is important to keep your data processing pipeline as separate sources, flows and sinks. This makes them easily testable by wiring them up to other sources or sinks, or some test harnesses that :mod:`akka-testkit` or :mod:`akka-stream-testkit` provide. + +Built in sources, sinks and combinators +======================================= + +Testing a custom sink can be as simple as attaching a source that emits elements from a predefined collection, running a constructed test flow and asserting on the results that sink produced. Here is an example of a test for a sink: + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#strict-collection + +The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here :code:`grouped` combinator and :code:`Sink.head` are very useful. + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#grouped-infinite + +When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that eases assertions. + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#folded-stream + +TestKit +======= + +Akka Stream offers integration with Actors out of the box. This support can be used for writing stream tests that use familiar :class:`TestProbe` from the :mod:`akka-testkit` API. + +One of the more straightforward tests would be to materialize stream to a :class:`Future` and then use :code:`pipe` pattern to pipe the result of that future to the probe. + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#pipeto-testprobe + +Instead of materializing to a future, we can use a :class:`Sink.actorRef` that sends all incoming elements to the given :class:`ActorRef`. Now we can use assertion methods on :class:`TestProbe` and expect elements one by one as they arrive. We can also assert stream completion by expecting for :code:`onCompleteMessage` which was given to :class:`Sink.actorRef`. + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#sink-actorref + +Similarly to :class:`Sink.actorRef` that provides control over received elements, we can use :class:`Source.actorRef` and have full control over elements to be sent. + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#source-actorref + +Streams TestKit +=============== + +You may have noticed various code patterns that emerge when testing stream pipelines. Akka Stream has a separate :mod:`akka-stream-testkit` module that provides tools specifically for writing stream tests. This module comes with two main components that are :class:`TestSource` and :class:`TestSink` which provide sources and sinks that materialize to probes that allow fluent API. + +.. note:: + + Be sure to add the module :mod:`akka-stream-testkit` to your dependencies. + +A sink returned by :code:`TestSink.probe` allows manual control over demand and assertions over elements coming downstream. .. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-sink-probe -Probe Source -============ - -A source that materializes to :class:`TestPublisher.Probe` can be used for asserting demand or controlling when stream is completed or ended with an error. +A source returned by :code:`TestSource.probe` can be used for asserting demand or controlling when stream is completed or ended with an error. .. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-probe -*TODO* +You can also inject exceptions and test sink behaviour on error conditions. -List by example various operations on probes. Using probes without a sink. +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#injecting-failure + +Test source and sink can be used together in combination when testing flows. + +.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-and-sink diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index f5377d91fc..02bdcc7ef0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -328,9 +328,9 @@ object Source extends SourceApply { * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does * not matter. * - * The stream can be completed successfully by sending the actor referende an [[akka.actor.Status.Success]] - * messagein which case already buffered elements will be signalled before signalling completion, - * or by sending a [[akka.actor.PoisonPill]] in which case completion will be signalled immediatly. + * The stream can be completed successfully by sending the actor reference an [[akka.actor.Status.Success]] + * message in which case already buffered elements will be signalled before signalling completion, + * or by sending a [[akka.actor.PoisonPill]] in which case completion will be signalled immediately. * * The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the * actor reference. In case the Actor is still draining its internal buffer (after having received From 5671bbe6968fd8dc9589adf97ed15c66b6ef6e9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Fri, 19 Jun 2015 14:42:43 +0300 Subject: [PATCH 2/3] =doc #17291 java docs code --- akka-docs-dev/rst/java/stream-testkit.rst | 65 +++++++++++++++++++---- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-testkit.rst b/akka-docs-dev/rst/java/stream-testkit.rst index 5ae3b948a2..873ab4be49 100644 --- a/akka-docs-dev/rst/java/stream-testkit.rst +++ b/akka-docs-dev/rst/java/stream-testkit.rst @@ -4,22 +4,67 @@ Testing streams ############### -Akka Streams comes with an :mod:`akka-stream-testkit` module that provides tools which can be used for controlling and asserting various parts of the stream pipeline. +Verifying behaviour of Akka Stream sources, flows and sinks can be done using various code patterns and libraries. Here we will discuss testing these elements using: -Probe Sink -========== +- simple sources, sinks and flows; +- sources and sinks in combination with :class:`TestProbe` from the :mod:`akka-testkit` module; +- sources and sinks specifically crafted for writing tests from the :mod:`akka-stream-testkit` module. -Using probe as a `Sink` allows manual control over demand and assertions over elements coming downstream. Streams testkit provides a sink that materializes to a :class:`TestSubscriber.Probe`. +It is important to keep your data processing pipeline as separate sources, flows and sinks. This makes them easily testable by wiring them up to other sources or sinks, or some test harnesses that :mod:`akka-testkit` or :mod:`akka-stream-testkit` provide. + +Built in sources, sinks and combinators +======================================= + +Testing a custom sink can be as simple as attaching a source that emits elements from a predefined collection, running a constructed test flow and asserting on the results that sink produced. Here is an example of a test for a sink: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#strict-collection + +The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here :code:`grouped` combinator and :code:`Sink.head` are very useful. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#grouped-infinite + +When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that eases assertions. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#folded-stream + +TestKit +======= + +Akka Stream offers integration with Actors out of the box. This support can be used for writing stream tests that use familiar :class:`TestProbe` from the :mod:`akka-testkit` API. + +One of the more straightforward tests would be to materialize stream to a :class:`Future` and then use :code:`pipe` pattern to pipe the result of that future to the probe. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#pipeto-testprobe + +Instead of materializing to a future, we can use a :class:`Sink.actorRef` that sends all incoming elements to the given :class:`ActorRef`. Now we can use assertion methods on :class:`TestProbe` and expect elements one by one as they arrive. We can also assert stream completion by expecting for :code:`onCompleteMessage` which was given to :class:`Sink.actorRef`. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#sink-actorref + +Similarly to :class:`Sink.actorRef` that provides control over received elements, we can use :class:`Source.actorRef` and have full control over elements to be sent. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#source-actorref + +Streams TestKit +=============== + +You may have noticed various code patterns that emerge when testing stream pipelines. Akka Stream has a separate :mod:`akka-stream-testkit` module that provides tools specifically for writing stream tests. This module comes with two main components that are :class:`TestSource` and :class:`TestSink` which provide sources and sinks that materialize to probes that allow fluent API. + +.. note:: + + Be sure to add the module :mod:`akka-stream-testkit` to your dependencies. + +A sink returned by :code:`TestSink.probe` allows manual control over demand and assertions over elements coming downstream. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#test-sink-probe -Probe Source -============ - -A source that materializes to :class:`TestPublisher.Probe` can be used for asserting demand or controlling when stream is completed or ended with an error. +A source returned by :code:`TestSource.probe` can be used for asserting demand or controlling when stream is completed or ended with an error. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#test-source-probe -*TODO* +You can also inject exceptions and test sink behaviour on error conditions. -List by example various operations on probes. Using probes without a sink. +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#injecting-failure + +Test source and sink can be used together in combination when testing flows. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#test-source-and-sink From 4841b17ca2231cae9bb3f15ec705c24cd9d0ec0b Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Mon, 22 Jun 2015 13:54:49 +0300 Subject: [PATCH 3/3] =doc #17291 grammar fixes and Await.result instead of Await.ready --- akka-docs-dev/rst/java/stream-testkit.rst | 4 ++-- .../docs/stream/StreamTestKitDocSpec.scala | 19 ++++++++----------- akka-docs-dev/rst/scala/stream-testkit.rst | 4 ++-- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-testkit.rst b/akka-docs-dev/rst/java/stream-testkit.rst index 873ab4be49..8de0cb1e93 100644 --- a/akka-docs-dev/rst/java/stream-testkit.rst +++ b/akka-docs-dev/rst/java/stream-testkit.rst @@ -19,11 +19,11 @@ Testing a custom sink can be as simple as attaching a source that emits elements .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#strict-collection -The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here :code:`grouped` combinator and :code:`Sink.head` are very useful. +The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here the :code:`grouped` combinator and :code:`Sink.head` are very useful. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#grouped-infinite -When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that eases assertions. +When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that ease assertions. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTestKitDocTest.java#folded-stream diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala index d169bb38d3..60dfc8d20e 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala @@ -23,8 +23,7 @@ class StreamTestKitDocSpec extends AkkaSpec { val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right) val future = Source(1 to 4).runWith(sinkUnderTest) - Await.ready(future, 100.millis) - val Success(result) = future.value.get + val result = Await.result(future, 100.millis) assert(result == 20) //#strict-collection } @@ -34,8 +33,7 @@ class StreamTestKitDocSpec extends AkkaSpec { val sourceUnderTest = Source.repeat(1).map(_ * 2) val future = sourceUnderTest.grouped(10).runWith(Sink.head) - Await.ready(future, 100.millis) - val Success(result) = future.value.get + val result = Await.result(future, 100.millis) assert(result == Seq.fill(10)(2)) //#grouped-infinite } @@ -45,8 +43,7 @@ class StreamTestKitDocSpec extends AkkaSpec { val flowUnderTest = Flow[Int].takeWhile(_ < 5) val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _)) - Await.ready(future, 100.millis) - val Success(result) = future.value.get + val result = Await.result(future, 100.millis) assert(result == (1 to 4)) //#folded-stream } @@ -64,14 +61,15 @@ class StreamTestKitDocSpec extends AkkaSpec { "sink actor ref" in { //#sink-actorref - val sourceUnderTest = Source(0.seconds, 200.millis, ()) + case object Tick + val sourceUnderTest = Source(0.seconds, 200.millis, Tick) val probe = TestProbe() val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run() - probe.expectMsg(1.second, ()) + probe.expectMsg(1.second, Tick) probe.expectNoMsg(100.millis) - probe.expectMsg(200.millis, ()) + probe.expectMsg(200.millis, Tick) cancellable.cancel() probe.expectMsg(200.millis, "completed") //#sink-actorref @@ -89,8 +87,7 @@ class StreamTestKitDocSpec extends AkkaSpec { ref ! 3 ref ! akka.actor.Status.Success("done") - Await.ready(future, 100.millis) - val Success(result) = future.value.get + val result = Await.result(future, 100.millis) assert(result == "123") //#source-actorref } diff --git a/akka-docs-dev/rst/scala/stream-testkit.rst b/akka-docs-dev/rst/scala/stream-testkit.rst index b1f6b1af60..0f642455b6 100644 --- a/akka-docs-dev/rst/scala/stream-testkit.rst +++ b/akka-docs-dev/rst/scala/stream-testkit.rst @@ -19,11 +19,11 @@ Testing a custom sink can be as simple as attaching a source that emits elements .. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#strict-collection -The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here :code:`grouped` combinator and :code:`Sink.head` are very useful. +The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here the :code:`grouped` combinator and :code:`Sink.head` are very useful. .. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#grouped-infinite -When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that eases assertions. +When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that ease assertions. .. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#folded-stream