=test Add BeenCalledTimesGate in tests to make sure some method should only be called specified times.
This commit is contained in:
parent
b37b5cde89
commit
e784cd7c99
1 changed files with 102 additions and 10 deletions
|
|
@ -24,6 +24,7 @@ import pekko.stream.testkit.TestSubscriber
|
|||
import pekko.stream.testkit.scaladsl.TestSink
|
||||
import pekko.stream.testkit.scaladsl.TestSource
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import scala.annotation.nowarn
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.Promise
|
||||
|
|
@ -35,12 +36,37 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
|
||||
val ex = new Exception("TEST") with NoStackTrace
|
||||
|
||||
object BeenCalledTimesGate {
|
||||
def apply(): BeenCalledTimesGate = new BeenCalledTimesGate(1)
|
||||
|
||||
def apply(nTimes: Int): BeenCalledTimesGate = new BeenCalledTimesGate(nTimes)
|
||||
}
|
||||
|
||||
class BeenCalledTimesGate(nTimes: Int) {
|
||||
private val beenCalled = new AtomicInteger(0)
|
||||
|
||||
def mark(): Unit = beenCalled.updateAndGet(current => {
|
||||
if (current == nTimes) {
|
||||
throw new IllegalStateException(s"Has been called:[$nTimes] times, should not be called anymore.")
|
||||
} else current + 1
|
||||
})
|
||||
|
||||
def ensure(): Unit = if (beenCalled.get() != nTimes) {
|
||||
throw new IllegalStateException(s"Expected to be called:[$nTimes], but only be called:[$beenCalled]")
|
||||
}
|
||||
}
|
||||
|
||||
"A StatefulMap" must {
|
||||
"work in the happy case" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val sinkProb = Source(List(1, 2, 3, 4, 5))
|
||||
.statefulMap(() => 0)((agg, elem) => {
|
||||
(agg + elem, (agg, elem))
|
||||
}, _ => None)
|
||||
},
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.runWith(TestSink.probe[(Int, Int)])
|
||||
sinkProb.expectSubscription().request(6)
|
||||
sinkProb
|
||||
|
|
@ -50,9 +76,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
.expectNext((6, 4))
|
||||
.expectNext((10, 5))
|
||||
.expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"can remember the state when complete" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val sinkProb = Source(1 to 10)
|
||||
.statefulMap(() => List.empty[Int])(
|
||||
(state, elem) => {
|
||||
|
|
@ -63,64 +91,91 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
else
|
||||
(newState, Nil)
|
||||
},
|
||||
state => Some(state.reverse))
|
||||
state => {
|
||||
gate.mark()
|
||||
Some(state.reverse)
|
||||
})
|
||||
.mapConcat(identity)
|
||||
.runWith(TestSink.probe[Int])
|
||||
sinkProb.expectSubscription().request(10)
|
||||
sinkProb.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"be able to resume" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val testSink = Source(List(1, 2, 3, 4, 5))
|
||||
.statefulMap(() => 0)((agg, elem) => {
|
||||
if (elem % 2 == 0)
|
||||
throw ex
|
||||
else
|
||||
(agg + elem, (agg, elem))
|
||||
}, _ => None)
|
||||
},
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
|
||||
.runWith(TestSink.probe[(Int, Int)])
|
||||
|
||||
testSink.expectSubscription().request(5)
|
||||
testSink.expectNext((0, 1)).expectNext((1, 3)).expectNext((4, 5)).expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"be able to restart" in {
|
||||
val gate = BeenCalledTimesGate(2)
|
||||
val testSink = Source(List(1, 2, 3, 4, 5))
|
||||
.statefulMap(() => 0)((agg, elem) => {
|
||||
if (elem % 3 == 0)
|
||||
throw ex
|
||||
else
|
||||
(agg + elem, (agg, elem))
|
||||
}, _ => None)
|
||||
},
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
|
||||
.runWith(TestSink.probe[(Int, Int)])
|
||||
|
||||
testSink.expectSubscription().request(5)
|
||||
testSink.expectNext((0, 1)).expectNext((1, 2)).expectNext((0, 4)).expectNext((4, 5)).expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"be able to stop" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val testSink = Source(List(1, 2, 3, 4, 5))
|
||||
.statefulMap(() => 0)((agg, elem) => {
|
||||
if (elem % 3 == 0)
|
||||
throw ex
|
||||
else
|
||||
(agg + elem, (agg, elem))
|
||||
}, _ => None)
|
||||
},
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
|
||||
.runWith(TestSink.probe[(Int, Int)])
|
||||
|
||||
testSink.expectSubscription().request(5)
|
||||
testSink.expectNext((0, 1)).expectNext((1, 2)).expectError(ex)
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"fail on upstream failure" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val (testSource, testSink) = TestSource
|
||||
.probe[Int]
|
||||
.statefulMap(() => 0)((agg, elem) => {
|
||||
(agg + elem, (agg, elem))
|
||||
}, _ => None)
|
||||
},
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.toMat(TestSink.probe[(Int, Int)])(Keep.both)
|
||||
.run()
|
||||
|
||||
|
|
@ -135,12 +190,20 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
testSink.expectNext((6, 4))
|
||||
testSource.sendError(ex)
|
||||
testSink.expectError(ex)
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"defer upstream failure and remember state" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val (testSource, testSink) = TestSource
|
||||
.probe[Int]
|
||||
.statefulMap(() => 0)((agg, elem) => { (agg + elem, (agg, elem)) }, (state: Int) => Some((state, -1)))
|
||||
.statefulMap(() => 0)((agg, elem) => {
|
||||
(agg + elem, (agg, elem))
|
||||
},
|
||||
(state: Int) => {
|
||||
gate.mark()
|
||||
Some((state, -1))
|
||||
})
|
||||
.toMat(TestSink.probe[(Int, Int)])(Keep.both)
|
||||
.run()
|
||||
|
||||
|
|
@ -156,9 +219,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
testSource.sendError(ex)
|
||||
testSink.expectNext((10, -1))
|
||||
testSink.expectError(ex)
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"cancel upstream when downstream cancel" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val promise = Promise[Done]()
|
||||
val testSource = TestSource
|
||||
.probe[Int]
|
||||
|
|
@ -166,6 +231,7 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
(agg + elem, (agg, elem))
|
||||
},
|
||||
(state: Int) => {
|
||||
gate.mark()
|
||||
promise.complete(Success(Done))
|
||||
Some((state, -1))
|
||||
})
|
||||
|
|
@ -173,9 +239,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
.run()
|
||||
testSource.expectSubscription().expectCancellation()
|
||||
Await.result(promise.future, 3.seconds) shouldBe Done
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"cancel upstream when downstream fail" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val promise = Promise[Done]()
|
||||
val testProb = TestSubscriber.probe[(Int, Int)]()
|
||||
val testSource = TestSource
|
||||
|
|
@ -184,6 +252,7 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
(agg + elem, (agg, elem))
|
||||
},
|
||||
(state: Int) => {
|
||||
gate.mark()
|
||||
promise.complete(Success(Done))
|
||||
Some((state, -1))
|
||||
})
|
||||
|
|
@ -192,9 +261,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
testProb.cancel(ex)
|
||||
testSource.expectCancellationWithCause(ex)
|
||||
Await.result(promise.future, 3.seconds) shouldBe Done
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"call its onComplete callback on abrupt materializer termination" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
@nowarn("msg=deprecated")
|
||||
val mat = ActorMaterializer()
|
||||
val promise = Promise[Done]()
|
||||
|
|
@ -203,6 +274,7 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
.single(1)
|
||||
.statefulMap(() => -1)((_, elem) => (elem, elem),
|
||||
_ => {
|
||||
gate.mark()
|
||||
promise.complete(Success(Done))
|
||||
None
|
||||
})
|
||||
|
|
@ -210,9 +282,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
mat.shutdown()
|
||||
matVal.failed.futureValue shouldBe a[AbruptStageTerminationException]
|
||||
Await.result(promise.future, 3.seconds) shouldBe Done
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"call its onComplete callback when stop" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val promise = Promise[Done]()
|
||||
Source
|
||||
.single(1)
|
||||
|
|
@ -221,16 +295,23 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
(elem, elem)
|
||||
},
|
||||
_ => {
|
||||
gate.mark()
|
||||
promise.complete(Success(Done))
|
||||
None
|
||||
})
|
||||
.runWith(Sink.ignore)
|
||||
Await.result(promise.future, 3.seconds) shouldBe Done
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"be able to be used as zipWithIndex" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
Source(List("A", "B", "C", "D"))
|
||||
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
|
||||
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)),
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.runWith(TestSink.probe[(String, Long)])
|
||||
.request(4)
|
||||
.expectNext(("A", 0L))
|
||||
|
|
@ -238,9 +319,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
.expectNext(("C", 2L))
|
||||
.expectNext(("D", 3L))
|
||||
.expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"be able to be used as bufferUntilChanged" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
val sink = TestSink.probe[List[String]]
|
||||
Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
|
||||
.statefulMap(() => List.empty[String])(
|
||||
|
|
@ -249,7 +332,10 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
case head :: _ if head != elem => (elem :: Nil, buffer)
|
||||
case _ => (elem :: buffer, Nil)
|
||||
},
|
||||
buffer => Some(buffer))
|
||||
buffer => {
|
||||
gate.mark()
|
||||
Some(buffer)
|
||||
})
|
||||
.filter(_.nonEmpty)
|
||||
.alsoTo(Sink.foreach(println))
|
||||
.runWith(sink)
|
||||
|
|
@ -259,9 +345,11 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
.expectNext(List("C", "C", "C"))
|
||||
.expectNext(List("D"))
|
||||
.expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
|
||||
"be able to be used as distinctUntilChanged" in {
|
||||
val gate = BeenCalledTimesGate()
|
||||
Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
|
||||
.statefulMap(() => Option.empty[String])(
|
||||
(lastElement, elem) =>
|
||||
|
|
@ -269,7 +357,10 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
case Some(head) if head == elem => (Some(elem), None)
|
||||
case _ => (Some(elem), Some(elem))
|
||||
},
|
||||
_ => None)
|
||||
_ => {
|
||||
gate.mark()
|
||||
None
|
||||
})
|
||||
.collect { case Some(elem) => elem }
|
||||
.runWith(TestSink.probe[String])
|
||||
.request(4)
|
||||
|
|
@ -278,6 +369,7 @@ class FlowStatefulMapSpec extends StreamSpec {
|
|||
.expectNext("C")
|
||||
.expectNext("D")
|
||||
.expectComplete()
|
||||
gate.ensure()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue