From 61e44c3bad5f521f619c02c5877fd2ac6e971b16 Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Mon, 21 Feb 2022 17:45:02 +0100 Subject: [PATCH] CircuitBreakerMTSpec simplificaitons (#31153) --- .../akka/pattern/CircuitBreakerMTSpec.scala | 82 +++++++++++-------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala index f60e948d6a..30be6fd713 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -5,9 +5,10 @@ package akka.pattern import scala.collection.immutable -import scala.concurrent.{ Await, Future } import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.concurrent.Future import akka.testkit._ @@ -18,7 +19,6 @@ class CircuitBreakerMTSpec extends AkkaSpec { val resetTimeout = 3.seconds.dilated val maxFailures = 5 def newBreaker = new CircuitBreaker(system.scheduler, maxFailures, callTimeout, resetTimeout) - val numberOfTestCalls = 100 def openBreaker(breaker: CircuitBreaker): Unit = { // returns true if the breaker is open @@ -36,52 +36,69 @@ class CircuitBreakerMTSpec extends AkkaSpec { awaitCond(failingCall()) } - def testCallsWithBreaker(breaker: CircuitBreaker): immutable.IndexedSeq[Future[String]] = { - val aFewActive = new TestLatch(5) - for (_ <- 1 to numberOfTestCalls) - yield breaker - .withCircuitBreaker(Future { - aFewActive.countDown() - Await.ready(aFewActive, 5.seconds.dilated) - "succeed" - }) - .recoverWith { - case _: CircuitBreakerOpenException => - aFewActive.countDown() - Future.successful("CBO") - } + def testCallsWithBreaker(breaker: CircuitBreaker, numberOfCalls: Int): immutable.IndexedSeq[Future[String]] = { + for (_ <- 1 to numberOfCalls) + yield makeCallWithBreaker(breaker) } + def makeCallWithBreaker(breaker: CircuitBreaker): Future[String] = + breaker.withCircuitBreaker(Future.successful("succeed")).recoverWith { + case _: CircuitBreakerOpenException => + Future.successful("CBO") + } + "allow many calls while in closed state with no errors" in { - val futures = testCallsWithBreaker(newBreaker) + val futures = testCallsWithBreaker(newBreaker, 10) val result = Await.result(Future.sequence(futures), 5.second.dilated) - result.size should ===(numberOfTestCalls) + result.size should ===(10) result.toSet should ===(Set("succeed")) } "transition to open state upon reaching failure limit and fail-fast" in { val breaker = newBreaker openBreaker(breaker) - val futures = testCallsWithBreaker(breaker) - val result = Await.result(Future.sequence(futures), 5.second.dilated) - result.size should ===(numberOfTestCalls) - result.toSet should ===(Set("CBO")) + + breaker.isOpen shouldBe true + val call = makeCallWithBreaker(breaker) + val result = Await.result(call, 5.second.dilated) + result shouldBe "CBO" } "allow a single call through in half-open state" in { val breaker = newBreaker val halfOpenLatch = new TestLatch(1) breaker.onHalfOpen(halfOpenLatch.countDown()) - openBreaker(breaker) // breaker should become half-open after a while Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) - val futures = testCallsWithBreaker(breaker) - val result = Await.result(Future.sequence(futures), 5.second.dilated) - result.size should ===(numberOfTestCalls) - result.toSet should ===(Set("succeed", "CBO")) + breaker.isHalfOpen shouldBe true + + val latch = new TestLatch(1) + val firstCall = + breaker.withCircuitBreaker(Future { + // this call closes the CB, + // but only after next call fails and touches the latch + Await.ready(latch, 5.seconds) + "succeed" + }) + + val secondCall = + breaker.withCircuitBreaker(Future.successful("this should have failed")).recoverWith { + case _: CircuitBreakerOpenException => + latch.countDown() + Future.successful("CBO") + } + + val firstResult = Await.result(firstCall, 5.second.dilated) + firstResult shouldBe "succeed" + + val secondResult = Await.result(secondCall, 5.second.dilated) + secondResult shouldBe "CBO" + + breaker.isClosed shouldBe true + } "recover and reset the breaker after the reset timeout" in { @@ -97,13 +114,14 @@ class CircuitBreakerMTSpec extends AkkaSpec { // one successful call should close the latch val closedLatch = new TestLatch(1) breaker.onClose(closedLatch.countDown()) - breaker.withCircuitBreaker(Future("succeed")) + breaker.withCircuitBreaker(Future.successful("succeed")) Await.ready(closedLatch, 5.seconds.dilated) - val futures = testCallsWithBreaker(breaker) - val result = Await.result(Future.sequence(futures), 5.second.dilated) - result.size should ===(numberOfTestCalls) - result.toSet should ===(Set("succeed")) + breaker.isClosed shouldBe true + + val call = makeCallWithBreaker(breaker) + val result = Await.result(call, 5.second.dilated) + result shouldBe "succeed" } } }