parent
af5f84ddac
commit
a10b00ed02
2 changed files with 69 additions and 59 deletions
|
|
@ -784,7 +784,7 @@ private[testkit] object StreamTestKit {
|
|||
def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this)
|
||||
|
||||
def expectRequest(n: Long): Unit = publisherProbe.expectMsg(RequestMore(this, n))
|
||||
def expectRequest(): Long = publisherProbe.expectMsgPF() {
|
||||
def expectRequest(): Long = publisherProbe.expectMsgPF(hint = "expecting request() signal") {
|
||||
case RequestMore(sub, n) if sub eq this ⇒ n
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import akka.stream.testkit.StreamSpec
|
|||
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
|
||||
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.testkit.TestDuration
|
||||
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -21,10 +22,15 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
implicit val mat = ActorMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
private val shortMinBackoff = 10.millis
|
||||
private val shortMaxBackoff = 20.millis
|
||||
private val minBackoff = 1.second.dilated
|
||||
private val maxBackoff = 3.seconds.dilated
|
||||
|
||||
"A restart with backoff source" should {
|
||||
"run normally" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source.repeat("a")
|
||||
}.runWith(TestSink.probe)
|
||||
|
|
@ -42,7 +48,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"restart on completion" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source(List("a", "b"))
|
||||
}.runWith(TestSink.probe)
|
||||
|
|
@ -60,7 +66,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"restart on failure" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source(List("a", "b", "c"))
|
||||
.map {
|
||||
|
|
@ -82,18 +88,19 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"backoff before restart" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val probe = RestartSource.withBackoff(200.millis, 1.second, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source(List("a", "b"))
|
||||
}.runWith(TestSink.probe)
|
||||
|
||||
probe.requestNext("a")
|
||||
probe.requestNext("b")
|
||||
|
||||
// There should be a delay of at least minBackoff before we receive the element after restart
|
||||
val deadline = (minBackoff - 1.millis).fromNow
|
||||
probe.request(1)
|
||||
// There should be a delay of at least 200ms before we receive the element, wait for 100ms.
|
||||
val deadline = 100.millis.fromNow
|
||||
// But the delay shouldn't be more than 300ms.
|
||||
probe.expectNext(300.milliseconds, "a")
|
||||
|
||||
probe.expectNext("a")
|
||||
deadline.isOverdue() should be(true)
|
||||
|
||||
created.get() should ===(2)
|
||||
|
|
@ -103,29 +110,29 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source(List("a", "b"))
|
||||
}.runWith(TestSink.probe)
|
||||
|
||||
probe.requestNext("a")
|
||||
probe.requestNext("b")
|
||||
// There should be a 200ms delay
|
||||
// There should be minBackoff delay
|
||||
probe.requestNext("a")
|
||||
probe.requestNext("b")
|
||||
probe.request(1)
|
||||
// The probe should now be backing off for 400ms
|
||||
// The probe should now be backing off again with with increased backoff
|
||||
|
||||
// Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the
|
||||
// subsequent 200ms min backoff to pass, so it resets the restart count
|
||||
Thread.sleep(700)
|
||||
// Now wait for the delay to pass, then it will start the new source, we also want to wait for the
|
||||
// subsequent backoff to pass, so it resets the restart count
|
||||
Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis)
|
||||
|
||||
probe.expectNext("a")
|
||||
probe.requestNext("b")
|
||||
|
||||
// We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the
|
||||
// next element within 300ms
|
||||
probe.requestNext(300.milliseconds) should ===("a")
|
||||
// We should have reset, so the restart delay should be back, ie we should receive the
|
||||
// next element within < 2 * minBackoff
|
||||
probe.requestNext(2 * minBackoff - 10.milliseconds) should ===("a")
|
||||
|
||||
created.get() should ===(4)
|
||||
|
||||
|
|
@ -135,7 +142,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
"cancel the currently running source when cancelled" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val promise = Promise[Done]()
|
||||
val probe = RestartSource.withBackoff(10.millis, 2.seconds, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source.repeat("a").watchTermination() { (_, term) ⇒
|
||||
promise.completeWith(term)
|
||||
|
|
@ -154,7 +161,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"not restart the source when cancelled while backing off" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { () ⇒
|
||||
val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Source.single("a")
|
||||
}.runWith(TestSink.probe)
|
||||
|
|
@ -165,7 +172,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
probe.cancel()
|
||||
|
||||
// Wait to ensure it isn't restarted
|
||||
Thread.sleep(300)
|
||||
Thread.sleep((minBackoff + 100.millis).toMillis)
|
||||
created.get() should ===(1)
|
||||
}
|
||||
}
|
||||
|
|
@ -174,7 +181,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
"run normally" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val result = Promise[Seq[String]]()
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { () ⇒
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Sink.seq.mapMaterializedValue(result.completeWith)
|
||||
})(Keep.left).run()
|
||||
|
|
@ -191,7 +198,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
"restart on cancellation" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { () ⇒
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Flow[String].takeWhile(_ != "cancel", inclusive = true)
|
||||
.to(Sink.foreach(queue.sendNext))
|
||||
|
|
@ -215,7 +222,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
"backoff before restart" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Flow[String].takeWhile(_ != "cancel", inclusive = true)
|
||||
.to(Sink.foreach(queue.sendNext))
|
||||
|
|
@ -226,9 +233,9 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
probe.sendNext("cancel")
|
||||
sinkProbe.requestNext("cancel")
|
||||
probe.sendNext("b")
|
||||
val deadline = (minBackoff - 1.millis).fromNow
|
||||
sinkProbe.request(1)
|
||||
val deadline = 100.millis.fromNow
|
||||
sinkProbe.expectNext(300.millis, "b")
|
||||
sinkProbe.expectNext("b")
|
||||
deadline.isOverdue() should be(true)
|
||||
|
||||
created.get() should ===(2)
|
||||
|
|
@ -240,7 +247,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
"reset exponential backoff back to minimum when sink runs for at least minimum backoff without completing" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Flow[String].takeWhile(_ != "cancel", inclusive = true)
|
||||
.to(Sink.foreach(queue.sendNext))
|
||||
|
|
@ -250,26 +257,26 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
sinkProbe.requestNext("a")
|
||||
probe.sendNext("cancel")
|
||||
sinkProbe.requestNext("cancel")
|
||||
// There should be a 200ms delay
|
||||
// There should be a minBackoff delay
|
||||
probe.sendNext("b")
|
||||
sinkProbe.requestNext("b")
|
||||
probe.sendNext("cancel")
|
||||
sinkProbe.requestNext("cancel")
|
||||
sinkProbe.request(1)
|
||||
// The probe should now be backing off for 400ms
|
||||
// The probe should now be backing off for 2 * minBackoff
|
||||
|
||||
// Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the
|
||||
// subsequent 200ms min backoff to pass, so it resets the restart count
|
||||
Thread.sleep(700)
|
||||
// Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the
|
||||
// subsequent minBackoff min backoff to pass, so it resets the restart count
|
||||
Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis)
|
||||
|
||||
probe.sendNext("cancel")
|
||||
sinkProbe.requestNext("cancel")
|
||||
|
||||
// We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the
|
||||
// next element within 300ms
|
||||
// We should have reset, so the restart delay should be back to minBackoff, ie we should definitely receive the
|
||||
// next element within < 2 * minBackoff
|
||||
probe.sendNext("c")
|
||||
sinkProbe.request(1)
|
||||
sinkProbe.expectNext(300.milliseconds, "c")
|
||||
sinkProbe.expectNext((2 * minBackoff) - 10.millis, "c")
|
||||
|
||||
created.get() should ===(4)
|
||||
|
||||
|
|
@ -280,7 +287,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
"not restart the sink when completed while backing off" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒
|
||||
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Flow[String].takeWhile(_ != "cancel", inclusive = true)
|
||||
.to(Sink.foreach(queue.sendNext))
|
||||
|
|
@ -294,7 +301,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
probe.sendComplete()
|
||||
|
||||
// Wait to ensure it isn't restarted
|
||||
Thread.sleep(300)
|
||||
Thread.sleep((minBackoff + 100.millis).toMillis)
|
||||
created.get() should ===(1)
|
||||
|
||||
sinkProbe.cancel()
|
||||
|
|
@ -313,19 +320,22 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Flow.fromSinkAndSource(
|
||||
Flow[String].takeWhile(_ != "cancel").to(Sink.foreach(flowInSource.sendNext).mapMaterializedValue(_.onComplete {
|
||||
case Success(_) ⇒ flowInSource.sendNext("in complete")
|
||||
case Failure(_) ⇒ flowInSource.sendNext("in error")
|
||||
})),
|
||||
flowOutSource.takeWhile(_ != "complete").map {
|
||||
case "error" ⇒ throw TE("error")
|
||||
case other ⇒ other
|
||||
}.watchTermination()((_, term) ⇒
|
||||
term.foreach(_ ⇒ {
|
||||
flowInSource.sendNext("out complete")
|
||||
})
|
||||
)
|
||||
)
|
||||
Flow[String]
|
||||
.takeWhile(_ != "cancel")
|
||||
.to(Sink.foreach(flowInSource.sendNext)
|
||||
.mapMaterializedValue(_.onComplete {
|
||||
case Success(_) ⇒ flowInSource.sendNext("in complete")
|
||||
case Failure(_) ⇒ flowInSource.sendNext("in error")
|
||||
})),
|
||||
flowOutSource
|
||||
.takeWhile(_ != "complete")
|
||||
.map {
|
||||
case "error" ⇒ throw TE("error")
|
||||
case other ⇒ other
|
||||
}.watchTermination()((_, term) ⇒
|
||||
term.foreach(_ ⇒ {
|
||||
flowInSource.sendNext("out complete")
|
||||
})))
|
||||
})(Keep.left).toMat(TestSink.probe[String])(Keep.both).run()
|
||||
|
||||
(created, source, flowInProbe, flowOutProbe, sink)
|
||||
|
|
@ -333,7 +343,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"run normally" in assertAllStagesStopped {
|
||||
val created = new AtomicInteger()
|
||||
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(10.millis, 20.millis, 0) { () ⇒
|
||||
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒
|
||||
created.incrementAndGet()
|
||||
Flow[String]
|
||||
})(Keep.left).toMat(TestSink.probe[String])(Keep.both).run()
|
||||
|
|
@ -349,7 +359,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"restart on cancellation" in {
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis)
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff)
|
||||
|
||||
source.sendNext("a")
|
||||
flowInProbe.requestNext("a")
|
||||
|
|
@ -371,7 +381,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"restart on completion" in {
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis)
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff)
|
||||
|
||||
source.sendNext("a")
|
||||
flowInProbe.requestNext("a")
|
||||
|
|
@ -395,7 +405,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"restart on failure" in {
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis)
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff)
|
||||
|
||||
source.sendNext("a")
|
||||
flowInProbe.requestNext("a")
|
||||
|
|
@ -418,7 +428,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"backoff before restart" in {
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(200.millis, 2.seconds)
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(minBackoff, maxBackoff)
|
||||
|
||||
source.sendNext("a")
|
||||
flowInProbe.requestNext("a")
|
||||
|
|
@ -431,16 +441,16 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete")
|
||||
|
||||
source.sendNext("c")
|
||||
val deadline = (minBackoff - 1.millis).fromNow
|
||||
flowInProbe.request(1)
|
||||
val deadline = 100.millis.fromNow
|
||||
flowInProbe.expectNext(300.millis, "c")
|
||||
flowInProbe.expectNext("c")
|
||||
deadline.isOverdue() should be(true)
|
||||
|
||||
created.get() should ===(2)
|
||||
}
|
||||
|
||||
"continue running flow out port after in has been sent completion" in {
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds)
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, maxBackoff)
|
||||
|
||||
source.sendNext("a")
|
||||
flowInProbe.requestNext("a")
|
||||
|
|
@ -464,7 +474,7 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"continue running flow in port after out has been cancelled" in {
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds)
|
||||
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, maxBackoff)
|
||||
|
||||
source.sendNext("a")
|
||||
flowInProbe.requestNext("a")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue