Merge pull request #17783 from 2m/wip-more-stream-test-docs
=doc #17291 more stream test docs
This commit is contained in:
commit
e898f49d86
4 changed files with 243 additions and 34 deletions
|
|
@ -7,27 +7,96 @@ import akka.stream._
|
|||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.scaladsl._
|
||||
import scala.util._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent._
|
||||
import akka.testkit.TestProbe
|
||||
import akka.pattern.pipe
|
||||
import akka.pattern
|
||||
|
||||
class StreamTestKitDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
|
||||
"test source probe" in {
|
||||
"strict collection" in {
|
||||
//#strict-collection
|
||||
val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right)
|
||||
|
||||
//#test-source-probe
|
||||
TestSource.probe[Int]
|
||||
.toMat(Sink.cancelled)(Keep.left)
|
||||
.run()
|
||||
.expectCancellation()
|
||||
//#test-source-probe
|
||||
val future = Source(1 to 4).runWith(sinkUnderTest)
|
||||
val result = Await.result(future, 100.millis)
|
||||
assert(result == 20)
|
||||
//#strict-collection
|
||||
}
|
||||
|
||||
"grouped part of infinite stream" in {
|
||||
//#grouped-infinite
|
||||
val sourceUnderTest = Source.repeat(1).map(_ * 2)
|
||||
|
||||
val future = sourceUnderTest.grouped(10).runWith(Sink.head)
|
||||
val result = Await.result(future, 100.millis)
|
||||
assert(result == Seq.fill(10)(2))
|
||||
//#grouped-infinite
|
||||
}
|
||||
|
||||
"folded stream" in {
|
||||
//#folded-stream
|
||||
val flowUnderTest = Flow[Int].takeWhile(_ < 5)
|
||||
|
||||
val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
|
||||
val result = Await.result(future, 100.millis)
|
||||
assert(result == (1 to 4))
|
||||
//#folded-stream
|
||||
}
|
||||
|
||||
"pipe to test probe" in {
|
||||
import system.dispatcher
|
||||
//#pipeto-testprobe
|
||||
val sourceUnderTest = Source(1 to 4).grouped(2)
|
||||
|
||||
val probe = TestProbe()
|
||||
sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref)
|
||||
probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4)))
|
||||
//#pipeto-testprobe
|
||||
}
|
||||
|
||||
"sink actor ref" in {
|
||||
//#sink-actorref
|
||||
case object Tick
|
||||
val sourceUnderTest = Source(0.seconds, 200.millis, Tick)
|
||||
|
||||
val probe = TestProbe()
|
||||
val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run()
|
||||
|
||||
probe.expectMsg(1.second, Tick)
|
||||
probe.expectNoMsg(100.millis)
|
||||
probe.expectMsg(200.millis, Tick)
|
||||
cancellable.cancel()
|
||||
probe.expectMsg(200.millis, "completed")
|
||||
//#sink-actorref
|
||||
}
|
||||
|
||||
"source actor ref" in {
|
||||
//#source-actorref
|
||||
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
|
||||
|
||||
val (ref, future) = Source.actorRef(8, OverflowStrategy.fail)
|
||||
.toMat(sinkUnderTest)(Keep.both).run()
|
||||
|
||||
ref ! 1
|
||||
ref ! 2
|
||||
ref ! 3
|
||||
ref ! akka.actor.Status.Success("done")
|
||||
|
||||
val result = Await.result(future, 100.millis)
|
||||
assert(result == "123")
|
||||
//#source-actorref
|
||||
}
|
||||
|
||||
"test sink probe" in {
|
||||
|
||||
//#test-sink-probe
|
||||
Source(1 to 4)
|
||||
.filter(_ % 2 == 0)
|
||||
.map(_ * 2)
|
||||
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
|
||||
|
||||
sourceUnderTest
|
||||
.runWith(TestSink.probe[Int])
|
||||
.request(2)
|
||||
.expectNext(4, 8)
|
||||
|
|
@ -35,4 +104,54 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
//#test-sink-probe
|
||||
}
|
||||
|
||||
"test source probe" in {
|
||||
//#test-source-probe
|
||||
val sinkUnderTest = Sink.cancelled
|
||||
|
||||
TestSource.probe[Int]
|
||||
.toMat(sinkUnderTest)(Keep.left)
|
||||
.run()
|
||||
.expectCancellation()
|
||||
//#test-source-probe
|
||||
}
|
||||
|
||||
"injecting failure" in {
|
||||
//#injecting-failure
|
||||
val sinkUnderTest = Sink.head[Int]
|
||||
|
||||
val (probe, future) = TestSource.probe[Int]
|
||||
.toMat(sinkUnderTest)(Keep.both)
|
||||
.run()
|
||||
probe.sendError(new Exception("boom"))
|
||||
|
||||
Await.ready(future, 100.millis)
|
||||
val Failure(exception) = future.value.get
|
||||
assert(exception.getMessage == "boom")
|
||||
//#injecting-failure
|
||||
}
|
||||
|
||||
"test source and a sink" in {
|
||||
import system.dispatcher
|
||||
//#test-source-and-sink
|
||||
val flowUnderTest = Flow[Int].mapAsyncUnordered(2) { sleep =>
|
||||
pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep))
|
||||
}
|
||||
|
||||
val (pub, sub) = TestSource.probe[Int]
|
||||
.via(flowUnderTest)
|
||||
.toMat(TestSink.probe[Int])(Keep.both)
|
||||
.run()
|
||||
|
||||
sub.request(n = 3)
|
||||
pub.sendNext(3)
|
||||
pub.sendNext(2)
|
||||
pub.sendNext(1)
|
||||
sub.expectNextUnordered(1, 2, 3)
|
||||
|
||||
pub.sendError(new Exception("Power surge in the linear subroutine C-47!"))
|
||||
val ex = sub.expectError
|
||||
assert(ex.getMessage.contains("C-47"))
|
||||
//#test-source-and-sink
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,22 +4,67 @@
|
|||
Testing streams
|
||||
###############
|
||||
|
||||
Akka Streams comes with an :mod:`akka-stream-testkit` module that provides tools which can be used for controlling and asserting various parts of the stream pipeline.
|
||||
Verifying behaviour of Akka Stream sources, flows and sinks can be done using various code patterns and libraries. Here we will discuss testing these elements using:
|
||||
|
||||
Probe Sink
|
||||
==========
|
||||
- simple sources, sinks and flows;
|
||||
- sources and sinks in combination with :class:`TestProbe` from the :mod:`akka-testkit` module;
|
||||
- sources and sinks specifically crafted for writing tests from the :mod:`akka-stream-testkit` module.
|
||||
|
||||
Using probe as a `Sink` allows manual control over demand and assertions over elements coming downstream. Streams testkit provides a sink that materializes to a :class:`TestSubscriber.Probe`.
|
||||
It is important to keep your data processing pipeline as separate sources, flows and sinks. This makes them easily testable by wiring them up to other sources or sinks, or some test harnesses that :mod:`akka-testkit` or :mod:`akka-stream-testkit` provide.
|
||||
|
||||
Built in sources, sinks and combinators
|
||||
=======================================
|
||||
|
||||
Testing a custom sink can be as simple as attaching a source that emits elements from a predefined collection, running a constructed test flow and asserting on the results that sink produced. Here is an example of a test for a sink:
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#strict-collection
|
||||
|
||||
The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here the :code:`grouped` combinator and :code:`Sink.head` are very useful.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#grouped-infinite
|
||||
|
||||
When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that ease assertions.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#folded-stream
|
||||
|
||||
TestKit
|
||||
=======
|
||||
|
||||
Akka Stream offers integration with Actors out of the box. This support can be used for writing stream tests that use familiar :class:`TestProbe` from the :mod:`akka-testkit` API.
|
||||
|
||||
One of the more straightforward tests would be to materialize stream to a :class:`Future` and then use :code:`pipe` pattern to pipe the result of that future to the probe.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#pipeto-testprobe
|
||||
|
||||
Instead of materializing to a future, we can use a :class:`Sink.actorRef` that sends all incoming elements to the given :class:`ActorRef`. Now we can use assertion methods on :class:`TestProbe` and expect elements one by one as they arrive. We can also assert stream completion by expecting for :code:`onCompleteMessage` which was given to :class:`Sink.actorRef`.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#sink-actorref
|
||||
|
||||
Similarly to :class:`Sink.actorRef` that provides control over received elements, we can use :class:`Source.actorRef` and have full control over elements to be sent.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#source-actorref
|
||||
|
||||
Streams TestKit
|
||||
===============
|
||||
|
||||
You may have noticed various code patterns that emerge when testing stream pipelines. Akka Stream has a separate :mod:`akka-stream-testkit` module that provides tools specifically for writing stream tests. This module comes with two main components that are :class:`TestSource` and :class:`TestSink` which provide sources and sinks that materialize to probes that allow fluent API.
|
||||
|
||||
.. note::
|
||||
|
||||
Be sure to add the module :mod:`akka-stream-testkit` to your dependencies.
|
||||
|
||||
A sink returned by :code:`TestSink.probe` allows manual control over demand and assertions over elements coming downstream.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-sink-probe
|
||||
|
||||
Probe Source
|
||||
============
|
||||
|
||||
A source that materializes to :class:`TestPublisher.Probe` can be used for asserting demand or controlling when stream is completed or ended with an error.
|
||||
A source returned by :code:`TestSource.probe` can be used for asserting demand or controlling when stream is completed or ended with an error.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-probe
|
||||
|
||||
*TODO*
|
||||
You can also inject exceptions and test sink behaviour on error conditions.
|
||||
|
||||
List by example various operations on probes. Using probes without a sink.
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#injecting-failure
|
||||
|
||||
Test source and sink can be used together in combination when testing flows.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-and-sink
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue