CircuitBreakerMTSpec simplificaitons (#31153)

This commit is contained in:
Renato Cavalcanti 2022-02-21 17:45:02 +01:00 committed by GitHub
parent 2050667585
commit 61e44c3bad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,9 +5,10 @@
package akka.pattern package akka.pattern
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.{ Await, Future }
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.Future
import akka.testkit._ import akka.testkit._
@ -18,7 +19,6 @@ class CircuitBreakerMTSpec extends AkkaSpec {
val resetTimeout = 3.seconds.dilated val resetTimeout = 3.seconds.dilated
val maxFailures = 5 val maxFailures = 5
def newBreaker = new CircuitBreaker(system.scheduler, maxFailures, callTimeout, resetTimeout) def newBreaker = new CircuitBreaker(system.scheduler, maxFailures, callTimeout, resetTimeout)
val numberOfTestCalls = 100
def openBreaker(breaker: CircuitBreaker): Unit = { def openBreaker(breaker: CircuitBreaker): Unit = {
// returns true if the breaker is open // returns true if the breaker is open
@ -36,52 +36,69 @@ class CircuitBreakerMTSpec extends AkkaSpec {
awaitCond(failingCall()) awaitCond(failingCall())
} }
def testCallsWithBreaker(breaker: CircuitBreaker): immutable.IndexedSeq[Future[String]] = { def testCallsWithBreaker(breaker: CircuitBreaker, numberOfCalls: Int): immutable.IndexedSeq[Future[String]] = {
val aFewActive = new TestLatch(5) for (_ <- 1 to numberOfCalls)
for (_ <- 1 to numberOfTestCalls) yield makeCallWithBreaker(breaker)
yield breaker
.withCircuitBreaker(Future {
aFewActive.countDown()
Await.ready(aFewActive, 5.seconds.dilated)
"succeed"
})
.recoverWith {
case _: CircuitBreakerOpenException =>
aFewActive.countDown()
Future.successful("CBO")
}
} }
def makeCallWithBreaker(breaker: CircuitBreaker): Future[String] =
breaker.withCircuitBreaker(Future.successful("succeed")).recoverWith {
case _: CircuitBreakerOpenException =>
Future.successful("CBO")
}
"allow many calls while in closed state with no errors" in { "allow many calls while in closed state with no errors" in {
val futures = testCallsWithBreaker(newBreaker) val futures = testCallsWithBreaker(newBreaker, 10)
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size should ===(numberOfTestCalls) result.size should ===(10)
result.toSet should ===(Set("succeed")) result.toSet should ===(Set("succeed"))
} }
"transition to open state upon reaching failure limit and fail-fast" in { "transition to open state upon reaching failure limit and fail-fast" in {
val breaker = newBreaker val breaker = newBreaker
openBreaker(breaker) openBreaker(breaker)
val futures = testCallsWithBreaker(breaker)
val result = Await.result(Future.sequence(futures), 5.second.dilated) breaker.isOpen shouldBe true
result.size should ===(numberOfTestCalls) val call = makeCallWithBreaker(breaker)
result.toSet should ===(Set("CBO")) val result = Await.result(call, 5.second.dilated)
result shouldBe "CBO"
} }
"allow a single call through in half-open state" in { "allow a single call through in half-open state" in {
val breaker = newBreaker val breaker = newBreaker
val halfOpenLatch = new TestLatch(1) val halfOpenLatch = new TestLatch(1)
breaker.onHalfOpen(halfOpenLatch.countDown()) breaker.onHalfOpen(halfOpenLatch.countDown())
openBreaker(breaker) openBreaker(breaker)
// breaker should become half-open after a while // breaker should become half-open after a while
Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated)
val futures = testCallsWithBreaker(breaker) breaker.isHalfOpen shouldBe true
val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size should ===(numberOfTestCalls) val latch = new TestLatch(1)
result.toSet should ===(Set("succeed", "CBO")) val firstCall =
breaker.withCircuitBreaker(Future {
// this call closes the CB,
// but only after next call fails and touches the latch
Await.ready(latch, 5.seconds)
"succeed"
})
val secondCall =
breaker.withCircuitBreaker(Future.successful("this should have failed")).recoverWith {
case _: CircuitBreakerOpenException =>
latch.countDown()
Future.successful("CBO")
}
val firstResult = Await.result(firstCall, 5.second.dilated)
firstResult shouldBe "succeed"
val secondResult = Await.result(secondCall, 5.second.dilated)
secondResult shouldBe "CBO"
breaker.isClosed shouldBe true
} }
"recover and reset the breaker after the reset timeout" in { "recover and reset the breaker after the reset timeout" in {
@ -97,13 +114,14 @@ class CircuitBreakerMTSpec extends AkkaSpec {
// one successful call should close the latch // one successful call should close the latch
val closedLatch = new TestLatch(1) val closedLatch = new TestLatch(1)
breaker.onClose(closedLatch.countDown()) breaker.onClose(closedLatch.countDown())
breaker.withCircuitBreaker(Future("succeed")) breaker.withCircuitBreaker(Future.successful("succeed"))
Await.ready(closedLatch, 5.seconds.dilated) Await.ready(closedLatch, 5.seconds.dilated)
val futures = testCallsWithBreaker(breaker) breaker.isClosed shouldBe true
val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size should ===(numberOfTestCalls) val call = makeCallWithBreaker(breaker)
result.toSet should ===(Set("succeed")) val result = Await.result(call, 5.second.dilated)
result shouldBe "succeed"
} }
} }
} }