diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index b1899bbeb6..18133d2714 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -181,8 +181,11 @@ object TestSubscriber { /** * Implementation of [[org.reactivestreams.Subscriber]] that allows various assertions. + * + * All timeouts are dilated automatically, for more details about time dilation refer to [[akka.testkit.TestKit]]. */ class ManualProbe[I] private[TestSubscriber] ()(implicit system: ActorSystem) extends Subscriber[I] { + import akka.testkit._ type Self <: ManualProbe[I] @@ -206,12 +209,18 @@ object TestSubscriber { def expectEvent(): SubscriberEvent = probe.expectMsgType[SubscriberEvent] + /** + * Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`). + */ + def expectEvent(max: FiniteDuration): SubscriberEvent = + probe.expectMsgType[SubscriberEvent](max.dilated) + /** * Fluent DSL * * Expect [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`). */ - def expectEvent(event: SubscriberEvent): Self = { // TODO it's more "signal" than event, shall we rename? -- ktoso + def expectEvent(event: SubscriberEvent): Self = { probe.expectMsg(event) self } @@ -492,20 +501,14 @@ object TestSubscriber { val deadline = Deadline.now + atMost val b = immutable.Seq.newBuilder[I] - def checkDeadline(): Unit = { - if (deadline.isOverdue()) - throw new TimeoutException(s"toStrict did not drain the stream within $atMost! Accumulated elements: ${b.result()}") - } - @tailrec def drain(): immutable.Seq[I] = - self.expectEvent() match { + self.expectEvent(deadline.timeLeft) match { case OnError(ex) ⇒ - throw new AssertionError(s"toStrict received OnError($ex) while draining stream! Accumulated elements: ${b.result()}") + // TODO once on JDK7+ this could be made an AssertionError, since it can carry ex in its cause param + throw new AssertionError(s"toStrict received OnError(${ex.getMessage}) while draining stream! Accumulated elements: ${b.result()}") case OnComplete ⇒ - checkDeadline() b.result() case OnNext(i: I @unchecked) ⇒ - checkDeadline() b += i drain() } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala index d5fbedec32..3a941e6705 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala @@ -14,6 +14,6 @@ object TestSink { * A Sink that materialized to a [[TestSubscriber.Probe]]. */ def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]] = - new Sink(scaladsl.TestSink.probe[T]()(system)) + new Sink(scaladsl.TestSink.probe[T](system)) } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala index c1f0cf99e1..5fe07a7a5b 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala @@ -14,6 +14,6 @@ object TestSource { * A Source that materializes to a [[TestPublisher.Probe]]. */ def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]] = - new Source(scaladsl.TestSource.probe[T]()(system)) + new Source(scaladsl.TestSource.probe[T](system)) } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala index a50faf7186..495b4ae58d 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala @@ -18,7 +18,7 @@ object TestSink { /** * A Sink that materialized to a [[TestSubscriber.Probe]]. */ - def probe[T]()(implicit system: ActorSystem): Sink[T, Probe[T]] = + def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] = new Sink[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(Inlet("ProbeSink.in")))) } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala index 2fa3b42703..55c21439fd 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala @@ -19,6 +19,6 @@ object TestSource { /** * A Source that materializes to a [[TestPublisher.Probe]]. */ - def probe[T]()(implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) + def probe[T](implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala index 09c43bb6a8..702689468d 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala @@ -17,7 +17,7 @@ class StreamTestKitSpec extends AkkaSpec { "A TestSink Probe" must { "#toStrict" in { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .toStrict(300.millis) should ===(List(1, 2, 3, 4)) } @@ -33,7 +33,7 @@ class StreamTestKitSpec extends AkkaSpec { case n ⇒ n } } - }).runWith(TestSink.probe()) + }).runWith(TestSink.probe) .toStrict(300.millis) }.getMessage @@ -42,40 +42,40 @@ class StreamTestKitSpec extends AkkaSpec { } "#toStrict when subscription was already obtained" in { - val p = Source(1 to 4).runWith(TestSink.probe()) + val p = Source(1 to 4).runWith(TestSink.probe) p.expectSubscription() p.toStrict(300.millis) should ===(List(1, 2, 3, 4)) } "#expectNextOrError with right element" in { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(4) .expectNextOrError(1, ex) } "#expectNextOrError with right exception" in { - Source.failed[Int](ex).runWith(TestSink.probe()) + Source.failed[Int](ex).runWith(TestSink.probe) .request(4) .expectNextOrError(1, ex) } "#expectNextOrError fail if the next element is not the expected one" in { intercept[AssertionError] { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(4) .expectNextOrError(100, ex) }.getMessage should include("OnNext(1)") } "#expectError" in { - Source.failed[Int](ex).runWith(TestSink.probe()) + Source.failed[Int](ex).runWith(TestSink.probe) .request(1) .expectError() should ===(ex) } "#expectError fail if no error signalled" in { intercept[AssertionError] { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(1) .expectError() }.getMessage should include("OnNext") @@ -83,7 +83,7 @@ class StreamTestKitSpec extends AkkaSpec { "#expectComplete should fail if error signalled" in { intercept[AssertionError] { - Source.failed[Int](ex).runWith(TestSink.probe()) + Source.failed[Int](ex).runWith(TestSink.probe) .request(1) .expectComplete() }.getMessage should include("OnError") @@ -91,35 +91,35 @@ class StreamTestKitSpec extends AkkaSpec { "#expectComplete should fail if next element signalled" in { intercept[AssertionError] { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(1) .expectComplete() }.getMessage should include("OnNext") } "#expectNextOrComplete with right element" in { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(4) .expectNextOrComplete(1) } "#expectNextOrComplete with completion" in { - Source.single(1).runWith(TestSink.probe()) + Source.single(1).runWith(TestSink.probe) .request(4) .expectNextOrComplete(1) .expectNextOrComplete(1337) } "#expectNextN given a number of elements" in { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(4) .expectNextN(4) should ===(List(1, 2, 3, 4)) } "#expectNextN given specific elements" in { - Source(1 to 4).runWith(TestSink.probe()) + Source(1 to 4).runWith(TestSink.probe) .request(4) .expectNextN(4) should ===(List(1, 2, 3, 4)) } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index fe2d3e38a4..b1936e51c1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -172,7 +172,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - val p = SynchronousFileSource(manyLines).runWith(TestSink.probe())(mat) + val p = SynchronousFileSource(manyLines).runWith(TestSink.probe)(mat) mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get @@ -188,7 +188,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { try { val p = SynchronousFileSource(manyLines) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) - .runWith(TestSink.probe())(mat) + .runWith(TestSink.probe)(mat) mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index dbe83b5499..e482d80c04 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -4,14 +4,14 @@ package akka.stream.scaladsl import akka.stream.testkit.scaladsl.TestSink -import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision} +import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision } import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ThreadLocalRandom.{current => random} +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowScanSpec extends AkkaSpec { @@ -41,8 +41,8 @@ class FlowScanSpec extends AkkaSpec { } "emit values promptly" in { - Source.single(1).concat(Source.lazyEmpty).scan(0)(_ + _).grouped(2).runWith(TestSink.probe()) - .toStrict(1.second) should ===(Seq(0, 1)) + val f = Source.single(1).concat(Source.lazyEmpty).scan(0)(_ + _).grouped(2).runWith(Sink.head) + Await.result(f, 1.second) should ===(Seq(0, 1)) } "fail properly" in { @@ -51,7 +51,7 @@ class FlowScanSpec extends AkkaSpec { require(current > 0) old + current }.withAttributes(supervisionStrategy(Supervision.restartingDecider)) - Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe()) + Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe) .toStrict(1.second) should ===(Seq(0, 1, 4, 0, 5, 12)) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index 98d5e8192e..66200d80d0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -300,7 +300,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug emit(Iterator(elem, elem), ctx) } }) - .runWith(TestSink.probe[Int]()) + .runWith(TestSink.probe[Int]) .request(1000) .expectNext(1) .cancel() @@ -444,7 +444,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug terminationEmit(Iterator("byebye"), ctx) } }) - .runWith(TestSink.probe[String]()) + .runWith(TestSink.probe[String]) .request(1) .expectNext("hi1") .request(2)