From f930bcdda8126b88c1e36b55a813a1e3a9c2487f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 16 Apr 2015 20:13:43 +0200 Subject: [PATCH] =str #15191 Verify that stage actors are stopped * found one bug of too early actor stop and thereby missing cancel of upstream, in fan-in tests ""work with one immediately failed and one nonempty publisher" --- .../akka/stream/testkit/StreamTestKit.scala | 19 ++- .../akka/stream/testkit/TwoStreamsSetup.scala | 17 +-- .../stream/actor/ActorPublisherSpec.scala | 64 +++++----- .../akka/stream/extra/FlowTimedSpec.scala | 4 +- .../stream/io/InputStreamSourceSpec.scala | 2 +- .../akka/stream/io/OutputStreamSinkSpec.scala | 6 +- .../scala/akka/stream/io/StreamTcpSpec.scala | 19 +-- .../stream/io/SynchronousFileSinkSpec.scala | 10 +- .../stream/io/SynchronousFileSourceSpec.scala | 6 +- .../stream/scaladsl/ActorRefSinkSpec.scala | 5 +- .../stream/scaladsl/ActorRefSourceSpec.scala | 9 +- .../akka/stream/scaladsl/BidiFlowSpec.scala | 3 +- .../akka/stream/scaladsl/FlowBufferSpec.scala | 5 +- .../stream/scaladsl/FlowConcatAllSpec.scala | 9 +- .../akka/stream/scaladsl/FlowFoldSpec.scala | 7 +- .../stream/scaladsl/FlowForeachSpec.scala | 9 +- .../stream/scaladsl/FlowFromFutureSpec.scala | 9 +- .../stream/scaladsl/FlowGroupBySpec.scala | 114 +++++++++--------- .../scaladsl/FlowGroupedWithinSpec.scala | 3 +- .../stream/scaladsl/FlowIteratorSpec.scala | 15 +-- .../stream/scaladsl/FlowMapAsyncSpec.scala | 13 +- .../scaladsl/FlowMapAsyncUnorderedSpec.scala | 11 +- .../stream/scaladsl/FlowMapConcatSpec.scala | 17 +-- .../stream/scaladsl/FlowOnCompleteSpec.scala | 9 +- .../scaladsl/FlowPrefixAndTailSpec.scala | 18 +-- .../akka/stream/scaladsl/FlowScanSpec.scala | 7 +- .../scala/akka/stream/scaladsl/FlowSpec.scala | 3 +- .../stream/scaladsl/FlowSplitWhenSpec.scala | 89 +++++++------- .../akka/stream/scaladsl/FlowStageSpec.scala | 17 +-- .../stream/scaladsl/FlowTakeWithinSpec.scala | 3 +- .../scaladsl/FlowTimerTransformerSpec.scala | 5 +- .../stream/scaladsl/GraphBalanceSpec.scala | 11 +- .../stream/scaladsl/GraphBroadcastSpec.scala | 9 +- .../stream/scaladsl/GraphConcatSpec.scala | 15 +-- .../stream/scaladsl/GraphFlexiMergeSpec.scala | 23 ++-- .../stream/scaladsl/GraphFlexiRouteSpec.scala | 23 ++-- .../akka/stream/scaladsl/GraphMergeSpec.scala | 7 +- .../akka/stream/scaladsl/GraphUnzipSpec.scala | 5 +- .../akka/stream/scaladsl/GraphZipSpec.scala | 11 +- .../akka/stream/scaladsl/HeadSinkSpec.scala | 7 +- .../stream/scaladsl/PublisherSinkSpec.scala | 3 +- .../stream/scaladsl/SubscriberSinkSpec.scala | 3 +- .../SubstreamSubscriptionTimeoutSpec.scala | 13 +- .../akka/stream/scaladsl/TickSourceSpec.scala | 5 +- .../impl/ActorFlowMaterializerImpl.scala | 15 +-- .../main/scala/akka/stream/impl/FanIn.scala | 2 +- 46 files changed, 373 insertions(+), 306 deletions(-) diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index b3b01028b6..4529881f73 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -7,6 +7,7 @@ import akka.stream.FlowMaterializer import com.typesafe.config.ConfigFactory import scala.language.existentials +import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.stream.impl.{ StreamSupervisor, ActorFlowMaterializerImpl, EmptyPublisher, ErrorPublisher } import akka.testkit.TestProbe @@ -14,6 +15,10 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.concurrent.duration.FiniteDuration import akka.actor.DeadLetterSuppression import scala.util.control.NoStackTrace +import akka.stream.FlowMaterializer +import akka.stream.impl.ActorFlowMaterializerImpl +import akka.stream.impl.StreamSupervisor +import akka.actor.ActorRef object StreamTestKit { @@ -181,17 +186,19 @@ object StreamTestKit { case class TE(message: String) extends RuntimeException(message) with NoStackTrace - def checkThatAllStagesAreStopped[T](block: ⇒ T)(implicit materializer: FlowMaterializer): T = + def assertAllStagesStopped[T](block: ⇒ T)(implicit materializer: FlowMaterializer): T = materializer match { case impl: ActorFlowMaterializerImpl ⇒ impl.supervisor ! StreamSupervisor.StopChildren val result = block val probe = TestProbe()(impl.system) - probe.awaitAssert { - impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - val children = probe.expectMsgType[StreamSupervisor.Children].children - assert(children.isEmpty, - s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") + probe.within(5.seconds) { + probe.awaitAssert { + impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) + val children = probe.expectMsgType[StreamSupervisor.Children].children + assert(children.isEmpty, + s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") + } } result case _ ⇒ block diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index a5f7d419f1..d77633fcf1 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -5,6 +5,7 @@ import akka.stream.scaladsl._ import org.reactivestreams.Publisher import scala.collection.immutable import scala.util.control.NoStackTrace +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped abstract class TwoStreamsSetup extends AkkaSpec { @@ -51,42 +52,42 @@ abstract class TwoStreamsSetup extends AkkaSpec { def soonToCompletePublisher[T]: Publisher[T] = StreamTestKit.lazyEmptyPublisher[T] def commonTests() = { - "work with two immediately completed publishers" in { + "work with two immediately completed publishers" in assertAllStagesStopped { val subscriber = setup(completedPublisher, completedPublisher) subscriber.expectSubscriptionAndComplete() } - "work with two delayed completed publishers" in { + "work with two delayed completed publishers" in assertAllStagesStopped { val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher) subscriber.expectSubscriptionAndComplete() } - "work with one immediately completed and one delayed completed publisher" in { + "work with one immediately completed and one delayed completed publisher" in assertAllStagesStopped { val subscriber = setup(completedPublisher, soonToCompletePublisher) subscriber.expectSubscriptionAndComplete() } - "work with two immediately failed publishers" in { + "work with two immediately failed publishers" in assertAllStagesStopped { val subscriber = setup(failedPublisher, failedPublisher) subscriber.expectSubscriptionAndError(TestException) } - "work with two delayed failed publishers" in { + "work with two delayed failed publishers" in assertAllStagesStopped { val subscriber = setup(soonToFailPublisher, soonToFailPublisher) subscriber.expectSubscriptionAndError(TestException) } // Warning: The two test cases below are somewhat implementation specific and might fail if the implementation // is changed. They are here to be an early warning though. - "work with one immediately failed and one delayed failed publisher (case 1)" in { + "work with one immediately failed and one delayed failed publisher (case 1)" in assertAllStagesStopped { val subscriber = setup(soonToFailPublisher, failedPublisher) subscriber.expectSubscriptionAndError(TestException) } - "work with one immediately failed and one delayed failed publisher (case 2)" in { + "work with one immediately failed and one delayed failed publisher (case 2)" in assertAllStagesStopped { val subscriber = setup(failedPublisher, soonToFailPublisher) subscriber.expectSubscriptionAndError(TestException) } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 7defef30f5..dddae24dd7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -285,28 +285,30 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic "work together with Flow and ActorSubscriber" in { implicit val materializer = ActorFlowMaterializer() - val probe = TestProbe() + StreamTestKit.assertAllStagesStopped { + val probe = TestProbe() - val source: Source[Int, ActorRef] = Source.actorPublisher(senderProps) - val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref)) + val source: Source[Int, ActorRef] = Source.actorPublisher(senderProps) + val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref)) - val (snd, rcv) = source.collect { - case n if n % 2 == 0 ⇒ "elem-" + n - }.toMat(sink)(Keep.both).run() + val (snd, rcv) = source.collect { + case n if n % 2 == 0 ⇒ "elem-" + n + }.toMat(sink)(Keep.both).run() - (1 to 3) foreach { snd ! _ } - probe.expectMsg("elem-2") + (1 to 3) foreach { snd ! _ } + probe.expectMsg("elem-2") - (4 to 500) foreach { n ⇒ - if (n % 19 == 0) Thread.sleep(50) // simulate bursts - snd ! n + (4 to 500) foreach { n ⇒ + if (n % 19 == 0) Thread.sleep(50) // simulate bursts + snd ! n + } + + (4 to 500 by 2) foreach { n ⇒ probe.expectMsg("elem-" + n) } + + watch(snd) + rcv ! PoisonPill + expectTerminated(snd) } - - (4 to 500 by 2) foreach { n ⇒ probe.expectMsg("elem-" + n) } - - watch(snd) - rcv ! PoisonPill - expectTerminated(snd) } "work in a FlowGraph" in { @@ -349,22 +351,24 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic "be able to define a subscription-timeout, after which it should shut down" in { implicit val materializer = ActorFlowMaterializer() - val timeout = 150.millis - val a = system.actorOf(timeoutingProps(testActor, timeout)) - val pub = ActorPublisher(a) + StreamTestKit.assertAllStagesStopped { + val timeout = 150.millis + val a = system.actorOf(timeoutingProps(testActor, timeout)) + val pub = ActorPublisher(a) - // don't subscribe for `timeout` millis, so it will shut itself down - expectMsg("timed-out") + // don't subscribe for `timeout` millis, so it will shut itself down + expectMsg("timed-out") - // now subscribers will already be rejected, while the actor could perform some clean-up - val sub = StreamTestKit.SubscriberProbe() - pub.subscribe(sub) - sub.expectSubscriptionAndError() + // now subscribers will already be rejected, while the actor could perform some clean-up + val sub = StreamTestKit.SubscriberProbe() + pub.subscribe(sub) + sub.expectSubscriptionAndError() - expectMsg("cleaned-up") - // termination is tiggered by user code - watch(a) - expectTerminated(a) + expectMsg("cleaned-up") + // termination is tiggered by user code + watch(a) + expectTerminated(a) + } } "be able to define a subscription-timeout, which is cancelled by the first incoming Subscriber" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index 5b0b29e4f6..ef9792c2d7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -72,7 +72,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { "Timed Flow" must { import akka.stream.extra.Implicits.TimedFlowDsl - "measure time it between elements matching a predicate" in { + "measure time it between elements matching a predicate" in StreamTestKit.assertAllStagesStopped { val probe = TestProbe() val flow: Flow[Int, Long, _] = Flow[Int].map(_.toLong).timedIntervalBetween(in ⇒ in % 2 == 1, d ⇒ probe.ref ! d) @@ -91,7 +91,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { info(s"Got duration (first): $duration") } - "measure time from start to complete, by wrapping operations" in { + "measure time from start to complete, by wrapping operations" in StreamTestKit.assertAllStagesStopped { val probe = TestProbe() // making sure the types come out as expected diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index 5a52051b64..c7228c23b1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -18,7 +18,7 @@ class InputStreamSourceSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfi implicit val materializer = ActorFlowMaterializer(settings) "InputStreamSource" must { - "read bytes from InputStream" in checkThatAllStagesAreStopped { + "read bytes from InputStream" in assertAllStagesStopped { val f = InputStreamSource(() ⇒ new InputStream { @volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt) override def read(): Int = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala index c6549b83c3..6d45cc2919 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -21,7 +21,7 @@ class OutputStreamSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig implicit val materializer = ActorFlowMaterializer(settings) "OutputStreamSink" must { - "write bytes to void OutputStream" in checkThatAllStagesAreStopped { + "write bytes to void OutputStream" in assertAllStagesStopped { val p = TestProbe() val datas = List(ByteString("a"), ByteString("c"), ByteString("c")) @@ -37,7 +37,7 @@ class OutputStreamSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig Await.ready(completion, 3.seconds) } - "close underlying stream when error received" in checkThatAllStagesAreStopped { + "close underlying stream when error received" in assertAllStagesStopped { val p = TestProbe() Source.failed(new TE("Boom!")) .runWith(OutputStreamSink(() ⇒ new OutputStream { @@ -48,7 +48,7 @@ class OutputStreamSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig p.expectMsg("closed") } - "close underlying stream when completion received" in checkThatAllStagesAreStopped { + "close underlying stream when completion received" in assertAllStagesStopped { val p = TestProbe() Source.empty .runWith(OutputStreamSink(() ⇒ new OutputStream { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala index 9a3eaa08b8..b1f059783d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala @@ -17,6 +17,7 @@ import akka.stream.scaladsl.Flow import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.scaladsl._ import akka.stream.testkit.TestUtils.temporaryServerAddress +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class StreamTcpSpec extends AkkaSpec with TcpHelper { import akka.stream.io.TcpHelper._ @@ -24,7 +25,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { "Outgoing TCP stream" must { - "work in the happy case" in { + "work in the happy case" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -75,7 +76,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { } - "work when client closes write, then remote closes write" in { + "work when client closes write, then remote closes write" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -105,7 +106,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "work when remote closes write, then client closes write" in { + "work when remote closes write, then client closes write" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -133,7 +134,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "work when client closes read, then client closes write" in { + "work when client closes read, then client closes write" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -165,7 +166,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "work when client closes write, then client closes read" in { + "work when client closes write, then client closes read" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -198,7 +199,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "work when client closes read, then server closes write, then client closes write" in { + "work when client closes read, then server closes write, then client closes write" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -227,7 +228,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "shut everything down if client signals error" in { + "shut everything down if client signals error" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -254,7 +255,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "shut everything down if client signals error after remote has closed write" in { + "shut everything down if client signals error after remote has closed write" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() @@ -282,7 +283,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { serverConnection.expectTerminated() } - "shut down both streams when connection is aborted remotely" in { + "shut down both streams when connection is aborted remotely" in assertAllStagesStopped { // Client gets a PeerClosed event and does not know that the write side is also closed val testData = ByteString(1, 2, 3, 4, 5) val server = new Server() diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala index 27900b0252..16738cfcd9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala @@ -35,7 +35,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxCon val TestByteStrings = TestLines.map(ByteString(_)) "SynchronousFile Sink" must { - "write lines to a file" in checkThatAllStagesAreStopped { + "write lines to a file" in assertAllStagesStopped { targetFile { f ⇒ val completion = Source(TestByteStrings) .runWith(SynchronousFileSink(f)) @@ -46,7 +46,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxCon } } - "by default write into existing file" in checkThatAllStagesAreStopped { + "by default write into existing file" in assertAllStagesStopped { targetFile { f ⇒ def write(lines: List[String]) = Source(lines) @@ -65,7 +65,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxCon } } - "allow appending to file" in checkThatAllStagesAreStopped { + "allow appending to file" in assertAllStagesStopped { targetFile { f ⇒ def write(lines: List[String] = TestLines) = Source(lines) @@ -84,7 +84,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxCon } } - "use dedicated file-io-dispatcher by default" in checkThatAllStagesAreStopped { + "use dedicated file-io-dispatcher by default" in assertAllStagesStopped { targetFile { f ⇒ val sys = ActorSystem("dispatcher-testing", StreamTestKit.UnboundedMailboxConfig) val mat = ActorFlowMaterializer()(sys) @@ -99,7 +99,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxCon } } - "allow overriding the dispatcher using OperationAttributes" in checkThatAllStagesAreStopped { + "allow overriding the dispatcher using OperationAttributes" in assertAllStagesStopped { targetFile { f ⇒ val sys = ActorSystem("dispatcher-testing", StreamTestKit.UnboundedMailboxConfig) val mat = ActorFlowMaterializer()(sys) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index 521b6e56d8..ce76b1e2e9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -61,7 +61,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxC } "File Source" must { - "read contents from a file" in checkThatAllStagesAreStopped { + "read contents from a file" in assertAllStagesStopped { val chunkSize = 512 val bufferAttributes = OperationAttributes.inputBuffer(1, 2) @@ -96,7 +96,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxC c.expectComplete() } - "complete only when all contents of a file have been signalled" in checkThatAllStagesAreStopped { + "complete only when all contents of a file have been signalled" in assertAllStagesStopped { val chunkSize = 256 val bufferAttributes = OperationAttributes.inputBuffer(4, 8) @@ -130,7 +130,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxC c.expectComplete() } - "onError whent trying to read from file which does not exist" in checkThatAllStagesAreStopped { + "onError whent trying to read from file which does not exist" in assertAllStagesStopped { val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[ByteString]() p.subscribe(c) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index 6a54803b0a..508585d85f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props @@ -24,7 +25,7 @@ class ActorRefSinkSpec extends AkkaSpec { "A ActorRefSink" must { - "send the elements to the ActorRef" in { + "send the elements to the ActorRef" in assertAllStagesStopped { Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done")) expectMsg(1) expectMsg(2) @@ -32,7 +33,7 @@ class ActorRefSinkSpec extends AkkaSpec { expectMsg("done") } - "cancel stream when actor terminates" in { + "cancel stream when actor terminates" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Int]() val fw = system.actorOf(Props(classOf[Fw], testActor).withDispatcher("akka.test.stream-dispatcher")) Source(publisher).runWith(Sink.actorRef(fw, onCompleteMessage = "done")) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index d2b2e24281..23bc3fec69 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -10,6 +10,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.actor.PoisonPill import akka.actor.Status +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class ActorRefSourceSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() @@ -44,7 +45,7 @@ class ActorRefSourceSpec extends AkkaSpec { for (n ← 300 to 399) s.expectNext(n) } - "terminate when the stream is cancelled" in { + "terminate when the stream is cancelled" in assertAllStagesStopped { val s = StreamTestKit.SubscriberProbe[Int]() val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink(s)).run() watch(ref) @@ -53,7 +54,7 @@ class ActorRefSourceSpec extends AkkaSpec { expectTerminated(ref) } - "complete the stream when receiving PoisonPill" in { + "complete the stream when receiving PoisonPill" in assertAllStagesStopped { val s = StreamTestKit.SubscriberProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() val sub = s.expectSubscription @@ -61,7 +62,7 @@ class ActorRefSourceSpec extends AkkaSpec { s.expectComplete() } - "complete the stream when receiving Status.Success" in { + "complete the stream when receiving Status.Success" in assertAllStagesStopped { val s = StreamTestKit.SubscriberProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() val sub = s.expectSubscription @@ -69,7 +70,7 @@ class ActorRefSourceSpec extends AkkaSpec { s.expectComplete() } - "fail the stream when receiving Status.Failure" in { + "fail the stream when receiving Status.Failure" in assertAllStagesStopped { val s = StreamTestKit.SubscriberProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() val sub = s.expectSubscription diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index 5188f81752..5d2de8d30f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.collection.immutable import akka.stream.OperationAttributes +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { import OperationAttributes._ @@ -91,7 +92,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { Await.result(f, 1.second) should ===(42) } - "combine materialization values" in { + "combine materialization values" in assertAllStagesStopped { val left = Flow(Sink.head[Int]) { implicit b ⇒ sink ⇒ val bcast = b.add(Broadcast[Int](2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 46423199d6..e65e34ff91 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorFlowMaterializerSettings import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy.Fail.BufferOverflowException import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowBufferSpec extends AkkaSpec { @@ -35,7 +36,7 @@ class FlowBufferSpec extends AkkaSpec { Await.result(future, 3.seconds) should be(1 to 1000) } - "pass elements through a chain of backpressured buffers of different size" in { + "pass elements through a chain of backpressured buffers of different size" in assertAllStagesStopped { val future = Source(1 to 1000) .buffer(1, overflowStrategy = OverflowStrategy.backpressure) .buffer(10, overflowStrategy = OverflowStrategy.backpressure) @@ -155,7 +156,7 @@ class FlowBufferSpec extends AkkaSpec { sub.cancel() } - "fail upstream if buffer is full and configured so" in { + "fail upstream if buffer is full and configured so" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index e3ec71c402..e0fa6e21ee 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -8,6 +8,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowConcatAllSpec extends AkkaSpec { @@ -20,7 +21,7 @@ class FlowConcatAllSpec extends AkkaSpec { val testException = new Exception("test") with NoStackTrace - "work in the happy case" in { + "work in the happy case" in assertAllStagesStopped { val s1 = Source(1 to 2) val s2 = Source(List.empty[Int]) val s3 = Source(List(3)) @@ -48,7 +49,7 @@ class FlowConcatAllSpec extends AkkaSpec { subscriber.expectComplete() } - "on onError on master stream cancel the current open substream and signal error" in { + "on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Source[Int, _]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() @@ -68,7 +69,7 @@ class FlowConcatAllSpec extends AkkaSpec { subUpstream.expectCancellation() } - "on onError on open substream, cancel the master stream and signal error " in { + "on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Source[Int, _]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() @@ -88,7 +89,7 @@ class FlowConcatAllSpec extends AkkaSpec { upstream.expectCancellation() } - "on cancellation cancel the current open substream and the master stream" in { + "on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Source[Int, _]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index 0d9837b260..00ced50547 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -8,26 +8,27 @@ import scala.util.control.NoStackTrace import akka.stream.{ OverflowStrategy, ActorFlowMaterializer } import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowFoldSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() "A Fold" must { - "fold" in { + "fold" in assertAllStagesStopped { val input = 1 to 100 val future = Source(input).runFold(0)(_ + _) val expected = input.fold(0)(_ + _) Await.result(future, remaining) should be(expected) } - "propagate an error" in { + "propagate an error" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = Source[Unit](() ⇒ throw error).runFold(())((_, _) ⇒ ()) the[Exception] thrownBy Await.result(future, remaining) should be(error) } - "complete future with failure when function throws" in { + "complete future with failure when function throws" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = Source.single(1).runFold(0)((_, _) ⇒ throw error) the[Exception] thrownBy Await.result(future, remaining) should be(error) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index df161844bd..62806a2a2a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -7,6 +7,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import scala.concurrent.Await +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowForeachSpec extends AkkaSpec { @@ -15,7 +16,7 @@ class FlowForeachSpec extends AkkaSpec { "A Foreach" must { - "call the procedure for each element" in { + "call the procedure for each element" in assertAllStagesStopped { Source(1 to 3).runForeach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } @@ -25,14 +26,14 @@ class FlowForeachSpec extends AkkaSpec { expectMsg("done") } - "complete the future for an empty stream" in { + "complete the future for an empty stream" in assertAllStagesStopped { Source.empty[String].runForeach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg("done") } - "yield the first error" in { + "yield the first error" in assertAllStagesStopped { val p = StreamTestKit.PublisherProbe[Int]() Source(p).runForeach(testActor ! _) onFailure { case ex ⇒ testActor ! ex @@ -44,7 +45,7 @@ class FlowForeachSpec extends AkkaSpec { expectMsg(ex) } - "complete future with failure when function throws" in { + "complete future with failure when function throws" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = Source.single(1).runForeach(_ ⇒ throw error) the[Exception] thrownBy Await.result(future, remaining) should be(error) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 1868e429af..a5e934e3cc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -9,6 +9,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.{ AkkaSpec, StreamTestKit } @@ -19,7 +20,7 @@ class FlowFromFutureSpec extends AkkaSpec { implicit val materializer = ActorFlowMaterializer(settings) "A Flow based on a Future" must { - "produce one element from already successful Future" in { + "produce one element from already successful Future" in assertAllStagesStopped { val p = Source(Future.successful(1)).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) @@ -30,7 +31,7 @@ class FlowFromFutureSpec extends AkkaSpec { c.expectComplete() } - "produce error from already failed Future" in { + "produce error from already failed Future" in assertAllStagesStopped { val ex = new RuntimeException("test") with NoStackTrace val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() @@ -38,7 +39,7 @@ class FlowFromFutureSpec extends AkkaSpec { c.expectSubscriptionAndError(ex) } - "produce one element when Future is completed" in { + "produce one element when Future is completed" in assertAllStagesStopped { val promise = Promise[Int]() val p = Source(promise.future).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() @@ -65,7 +66,7 @@ class FlowFromFutureSpec extends AkkaSpec { c.expectComplete() } - "produce elements with multiple subscribers" in { + "produce elements with multiple subscribers" in assertAllStagesStopped { val promise = Promise[Int]() val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) val c1 = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index ad1581d3dc..bcf5becfb4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -13,6 +13,7 @@ import akka.stream.testkit.StreamTestKit.TE import org.reactivestreams.Publisher import akka.stream.OperationAttributes import akka.stream.ActorOperationAttributes +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowGroupBySpec extends AkkaSpec { @@ -56,59 +57,62 @@ class FlowGroupBySpec extends AkkaSpec { } "groupBy" must { - "work in the happy case" in new SubstreamsSupport(groupCount = 2) { - val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) - masterSubscriber.expectNoMsg(100.millis) + "work in the happy case" in assertAllStagesStopped { + new SubstreamsSupport(groupCount = 2) { + val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) + masterSubscriber.expectNoMsg(100.millis) - s1.expectNoMsg(100.millis) - s1.request(1) - s1.expectNext(1) - s1.expectNoMsg(100.millis) + s1.expectNoMsg(100.millis) + s1.request(1) + s1.expectNext(1) + s1.expectNoMsg(100.millis) - val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) + val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) - s2.expectNoMsg(100.millis) - s2.request(2) - s2.expectNext(2) + s2.expectNoMsg(100.millis) + s2.request(2) + s2.expectNext(2) - // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box - s1.request(1) - s2.expectNext(4) + // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box + s1.request(1) + s2.expectNext(4) - s2.expectNoMsg(100.millis) + s2.expectNoMsg(100.millis) - s1.expectNext(3) + s1.expectNext(3) - s2.request(1) - // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box - s1.request(1) - s2.expectNext(6) - s2.expectComplete() + s2.request(1) + // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box + s1.request(1) + s2.expectNext(6) + s2.expectComplete() - s1.expectNext(5) - s1.expectComplete() + s1.expectNext(5) + s1.expectComplete() - masterSubscriber.expectComplete() + masterSubscriber.expectComplete() + } } - "accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) { - StreamPuppet(getSubFlow(1).runWith(Sink.publisher)).cancel() + "accept cancellation of substreams" in assertAllStagesStopped { + new SubstreamsSupport(groupCount = 2) { + StreamPuppet(getSubFlow(1).runWith(Sink.publisher)).cancel() - val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) - substream.request(2) - substream.expectNext(2) - substream.expectNext(4) - substream.expectNoMsg(100.millis) + val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) + substream.request(2) + substream.expectNext(2) + substream.expectNext(4) + substream.expectNoMsg(100.millis) - substream.request(2) - substream.expectNext(6) - substream.expectComplete() - - masterSubscriber.expectComplete() + substream.request(2) + substream.expectNext(6) + substream.expectComplete() + masterSubscriber.expectComplete() + } } - "accept cancellation of master stream when not consumed anything" in { + "accept cancellation of master stream when not consumed anything" in assertAllStagesStopped { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() @@ -120,25 +124,27 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.expectCancellation() } - "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { - val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) + "accept cancellation of master stream when substreams are open" in assertAllStagesStopped { + new SubstreamsSupport(groupCount = 3, elementCount = 13) { + val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) - substream.request(1) - substream.expectNext(1) + substream.request(1) + substream.expectNext(1) - masterSubscription.cancel() - masterSubscriber.expectNoMsg(100.millis) + masterSubscription.cancel() + masterSubscriber.expectNoMsg(100.millis) - // Open substreams still work, others are discarded - substream.request(4) - substream.expectNext(4) - substream.expectNext(7) - substream.expectNext(10) - substream.expectNext(13) - substream.expectComplete() + // Open substreams still work, others are discarded + substream.request(4) + substream.expectNext(4) + substream.expectNext(7) + substream.expectNext(10) + substream.expectNext(13) + substream.expectComplete() + } } - "work with empty input stream" in { + "work with empty input stream" in assertAllStagesStopped { val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -146,7 +152,7 @@ class FlowGroupBySpec extends AkkaSpec { subscriber.expectSubscriptionAndComplete() } - "abort on onError from upstream" in { + "abort on onError from upstream" in assertAllStagesStopped { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() @@ -163,7 +169,7 @@ class FlowGroupBySpec extends AkkaSpec { subscriber.expectError(e) } - "abort on onError from upstream when substreams are running" in { + "abort on onError from upstream when substreams are running" in assertAllStagesStopped { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() @@ -190,7 +196,7 @@ class FlowGroupBySpec extends AkkaSpec { } - "fail stream when groupBy function throws" in { + "fail stream when groupBy function throws" in assertAllStagesStopped { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val exc = TE("test") val publisher = Source(publisherProbeProbe) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index 63021e2dbd..ffbe941308 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { @@ -21,7 +22,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "A GroupedWithin" must { - "group elements within the duration" in { + "group elements within the duration" in assertAllStagesStopped { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 13fd58ad34..ec3192ec26 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -12,6 +12,7 @@ import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.OnComplete import akka.stream.testkit.StreamTestKit.OnError import akka.stream.testkit.StreamTestKit.OnNext +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream.impl.SynchronousIterablePublisher import org.reactivestreams.Subscription import akka.testkit.TestProbe @@ -160,7 +161,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { def createSource(elements: Int): Source[Int, Unit] testName must { - "produce elements" in { + "produce elements" in assertAllStagesStopped { val p = createSource(3).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) @@ -174,7 +175,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { c.expectComplete() } - "complete empty" in { + "complete empty" in assertAllStagesStopped { val p = createSource(0).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) @@ -182,7 +183,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { c.expectNoMsg(100.millis) } - "produce elements with multiple subscribers" in { + "produce elements with multiple subscribers" in assertAllStagesStopped { val p = createSource(3).runWith(Sink.fanoutPublisher(2, 4)) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -206,7 +207,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { c2.expectComplete() } - "produce elements to later subscriber" in { + "produce elements to later subscriber" in assertAllStagesStopped { val p = createSource(3).runWith(Sink.fanoutPublisher(2, 4)) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -229,7 +230,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { c1.expectComplete() } - "produce elements with one transformation step" in { + "produce elements with one transformation step" in assertAllStagesStopped { val p = createSource(3).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) @@ -241,7 +242,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { c.expectComplete() } - "produce elements with two transformation steps" in { + "produce elements with two transformation steps" in assertAllStagesStopped { val p = createSource(4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) @@ -252,7 +253,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { c.expectComplete() } - "not produce after cancel" in { + "not produce after cancel" in assertAllStagesStopped { val p = createSource(3).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index e0eb22181c..1c734c2ea6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorFlowMaterializer import akka.stream.stage._ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.testkit.TestLatch import akka.testkit.TestProbe import akka.stream.ActorOperationAttributes.supervisionStrategy @@ -66,7 +67,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "A Flow with mapAsync" must { - "produce future elements" in { + "produce future elements" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher val p = Source(1 to 3).mapAsync(4, n ⇒ Future(n)).runWith(Sink(c)) @@ -119,7 +120,7 @@ class FlowMapAsyncSpec extends AkkaSpec { c.expectNoMsg(200.millis) } - "signal future failure" in { + "signal future failure" in assertAllStagesStopped { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher @@ -136,7 +137,7 @@ class FlowMapAsyncSpec extends AkkaSpec { latch.countDown() } - "signal error from mapAsync" in { + "signal error from mapAsync" in assertAllStagesStopped { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher @@ -155,7 +156,7 @@ class FlowMapAsyncSpec extends AkkaSpec { latch.countDown() } - "resume after future failure" in { + "resume after future failure" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher val p = Source(1 to 5) @@ -171,7 +172,7 @@ class FlowMapAsyncSpec extends AkkaSpec { c.expectComplete() } - "finish after future failure" in { + "finish after future failure" in assertAllStagesStopped { import system.dispatcher Await.result(Source(1 to 3).mapAsync(1, n ⇒ Future { if (n == 3) throw new RuntimeException("err3b") with NoStackTrace @@ -216,7 +217,7 @@ class FlowMapAsyncSpec extends AkkaSpec { c.expectComplete() } - "should handle cancel properly" in { + "should handle cancel properly" in assertAllStagesStopped { val pub = StreamTestKit.PublisherProbe[Int]() val sub = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index dc598a1074..64e1a645b3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -11,6 +11,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.testkit.TestLatch import akka.testkit.TestProbe import akka.stream.ActorOperationAttributes.supervisionStrategy @@ -25,7 +26,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "A Flow with mapAsyncUnordered" must { - "produce future elements in the order they are ready" in { + "produce future elements in the order they are ready" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher val latch = (1 to 4).map(_ -> TestLatch(1)).toMap @@ -73,7 +74,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { c.expectComplete() } - "signal future failure" in { + "signal future failure" in assertAllStagesStopped { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher @@ -90,7 +91,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { latch.countDown() } - "signal error from mapAsyncUnordered" in { + "signal error from mapAsyncUnordered" in assertAllStagesStopped { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher @@ -125,7 +126,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { c.probe.receiveWhile(2.seconds, messages = 5) { case x ⇒ x }.toSet should be(expected) } - "finish after future failure" in { + "finish after future failure" in assertAllStagesStopped { import system.dispatcher Await.result(Source(1 to 3).mapAsyncUnordered(1, n ⇒ Future { if (n == 3) throw new RuntimeException("err3b") with NoStackTrace @@ -170,7 +171,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { c.expectComplete() } - "should handle cancel properly" in { + "should handle cancel properly" in assertAllStagesStopped { val pub = StreamTestKit.PublisherProbe[Int]() val sub = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala index 70ae8b4dc6..b412ed6c68 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala @@ -8,6 +8,7 @@ import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream.ActorFlowMaterializer class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { @@ -32,13 +33,15 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { val settings = ActorFlowMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 2) implicit val materializer = ActorFlowMaterializer(settings) - val s = SubscriberProbe[Int] - val input = (1 to 20).grouped(5).toList - Source(input).mapConcat(identity).map(x ⇒ { Thread.sleep(10); x }).runWith(Sink(s)) - val sub = s.expectSubscription() - sub.request(100) - for (i ← 1 to 20) s.expectNext(i) - s.expectComplete() + assertAllStagesStopped { + val s = SubscriberProbe[Int] + val input = (1 to 20).grouped(5).toList + Source(input).mapConcat(identity).map(x ⇒ { Thread.sleep(10); x }).runWith(Sink(s)) + val sub = s.expectSubscription() + sub.request(100) + for (i ← 1 to 20) s.expectNext(i) + s.expectComplete() + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala index 1fd2e7acb2..ad04ef9135 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala @@ -10,6 +10,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import akka.testkit.TestProbe @@ -23,7 +24,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "A Flow with onComplete" must { - "invoke callback on normal completion" in { + "invoke callback on normal completion" in assertAllStagesStopped { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() @@ -35,7 +36,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { onCompleteProbe.expectMsg(Success(())) } - "yield the first error" in { + "yield the first error" in assertAllStagesStopped { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() @@ -47,7 +48,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { onCompleteProbe.expectNoMsg(100.millis) } - "invoke callback for an empty stream" in { + "invoke callback for an empty stream" in assertAllStagesStopped { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() @@ -58,7 +59,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { onCompleteProbe.expectNoMsg(100.millis) } - "invoke callback after transform and foreach steps " in { + "invoke callback after transform and foreach steps " in assertAllStagesStopped { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() import system.dispatcher // for the Future.onComplete diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index cb46c3f2be..63a5cb5e06 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -7,11 +7,13 @@ import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.testkit.StreamTestKit.PublisherProbe +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped +import org.reactivestreams.Subscriber class FlowPrefixAndTailSpec extends AkkaSpec { @@ -46,7 +48,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { tailSubscriber.expectSubscriptionAndComplete() } - "work on longer inputs" in { + "work on longer inputs" in assertAllStagesStopped { val futureSink = newHeadSink val fut = Source(1 to 10).prefixAndTail(5).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) @@ -57,7 +59,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { Await.result(fut2, 3.seconds) should be(6 to 10) } - "handle zero take count" in { + "handle zero take count" in assertAllStagesStopped { val futureSink = newHeadSink val fut = Source(1 to 10).prefixAndTail(0).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) @@ -79,7 +81,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { Await.result(fut2, 3.seconds) should be(1 to 10) } - "work if size of take is equal to stream size" in { + "work if size of take is equal to stream size" in assertAllStagesStopped { val futureSink = newHeadSink val fut = Source(1 to 10).prefixAndTail(10).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) @@ -90,7 +92,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectSubscriptionAndComplete() } - "handle onError when no substream open" in { + "handle onError when no substream open" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int, _])]() @@ -108,7 +110,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectError(testException) } - "handle onError when substream is open" in { + "handle onError when substream is open" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int, _])]() @@ -135,7 +137,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { } - "handle master stream cancellation" in { + "handle master stream cancellation" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int, _])]() @@ -153,7 +155,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { upstream.expectCancellation() } - "handle substream cancellation" in { + "handle substream cancellation" in assertAllStagesStopped { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int, _])]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 68e344a390..93b1eccc01 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.AkkaSpec import akka.stream.ActorOperationAttributes import akka.stream.Supervision +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowScanSpec extends AkkaSpec { @@ -25,17 +26,17 @@ class FlowScanSpec extends AkkaSpec { def scan(s: Source[Int, Unit], duration: Duration = 5.seconds): immutable.Seq[Int] = Await.result(s.scan(0)(_ + _).runFold(immutable.Seq.empty[Int])(_ :+ _), duration) - "Scan" in { + "Scan" in assertAllStagesStopped { val v = Vector.fill(random.nextInt(100, 1000))(random.nextInt()) scan(Source(v)) should be(v.scan(0)(_ + _)) } - "Scan empty failed" in { + "Scan empty failed" in assertAllStagesStopped { val e = new Exception("fail!") intercept[Exception](scan(Source.failed[Int](e))) should be theSameInstanceAs (e) } - "Scan empty" in { + "Scan empty" in assertAllStagesStopped { val v = Vector.empty[Int] scan(Source(v)) should be(v.scan(0)(_ + _)) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 55c94f2748..5720895d54 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -16,6 +16,7 @@ import akka.stream.ActorFlowMaterializer import akka.stream.impl._ import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.testkit.ChainSetup +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.testkit._ import akka.testkit.TestEvent.{ UnMute, Mute } import com.typesafe.config.ConfigFactory @@ -247,7 +248,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece c1.expectComplete } - "be materializable several times with fanout publisher" in { + "be materializable several times with fanout publisher" in assertAllStagesStopped { val flow = Source(List(1, 2, 3)).map(_.toString) val p1 = flow.runWith(Sink.fanoutPublisher(2, 2)) val p2 = flow.runWith(Sink.fanoutPublisher(2, 2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 35c0ff46e0..d2f87a10f0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -10,6 +10,7 @@ import akka.stream.Supervision.resumingDecider import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.TE +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import org.reactivestreams.Publisher import akka.stream.ActorOperationAttributes @@ -55,60 +56,66 @@ class FlowSplitWhenSpec extends AkkaSpec { "splitWhen" must { - "work in the happy case" in new SubstreamsSupport(elementCount = 4) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) - masterSubscriber.expectNoMsg(100.millis) + "work in the happy case" in assertAllStagesStopped { + new SubstreamsSupport(elementCount = 4) { + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + masterSubscriber.expectNoMsg(100.millis) - s1.request(2) - s1.expectNext(1) - s1.expectNext(2) - s1.request(1) - s1.expectComplete() + s1.request(2) + s1.expectNext(1) + s1.expectNext(2) + s1.request(1) + s1.expectComplete() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) - s2.request(1) - s2.expectNext(3) - s2.expectNoMsg(100.millis) + s2.request(1) + s2.expectNext(3) + s2.expectNoMsg(100.millis) - s2.request(1) - s2.expectNext(4) - s2.request(1) - s2.expectComplete() + s2.request(1) + s2.expectNext(4) + s2.request(1) + s2.expectComplete() - masterSubscriber.expectComplete() + masterSubscriber.expectComplete() + } } - "support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) - s1.cancel() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + "support cancelling substreams" in assertAllStagesStopped { + new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + s1.cancel() + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) - s2.request(4) - s2.expectNext(5) - s2.expectNext(6) - s2.expectNext(7) - s2.expectNext(8) - s2.request(1) - s2.expectComplete() + s2.request(4) + s2.expectNext(5) + s2.expectNext(6) + s2.expectNext(7) + s2.expectNext(8) + s2.request(1) + s2.expectComplete() - masterSubscription.request(1) - masterSubscriber.expectComplete() + masterSubscription.request(1) + masterSubscriber.expectComplete() + } } - "support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) - masterSubscription.cancel() - s1.request(4) - s1.expectNext(1) - s1.expectNext(2) - s1.expectNext(3) - s1.expectNext(4) - s1.request(1) - s1.expectComplete() + "support cancelling the master stream" in assertAllStagesStopped { + new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + masterSubscription.cancel() + s1.request(4) + s1.expectNext(1) + s1.expectNext(2) + s1.expectNext(3) + s1.expectNext(4) + s1.request(1) + s1.expectComplete() + } } - "fail stream when splitWhen function throws" in { + "fail stream when splitWhen function throws" in assertAllStagesStopped { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val exc = TE("test") val publisher = Source(publisherProbeProbe) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index 1dbed8e8fd..2981f5e578 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -9,6 +9,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.testkit.{ EventFilter, TestProbe } import com.typesafe.config.ConfigFactory import akka.stream.stage._ @@ -21,7 +22,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug implicit val materializer = ActorFlowMaterializer(settings) "A Flow with transform operations" must { - "produce one-to-one transformation as expected" in { + "produce one-to-one transformation as expected" in assertAllStagesStopped { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { @@ -44,7 +45,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug subscriber.expectComplete() } - "produce one-to-several transformation as expected" in { + "produce one-to-several transformation as expected" in assertAllStagesStopped { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { @@ -192,7 +193,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug c2.expectComplete() } - "support emit onUpstreamFinish" in { + "support emit onUpstreamFinish" in assertAllStagesStopped { val p = Source(List("a")).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[String, String] { @@ -215,7 +216,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug c.expectComplete() } - "allow early finish" in { + "allow early finish" in assertAllStagesStopped { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { @@ -241,7 +242,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug proc.expectCancellation() } - "report error when exception is thrown" in { + "report error when exception is thrown" in assertAllStagesStopped { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { @@ -268,7 +269,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } } - "support emit of final elements when onUpstreamFailure" in { + "support emit of final elements when onUpstreamFailure" in assertAllStagesStopped { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). map(elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem). @@ -295,7 +296,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } } - "support cancel as expected" in { + "support cancel as expected" in assertAllStagesStopped { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { @@ -317,7 +318,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug subscriber.expectNoMsg(200.millis) } - "support producing elements from empty inputs" in { + "support producing elements from empty inputs" in assertAllStagesStopped { val p = Source(List.empty[Int]).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala index 03825588b2..f304cd32d0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowTakeWithinSpec extends AkkaSpec { @@ -37,7 +38,7 @@ class FlowTakeWithinSpec extends AkkaSpec { c.expectNoMsg(200.millis) } - "deliver bufferd elements onComplete before the timeout" in { + "deliver bufferd elements onComplete before the timeout" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[Int]() Source(1 to 3).takeWithin(1.second).to(Sink(c)).run() val cSub = c.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala index e9bfdc65ac..2c0fcab3ea 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala @@ -10,13 +10,14 @@ import akka.stream.ActorFlowMaterializer import akka.stream.TimerTransformer import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class FlowTimerTransformerSpec extends AkkaSpec { implicit val materializer = ActorFlowMaterializer() "A Flow with TimerTransformer operations" must { - "produce scheduled ticks as expected" in { + "produce scheduled ticks as expected" in assertAllStagesStopped { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). timerTransform(() ⇒ new TimerTransformer[Int, Int] { @@ -64,7 +65,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { pSub.sendComplete() } - "propagate error if onTimer throws an exception" in { + "propagate error if onTimer throws an exception" in assertAllStagesStopped { val exception = new Exception("Expected exception to the rule") with NoStackTrace val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index a91e6beab1..0a3528a509 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -7,6 +7,7 @@ import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class GraphBalanceSpec extends AkkaSpec { @@ -18,7 +19,7 @@ class GraphBalanceSpec extends AkkaSpec { "A balance" must { import FlowGraph.Implicits._ - "balance between subscribers which signal demand" in { + "balance between subscribers which signal demand" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -72,7 +73,7 @@ class GraphBalanceSpec extends AkkaSpec { s2.expectComplete() } - "support waiting for demand from all non-cancelled downstream subscriptions" in { + "support waiting for demand from all non-cancelled downstream subscriptions" in assertAllStagesStopped { val s1 = StreamTestKit.SubscriberProbe[Int]() val (p2, p3) = FlowGraph.closed(Sink.publisher[Int], Sink.publisher[Int])(Keep.both) { implicit b ⇒ @@ -141,7 +142,7 @@ class GraphBalanceSpec extends AkkaSpec { Await.result(r3, 3.seconds) should be(numElementsForSink +- 2000) } - "produce to second even though first cancels" in { + "produce to second even though first cancels" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -162,7 +163,7 @@ class GraphBalanceSpec extends AkkaSpec { c2.expectComplete() } - "produce to first even though second cancels" in { + "produce to first even though second cancels" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -183,7 +184,7 @@ class GraphBalanceSpec extends AkkaSpec { c1.expectComplete() } - "cancel upstream when downstreams cancel" in { + "cancel upstream when downstreams cancel" in assertAllStagesStopped { val p1 = StreamTestKit.PublisherProbe[Int]() val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index 92dcc829f7..fa6769a129 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -6,6 +6,7 @@ import scala.concurrent.duration._ import akka.stream.{ OverflowStrategy, ActorFlowMaterializerSettings } import akka.stream.ActorFlowMaterializer import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class GraphBroadcastSpec extends AkkaSpec { @@ -17,7 +18,7 @@ class GraphBroadcastSpec extends AkkaSpec { "A broadcast" must { import FlowGraph.Implicits._ - "broadcast to other subscriber" in { + "broadcast to other subscriber" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -117,7 +118,7 @@ class GraphBroadcastSpec extends AkkaSpec { Await.result(result, 3.seconds) should be(List.fill(22)(List(1, 2, 3))) } - "produce to other even though downstream cancels" in { + "produce to other even though downstream cancels" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -138,7 +139,7 @@ class GraphBroadcastSpec extends AkkaSpec { c2.expectComplete() } - "produce to downstream even though other cancels" in { + "produce to downstream even though other cancels" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -159,7 +160,7 @@ class GraphBroadcastSpec extends AkkaSpec { c1.expectComplete() } - "cancel upstream when downstreams cancel" in { + "cancel upstream when downstreams cancel" in assertAllStagesStopped { val p1 = StreamTestKit.PublisherProbe[Int]() val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index bdc57e9b0a..481b3f3f1c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -10,6 +10,7 @@ import akka.stream.scaladsl._ import akka.stream.testkit.StreamTestKit import akka.stream.testkit.TwoStreamsSetup import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class GraphConcatSpec extends TwoStreamsSetup { @@ -27,7 +28,7 @@ class GraphConcatSpec extends TwoStreamsSetup { "Concat" must { import FlowGraph.Implicits._ - "work in the happy case" in { + "work in the happy case" in assertAllStagesStopped { val probe = StreamTestKit.SubscriberProbe[Int]() FlowGraph.closed() { implicit b ⇒ @@ -56,7 +57,7 @@ class GraphConcatSpec extends TwoStreamsSetup { commonTests() - "work with one immediately completed and one nonempty publisher" in { + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(5) @@ -76,7 +77,7 @@ class GraphConcatSpec extends TwoStreamsSetup { subscriber2.expectComplete() } - "work with one delayed completed and one nonempty publisher" in { + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(5) @@ -96,7 +97,7 @@ class GraphConcatSpec extends TwoStreamsSetup { subscriber2.expectComplete() } - "work with one immediately failed and one nonempty publisher" in { + "work with one immediately failed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectSubscriptionAndError(TestException) @@ -104,7 +105,7 @@ class GraphConcatSpec extends TwoStreamsSetup { subscriber2.expectSubscriptionAndError(TestException) } - "work with one nonempty and one delayed failed publisher" in { + "work with one nonempty and one delayed failed publisher" in assertAllStagesStopped { // This test and the next one are materialization order dependent and rely on the fact // that there are only 3 submodules in the graph that gets created and that an immutable // set (what they are stored in internally) of size 4 or less is an optimized version that @@ -120,7 +121,7 @@ class GraphConcatSpec extends TwoStreamsSetup { if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) } - "work with one delayed failed and one nonempty publisher" in { + "work with one delayed failed and one nonempty publisher" in assertAllStagesStopped { // This test and the previous one are materialization order dependent and rely on the fact // that there are only 3 submodules in the graph that gets created and that an immutable // set (what they are stored in internally) of size 4 or less is an optimized version that @@ -136,7 +137,7 @@ class GraphConcatSpec extends TwoStreamsSetup { if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) } - "correctly handle async errors in secondary upstream" in { + "correctly handle async errors in secondary upstream" in assertAllStagesStopped { val promise = Promise[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index b0899713d4..badfb7bc97 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -7,6 +7,7 @@ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl.FlexiMerge._ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit.{ PublisherProbe, AutoPublisher, OnNext, SubscriberProbe } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import org.reactivestreams.Publisher import akka.stream._ import scala.util.control.NoStackTrace @@ -176,7 +177,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { "FlexiMerge" must { - "build simple fair merge" in { + "build simple fair merge" in assertAllStagesStopped { val p = FlowGraph.closed(out) { implicit b ⇒ o ⇒ val merge = b.add(fairString) @@ -195,7 +196,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "be able to have two fleximerges in a graph" in { + "be able to have two fleximerges in a graph" in assertAllStagesStopped { val p = FlowGraph.closed(in1, in2, out)((i1, i2, o) ⇒ o) { implicit b ⇒ (in1, in2, o) ⇒ val m1 = b.add(fairString) @@ -462,7 +463,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "support cancel of input" in { + "support cancel of input" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ @@ -501,7 +502,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "finish when all inputs cancelled" in { + "finish when all inputs cancelled" in assertAllStagesStopped { val publisher1 = PublisherProbe[String] val publisher2 = PublisherProbe[String] val publisher3 = PublisherProbe[String] @@ -538,7 +539,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "handle failure" in { + "handle failure" in assertAllStagesStopped { val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ o ⇒ @@ -568,7 +569,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "propagate failure" in { + "propagate failure" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ @@ -585,7 +586,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectSubscriptionAndError().getMessage should be("ERROR") } - "emit failure" in { + "emit failure" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ @@ -605,7 +606,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("err") } - "emit failure for user thrown exception" in { + "emit failure for user thrown exception" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ @@ -624,7 +625,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("exc") } - "emit failure for user thrown exception in onComplete" in { + "emit failure for user thrown exception in onComplete" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ @@ -643,7 +644,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("onUpstreamFinish-exc") } - "emit failure for user thrown exception in onUpstreamFinish 2" in { + "emit failure for user thrown exception in onUpstreamFinish 2" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ @@ -669,7 +670,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("onUpstreamFinish-exc") } - "support finish from onInput" in { + "support finish from onInput" in assertAllStagesStopped { val publisher = PublisherProbe[String] val completionProbe = TestProbe() val p = FlowGraph.closed(out) { implicit b ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala index 1e9c361c88..f05049bbd3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala @@ -9,6 +9,7 @@ import akka.stream.testkit.StreamTestKit.AutoPublisher import akka.stream.testkit.StreamTestKit.OnNext import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.actor.ActorSystem import akka.stream._ import akka.actor.ActorRef @@ -160,7 +161,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { "FlexiRoute" must { - "build simple fair route" in { + "build simple fair route" in assertAllStagesStopped { // we can't know exactly which elements that go to each output, because if subscription/request // from one of the downstream is delayed the elements will be pushed to the other output val s = SubscriberProbe[String] @@ -244,7 +245,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "support finish of downstreams and cancel of upstream" in { + "support finish of downstreams and cancel of upstream" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -259,7 +260,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "support error of outputs" in { + "support error of outputs" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -275,7 +276,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { autoPublisher.subscription.expectCancellation() } - "support error of a specific output" in { + "support error of a specific output" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -297,7 +298,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "emit error for user thrown exception" in { + "emit error for user thrown exception" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -316,7 +317,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { autoPublisher.subscription.expectCancellation() } - "emit error for user thrown exception in onUpstreamFinish" in { + "emit error for user thrown exception in onUpstreamFinish" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -334,7 +335,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectError().getMessage should be("onUpstreamFinish-exc") } - "handle cancel from output" in { + "handle cancel from output" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -357,7 +358,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "handle finish from upstream input" in { + "handle finish from upstream input" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -376,7 +377,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "handle error from upstream input" in { + "handle error from upstream input" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -395,7 +396,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectError().getMessage should be("test err") } - "cancel upstream input when all outputs cancelled" in { + "cancel upstream input when all outputs cancelled" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ @@ -414,7 +415,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { autoPublisher.subscription.expectCancellation() } - "cancel upstream input when all outputs completed" in { + "cancel upstream input when all outputs completed" in assertAllStagesStopped { val fixture = new TestFixture import fixture._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index 2c26920857..d3329f9358 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -8,6 +8,7 @@ import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings, Inlet import scala.concurrent.duration._ import akka.stream.testkit.{ TwoStreamsSetup, AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class GraphMergeSpec extends TwoStreamsSetup { import FlowGraph.Implicits._ @@ -25,7 +26,7 @@ class GraphMergeSpec extends TwoStreamsSetup { "merge" must { - "work in the happy case" in { + "work in the happy case" in assertAllStagesStopped { // Different input sizes (4 and 6) val source1 = Source(0 to 3) val source2 = Source(4 to 9) @@ -93,7 +94,7 @@ class GraphMergeSpec extends TwoStreamsSetup { commonTests() - "work with one immediately completed and one nonempty publisher" in { + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(4) @@ -113,7 +114,7 @@ class GraphMergeSpec extends TwoStreamsSetup { subscriber2.expectComplete() } - "work with one delayed completed and one nonempty publisher" in { + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(4) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala index de04f968eb..52aeb524d9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala @@ -5,6 +5,7 @@ import scala.concurrent.duration._ import akka.stream.{ OverflowStrategy, ActorFlowMaterializerSettings } import akka.stream.ActorFlowMaterializer import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class GraphUnzipSpec extends AkkaSpec { @@ -16,7 +17,7 @@ class GraphUnzipSpec extends AkkaSpec { "A unzip" must { import FlowGraph.Implicits._ - "unzip to two subscribers" in { + "unzip to two subscribers" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[String]() @@ -116,7 +117,7 @@ class GraphUnzipSpec extends AkkaSpec { p1Sub.expectCancellation() } - "work with zip" in { + "work with zip" in assertAllStagesStopped { val c1 = StreamTestKit.SubscriberProbe[(Int, String)]() FlowGraph.closed() { implicit b ⇒ val zip = b.add(Zip[Int, String]()) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala index 8c997fadda..fb619babfa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.StreamTestKit import akka.stream.testkit.TwoStreamsSetup +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream._ class GraphZipSpec extends TwoStreamsSetup { @@ -22,7 +23,7 @@ class GraphZipSpec extends TwoStreamsSetup { "Zip" must { - "work in the happy case" in { + "work in the happy case" in assertAllStagesStopped { val probe = StreamTestKit.SubscriberProbe[(Int, String)]() FlowGraph.closed() { implicit b ⇒ @@ -50,7 +51,7 @@ class GraphZipSpec extends TwoStreamsSetup { commonTests() - "work with one immediately completed and one nonempty publisher" in { + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectSubscriptionAndComplete() @@ -58,7 +59,7 @@ class GraphZipSpec extends TwoStreamsSetup { subscriber2.expectSubscriptionAndComplete() } - "work with one delayed completed and one nonempty publisher" in { + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) subscriber1.expectSubscriptionAndComplete() @@ -66,7 +67,7 @@ class GraphZipSpec extends TwoStreamsSetup { subscriber2.expectSubscriptionAndComplete() } - "work with one immediately failed and one nonempty publisher" in { + "work with one immediately failed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectSubscriptionAndError(TestException) @@ -74,7 +75,7 @@ class GraphZipSpec extends TwoStreamsSetup { subscriber2.expectSubscriptionAndError(TestException) } - "work with one delayed failed and one nonempty publisher" in { + "work with one delayed failed and one nonempty publisher" in assertAllStagesStopped { val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectSubscriptionAndError(TestException) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 69fb778008..efa869f7c8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -13,6 +13,7 @@ import scala.util.Failure import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.ScriptedTest class HeadSinkSpec extends AkkaSpec with ScriptedTest { @@ -24,7 +25,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "A Flow with Sink.head" must { - "yield the first value" in { + "yield the first value" in assertAllStagesStopped { val p = StreamTestKit.PublisherProbe[Int]() val f: Future[Int] = Source(p).map(identity).runWith(Sink.head) val proc = p.expectSubscription @@ -48,7 +49,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { proc.expectCancellation() } - "yield the first error" in { + "yield the first error" in assertAllStagesStopped { val p = StreamTestKit.PublisherProbe[Int]() val f = Source(p).runWith(Sink.head) val proc = p.expectSubscription @@ -59,7 +60,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { f.value.get should be(Failure(ex)) } - "yield NoSuchElementExcption for empty stream" in { + "yield NoSuchElementExcption for empty stream" in assertAllStagesStopped { val p = StreamTestKit.PublisherProbe[Int]() val f = Source(p).runWith(Sink.head) val proc = p.expectSubscription diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index d968b4e48c..eaf3199c3e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import scala.concurrent.duration._ import scala.concurrent.Await @@ -16,7 +17,7 @@ class PublisherSinkSpec extends AkkaSpec { "A PublisherSink" must { - "be unique when created twice" in { + "be unique when created twice" in assertAllStagesStopped { val (pub1, pub2) = FlowGraph.closed(Sink.publisher[Int], Sink.publisher[Int])(Keep.both) { implicit b ⇒ (p1, p2) ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala index ad6ce26c49..bd48ca5875 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala @@ -7,6 +7,7 @@ import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped class SubscriberSinkSpec extends AkkaSpec { @@ -17,7 +18,7 @@ class SubscriberSinkSpec extends AkkaSpec { "A Flow with SubscriberSink" must { - "publish elements to the subscriber" in { + "publish elements to the subscriber" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[Int]() Source(List(1, 2, 3)).to(Sink(c)).run() val s = c.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index dc2984f86e..fab7cd04e3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -7,6 +7,7 @@ import akka.actor.{ ExtendedActorSystem, ActorIdentity, ActorRef, Identify } import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import akka.stream.impl.SubscriptionTimeoutException import akka.stream.testkit._ +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.util.Timeout import scala.concurrent.Await @@ -38,7 +39,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "groupBy" must { - "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in { + "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in assertAllStagesStopped { val publisherProbe = StreamTestKit.PublisherProbe[Int]() val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() @@ -57,14 +58,16 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { // should not break normal usage val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) - s1SubscriberProbe.expectSubscription().request(100) + val s1Subscription = s1SubscriberProbe.expectSubscription() + s1Subscription.request(100) s1SubscriberProbe.expectNext(1) val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) - s2SubscriberProbe.expectSubscription().request(100) + val s2Subscription = s2SubscriberProbe.expectSubscription() + s2Subscription.request(100) s2SubscriberProbe.expectNext(2) val (_, s3) = subscriber.expectNext() @@ -74,9 +77,11 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException ⇒ "expected" } Await.result(f, 300.millis) should equal("expected") + + upstreamSubscription.sendComplete() } - "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in { + "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in assertAllStagesStopped { val publisherProbe = StreamTestKit.PublisherProbe[Int]() val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 1d6a719a48..0eca894e9f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -10,6 +10,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.assertAllStagesStopped import akka.stream.ActorFlowMaterializerSettings class TickSourceSpec extends AkkaSpec { @@ -17,7 +18,7 @@ class TickSourceSpec extends AkkaSpec { implicit val materializer = ActorFlowMaterializer() "A Flow based on tick publisher" must { - "produce ticks" in { + "produce ticks" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[String]() Source(1.second, 500.millis, "tick").to(Sink(c)).run() val sub = c.expectSubscription() @@ -85,7 +86,7 @@ class TickSourceSpec extends AkkaSpec { sub.cancel() } - "be possible to cancel" in { + "be possible to cancel" in assertAllStagesStopped { val c = StreamTestKit.SubscriberProbe[String]() val tickSource = Source(1.second, 500.millis, "tick") val cancellable = tickSource.to(Sink(c)).run() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index d43545c925..87f5051ea9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -27,13 +27,14 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } /** * INTERNAL API */ -private[akka] case class ActorFlowMaterializerImpl(override val system: ActorSystem, - override val settings: ActorFlowMaterializerSettings, - dispatchers: Dispatchers, - supervisor: ActorRef, - flowNameCounter: AtomicLong, - namePrefix: String, - optimizations: Optimizations) +private[akka] case class ActorFlowMaterializerImpl( + val system: ActorSystem, + override val settings: ActorFlowMaterializerSettings, + dispatchers: Dispatchers, + val supervisor: ActorRef, + flowNameCounter: AtomicLong, + namePrefix: String, + optimizations: Optimizations) extends ActorFlowMaterializer { import ActorFlowMaterializerImpl._ import akka.stream.impl.Stages._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 0ae7090d3b..2b1bbe348c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -243,7 +243,7 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings, log.debug("fail due to: {}", e.getMessage) inputBunch.cancel() primaryOutputs.error(e) - context.stop(self) + pump() } override def postStop(): Unit = {