=doc #17291 more stream test docs

This commit is contained in:
Martynas Mickevičius 2015-06-19 10:03:55 +03:00
parent b4272b77c2
commit 27dd698706
3 changed files with 191 additions and 24 deletions

View file

@ -7,27 +7,99 @@ import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import scala.util._
import scala.concurrent.duration._
import scala.concurrent._
import akka.testkit.TestProbe
import akka.pattern.pipe
import akka.pattern
class StreamTestKitDocSpec extends AkkaSpec {
implicit val mat = ActorFlowMaterializer()
"test source probe" in {
"strict collection" in {
//#strict-collection
val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right)
//#test-source-probe
TestSource.probe[Int]
.toMat(Sink.cancelled)(Keep.left)
.run()
.expectCancellation()
//#test-source-probe
val future = Source(1 to 4).runWith(sinkUnderTest)
Await.ready(future, 100.millis)
val Success(result) = future.value.get
assert(result == 20)
//#strict-collection
}
"grouped part of infinite stream" in {
//#grouped-infinite
val sourceUnderTest = Source.repeat(1).map(_ * 2)
val future = sourceUnderTest.grouped(10).runWith(Sink.head)
Await.ready(future, 100.millis)
val Success(result) = future.value.get
assert(result == Seq.fill(10)(2))
//#grouped-infinite
}
"folded stream" in {
//#folded-stream
val flowUnderTest = Flow[Int].takeWhile(_ < 5)
val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
Await.ready(future, 100.millis)
val Success(result) = future.value.get
assert(result == (1 to 4))
//#folded-stream
}
"pipe to test probe" in {
import system.dispatcher
//#pipeto-testprobe
val sourceUnderTest = Source(1 to 4).grouped(2)
val probe = TestProbe()
sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref)
probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4)))
//#pipeto-testprobe
}
"sink actor ref" in {
//#sink-actorref
val sourceUnderTest = Source(0.seconds, 200.millis, ())
val probe = TestProbe()
val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run()
probe.expectMsg(1.second, ())
probe.expectNoMsg(100.millis)
probe.expectMsg(200.millis, ())
cancellable.cancel()
probe.expectMsg(200.millis, "completed")
//#sink-actorref
}
"source actor ref" in {
//#source-actorref
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(8, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
Await.ready(future, 100.millis)
val Success(result) = future.value.get
assert(result == "123")
//#source-actorref
}
"test sink probe" in {
//#test-sink-probe
Source(1 to 4)
.filter(_ % 2 == 0)
.map(_ * 2)
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
sourceUnderTest
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(4, 8)
@ -35,4 +107,54 @@ class StreamTestKitDocSpec extends AkkaSpec {
//#test-sink-probe
}
"test source probe" in {
//#test-source-probe
val sinkUnderTest = Sink.cancelled
TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.left)
.run()
.expectCancellation()
//#test-source-probe
}
"injecting failure" in {
//#injecting-failure
val sinkUnderTest = Sink.head[Int]
val (probe, future) = TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.both)
.run()
probe.sendError(new Exception("boom"))
Await.ready(future, 100.millis)
val Failure(exception) = future.value.get
assert(exception.getMessage == "boom")
//#injecting-failure
}
"test source and a sink" in {
import system.dispatcher
//#test-source-and-sink
val flowUnderTest = Flow[Int].mapAsyncUnordered(2) { sleep =>
pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep))
}
val (pub, sub) = TestSource.probe[Int]
.via(flowUnderTest)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
sub.request(n = 3)
pub.sendNext(3)
pub.sendNext(2)
pub.sendNext(1)
sub.expectNextUnordered(1, 2, 3)
pub.sendError(new Exception("Power surge in the linear subroutine C-47!"))
val ex = sub.expectError
assert(ex.getMessage.contains("C-47"))
//#test-source-and-sink
}
}

View file

@ -4,22 +4,67 @@
Testing streams
###############
Akka Streams comes with an :mod:`akka-stream-testkit` module that provides tools which can be used for controlling and asserting various parts of the stream pipeline.
Verifying behaviour of Akka Stream sources, flows and sinks can be done using various code patterns and libraries. Here we will discuss testing these elements using:
Probe Sink
==========
- simple sources, sinks and flows;
- sources and sinks in combination with :class:`TestProbe` from the :mod:`akka-testkit` module;
- sources and sinks specifically crafted for writing tests from the :mod:`akka-stream-testkit` module.
Using probe as a `Sink` allows manual control over demand and assertions over elements coming downstream. Streams testkit provides a sink that materializes to a :class:`TestSubscriber.Probe`.
It is important to keep your data processing pipeline as separate sources, flows and sinks. This makes them easily testable by wiring them up to other sources or sinks, or some test harnesses that :mod:`akka-testkit` or :mod:`akka-stream-testkit` provide.
Built in sources, sinks and combinators
=======================================
Testing a custom sink can be as simple as attaching a source that emits elements from a predefined collection, running a constructed test flow and asserting on the results that sink produced. Here is an example of a test for a sink:
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#strict-collection
The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here :code:`grouped` combinator and :code:`Sink.head` are very useful.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#grouped-infinite
When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that eases assertions.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#folded-stream
TestKit
=======
Akka Stream offers integration with Actors out of the box. This support can be used for writing stream tests that use familiar :class:`TestProbe` from the :mod:`akka-testkit` API.
One of the more straightforward tests would be to materialize stream to a :class:`Future` and then use :code:`pipe` pattern to pipe the result of that future to the probe.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#pipeto-testprobe
Instead of materializing to a future, we can use a :class:`Sink.actorRef` that sends all incoming elements to the given :class:`ActorRef`. Now we can use assertion methods on :class:`TestProbe` and expect elements one by one as they arrive. We can also assert stream completion by expecting for :code:`onCompleteMessage` which was given to :class:`Sink.actorRef`.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#sink-actorref
Similarly to :class:`Sink.actorRef` that provides control over received elements, we can use :class:`Source.actorRef` and have full control over elements to be sent.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#source-actorref
Streams TestKit
===============
You may have noticed various code patterns that emerge when testing stream pipelines. Akka Stream has a separate :mod:`akka-stream-testkit` module that provides tools specifically for writing stream tests. This module comes with two main components that are :class:`TestSource` and :class:`TestSink` which provide sources and sinks that materialize to probes that allow fluent API.
.. note::
Be sure to add the module :mod:`akka-stream-testkit` to your dependencies.
A sink returned by :code:`TestSink.probe` allows manual control over demand and assertions over elements coming downstream.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-sink-probe
Probe Source
============
A source that materializes to :class:`TestPublisher.Probe` can be used for asserting demand or controlling when stream is completed or ended with an error.
A source returned by :code:`TestSource.probe` can be used for asserting demand or controlling when stream is completed or ended with an error.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-probe
*TODO*
You can also inject exceptions and test sink behaviour on error conditions.
List by example various operations on probes. Using probes without a sink.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#injecting-failure
Test source and sink can be used together in combination when testing flows.
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-and-sink