From dad40d456a859beebc3ad7af9fe4474092013ffe Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 2 Jan 2013 17:03:19 +0100 Subject: [PATCH] Fix race in CircuitBreakerMTSpec, see #2823 --- .../akka/pattern/CircuitBreakerMTSpec.scala | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala index 34cb3d4ef8..a21dd6f22a 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -4,8 +4,9 @@ package akka.pattern import akka.testkit._ +import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.{ Promise, Future, Await } +import scala.concurrent.{ Future, Await } import scala.annotation.tailrec class CircuitBreakerMTSpec extends AkkaSpec { @@ -14,6 +15,7 @@ class CircuitBreakerMTSpec extends AkkaSpec { val callTimeout = 1.second.dilated val resetTimeout = 2.seconds.dilated val breaker = new CircuitBreaker(system.scheduler, 5, callTimeout, resetTimeout) + val numberOfTestCalls = 100 def openBreaker(): Unit = { @tailrec def call(attemptsLeft: Int): Unit = { @@ -26,29 +28,31 @@ class CircuitBreakerMTSpec extends AkkaSpec { call(10) } + def testCallsWithBreaker(): immutable.IndexedSeq[Future[String]] = { + val aFewActive = new TestLatch(5) + for (_ ← 1 to numberOfTestCalls) yield breaker.withCircuitBreaker(Future { + aFewActive.countDown() + Await.ready(aFewActive, 5.seconds.dilated) + "succeed" + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ + aFewActive.countDown() + Future.successful("CBO") + } + } + "allow many calls while in closed state with no errors" in { - - val futures = for (i ← 1 to 100) yield breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" }) - + val futures = testCallsWithBreaker() val result = Await.result(Future.sequence(futures), 5.second.dilated) - - result.size must be(100) + result.size must be(numberOfTestCalls) result.toSet must be === Set("succeed") - } "transition to open state upon reaching failure limit and fail-fast" in { openBreaker() - - val futures = for (i ← 1 to 100) yield breaker.withCircuitBreaker(Future { - Thread.sleep(10); "success" - }) recoverWith { - case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO").future - } - + val futures = testCallsWithBreaker() val result = Await.result(Future.sequence(futures), 5.second.dilated) - - result.size must be(100) + result.size must be(numberOfTestCalls) result.toSet must be === Set("CBO") } @@ -58,17 +62,12 @@ class CircuitBreakerMTSpec extends AkkaSpec { openBreaker() + // breaker should become half-open after a while Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) - val futures = for (i ← 1 to 100) yield breaker.withCircuitBreaker(Future { - Thread.sleep(10); "succeed" - }) recoverWith { - case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO").future - } - + val futures = testCallsWithBreaker() val result = Await.result(Future.sequence(futures), 5.second.dilated) - - result.size must be(100) + result.size must be(numberOfTestCalls) result.toSet must be === Set("succeed", "CBO") } @@ -76,19 +75,19 @@ class CircuitBreakerMTSpec extends AkkaSpec { val halfOpenLatch = new TestLatch(1) breaker.onHalfOpen(halfOpenLatch.countDown()) openBreaker() - Await.ready(halfOpenLatch, 5.seconds.dilated) - Await.ready(breaker.withCircuitBreaker(Future("succeed")), resetTimeout) - val futures = (1 to 100) map { - i ⇒ - breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" }) recoverWith { - case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO").future - } - } + // breaker should become half-open after a while + Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) + // one successful call should close the latch + val closedLatch = new TestLatch(1) + breaker.onClose(closedLatch.countDown()) + breaker.withCircuitBreaker(Future("succeed")) + Await.ready(closedLatch, 5.seconds.dilated) + + val futures = testCallsWithBreaker() val result = Await.result(Future.sequence(futures), 5.second.dilated) - - result.size must be(100) + result.size must be(numberOfTestCalls) result.toSet must be === Set("succeed") } }