Expose assertAllStagesStopped

This commit is contained in:
Martynas Mickevičius 2018-06-15 18:16:58 +03:00
parent 28746a4cfe
commit 766944a592
No known key found for this signature in database
GPG key ID: E735DF276C508071
107 changed files with 263 additions and 85 deletions

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.testkit.javadsl
import akka.stream.Materializer
import akka.stream.impl.PhasedFusingActorMaterializer
import akka.stream.testkit.scaladsl
object StreamTestKit {
/**
* Assert that there are no stages running under a given materializer.
* Usually this assertion is run after a test-case to check that all of the
* stages have terminated successfully.
*/
def assertAllStagesStopped(mat: Materializer): Unit =
mat match {
case impl: PhasedFusingActorMaterializer
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor)
case _
}
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.testkit.scaladsl
import akka.actor.{ ActorRef, ActorSystem }
import akka.annotation.InternalApi
import akka.stream.Materializer
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.testkit.TestProbe
import scala.concurrent.duration._
object StreamTestKit {
/**
* Asserts that after the given code block is ran, no stages are left over
* that were created by the given materializer.
*
* This assertion is useful to check that all of the stages have
* terminated successfully.
*/
def assertAllStagesStopped[T](block: T)(implicit materializer: Materializer): T =
materializer match {
case impl: PhasedFusingActorMaterializer
stopAllChildren(impl.system, impl.supervisor)
val result = block
assertNoChildren(impl.system, impl.supervisor)
result
case _ block
}
/** INTERNAL API */
@InternalApi private[testkit] def stopAllChildren(sys: ActorSystem, supervisor: ActorRef): Unit = {
val probe = TestProbe()(sys)
probe.send(supervisor, StreamSupervisor.StopChildren)
probe.expectMsg(StreamSupervisor.StoppedChildren)
}
/** INTERNAL API */
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = {
val probe = TestProbe()(sys)
probe.within(5.seconds) {
var children = Set.empty[ActorRef]
try probe.awaitAssert {
supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
children = probe.expectMsgType[StreamSupervisor.Children].children
assert(
children.isEmpty,
s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]")
}
catch {
case ex: Throwable
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
throw ex
}
}
}
}

View file

@ -10,6 +10,7 @@ import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.StreamTestKit.ProbeSink
/** /**
* Factory methods for test sinks. * Factory methods for test sinks.
@ -20,6 +21,6 @@ object TestSink {
* A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]]. * A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]].
*/ */
def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] = def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] =
Sink.fromGraph[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(Inlet("ProbeSink.in")))) Sink.fromGraph[T, TestSubscriber.Probe[T]](new ProbeSink(none, SinkShape(Inlet("ProbeSink.in"))))
} }

View file

@ -8,6 +8,7 @@ import akka.stream._
import akka.stream.Attributes.none import akka.stream.Attributes.none
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.StreamTestKit.ProbeSource
import akka.actor.ActorSystem import akka.actor.ActorSystem
@ -19,6 +20,6 @@ object TestSource {
/** /**
* A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]]. * A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]].
*/ */
def probe[T](implicit system: ActorSystem) = Source.fromGraph[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) def probe[T](implicit system: ActorSystem) = Source.fromGraph[T, TestPublisher.Probe[T]](new ProbeSource(none, SourceShape(Outlet("ProbeSource.out"))))
} }

View file

@ -9,7 +9,7 @@ import akka.stream.scaladsl._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import scala.collection.immutable import scala.collection.immutable
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
abstract class BaseTwoStreamsSetup extends AkkaSpec { abstract class BaseTwoStreamsSetup extends AkkaSpec {

View file

@ -7,7 +7,7 @@ package akka.stream.testkit
import akka.stream.scaladsl.{ Sink, Source } import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.TestPublisher._ import akka.stream.testkit.TestPublisher._
import akka.stream.testkit.TestSubscriber._ import akka.stream.testkit.TestSubscriber._
import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import org.reactivestreams.Subscription import org.reactivestreams.Subscription
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec

View file

@ -20,32 +20,6 @@ object Utils {
case class TE(message: String) extends RuntimeException(message) with NoStackTrace case class TE(message: String) extends RuntimeException(message) with NoStackTrace
def assertAllStagesStopped[T](block: T)(implicit materializer: Materializer): T =
materializer match {
case impl: PhasedFusingActorMaterializer
val probe = TestProbe()(impl.system)
probe.send(impl.supervisor, StreamSupervisor.StopChildren)
probe.expectMsg(StreamSupervisor.StoppedChildren)
val result = block
probe.within(5.seconds) {
var children = Set.empty[ActorRef]
try probe.awaitAssert {
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
children = probe.expectMsgType[StreamSupervisor.Children].children
assert(
children.isEmpty,
s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]")
}
catch {
case ex: Throwable
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
throw ex
}
}
result
case _ block
}
def assertDispatcher(ref: ActorRef, dispatcher: String): Unit = ref match { def assertDispatcher(ref: ActorRef, dispatcher: String): Unit = ref match {
case r: ActorRefWithCell case r: ActorRefWithCell
if (r.underlying.props.dispatcher != dispatcher) if (r.underlying.props.dispatcher != dispatcher)

View file

@ -4,6 +4,9 @@
package akka.stream; package akka.stream;
import akka.stream.testkit.javadsl.StreamTestKit;
import org.junit.After;
import org.junit.Before;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
@ -11,11 +14,23 @@ import akka.testkit.AkkaJUnitActorSystemResource;
public abstract class StreamTest extends JUnitSuite { public abstract class StreamTest extends JUnitSuite {
final protected ActorSystem system; final protected ActorSystem system;
final protected ActorMaterializer materializer; final private ActorMaterializerSettings settings;
protected ActorMaterializer materializer;
protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) { protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) {
system = actorSystemResource.getSystem(); system = actorSystemResource.getSystem();
ActorMaterializerSettings settings = ActorMaterializerSettings.create(system); settings = ActorMaterializerSettings.create(system);
}
@Before
public void setUp() {
materializer = ActorMaterializer.create(settings, system); materializer = ActorMaterializer.create(settings, system);
} }
@After
public void tearDown() {
StreamTestKit.assertAllStagesStopped(materializer);
materializer.shutdown();
}
} }

View file

@ -8,6 +8,7 @@ import akka.Done;
import akka.NotUsed; import akka.NotUsed;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Cancellable; import akka.actor.Cancellable;
import akka.actor.Status;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.function.*; import akka.japi.function.*;
import akka.japi.pf.PFBuilder; import akka.japi.pf.PFBuilder;
@ -446,6 +447,7 @@ public class SourceTest extends StreamTest {
probe.expectNoMessage(Duration.ofMillis(200)); probe.expectNoMessage(Duration.ofMillis(200));
probe.expectMsgEquals("tick"); probe.expectMsgEquals("tick");
probe.expectNoMessage(Duration.ofMillis(200)); probe.expectNoMessage(Duration.ofMillis(200));
cancellable.cancel();
} }
@Test @Test
@ -547,6 +549,7 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(1); probe.expectMsgEquals(1);
ref.tell(2, ActorRef.noSender()); ref.tell(2, ActorRef.noSender());
probe.expectMsgEquals(2); probe.expectMsgEquals(2);
ref.tell(new Status.Success("ok"), ActorRef.noSender());
} }
@Test @Test

