diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/StreamTestKit.scala new file mode 100644 index 0000000000..4467bcc831 --- /dev/null +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/StreamTestKit.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.testkit.javadsl + +import akka.stream.Materializer +import akka.stream.impl.PhasedFusingActorMaterializer +import akka.stream.testkit.scaladsl + +object StreamTestKit { + + /** + * Assert that there are no stages running under a given materializer. + * Usually this assertion is run after a test-case to check that all of the + * stages have terminated successfully. + */ + def assertAllStagesStopped(mat: Materializer): Unit = + mat match { + case impl: PhasedFusingActorMaterializer ⇒ + scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor) + case _ ⇒ + } +} diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala new file mode 100644 index 0000000000..5dd46dc394 --- /dev/null +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.testkit.scaladsl + +import akka.actor.{ ActorRef, ActorSystem } +import akka.annotation.InternalApi +import akka.stream.Materializer +import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } +import akka.testkit.TestProbe + +import scala.concurrent.duration._ + +object StreamTestKit { + + /** + * Asserts that after the given code block is ran, no stages are left over + * that were created by the given materializer. + * + * This assertion is useful to check that all of the stages have + * terminated successfully. + */ + def assertAllStagesStopped[T](block: ⇒ T)(implicit materializer: Materializer): T = + materializer match { + case impl: PhasedFusingActorMaterializer ⇒ + stopAllChildren(impl.system, impl.supervisor) + val result = block + assertNoChildren(impl.system, impl.supervisor) + result + case _ ⇒ block + } + + /** INTERNAL API */ + @InternalApi private[testkit] def stopAllChildren(sys: ActorSystem, supervisor: ActorRef): Unit = { + val probe = TestProbe()(sys) + probe.send(supervisor, StreamSupervisor.StopChildren) + probe.expectMsg(StreamSupervisor.StoppedChildren) + } + + /** INTERNAL API */ + @InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = { + val probe = TestProbe()(sys) + probe.within(5.seconds) { + var children = Set.empty[ActorRef] + try probe.awaitAssert { + supervisor.tell(StreamSupervisor.GetChildren, probe.ref) + children = probe.expectMsgType[StreamSupervisor.Children].children + assert( + children.isEmpty, + s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") + } + catch { + case ex: Throwable ⇒ + children.foreach(_ ! StreamSupervisor.PrintDebugDump) + throw ex + } + } + } + +} + diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala index c1af4c2ce3..67ed4d5417 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala @@ -10,6 +10,7 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit._ +import akka.stream.testkit.StreamTestKit.ProbeSink /** * Factory methods for test sinks. @@ -20,6 +21,6 @@ object TestSink { * A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]]. */ def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] = - Sink.fromGraph[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(Inlet("ProbeSink.in")))) + Sink.fromGraph[T, TestSubscriber.Probe[T]](new ProbeSink(none, SinkShape(Inlet("ProbeSink.in")))) } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala index 58cf2816f3..6f76bf0c09 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala @@ -8,6 +8,7 @@ import akka.stream._ import akka.stream.Attributes.none import akka.stream.scaladsl._ import akka.stream.testkit._ +import akka.stream.testkit.StreamTestKit.ProbeSource import akka.actor.ActorSystem @@ -19,6 +20,6 @@ object TestSource { /** * A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]]. */ - def probe[T](implicit system: ActorSystem) = Source.fromGraph[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) + def probe[T](implicit system: ActorSystem) = Source.fromGraph[T, TestPublisher.Probe[T]](new ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala index fcd86cafcb..c945cc188f 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala @@ -9,7 +9,7 @@ import akka.stream.scaladsl._ import org.reactivestreams.Publisher import scala.collection.immutable import scala.util.control.NoStackTrace -import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.AkkaSpec abstract class BaseTwoStreamsSetup extends AkkaSpec { diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala index 1bdc3e3fe8..f81acae593 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala @@ -7,7 +7,7 @@ package akka.stream.testkit import akka.stream.scaladsl.{ Sink, Source } import akka.stream.testkit.TestPublisher._ import akka.stream.testkit.TestSubscriber._ -import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import org.reactivestreams.Subscription import akka.testkit.AkkaSpec diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala index d4a681b4a4..3ec3039a33 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala @@ -20,32 +20,6 @@ object Utils { case class TE(message: String) extends RuntimeException(message) with NoStackTrace - def assertAllStagesStopped[T](block: ⇒ T)(implicit materializer: Materializer): T = - materializer match { - case impl: PhasedFusingActorMaterializer ⇒ - val probe = TestProbe()(impl.system) - probe.send(impl.supervisor, StreamSupervisor.StopChildren) - probe.expectMsg(StreamSupervisor.StoppedChildren) - val result = block - probe.within(5.seconds) { - var children = Set.empty[ActorRef] - try probe.awaitAssert { - impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - children = probe.expectMsgType[StreamSupervisor.Children].children - assert( - children.isEmpty, - s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") - } - catch { - case ex: Throwable ⇒ - children.foreach(_ ! StreamSupervisor.PrintDebugDump) - throw ex - } - } - result - case _ ⇒ block - } - def assertDispatcher(ref: ActorRef, dispatcher: String): Unit = ref match { case r: ActorRefWithCell ⇒ if (r.underlying.props.dispatcher != dispatcher) diff --git a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java index 206a9a8d67..11095e799b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java @@ -4,6 +4,9 @@ package akka.stream; +import akka.stream.testkit.javadsl.StreamTestKit; +import org.junit.After; +import org.junit.Before; import org.scalatest.junit.JUnitSuite; import akka.actor.ActorSystem; @@ -11,11 +14,23 @@ import akka.testkit.AkkaJUnitActorSystemResource; public abstract class StreamTest extends JUnitSuite { final protected ActorSystem system; - final protected ActorMaterializer materializer; + final private ActorMaterializerSettings settings; + + protected ActorMaterializer materializer; protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) { system = actorSystemResource.getSystem(); - ActorMaterializerSettings settings = ActorMaterializerSettings.create(system); + settings = ActorMaterializerSettings.create(system); + } + + @Before + public void setUp() { materializer = ActorMaterializer.create(settings, system); } + + @After + public void tearDown() { + StreamTestKit.assertAllStagesStopped(materializer); + materializer.shutdown(); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 518e12110d..6a33288fb4 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -8,6 +8,7 @@ import akka.Done; import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.Cancellable; +import akka.actor.Status; import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; @@ -446,6 +447,7 @@ public class SourceTest extends StreamTest { probe.expectNoMessage(Duration.ofMillis(200)); probe.expectMsgEquals("tick"); probe.expectNoMessage(Duration.ofMillis(200)); + cancellable.cancel(); } @Test @@ -547,6 +549,7 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals(1); ref.tell(2, ActorRef.noSender()); probe.expectMsgEquals(2); + ref.tell(new Status.Success("ok"), ActorRef.noSender()); } @Test diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java index cd849d42d0..54f67499a9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java @@ -90,6 +90,8 @@ public class TcpTest extends StreamTest { for (int i = 0; i < testInput.size(); i ++) { assertEquals(testInput.get(i).head(), result[i]); } + + b.unbind(); } @Test @@ -111,6 +113,7 @@ public class TcpTest extends StreamTest { if (e.getCause() instanceof BindFailedException) {} // all good else throw new AssertionError("failed", e); // expected + b.unbind(); } catch (Exception e) { throw new AssertionError("failed", e); } diff --git a/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala b/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala index ec34d89555..aed92b472f 100644 --- a/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.duration._ import scala.concurrent.Await diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 199e2d7d5c..add121485b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -9,6 +9,7 @@ import akka.stream.{ ClosedShape, ActorMaterializer, ActorMaterializerSettings, import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.impl.ReactiveStreamsCompliance import akka.testkit.TestEvent.Mute import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } @@ -410,7 +411,7 @@ class ActorPublisherSpec extends StreamSpec(ActorPublisherSpec.config) with Impl "be able to define a subscription-timeout, after which it should shut down" in { implicit val materializer = ActorMaterializer() - Utils.assertAllStagesStopped { + assertAllStagesStopped { val timeout = 150.millis val a = system.actorOf(timeoutingProps(testActor, timeout)) val pub = ActorPublisher(a) diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index 2a054864fa..24ddb80585 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -9,6 +9,7 @@ import akka.stream.scaladsl.{ Source, Flow } import akka.stream.scaladsl.Sink import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.TestProbe import org.reactivestreams.{ Publisher, Subscriber } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index 47cbdbae0e..b40461a075 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -10,7 +10,7 @@ import akka.stream.testkit.StreamSpec import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ -import akka.stream.testkit.Utils.assertAllStagesStopped +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.impl.fusing._ import org.scalatest.concurrent.ScalaFutures diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala index db18b361a4..49908ebf25 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala @@ -9,6 +9,7 @@ import java.util.concurrent.TimeoutException import akka.Done import akka.stream.scaladsl._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream._ import org.scalatest.{ Matchers, WordSpecLike } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index a7ca047cb7..7c4dcc5a2f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -13,6 +13,7 @@ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.scaladsl._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.testkit.{ EventFilter, TestLatch } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala index 06078b6425..47e764c03a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala @@ -6,6 +6,7 @@ package akka.stream.impl.fusing import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala index d4c1f3f086..1a7a213c8b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala @@ -10,6 +10,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.{ Attributes, Inlet, SinkShape, ActorMaterializer } import akka.stream.stage.{ InHandler, AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.{ Await, Promise, Future } import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 8e82dc5dff..60d639295d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -16,6 +16,7 @@ import akka.stream.impl.StreamSupervisor.Children import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream._ import akka.util.ByteString import com.google.common.jimfs.{ Configuration, Jimfs } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index f2b9577f19..10f9978e7f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -16,6 +16,7 @@ import akka.stream.impl.StreamSupervisor.Children import akka.stream.io.FileSourceSpec.Settings import akka.stream.scaladsl.{ FileIO, Keep, Sink } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestDuration diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index b4d4c99d61..67afc17326 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -15,6 +15,7 @@ import akka.stream.impl.io.InputStreamSinkStage import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.scaladsl.{ Keep, Source, StreamConverters } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit._ import akka.testkit.TestProbe diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index 6496509aed..f909f7147e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -10,6 +10,7 @@ import java.util.concurrent.CountDownLatch import akka.stream.scaladsl.{ Sink, StreamConverters } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.util.ByteString diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala index 67f2e71be9..3f1881f5f6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -9,6 +9,7 @@ import java.io.OutputStream import akka.stream.scaladsl.{ Source, StreamConverters } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.{ AbruptIOTerminationException, ActorMaterializer, ActorMaterializerSettings } import akka.testkit.TestProbe import akka.util.ByteString diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala index 845167d9ba..69397fe041 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -16,6 +16,7 @@ import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.OutputStreamSourceStage import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestProbe diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index eaca8cbf07..a9957fc5be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -15,6 +15,7 @@ import akka.stream._ import akka.stream.scaladsl.Tcp.{ IncomingConnection, ServerBinding } import akka.stream.scaladsl.{ Flow, _ } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe } import akka.testkit.SocketUtil.temporaryServerAddress diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 873ebcff85..6def5591e7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -24,6 +24,7 @@ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.util.ByteString import javax.net.ssl._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 6591b55125..6e62ba698f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -9,6 +9,7 @@ import akka.actor.{ Actor, ActorRef, Props, Status } import akka.stream.ActorMaterializer import akka.stream.Attributes.inputBuffer import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.testkit.TestProbe diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index 207760364f..575fdc09f0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream.ActorMaterializer import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl._ import akka.actor.Actor import akka.actor.ActorRef diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 9f9fee4710..a2a6b356c1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -9,6 +9,7 @@ import akka.stream.{ Attributes, ActorMaterializer, OverflowStrategy } import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.actor.PoisonPill import akka.actor.Status diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index 9da281849c..3bea5a4bfb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.util.ByteString import akka.stream._ import scala.concurrent.Await diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala index 66af06ae5d..1437ae724b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ActorAttributes.supervisionStrategy import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.Supervision.resumingDecider import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.testkit.{ TestActors, TestProbe } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index cf676d870c..ae46731751 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -11,6 +11,7 @@ import akka.stream.{ BufferOverflowException, ActorMaterializer, ActorMaterializ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class FlowBufferSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index dca9423dfb..ed60ed55da 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class FlowConcatAllSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index 40905ad769..6b7d5274fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber } import org.reactivestreams.Publisher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 0a02ee129c..c40c9cbd10 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -8,6 +8,7 @@ import akka.Done import akka.stream.Attributes._ import akka.stream.OverflowStrategies.EmitEarly import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala index fd3af25616..5c7eae7718 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala @@ -8,7 +8,8 @@ import akka.stream._ import scala.concurrent.Await import scala.concurrent.duration._ import akka.stream.testkit.scaladsl.TestSink -import akka.stream.testkit.{ StreamSpec, Utils } +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.scaladsl.StreamTestKit._ class FlowDetacherSpec extends StreamSpec { @@ -16,14 +17,14 @@ class FlowDetacherSpec extends StreamSpec { "A Detacher" must { - "pass through all elements" in Utils.assertAllStagesStopped { + "pass through all elements" in assertAllStagesStopped { Source(1 to 100) .detach .runWith(Sink.seq) .futureValue should ===(1 to 100) } - "pass through failure" in Utils.assertAllStagesStopped { + "pass through failure" in assertAllStagesStopped { val ex = new Exception("buh") val result = Source(1 to 100) .map(x ⇒ if (x == 50) throw ex else x) @@ -35,7 +36,7 @@ class FlowDetacherSpec extends StreamSpec { } - "emit the last element when completed without demand" in Utils.assertAllStagesStopped { + "emit the last element when completed without demand" in assertAllStagesStopped { Source.single(42) .detach .runWith(TestSink.probe) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala index b41b7059b9..9ec3aa626f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream.ActorAttributes._ import akka.stream.Supervision._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala index 35d7a18aea..e0df7e7c4a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala @@ -10,6 +10,7 @@ import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random } import akka.stream.ActorAttributes._ import akka.stream.Supervision._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala index c621ab489b..4d71b84851 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -7,7 +7,8 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } import akka.stream._ -import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent._ import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala index 810791f9ab..16b0accebb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala @@ -10,6 +10,7 @@ import akka.stream.ActorMaterializer import akka.stream.Supervision.{ restartingDecider, resumingDecider } import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import org.scalatest.concurrent.PatienceConfiguration.Timeout diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index 32a69e961e..db0cd6ff3c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.Await import scala.util.control.NoStackTrace import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index fbfbdc3b74..a5245ec2c1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -8,6 +8,7 @@ import scala.util.control.NoStackTrace import akka.stream.ActorMaterializer import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 072ebb67d5..34d816fdcd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class FlowFromFutureSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 20b2ca5620..41975bfd28 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -20,6 +20,7 @@ import akka.stream.Supervision.resumingDecider import akka.stream.impl.fusing.GroupBy import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import org.reactivestreams.Publisher import org.scalatest.concurrent.PatienceConfiguration.Timeout import akka.stream.testkit.scaladsl.TestSource diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index b35f4bd5fe..f6e13235b7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -11,6 +11,7 @@ import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, ThrottleMode } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.TimingTest import akka.util.ConstantFun diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala index 2c8cfdf14b..32197693c7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala @@ -5,7 +5,8 @@ package akka.stream.scaladsl import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestPublisher, Utils } +import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestPublisher } +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.Await import scala.concurrent.duration._ @@ -18,13 +19,13 @@ class FlowIdleInjectSpec extends StreamSpec { "keepAlive" must { - "not emit additional elements if upstream is fast enough" in Utils.assertAllStagesStopped { + "not emit additional elements if upstream is fast enough" in assertAllStagesStopped { Await.result( Source(1 to 10).keepAlive(1.second, () ⇒ 0).grouped(1000).runWith(Sink.head), 3.seconds) should ===(1 to 10) } - "emit elements periodically after silent periods" in Utils.assertAllStagesStopped { + "emit elements periodically after silent periods" in assertAllStagesStopped { val sourceWithIdleGap = Source(1 to 5) ++ Source(6 to 10).initialDelay(2.second) val result = Await.result( diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala index a25a7e4a80..e3c0cbb706 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala @@ -6,7 +6,8 @@ package akka.stream.scaladsl import java.util.concurrent.TimeoutException import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.testkit.{ StreamSpec, Utils, TestSubscriber } +import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.Await import scala.concurrent.duration._ @@ -19,13 +20,13 @@ class FlowInitialDelaySpec extends StreamSpec { "Flow initialDelay" must { - "work with zero delay" in Utils.assertAllStagesStopped { + "work with zero delay" in assertAllStagesStopped { Await.result( Source(1 to 10).initialDelay(Duration.Zero).grouped(100).runWith(Sink.head), 1.second) should ===(1 to 10) } - "delay elements by the specified time but not more" in Utils.assertAllStagesStopped { + "delay elements by the specified time but not more" in assertAllStagesStopped { a[TimeoutException] shouldBe thrownBy { Await.result( Source(1 to 10).initialDelay(2.seconds).initialTimeout(1.second).runWith(Sink.ignore), @@ -37,7 +38,7 @@ class FlowInitialDelaySpec extends StreamSpec { 2.seconds) } - "properly ignore timer while backpressured" in Utils.assertAllStagesStopped { + "properly ignore timer while backpressured" in assertAllStagesStopped { val probe = TestSubscriber.probe[Int]() Source(1 to 10).initialDelay(0.5.second).runWith(Sink.fromSubscriber(probe)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala index 73024ebfaf..00a895e1db 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import org.reactivestreams.Publisher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index bb4aaa4ed9..744a022e38 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.EventFilter class FlowIteratorSpec extends AbstractFlowIteratorSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala index 3d6d64a786..70ae6a9543 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowShape, OverflowStrategy } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl._ import com.typesafe.config.ConfigFactory import org.scalatest.time._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala index 5f52b16081..e4f522c717 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala @@ -8,7 +8,8 @@ import akka.Done import akka.stream.testkit.StreamSpec import akka.stream.{ ActorMaterializer, ClosedShape, KillSwitches } import akka.stream.testkit.scaladsl.{ TestSink, TestSource } -import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.stream.testkit.Utils.TE import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index f1ad878f97..b4f63ccc5f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -12,6 +12,7 @@ import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.Supervision.resumingDecider import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.{ TestLatch, TestProbe } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index 7d87759d27..05a3fa1022 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.TestLatch import akka.testkit.TestProbe import akka.stream.ActorAttributes.supervisionStrategy diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala index c489d69005..376b08e861 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import scala.util.control.NoStackTrace diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala index 07edcc68f0..348444557d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index deb8b354a3..a6759055b1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import org.reactivestreams.Publisher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala index b0ddf0f87b..ee461f91a3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.TestProbe class FlowOnCompleteSpec extends StreamSpec with ScriptedTest { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 0a76e2206f..3cbe7e8947 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -11,6 +11,7 @@ import scala.util.control.NoStackTrace import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class FlowPrefixAndTailSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala index dcbbd9d4be..fe16c3a6b2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.util.control.NoStackTrace diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala index 5b134a3a76..a763c427f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -9,6 +9,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink import akka.stream._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.util.control.NoStackTrace diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala index 262a708c02..9566961702 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.Await import scala.util.control.NoStackTrace import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 1e870658c7..21353d910c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -9,6 +9,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.collection.immutable import scala.concurrent.Await diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala index 57660b9043..eee143db8d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ import org.scalacheck.Gen diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index eed48ff9a4..bc79f01093 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -10,6 +10,7 @@ import akka.NotUsed import akka.actor._ import akka.stream.impl._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream._ import akka.testkit.TestDuration diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala index 87e42ff07e..29e8e6acf4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala @@ -10,6 +10,7 @@ import akka.stream.Supervision.resumingDecider import akka.stream.impl.SubscriptionTimeoutException import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import org.reactivestreams.Publisher import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 3a415dcfbb..91df8a676a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -9,6 +9,7 @@ import akka.stream._ import akka.stream.Supervision.resumingDecider import akka.stream.impl.SubscriptionTimeoutException import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink import org.reactivestreams.Publisher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala index d5b8f976df..5d6d4bef75 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream.ActorAttributes._ import akka.stream.Supervision._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala index 95a8743f67..b850282704 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import akka.stream.ActorMaterializer import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class FlowTakeWithinSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala index e2a0e3cf40..53d941fedd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ThrottleMode.{ Enforcing, Shaping } import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.util.ByteString import scala.concurrent.Await import scala.concurrent.duration._ @@ -27,7 +28,7 @@ class FlowThrottleSpec extends StreamSpec { ByteString(new Random().shuffle(0 to 255).take(length).map(_.toByte).toArray) "Throttle for single cost elements" must { - "work for the happy case" in Utils.assertAllStagesStopped { + "work for the happy case" in assertAllStagesStopped { //Source(1 to 5).throttle(1, 100.millis, 0, Shaping) Source(1 to 5).throttle(19, 1000.millis, -1, Shaping) .runWith(TestSink.probe[Int]) @@ -36,7 +37,7 @@ class FlowThrottleSpec extends StreamSpec { .expectComplete() } - "accept very high rates" in Utils.assertAllStagesStopped { + "accept very high rates" in assertAllStagesStopped { Source(1 to 5).throttle(1, 1.nanos, 0, Shaping) .runWith(TestSink.probe[Int]) .request(5) @@ -44,7 +45,7 @@ class FlowThrottleSpec extends StreamSpec { .expectComplete() } - "accept very low rates" in Utils.assertAllStagesStopped { + "accept very low rates" in assertAllStagesStopped { Source(1 to 5).throttle(1, 100.days, 1, Shaping) .runWith(TestSink.probe[Int]) .request(5) @@ -53,7 +54,7 @@ class FlowThrottleSpec extends StreamSpec { .cancel() // We won't wait 100 days, sorry } - "work if there are two throttles in different streams" in Utils.assertAllStagesStopped { + "work if there are two throttles in different streams" in assertAllStagesStopped { val sharedThrottle = Flow[Int].throttle(1, 1.day, 1, Enforcing) // If there is accidental shared state then we would not be able to pass through the single element @@ -70,7 +71,7 @@ class FlowThrottleSpec extends StreamSpec { } - "emit single element per tick" in Utils.assertAllStagesStopped { + "emit single element per tick" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() @@ -89,7 +90,7 @@ class FlowThrottleSpec extends StreamSpec { downstream.expectComplete() } - "not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped { + "not send downstream if upstream does not emit element" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() Source.fromPublisher(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink.fromSubscriber(downstream)) @@ -105,13 +106,13 @@ class FlowThrottleSpec extends StreamSpec { upstream.sendComplete() } - "cancel when downstream cancels" in Utils.assertAllStagesStopped { + "cancel when downstream cancels" in assertAllStagesStopped { val downstream = TestSubscriber.probe[Int]() Source(1 to 10).throttle(1, 300.millis, 0, Shaping).runWith(Sink.fromSubscriber(downstream)) downstream.cancel() } - "send elements downstream as soon as time comes" in Utils.assertAllStagesStopped { + "send elements downstream as soon as time comes" in assertAllStagesStopped { val probe = Source(1 to 10).throttle(2, 750.millis, 0, Shaping).runWith(TestSink.probe[Int]) .request(5) probe.receiveWithin(900.millis) should be(Seq(1, 2)) @@ -122,7 +123,7 @@ class FlowThrottleSpec extends StreamSpec { .cancel() } - "burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped { + "burst according to its maximum if enough time passed" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() Source.fromPublisher(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink.fromSubscriber(downstream)) @@ -139,7 +140,7 @@ class FlowThrottleSpec extends StreamSpec { downstream.cancel() } - "burst some elements if have enough time" in Utils.assertAllStagesStopped { + "burst some elements if have enough time" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() Source.fromPublisher(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink.fromSubscriber(downstream)) @@ -160,7 +161,7 @@ class FlowThrottleSpec extends StreamSpec { downstream.cancel() } - "throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped { + "throw exception when exceeding throughtput in enforced mode" in assertAllStagesStopped { Await.result( Source(1 to 5).throttle(1, 200.millis, 5, Enforcing).runWith(Sink.seq), 2.seconds) should ===(1 to 5) // Burst is 5 so this will not fail @@ -172,7 +173,7 @@ class FlowThrottleSpec extends StreamSpec { } } - "properly combine shape and throttle modes" in Utils.assertAllStagesStopped { + "properly combine shape and throttle modes" in assertAllStagesStopped { Source(1 to 5).throttle(1, 100.millis, 5, Shaping) .throttle(1, 100.millis, 5, Enforcing) .runWith(TestSink.probe[Int]) @@ -183,7 +184,7 @@ class FlowThrottleSpec extends StreamSpec { } "Throttle for various cost elements" must { - "work for happy case" in Utils.assertAllStagesStopped { + "work for happy case" in assertAllStagesStopped { Source(1 to 5).throttle(1, 100.millis, 0, (_) ⇒ 1, Shaping) .runWith(TestSink.probe[Int]) .request(5) @@ -191,7 +192,7 @@ class FlowThrottleSpec extends StreamSpec { .expectComplete() } - "emit elements according to cost" in Utils.assertAllStagesStopped { + "emit elements according to cost" in assertAllStagesStopped { val list = (1 to 4).map(_ * 2).map(genByteString) Source(list).throttle(2, 200.millis, 0, _.length, Shaping) .runWith(TestSink.probe[ByteString]) @@ -206,7 +207,7 @@ class FlowThrottleSpec extends StreamSpec { .expectComplete() } - "not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped { + "not send downstream if upstream does not emit element" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() Source.fromPublisher(upstream).throttle(2, 300.millis, 0, identity, Shaping).runWith(Sink.fromSubscriber(downstream)) @@ -222,13 +223,13 @@ class FlowThrottleSpec extends StreamSpec { upstream.sendComplete() } - "cancel when downstream cancels" in Utils.assertAllStagesStopped { + "cancel when downstream cancels" in assertAllStagesStopped { val downstream = TestSubscriber.probe[Int]() Source(1 to 10).throttle(2, 200.millis, 0, identity, Shaping).runWith(Sink.fromSubscriber(downstream)) downstream.cancel() } - "send elements downstream as soon as time comes" in Utils.assertAllStagesStopped { + "send elements downstream as soon as time comes" in assertAllStagesStopped { val probe = Source(1 to 10).throttle(4, 500.millis, 0, _ ⇒ 2, Shaping).runWith(TestSink.probe[Int]) .request(5) probe.receiveWithin(600.millis) should be(Seq(1, 2)) @@ -239,7 +240,7 @@ class FlowThrottleSpec extends StreamSpec { .cancel() } - "burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped { + "burst according to its maximum if enough time passed" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() Source.fromPublisher(upstream).throttle(2, 400.millis, 5, (_) ⇒ 1, Shaping).runWith(Sink.fromSubscriber(downstream)) @@ -260,7 +261,7 @@ class FlowThrottleSpec extends StreamSpec { downstream.cancel() } - "burst some elements if have enough time" in Utils.assertAllStagesStopped { + "burst some elements if have enough time" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() Source.fromPublisher(upstream).throttle(2, 400.millis, 5, (e) ⇒ if (e < 9) 1 else 20, Shaping).runWith(Sink.fromSubscriber(downstream)) @@ -281,7 +282,7 @@ class FlowThrottleSpec extends StreamSpec { downstream.cancel() } - "throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped { + "throw exception when exceeding throughtput in enforced mode" in assertAllStagesStopped { Await.result( Source(1 to 4).throttle(2, 200.millis, 10, identity, Enforcing).runWith(Sink.seq), 2.seconds) should ===(1 to 4) // Burst is 10 so this will not fail @@ -293,7 +294,7 @@ class FlowThrottleSpec extends StreamSpec { } } - "properly combine shape and enforce modes" in Utils.assertAllStagesStopped { + "properly combine shape and enforce modes" in assertAllStagesStopped { Source(1 to 5).throttle(2, 200.millis, 0, identity, Shaping) .throttle(1, 100.millis, 5, Enforcing) .runWith(TestSink.probe[Int]) @@ -302,7 +303,7 @@ class FlowThrottleSpec extends StreamSpec { .expectComplete() } - "handle rate calculation function exception" in Utils.assertAllStagesStopped { + "handle rate calculation function exception" in assertAllStagesStopped { val ex = new RuntimeException with NoStackTrace Source(1 to 5).throttle(2, 200.millis, 0, (_) ⇒ { throw ex }, Shaping) .throttle(1, 100.millis, 5, Enforcing) @@ -311,7 +312,7 @@ class FlowThrottleSpec extends StreamSpec { .expectError(ex) } - "work for real scenario with automatic burst size" taggedAs TimingTest in Utils.assertAllStagesStopped { + "work for real scenario with automatic burst size" taggedAs TimingTest in assertAllStagesStopped { val startTime = System.nanoTime() val counter1 = new AtomicInteger val timestamp1 = new AtomicLong(System.nanoTime()) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala index 6826ca3b81..840751ea8e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.actor.{ Actor, PoisonPill, Props } import akka.stream.ActorMaterializer import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.testkit.TestActors diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala index 17109664a7..dfc354d1a6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala @@ -9,6 +9,7 @@ import akka.pattern.pipe import akka.stream._ import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import scala.util.control.NoStackTrace diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala index 6769fffb1c..6457d81628 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.Done import akka.stream.ActorMaterializer import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala index dda5167560..86552ad9f6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import org.reactivestreams.Publisher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala index 45eb409404..08be7b5296 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.{ StreamSpec, TestSubscriber } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala index 37d383b70e..2dbdd9e275 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala @@ -8,8 +8,9 @@ import java.util.concurrent.{ CompletableFuture, TimeUnit } import akka.stream._ import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue } -import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.Utils.TE import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.TestLatch import scala.concurrent.{ Await, Future, Promise } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 6135487e09..1d1d9c9538 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -11,6 +11,7 @@ import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class GraphBalanceSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index 2488870454..78b47864cb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration._ import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class GraphBroadcastSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index 9086a9324c..58fd40d093 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -9,6 +9,7 @@ import scala.concurrent.{ Promise } import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class GraphConcatSpec extends TwoStreamsSetup { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala index 6dc9428d28..f90f48ab6e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSource diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index 9168ec1f4f..c1378714c7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class GraphMergeSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala index 4dad5994d6..25a5fd8339 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream.testkit._ import akka.stream.{ OverflowStrategy, ActorMaterializer, ActorMaterializerSettings, ClosedShape } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala index fc9f5a7837..e70b4b722e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala @@ -15,6 +15,7 @@ import scala.concurrent.duration._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ object GraphStageTimersSpec { case object TestSingleTimer diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala index 345fdf2f54..c8c84535d8 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import akka.stream.{ ClosedShape, OverflowStrategy, ActorMaterializerSettings, ActorMaterializer } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class GraphUnzipSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala index 552fe8e163..037df9d3c2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import org.reactivestreams.Publisher import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala index 86c0402e71..3a5cd8d505 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala index c07b93af0c..dc92c16f6d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream._ import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala index 2ac744db73..b6709aebe2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream._ import scala.concurrent.Await diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 6f49515706..8a22ea047b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class HeadSinkSpec extends StreamSpec with ScriptedTest { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala index e1bf7ee378..cb24b667f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala @@ -6,8 +6,9 @@ package akka.stream.scaladsl import akka.stream.{ ActorMaterializer, KillSwitches, ThrottleMode } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } -import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.Utils.TE import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.EventFilter import scala.collection.immutable diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala index 0027368749..fb85fd56d8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class LastSinkSpec extends StreamSpec with ScriptedTest { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala index 93f1fc375a..ba1579ed38 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.Done import akka.stream.ActorMaterializer import akka.stream.testkit.{ StreamSpec, TestSubscriber } -import akka.stream.testkit.Utils.assertAllStagesStopped +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.DefaultTimeout import org.scalatest.concurrent.ScalaFutures diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala index 0d4f4f7e5c..d365c83b23 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala @@ -9,6 +9,7 @@ import akka.stream._ import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue } import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.{ StreamSpec, TestPublisher } import org.scalatest.concurrent.ScalaFutures diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index 56dceafdab..0c9ca44d3e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -14,6 +14,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic } import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import scala.concurrent.{ Await, Future, Promise } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index c9d6affefb..a1b1f452fb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -9,7 +9,8 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.Done import akka.stream.impl.LazySource import akka.stream.stage.{ GraphStage, GraphStageLogic } -import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape } import akka.testkit.DefaultTimeout diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala index dd60b2a88b..2768d718f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala @@ -5,7 +5,8 @@ package akka.stream.scaladsl import akka.stream.{ AbruptStageTerminationException, ActorMaterializer } -import akka.stream.testkit.{ StreamSpec, TestSubscriber, Utils } +import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.DefaultTimeout import scala.concurrent.duration._ @@ -17,7 +18,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { "The Maybe Source" must { - "complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped { + "complete materialized future with None when stream cancels" in assertAllStagesStopped { val neverSource = Source.maybe[Int] val pubSink = Sink.asPublisher[Int](false) @@ -34,7 +35,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { f.future.futureValue shouldEqual None } - "allow external triggering of empty completion" in Utils.assertAllStagesStopped { + "allow external triggering of empty completion" in assertAllStagesStopped { val neverSource = Source.maybe[Int].filter(_ ⇒ false) val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } @@ -46,7 +47,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { counterFuture.futureValue shouldEqual 0 } - "allow external triggering of empty completion when there was no demand" in Utils.assertAllStagesStopped { + "allow external triggering of empty completion when there was no demand" in assertAllStagesStopped { val probe = TestSubscriber.probe[Int]() val promise = Source.maybe[Int].to(Sink.fromSubscriber(probe)).run() @@ -56,7 +57,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { probe.expectComplete() } - "allow external triggering of non-empty completion" in Utils.assertAllStagesStopped { + "allow external triggering of non-empty completion" in assertAllStagesStopped { val neverSource = Source.maybe[Int] val counterSink = Sink.head[Int] @@ -68,7 +69,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { counterFuture.futureValue shouldEqual 6 } - "allow external triggering of onError" in Utils.assertAllStagesStopped { + "allow external triggering of onError" in assertAllStagesStopped { val neverSource = Source.maybe[Int] val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } @@ -80,7 +81,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { counterFuture.failed.futureValue.getMessage should include("Boom") } - "complete materialized future when materializer is shutdown" in Utils.assertAllStagesStopped { + "complete materialized future when materializer is shutdown" in assertAllStagesStopped { val mat = ActorMaterializer() val neverSource = Source.maybe[Int] val pubSink = Sink.asPublisher[Int](false) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index 635e4fbf7d..b9db197685 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.duration._ import scala.concurrent.Await diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index da32e6b622..6f20a5234c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -9,6 +9,7 @@ import akka.pattern.pipe import akka.stream.Attributes.inputBuffer import akka.stream.{ ActorMaterializer, StreamDetachedException } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import scala.concurrent.{ Await, Promise } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index ddb8cba602..c2b60189da 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -10,6 +10,7 @@ import akka.pattern.pipe import akka.stream._ import akka.stream.impl.QueueSource import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber } import akka.testkit.TestProbe diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index e385724fc0..9ecf269b70 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -8,8 +8,9 @@ import java.util.concurrent.atomic.AtomicInteger import akka.stream.scaladsl.RestartWithBackoffFlow.Delay import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } -import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.Utils.TE import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy } import akka.testkit.{ DefaultTimeout, TestDuration } import akka.{ Done, NotUsed } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala index 5950a90d4d..8abb2500d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala @@ -11,6 +11,7 @@ import akka.stream._ import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.StreamSupervisor.Children import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSource import akka.util.ByteString @@ -43,13 +44,13 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) { javaSource.count() should ===(0) } - "work with endless stream" in Utils.assertAllStagesStopped { + "work with endless stream" in assertAllStagesStopped { val javaSource = Source.repeat(1).runWith(StreamConverters.asJavaStream()) javaSource.limit(10).count() should ===(10) javaSource.close() } - "allow overriding the dispatcher using Attributes" in Utils.assertAllStagesStopped { + "allow overriding the dispatcher using Attributes" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) @@ -62,7 +63,7 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) { } finally shutdown(sys) } - "work in separate IO dispatcher" in Utils.assertAllStagesStopped { + "work in separate IO dispatcher" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala index 70033246e3..aeda660d86 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ActorAttributes._ import akka.stream.Supervision._ import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.{ TestLatch, TestProbe } import scala.concurrent.Await diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 105377140b..df94676d3a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -12,6 +12,7 @@ import java.util.stream.{ Collector, Collectors } import akka.stream._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.DefaultTimeout diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala index f267ded5d1..5ab1be39a6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala @@ -8,6 +8,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ class SubscriberSinkSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index 35ec8fe54a..374068478d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -8,6 +8,7 @@ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.impl.SubscriptionTimeoutException import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 028805afe9..62fb631fce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.TimingTest class TickSourceSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala index 8463b41cee..a7f778e6c0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala @@ -11,6 +11,7 @@ import akka.actor.ActorSystem import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.{ ActorMaterializer, _ } import akka.testkit.TestLatch diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala index 771d985d95..2173f6919e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala @@ -15,6 +15,7 @@ import akka.stream.Supervision._ import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.{ ActorMaterializer, _ }