diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 663bfeecac..4717f10ba3 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -784,7 +784,7 @@ private[testkit] object StreamTestKit { def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this) def expectRequest(n: Long): Unit = publisherProbe.expectMsg(RequestMore(this, n)) - def expectRequest(): Long = publisherProbe.expectMsgPF() { + def expectRequest(): Long = publisherProbe.expectMsgPF(hint = "expecting request() signal") { case RequestMore(sub, n) if sub eq this ⇒ n } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index ccbfddc9b2..3793a8cf1f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -11,6 +11,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.testkit.DefaultTimeout +import akka.testkit.TestDuration import scala.concurrent.Promise import scala.concurrent.duration._ @@ -21,10 +22,15 @@ class RestartSpec extends StreamSpec with DefaultTimeout { implicit val mat = ActorMaterializer() import system.dispatcher + private val shortMinBackoff = 10.millis + private val shortMaxBackoff = 20.millis + private val minBackoff = 1.second.dilated + private val maxBackoff = 3.seconds.dilated + "A restart with backoff source" should { "run normally" in assertAllStagesStopped { val created = new AtomicInteger() - val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒ + val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Source.repeat("a") }.runWith(TestSink.probe) @@ -42,7 +48,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "restart on completion" in assertAllStagesStopped { val created = new AtomicInteger() - val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒ + val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Source(List("a", "b")) }.runWith(TestSink.probe) @@ -60,7 +66,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "restart on failure" in assertAllStagesStopped { val created = new AtomicInteger() - val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒ + val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Source(List("a", "b", "c")) .map { @@ -82,18 +88,19 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "backoff before restart" in assertAllStagesStopped { val created = new AtomicInteger() - val probe = RestartSource.withBackoff(200.millis, 1.second, 0) { () ⇒ + val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Source(List("a", "b")) }.runWith(TestSink.probe) probe.requestNext("a") probe.requestNext("b") + + // There should be a delay of at least minBackoff before we receive the element after restart + val deadline = (minBackoff - 1.millis).fromNow probe.request(1) - // There should be a delay of at least 200ms before we receive the element, wait for 100ms. - val deadline = 100.millis.fromNow - // But the delay shouldn't be more than 300ms. - probe.expectNext(300.milliseconds, "a") + + probe.expectNext("a") deadline.isOverdue() should be(true) created.get() should ===(2) @@ -103,29 +110,29 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped { val created = new AtomicInteger() - val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Source(List("a", "b")) }.runWith(TestSink.probe) probe.requestNext("a") probe.requestNext("b") - // There should be a 200ms delay + // There should be minBackoff delay probe.requestNext("a") probe.requestNext("b") probe.request(1) - // The probe should now be backing off for 400ms + // The probe should now be backing off again with with increased backoff - // Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the - // subsequent 200ms min backoff to pass, so it resets the restart count - Thread.sleep(700) + // Now wait for the delay to pass, then it will start the new source, we also want to wait for the + // subsequent backoff to pass, so it resets the restart count + Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis) probe.expectNext("a") probe.requestNext("b") - // We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the - // next element within 300ms - probe.requestNext(300.milliseconds) should ===("a") + // We should have reset, so the restart delay should be back, ie we should receive the + // next element within < 2 * minBackoff + probe.requestNext(2 * minBackoff - 10.milliseconds) should ===("a") created.get() should ===(4) @@ -135,7 +142,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "cancel the currently running source when cancelled" in assertAllStagesStopped { val created = new AtomicInteger() val promise = Promise[Done]() - val probe = RestartSource.withBackoff(10.millis, 2.seconds, 0) { () ⇒ + val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Source.repeat("a").watchTermination() { (_, term) ⇒ promise.completeWith(term) @@ -154,7 +161,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "not restart the source when cancelled while backing off" in assertAllStagesStopped { val created = new AtomicInteger() - val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Source.single("a") }.runWith(TestSink.probe) @@ -165,7 +172,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { probe.cancel() // Wait to ensure it isn't restarted - Thread.sleep(300) + Thread.sleep((minBackoff + 100.millis).toMillis) created.get() should ===(1) } } @@ -174,7 +181,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "run normally" in assertAllStagesStopped { val created = new AtomicInteger() val result = Promise[Seq[String]]() - val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { () ⇒ + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Sink.seq.mapMaterializedValue(result.completeWith) })(Keep.left).run() @@ -191,7 +198,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "restart on cancellation" in assertAllStagesStopped { val created = new AtomicInteger() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() - val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { () ⇒ + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Flow[String].takeWhile(_ != "cancel", inclusive = true) .to(Sink.foreach(queue.sendNext)) @@ -215,7 +222,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "backoff before restart" in assertAllStagesStopped { val created = new AtomicInteger() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() - val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Flow[String].takeWhile(_ != "cancel", inclusive = true) .to(Sink.foreach(queue.sendNext)) @@ -226,9 +233,9 @@ class RestartSpec extends StreamSpec with DefaultTimeout { probe.sendNext("cancel") sinkProbe.requestNext("cancel") probe.sendNext("b") + val deadline = (minBackoff - 1.millis).fromNow sinkProbe.request(1) - val deadline = 100.millis.fromNow - sinkProbe.expectNext(300.millis, "b") + sinkProbe.expectNext("b") deadline.isOverdue() should be(true) created.get() should ===(2) @@ -240,7 +247,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "reset exponential backoff back to minimum when sink runs for at least minimum backoff without completing" in assertAllStagesStopped { val created = new AtomicInteger() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() - val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Flow[String].takeWhile(_ != "cancel", inclusive = true) .to(Sink.foreach(queue.sendNext)) @@ -250,26 +257,26 @@ class RestartSpec extends StreamSpec with DefaultTimeout { sinkProbe.requestNext("a") probe.sendNext("cancel") sinkProbe.requestNext("cancel") - // There should be a 200ms delay + // There should be a minBackoff delay probe.sendNext("b") sinkProbe.requestNext("b") probe.sendNext("cancel") sinkProbe.requestNext("cancel") sinkProbe.request(1) - // The probe should now be backing off for 400ms + // The probe should now be backing off for 2 * minBackoff - // Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the - // subsequent 200ms min backoff to pass, so it resets the restart count - Thread.sleep(700) + // Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the + // subsequent minBackoff min backoff to pass, so it resets the restart count + Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis) probe.sendNext("cancel") sinkProbe.requestNext("cancel") - // We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the - // next element within 300ms + // We should have reset, so the restart delay should be back to minBackoff, ie we should definitely receive the + // next element within < 2 * minBackoff probe.sendNext("c") sinkProbe.request(1) - sinkProbe.expectNext(300.milliseconds, "c") + sinkProbe.expectNext((2 * minBackoff) - 10.millis, "c") created.get() should ===(4) @@ -280,7 +287,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "not restart the sink when completed while backing off" in assertAllStagesStopped { val created = new AtomicInteger() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() - val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Flow[String].takeWhile(_ != "cancel", inclusive = true) .to(Sink.foreach(queue.sendNext)) @@ -294,7 +301,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { probe.sendComplete() // Wait to ensure it isn't restarted - Thread.sleep(300) + Thread.sleep((minBackoff + 100.millis).toMillis) created.get() should ===(1) sinkProbe.cancel() @@ -313,19 +320,22 @@ class RestartSpec extends StreamSpec with DefaultTimeout { val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ created.incrementAndGet() Flow.fromSinkAndSource( - Flow[String].takeWhile(_ != "cancel").to(Sink.foreach(flowInSource.sendNext).mapMaterializedValue(_.onComplete { - case Success(_) ⇒ flowInSource.sendNext("in complete") - case Failure(_) ⇒ flowInSource.sendNext("in error") - })), - flowOutSource.takeWhile(_ != "complete").map { - case "error" ⇒ throw TE("error") - case other ⇒ other - }.watchTermination()((_, term) ⇒ - term.foreach(_ ⇒ { - flowInSource.sendNext("out complete") - }) - ) - ) + Flow[String] + .takeWhile(_ != "cancel") + .to(Sink.foreach(flowInSource.sendNext) + .mapMaterializedValue(_.onComplete { + case Success(_) ⇒ flowInSource.sendNext("in complete") + case Failure(_) ⇒ flowInSource.sendNext("in error") + })), + flowOutSource + .takeWhile(_ != "complete") + .map { + case "error" ⇒ throw TE("error") + case other ⇒ other + }.watchTermination()((_, term) ⇒ + term.foreach(_ ⇒ { + flowInSource.sendNext("out complete") + }))) })(Keep.left).toMat(TestSink.probe[String])(Keep.both).run() (created, source, flowInProbe, flowOutProbe, sink) @@ -333,7 +343,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { "run normally" in assertAllStagesStopped { val created = new AtomicInteger() - val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(10.millis, 20.millis, 0) { () ⇒ + val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ created.incrementAndGet() Flow[String] })(Keep.left).toMat(TestSink.probe[String])(Keep.both).run() @@ -349,7 +359,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { } "restart on cancellation" in { - val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis) + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff) source.sendNext("a") flowInProbe.requestNext("a") @@ -371,7 +381,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { } "restart on completion" in { - val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis) + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff) source.sendNext("a") flowInProbe.requestNext("a") @@ -395,7 +405,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { } "restart on failure" in { - val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis) + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff) source.sendNext("a") flowInProbe.requestNext("a") @@ -418,7 +428,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { } "backoff before restart" in { - val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(200.millis, 2.seconds) + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(minBackoff, maxBackoff) source.sendNext("a") flowInProbe.requestNext("a") @@ -431,16 +441,16 @@ class RestartSpec extends StreamSpec with DefaultTimeout { Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete") source.sendNext("c") + val deadline = (minBackoff - 1.millis).fromNow flowInProbe.request(1) - val deadline = 100.millis.fromNow - flowInProbe.expectNext(300.millis, "c") + flowInProbe.expectNext("c") deadline.isOverdue() should be(true) created.get() should ===(2) } "continue running flow out port after in has been sent completion" in { - val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds) + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, maxBackoff) source.sendNext("a") flowInProbe.requestNext("a") @@ -464,7 +474,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout { } "continue running flow in port after out has been cancelled" in { - val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds) + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, maxBackoff) source.sendNext("a") flowInProbe.requestNext("a")