View file

@ -90,6 +90,8 @@ public class TcpTest extends StreamTest {
for (int i = 0; i < testInput.size(); i ++) { for (int i = 0; i < testInput.size(); i ++) {
assertEquals(testInput.get(i).head(), result[i]); assertEquals(testInput.get(i).head(), result[i]);
} }
b.unbind();
} }
@Test @Test
@ -111,6 +113,7 @@ public class TcpTest extends StreamTest {
if (e.getCause() instanceof BindFailedException) {} // all good if (e.getCause() instanceof BindFailedException) {} // all good
else throw new AssertionError("failed", e); else throw new AssertionError("failed", e);
// expected // expected
b.unbind();
} catch (Exception e) { } catch (Exception e) {
throw new AssertionError("failed", e); throw new AssertionError("failed", e);
} }

View file

@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.{ ClosedShape, ActorMaterializer }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await

View file

@ -9,6 +9,7 @@ import akka.stream.{ ClosedShape, ActorMaterializer, ActorMaterializerSettings,
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.ReactiveStreamsCompliance
import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.Mute
import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } import akka.testkit.{ EventFilter, ImplicitSender, TestProbe }
@ -410,7 +411,7 @@ class ActorPublisherSpec extends StreamSpec(ActorPublisherSpec.config) with Impl
"be able to define a subscription-timeout, after which it should shut down" in { "be able to define a subscription-timeout, after which it should shut down" in {
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
Utils.assertAllStagesStopped { assertAllStagesStopped {
val timeout = 150.millis val timeout = 150.millis
val a = system.actorOf(timeoutingProps(testActor, timeout)) val a = system.actorOf(timeoutingProps(testActor, timeout))
val pub = ActorPublisher(a) val pub = ActorPublisher(a)

View file

@ -9,6 +9,7 @@ import akka.stream.scaladsl.{ Source, Flow }
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.TestProbe import akka.testkit.TestProbe
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }

View file

@ -10,7 +10,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream._ import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.testkit.Utils.assertAllStagesStopped import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.impl.fusing._ import akka.stream.impl.fusing._
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeoutException
import akka.Done import akka.Done
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream._ import akka.stream._
import org.scalatest.{ Matchers, WordSpecLike } import org.scalatest.{ Matchers, WordSpecLike }

View file

@ -13,6 +13,7 @@ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.testkit.{ EventFilter, TestLatch } import akka.testkit.{ EventFilter, TestLatch }

View file

@ -6,6 +6,7 @@ package akka.stream.impl.fusing
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit { class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit {

View file

@ -10,6 +10,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.{ Attributes, Inlet, SinkShape, ActorMaterializer } import akka.stream.{ Attributes, Inlet, SinkShape, ActorMaterializer }
import akka.stream.stage.{ InHandler, AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue } import akka.stream.stage.{ InHandler, AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.{ Await, Promise, Future } import scala.concurrent.{ Await, Promise, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -16,6 +16,7 @@ import akka.stream.impl.StreamSupervisor.Children
import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.scaladsl.{ FileIO, Sink, Source }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream._ import akka.stream._
import akka.util.ByteString import akka.util.ByteString
import com.google.common.jimfs.{ Configuration, Jimfs } import com.google.common.jimfs.{ Configuration, Jimfs }

View file

@ -16,6 +16,7 @@ import akka.stream.impl.StreamSupervisor.Children
import akka.stream.io.FileSourceSpec.Settings import akka.stream.io.FileSourceSpec.Settings
import akka.stream.scaladsl.{ FileIO, Keep, Sink } import akka.stream.scaladsl.{ FileIO, Keep, Sink }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestDuration import akka.testkit.TestDuration

View file

@ -15,6 +15,7 @@ import akka.stream.impl.io.InputStreamSinkStage
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.scaladsl.{ Keep, Source, StreamConverters } import akka.stream.scaladsl.{ Keep, Source, StreamConverters }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSource
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit.TestProbe import akka.testkit.TestProbe

View file

@ -10,6 +10,7 @@ import java.util.concurrent.CountDownLatch
import akka.stream.scaladsl.{ Sink, StreamConverters } import akka.stream.scaladsl.{ Sink, StreamConverters }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString import akka.util.ByteString

View file

@ -9,6 +9,7 @@ import java.io.OutputStream
import akka.stream.scaladsl.{ Source, StreamConverters } import akka.stream.scaladsl.{ Source, StreamConverters }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.{ AbruptIOTerminationException, ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ AbruptIOTerminationException, ActorMaterializer, ActorMaterializerSettings }
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.util.ByteString import akka.util.ByteString

View file

@ -16,6 +16,7 @@ import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.io.OutputStreamSourceStage import akka.stream.impl.io.OutputStreamSourceStage
import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } import akka.stream.scaladsl.{ Keep, Sink, StreamConverters }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestProbe import akka.testkit.TestProbe

View file

@ -15,6 +15,7 @@ import akka.stream._
import akka.stream.scaladsl.Tcp.{ IncomingConnection, ServerBinding } import akka.stream.scaladsl.Tcp.{ IncomingConnection, ServerBinding }
import akka.stream.scaladsl.{ Flow, _ } import akka.stream.scaladsl.{ Flow, _ }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe } import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe }
import akka.testkit.SocketUtil.temporaryServerAddress import akka.testkit.SocketUtil.temporaryServerAddress

View file

@ -24,6 +24,7 @@ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.util.ByteString import akka.util.ByteString
import javax.net.ssl._ import javax.net.ssl._

View file

@ -9,6 +9,7 @@ import akka.actor.{ Actor, ActorRef, Props, Status }
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.Attributes.inputBuffer import akka.stream.Attributes.inputBuffer
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.testkit.TestProbe import akka.testkit.TestProbe

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorRef import akka.actor.ActorRef

View file

@ -9,6 +9,7 @@ import akka.stream.{ Attributes, ActorMaterializer, OverflowStrategy }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.Status import akka.actor.Status

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.util.ByteString import akka.util.ByteString
import akka.stream._ import akka.stream._
import scala.concurrent.Await import scala.concurrent.Await

View file

@ -11,6 +11,7 @@ import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.Supervision.resumingDecider import akka.stream.Supervision.resumingDecider
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit.{ TestActors, TestProbe } import akka.testkit.{ TestActors, TestProbe }

View file

@ -11,6 +11,7 @@ import akka.stream.{ BufferOverflowException, ActorMaterializer, ActorMaterializ
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class FlowBufferSpec extends StreamSpec { class FlowBufferSpec extends StreamSpec {

View file

@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class FlowConcatAllSpec extends StreamSpec { class FlowConcatAllSpec extends StreamSpec {

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber } import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber }
import org.reactivestreams.Publisher import org.reactivestreams.Publisher

View file

@ -8,6 +8,7 @@ import akka.Done
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream.OverflowStrategies.EmitEarly import akka.stream.OverflowStrategies.EmitEarly
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream._ import akka.stream._

View file

@ -8,7 +8,8 @@ import akka.stream._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, Utils } import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.StreamTestKit._
class FlowDetacherSpec extends StreamSpec { class FlowDetacherSpec extends StreamSpec {
@ -16,14 +17,14 @@ class FlowDetacherSpec extends StreamSpec {
"A Detacher" must { "A Detacher" must {
"pass through all elements" in Utils.assertAllStagesStopped { "pass through all elements" in assertAllStagesStopped {
Source(1 to 100) Source(1 to 100)
.detach .detach
.runWith(Sink.seq) .runWith(Sink.seq)
.futureValue should ===(1 to 100) .futureValue should ===(1 to 100)
} }
"pass through failure" in Utils.assertAllStagesStopped { "pass through failure" in assertAllStagesStopped {
val ex = new Exception("buh") val ex = new Exception("buh")
val result = Source(1 to 100) val result = Source(1 to 100)
.map(x if (x == 50) throw ex else x) .map(x if (x == 50) throw ex else x)
@ -35,7 +36,7 @@ class FlowDetacherSpec extends StreamSpec {
} }
"emit the last element when completed without demand" in Utils.assertAllStagesStopped { "emit the last element when completed without demand" in assertAllStagesStopped {
Source.single(42) Source.single(42)
.detach .detach
.runWith(TestSink.probe) .runWith(TestSink.probe)

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream.ActorAttributes._ import akka.stream.ActorAttributes._
import akka.stream.Supervision._ import akka.stream.Supervision._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._

View file

@ -10,6 +10,7 @@ import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random }
import akka.stream.ActorAttributes._ import akka.stream.ActorAttributes._
import akka.stream.Supervision._ import akka.stream.Supervision._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._

View file

@ -7,7 +7,8 @@ package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream._ import akka.stream._
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent._ import scala.concurrent._
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -10,6 +10,7 @@ import akka.stream.ActorMaterializer
import akka.stream.Supervision.{ restartingDecider, resumingDecider } import akka.stream.Supervision.{ restartingDecider, resumingDecider }
import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout

View file

@ -11,6 +11,7 @@ import scala.concurrent.Await
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -8,6 +8,7 @@ import scala.util.control.NoStackTrace
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -11,6 +11,7 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class FlowFromFutureSpec extends StreamSpec { class FlowFromFutureSpec extends StreamSpec {

View file

@ -20,6 +20,7 @@ import akka.stream.Supervision.resumingDecider
import akka.stream.impl.fusing.GroupBy import akka.stream.impl.fusing.GroupBy
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSource

View file

@ -11,6 +11,7 @@ import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, ThrottleMode } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, ThrottleMode }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.TimingTest import akka.testkit.TimingTest
import akka.util.ConstantFun import akka.util.ConstantFun

View file

@ -5,7 +5,8 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestPublisher, Utils } import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestPublisher }
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -18,13 +19,13 @@ class FlowIdleInjectSpec extends StreamSpec {
"keepAlive" must { "keepAlive" must {
"not emit additional elements if upstream is fast enough" in Utils.assertAllStagesStopped { "not emit additional elements if upstream is fast enough" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 10).keepAlive(1.second, () 0).grouped(1000).runWith(Sink.head), Source(1 to 10).keepAlive(1.second, () 0).grouped(1000).runWith(Sink.head),
3.seconds) should ===(1 to 10) 3.seconds) should ===(1 to 10)
} }
"emit elements periodically after silent periods" in Utils.assertAllStagesStopped { "emit elements periodically after silent periods" in assertAllStagesStopped {
val sourceWithIdleGap = Source(1 to 5) ++ Source(6 to 10).initialDelay(2.second) val sourceWithIdleGap = Source(1 to 5) ++ Source(6 to 10).initialDelay(2.second)
val result = Await.result( val result = Await.result(

View file

@ -6,7 +6,8 @@ package akka.stream.scaladsl
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ StreamSpec, Utils, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -19,13 +20,13 @@ class FlowInitialDelaySpec extends StreamSpec {
"Flow initialDelay" must { "Flow initialDelay" must {
"work with zero delay" in Utils.assertAllStagesStopped { "work with zero delay" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 10).initialDelay(Duration.Zero).grouped(100).runWith(Sink.head), Source(1 to 10).initialDelay(Duration.Zero).grouped(100).runWith(Sink.head),
1.second) should ===(1 to 10) 1.second) should ===(1 to 10)
} }
"delay elements by the specified time but not more" in Utils.assertAllStagesStopped { "delay elements by the specified time but not more" in assertAllStagesStopped {
a[TimeoutException] shouldBe thrownBy { a[TimeoutException] shouldBe thrownBy {
Await.result( Await.result(
Source(1 to 10).initialDelay(2.seconds).initialTimeout(1.second).runWith(Sink.ignore), Source(1 to 10).initialDelay(2.seconds).initialTimeout(1.second).runWith(Sink.ignore),
@ -37,7 +38,7 @@ class FlowInitialDelaySpec extends StreamSpec {
2.seconds) 2.seconds)
} }
"properly ignore timer while backpressured" in Utils.assertAllStagesStopped { "properly ignore timer while backpressured" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]() val probe = TestSubscriber.probe[Int]()
Source(1 to 10).initialDelay(0.5.second).runWith(Sink.fromSubscriber(probe)) Source(1 to 10).initialDelay(0.5.second).runWith(Sink.fromSubscriber(probe))

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher

View file

@ -11,6 +11,7 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.EventFilter import akka.testkit.EventFilter
class FlowIteratorSpec extends AbstractFlowIteratorSpec { class FlowIteratorSpec extends AbstractFlowIteratorSpec {

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowShape, OverflowStrategy } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowShape, OverflowStrategy }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.time._ import org.scalatest.time._

View file

@ -8,7 +8,8 @@ import akka.Done
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.{ ActorMaterializer, ClosedShape, KillSwitches } import akka.stream.{ ActorMaterializer, ClosedShape, KillSwitches }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.Utils.TE
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -12,6 +12,7 @@ import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.Supervision.resumingDecider import akka.stream.Supervision.resumingDecider
import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.{ TestLatch, TestProbe } import akka.testkit.{ TestLatch, TestProbe }

View file

@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.TestLatch import akka.testkit.TestLatch
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.stream.ActorAttributes.supervisionStrategy import akka.stream.ActorAttributes.supervisionStrategy

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher

View file

@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.TestProbe import akka.testkit.TestProbe
class FlowOnCompleteSpec extends StreamSpec with ScriptedTest { class FlowOnCompleteSpec extends StreamSpec with ScriptedTest {

View file

@ -11,6 +11,7 @@ import scala.util.control.NoStackTrace
import akka.stream._ import akka.stream._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class FlowPrefixAndTailSpec extends StreamSpec { class FlowPrefixAndTailSpec extends StreamSpec {

View file

@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace

View file

@ -9,6 +9,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream._ import akka.stream._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace

View file

@ -10,6 +10,7 @@ import scala.concurrent.Await
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -9,6 +9,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision } import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit._ import akka.stream.testkit._
import org.scalacheck.Gen import org.scalacheck.Gen

View file

@ -10,6 +10,7 @@ import akka.NotUsed
import akka.actor._ import akka.actor._
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream._ import akka.stream._
import akka.testkit.TestDuration import akka.testkit.TestDuration

View file

@ -10,6 +10,7 @@ import akka.stream.Supervision.resumingDecider
import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -9,6 +9,7 @@ import akka.stream._
import akka.stream.Supervision.resumingDecider import akka.stream.Supervision.resumingDecider
import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import org.reactivestreams.Publisher import org.reactivestreams.Publisher

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream.ActorAttributes._ import akka.stream.ActorAttributes._
import akka.stream.Supervision._ import akka.stream.Supervision._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class FlowTakeWithinSpec extends StreamSpec { class FlowTakeWithinSpec extends StreamSpec {

View file

@ -11,6 +11,7 @@ import akka.stream.ThrottleMode.{ Enforcing, Shaping }
import akka.stream._ import akka.stream._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.util.ByteString import akka.util.ByteString
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -27,7 +28,7 @@ class FlowThrottleSpec extends StreamSpec {
ByteString(new Random().shuffle(0 to 255).take(length).map(_.toByte).toArray) ByteString(new Random().shuffle(0 to 255).take(length).map(_.toByte).toArray)
"Throttle for single cost elements" must { "Throttle for single cost elements" must {
"work for the happy case" in Utils.assertAllStagesStopped { "work for the happy case" in assertAllStagesStopped {
//Source(1 to 5).throttle(1, 100.millis, 0, Shaping) //Source(1 to 5).throttle(1, 100.millis, 0, Shaping)
Source(1 to 5).throttle(19, 1000.millis, -1, Shaping) Source(1 to 5).throttle(19, 1000.millis, -1, Shaping)
.runWith(TestSink.probe[Int]) .runWith(TestSink.probe[Int])
@ -36,7 +37,7 @@ class FlowThrottleSpec extends StreamSpec {
.expectComplete() .expectComplete()
} }
"accept very high rates" in Utils.assertAllStagesStopped { "accept very high rates" in assertAllStagesStopped {
Source(1 to 5).throttle(1, 1.nanos, 0, Shaping) Source(1 to 5).throttle(1, 1.nanos, 0, Shaping)
.runWith(TestSink.probe[Int]) .runWith(TestSink.probe[Int])
.request(5) .request(5)
@ -44,7 +45,7 @@ class FlowThrottleSpec extends StreamSpec {
.expectComplete() .expectComplete()
} }
"accept very low rates" in Utils.assertAllStagesStopped { "accept very low rates" in assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.days, 1, Shaping) Source(1 to 5).throttle(1, 100.days, 1, Shaping)
.runWith(TestSink.probe[Int]) .runWith(TestSink.probe[Int])
.request(5) .request(5)
@ -53,7 +54,7 @@ class FlowThrottleSpec extends StreamSpec {
.cancel() // We won't wait 100 days, sorry .cancel() // We won't wait 100 days, sorry
} }
"work if there are two throttles in different streams" in Utils.assertAllStagesStopped { "work if there are two throttles in different streams" in assertAllStagesStopped {
val sharedThrottle = Flow[Int].throttle(1, 1.day, 1, Enforcing) val sharedThrottle = Flow[Int].throttle(1, 1.day, 1, Enforcing)
// If there is accidental shared state then we would not be able to pass through the single element // If there is accidental shared state then we would not be able to pass through the single element
@ -70,7 +71,7 @@ class FlowThrottleSpec extends StreamSpec {
} }
"emit single element per tick" in Utils.assertAllStagesStopped { "emit single element per tick" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
@ -89,7 +90,7 @@ class FlowThrottleSpec extends StreamSpec {
downstream.expectComplete() downstream.expectComplete()
} }
"not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped { "not send downstream if upstream does not emit element" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source.fromPublisher(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink.fromSubscriber(downstream)) Source.fromPublisher(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink.fromSubscriber(downstream))
@ -105,13 +106,13 @@ class FlowThrottleSpec extends StreamSpec {
upstream.sendComplete() upstream.sendComplete()
} }
"cancel when downstream cancels" in Utils.assertAllStagesStopped { "cancel when downstream cancels" in assertAllStagesStopped {
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source(1 to 10).throttle(1, 300.millis, 0, Shaping).runWith(Sink.fromSubscriber(downstream)) Source(1 to 10).throttle(1, 300.millis, 0, Shaping).runWith(Sink.fromSubscriber(downstream))
downstream.cancel() downstream.cancel()
} }
"send elements downstream as soon as time comes" in Utils.assertAllStagesStopped { "send elements downstream as soon as time comes" in assertAllStagesStopped {
val probe = Source(1 to 10).throttle(2, 750.millis, 0, Shaping).runWith(TestSink.probe[Int]) val probe = Source(1 to 10).throttle(2, 750.millis, 0, Shaping).runWith(TestSink.probe[Int])
.request(5) .request(5)
probe.receiveWithin(900.millis) should be(Seq(1, 2)) probe.receiveWithin(900.millis) should be(Seq(1, 2))
@ -122,7 +123,7 @@ class FlowThrottleSpec extends StreamSpec {
.cancel() .cancel()
} }
"burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped { "burst according to its maximum if enough time passed" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source.fromPublisher(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink.fromSubscriber(downstream)) Source.fromPublisher(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink.fromSubscriber(downstream))
@ -139,7 +140,7 @@ class FlowThrottleSpec extends StreamSpec {
downstream.cancel() downstream.cancel()
} }
"burst some elements if have enough time" in Utils.assertAllStagesStopped { "burst some elements if have enough time" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source.fromPublisher(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink.fromSubscriber(downstream)) Source.fromPublisher(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink.fromSubscriber(downstream))
@ -160,7 +161,7 @@ class FlowThrottleSpec extends StreamSpec {
downstream.cancel() downstream.cancel()
} }
"throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped { "throw exception when exceeding throughtput in enforced mode" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 5).throttle(1, 200.millis, 5, Enforcing).runWith(Sink.seq), Source(1 to 5).throttle(1, 200.millis, 5, Enforcing).runWith(Sink.seq),
2.seconds) should ===(1 to 5) // Burst is 5 so this will not fail 2.seconds) should ===(1 to 5) // Burst is 5 so this will not fail
@ -172,7 +173,7 @@ class FlowThrottleSpec extends StreamSpec {
} }
} }
"properly combine shape and throttle modes" in Utils.assertAllStagesStopped { "properly combine shape and throttle modes" in assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.millis, 5, Shaping) Source(1 to 5).throttle(1, 100.millis, 5, Shaping)
.throttle(1, 100.millis, 5, Enforcing) .throttle(1, 100.millis, 5, Enforcing)
.runWith(TestSink.probe[Int]) .runWith(TestSink.probe[Int])
@ -183,7 +184,7 @@ class FlowThrottleSpec extends StreamSpec {
} }
"Throttle for various cost elements" must { "Throttle for various cost elements" must {
"work for happy case" in Utils.assertAllStagesStopped { "work for happy case" in assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.millis, 0, (_) 1, Shaping) Source(1 to 5).throttle(1, 100.millis, 0, (_) 1, Shaping)
.runWith(TestSink.probe[Int]) .runWith(TestSink.probe[Int])
.request(5) .request(5)
@ -191,7 +192,7 @@ class FlowThrottleSpec extends StreamSpec {
.expectComplete() .expectComplete()
} }
"emit elements according to cost" in Utils.assertAllStagesStopped { "emit elements according to cost" in assertAllStagesStopped {
val list = (1 to 4).map(_ * 2).map(genByteString) val list = (1 to 4).map(_ * 2).map(genByteString)
Source(list).throttle(2, 200.millis, 0, _.length, Shaping) Source(list).throttle(2, 200.millis, 0, _.length, Shaping)
.runWith(TestSink.probe[ByteString]) .runWith(TestSink.probe[ByteString])
@ -206,7 +207,7 @@ class FlowThrottleSpec extends StreamSpec {
.expectComplete() .expectComplete()
} }
"not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped { "not send downstream if upstream does not emit element" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source.fromPublisher(upstream).throttle(2, 300.millis, 0, identity, Shaping).runWith(Sink.fromSubscriber(downstream)) Source.fromPublisher(upstream).throttle(2, 300.millis, 0, identity, Shaping).runWith(Sink.fromSubscriber(downstream))
@ -222,13 +223,13 @@ class FlowThrottleSpec extends StreamSpec {
upstream.sendComplete() upstream.sendComplete()
} }
"cancel when downstream cancels" in Utils.assertAllStagesStopped { "cancel when downstream cancels" in assertAllStagesStopped {
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source(1 to 10).throttle(2, 200.millis, 0, identity, Shaping).runWith(Sink.fromSubscriber(downstream)) Source(1 to 10).throttle(2, 200.millis, 0, identity, Shaping).runWith(Sink.fromSubscriber(downstream))
downstream.cancel() downstream.cancel()
} }
"send elements downstream as soon as time comes" in Utils.assertAllStagesStopped { "send elements downstream as soon as time comes" in assertAllStagesStopped {
val probe = Source(1 to 10).throttle(4, 500.millis, 0, _ 2, Shaping).runWith(TestSink.probe[Int]) val probe = Source(1 to 10).throttle(4, 500.millis, 0, _ 2, Shaping).runWith(TestSink.probe[Int])
.request(5) .request(5)
probe.receiveWithin(600.millis) should be(Seq(1, 2)) probe.receiveWithin(600.millis) should be(Seq(1, 2))
@ -239,7 +240,7 @@ class FlowThrottleSpec extends StreamSpec {
.cancel() .cancel()
} }
"burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped { "burst according to its maximum if enough time passed" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source.fromPublisher(upstream).throttle(2, 400.millis, 5, (_) 1, Shaping).runWith(Sink.fromSubscriber(downstream)) Source.fromPublisher(upstream).throttle(2, 400.millis, 5, (_) 1, Shaping).runWith(Sink.fromSubscriber(downstream))
@ -260,7 +261,7 @@ class FlowThrottleSpec extends StreamSpec {
downstream.cancel() downstream.cancel()
} }
"burst some elements if have enough time" in Utils.assertAllStagesStopped { "burst some elements if have enough time" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]() val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]() val downstream = TestSubscriber.probe[Int]()
Source.fromPublisher(upstream).throttle(2, 400.millis, 5, (e) if (e < 9) 1 else 20, Shaping).runWith(Sink.fromSubscriber(downstream)) Source.fromPublisher(upstream).throttle(2, 400.millis, 5, (e) if (e < 9) 1 else 20, Shaping).runWith(Sink.fromSubscriber(downstream))
@ -281,7 +282,7 @@ class FlowThrottleSpec extends StreamSpec {
downstream.cancel() downstream.cancel()
} }
"throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped { "throw exception when exceeding throughtput in enforced mode" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 4).throttle(2, 200.millis, 10, identity, Enforcing).runWith(Sink.seq), Source(1 to 4).throttle(2, 200.millis, 10, identity, Enforcing).runWith(Sink.seq),
2.seconds) should ===(1 to 4) // Burst is 10 so this will not fail 2.seconds) should ===(1 to 4) // Burst is 10 so this will not fail
@ -293,7 +294,7 @@ class FlowThrottleSpec extends StreamSpec {
} }
} }
"properly combine shape and enforce modes" in Utils.assertAllStagesStopped { "properly combine shape and enforce modes" in assertAllStagesStopped {
Source(1 to 5).throttle(2, 200.millis, 0, identity, Shaping) Source(1 to 5).throttle(2, 200.millis, 0, identity, Shaping)
.throttle(1, 100.millis, 5, Enforcing) .throttle(1, 100.millis, 5, Enforcing)
.runWith(TestSink.probe[Int]) .runWith(TestSink.probe[Int])
@ -302,7 +303,7 @@ class FlowThrottleSpec extends StreamSpec {
.expectComplete() .expectComplete()
} }
"handle rate calculation function exception" in Utils.assertAllStagesStopped { "handle rate calculation function exception" in assertAllStagesStopped {
val ex = new RuntimeException with NoStackTrace val ex = new RuntimeException with NoStackTrace
Source(1 to 5).throttle(2, 200.millis, 0, (_) { throw ex }, Shaping) Source(1 to 5).throttle(2, 200.millis, 0, (_) { throw ex }, Shaping)
.throttle(1, 100.millis, 5, Enforcing) .throttle(1, 100.millis, 5, Enforcing)
@ -311,7 +312,7 @@ class FlowThrottleSpec extends StreamSpec {
.expectError(ex) .expectError(ex)
} }
"work for real scenario with automatic burst size" taggedAs TimingTest in Utils.assertAllStagesStopped { "work for real scenario with automatic burst size" taggedAs TimingTest in assertAllStagesStopped {
val startTime = System.nanoTime() val startTime = System.nanoTime()
val counter1 = new AtomicInteger val counter1 = new AtomicInteger
val timestamp1 = new AtomicLong(System.nanoTime()) val timestamp1 = new AtomicLong(System.nanoTime())

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.actor.{ Actor, PoisonPill, Props } import akka.actor.{ Actor, PoisonPill, Props }
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit.TestActors import akka.testkit.TestActors

View file

@ -9,6 +9,7 @@ import akka.pattern.pipe
import akka.stream._ import akka.stream._
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.Done import akka.Done
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber }
import org.reactivestreams.Publisher import org.reactivestreams.Publisher

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestSubscriber }

View file

@ -8,8 +8,9 @@ import java.util.concurrent.{ CompletableFuture, TimeUnit }
import akka.stream._ import akka.stream._
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue } import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.Utils.TE
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.TestLatch import akka.testkit.TestLatch
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }

View file

@ -11,6 +11,7 @@ import akka.stream._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class GraphBalanceSpec extends StreamSpec { class GraphBalanceSpec extends StreamSpec {

View file

@ -11,6 +11,7 @@ import scala.concurrent.duration._
import akka.stream._ import akka.stream._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class GraphBroadcastSpec extends StreamSpec { class GraphBroadcastSpec extends StreamSpec {

View file

@ -9,6 +9,7 @@ import scala.concurrent.{ Promise }
import akka.stream._ import akka.stream._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class GraphConcatSpec extends TwoStreamsSetup { class GraphConcatSpec extends TwoStreamsSetup {

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream._ import akka.stream._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSource

View file

@ -11,6 +11,7 @@ import scala.concurrent.duration._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class GraphMergeSpec extends TwoStreamsSetup { class GraphMergeSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._ import GraphDSL.Implicits._

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.{ OverflowStrategy, ActorMaterializer, ActorMaterializerSettings, ClosedShape } import akka.stream.{ OverflowStrategy, ActorMaterializer, ActorMaterializerSettings, ClosedShape }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -15,6 +15,7 @@ import scala.concurrent.duration._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
object GraphStageTimersSpec { object GraphStageTimersSpec {
case object TestSingleTimer case object TestSingleTimer

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
import akka.stream.{ ClosedShape, OverflowStrategy, ActorMaterializerSettings, ActorMaterializer } import akka.stream.{ ClosedShape, OverflowStrategy, ActorMaterializerSettings, ActorMaterializer }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class GraphUnzipSpec extends StreamSpec { class GraphUnzipSpec extends StreamSpec {

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import akka.stream._ import akka.stream._
import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream._ import akka.stream._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream._ import akka.stream._
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream._ import akka.stream._
import scala.concurrent.Await import scala.concurrent.Await

View file

@ -10,6 +10,7 @@ import scala.concurrent.duration._
import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class HeadSinkSpec extends StreamSpec with ScriptedTest { class HeadSinkSpec extends StreamSpec with ScriptedTest {

View file

@ -6,8 +6,9 @@ package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, KillSwitches, ThrottleMode } import akka.stream.{ ActorMaterializer, KillSwitches, ThrottleMode }
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.EventFilter import akka.testkit.EventFilter
import scala.collection.immutable import scala.collection.immutable

View file

@ -11,6 +11,7 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
class LastSinkSpec extends StreamSpec with ScriptedTest { class LastSinkSpec extends StreamSpec with ScriptedTest {

View file

@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.Done import akka.Done
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.testkit.Utils.assertAllStagesStopped import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures

View file

@ -9,6 +9,7 @@ import akka.stream._
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue } import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue }
import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.{ StreamSpec, TestPublisher }
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures

View file

@ -14,6 +14,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic }
import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }

View file

@ -9,7 +9,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.Done import akka.Done
import akka.stream.impl.LazySource import akka.stream.impl.LazySource
import akka.stream.stage.{ GraphStage, GraphStageLogic } import akka.stream.stage.{ GraphStage, GraphStageLogic }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape } import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape }
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout

View file

@ -5,7 +5,8 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.{ AbruptStageTerminationException, ActorMaterializer } import akka.stream.{ AbruptStageTerminationException, ActorMaterializer }
import akka.stream.testkit.{ StreamSpec, TestSubscriber, Utils } import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -17,7 +18,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
"The Maybe Source" must { "The Maybe Source" must {
"complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped { "complete materialized future with None when stream cancels" in assertAllStagesStopped {
val neverSource = Source.maybe[Int] val neverSource = Source.maybe[Int]
val pubSink = Sink.asPublisher[Int](false) val pubSink = Sink.asPublisher[Int](false)
@ -34,7 +35,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
f.future.futureValue shouldEqual None f.future.futureValue shouldEqual None
} }
"allow external triggering of empty completion" in Utils.assertAllStagesStopped { "allow external triggering of empty completion" in assertAllStagesStopped {
val neverSource = Source.maybe[Int].filter(_ false) val neverSource = Source.maybe[Int].filter(_ false)
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 } val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
@ -46,7 +47,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
counterFuture.futureValue shouldEqual 0 counterFuture.futureValue shouldEqual 0
} }
"allow external triggering of empty completion when there was no demand" in Utils.assertAllStagesStopped { "allow external triggering of empty completion when there was no demand" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]() val probe = TestSubscriber.probe[Int]()
val promise = Source.maybe[Int].to(Sink.fromSubscriber(probe)).run() val promise = Source.maybe[Int].to(Sink.fromSubscriber(probe)).run()
@ -56,7 +57,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
probe.expectComplete() probe.expectComplete()
} }
"allow external triggering of non-empty completion" in Utils.assertAllStagesStopped { "allow external triggering of non-empty completion" in assertAllStagesStopped {
val neverSource = Source.maybe[Int] val neverSource = Source.maybe[Int]
val counterSink = Sink.head[Int] val counterSink = Sink.head[Int]
@ -68,7 +69,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
counterFuture.futureValue shouldEqual 6 counterFuture.futureValue shouldEqual 6
} }
"allow external triggering of onError" in Utils.assertAllStagesStopped { "allow external triggering of onError" in assertAllStagesStopped {
val neverSource = Source.maybe[Int] val neverSource = Source.maybe[Int]
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 } val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
@ -80,7 +81,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout {
counterFuture.failed.futureValue.getMessage should include("Boom") counterFuture.failed.futureValue.getMessage should include("Boom")
} }
"complete materialized future when materializer is shutdown" in Utils.assertAllStagesStopped { "complete materialized future when materializer is shutdown" in assertAllStagesStopped {
val mat = ActorMaterializer() val mat = ActorMaterializer()
val neverSource = Source.maybe[Int] val neverSource = Source.maybe[Int]
val pubSink = Sink.asPublisher[Int](false) val pubSink = Sink.asPublisher[Int](false)

View file

@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.{ ClosedShape, ActorMaterializer }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await

View file

@ -9,6 +9,7 @@ import akka.pattern.pipe
import akka.stream.Attributes.inputBuffer import akka.stream.Attributes.inputBuffer
import akka.stream.{ ActorMaterializer, StreamDetachedException } import akka.stream.{ ActorMaterializer, StreamDetachedException }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import scala.concurrent.{ Await, Promise } import scala.concurrent.{ Await, Promise }

View file

@ -10,6 +10,7 @@ import akka.pattern.pipe
import akka.stream._ import akka.stream._
import akka.stream.impl.QueueSource import akka.stream.impl.QueueSource
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber } import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber }
import akka.testkit.TestProbe import akka.testkit.TestProbe

View file

@ -8,8 +8,9 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy } import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy }
import akka.testkit.{ DefaultTimeout, TestDuration } import akka.testkit.{ DefaultTimeout, TestDuration }
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }

View file

@ -11,6 +11,7 @@ import akka.stream._
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSource
import akka.util.ByteString import akka.util.ByteString
@ -43,13 +44,13 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
javaSource.count() should ===(0) javaSource.count() should ===(0)
} }
"work with endless stream" in Utils.assertAllStagesStopped { "work with endless stream" in assertAllStagesStopped {
val javaSource = Source.repeat(1).runWith(StreamConverters.asJavaStream()) val javaSource = Source.repeat(1).runWith(StreamConverters.asJavaStream())
javaSource.limit(10).count() should ===(10) javaSource.limit(10).count() should ===(10)
javaSource.close() javaSource.close()
} }
"allow overriding the dispatcher using Attributes" in Utils.assertAllStagesStopped { "allow overriding the dispatcher using Attributes" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys) val materializer = ActorMaterializer()(sys)
@ -62,7 +63,7 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
} finally shutdown(sys) } finally shutdown(sys)
} }
"work in separate IO dispatcher" in Utils.assertAllStagesStopped { "work in separate IO dispatcher" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys) val materializer = ActorMaterializer()(sys)

Some files were not shown because too many files have changed in this diff Show more