Fix race in CircuitBreakerMTSpec, see #2823

This commit is contained in:
Patrik Nordwall 2013-01-02 17:03:19 +01:00
parent 0dda2ad361
commit dad40d456a

View file

@ -4,8 +4,9 @@
package akka.pattern package akka.pattern
import akka.testkit._ import akka.testkit._
import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Promise, Future, Await } import scala.concurrent.{ Future, Await }
import scala.annotation.tailrec import scala.annotation.tailrec
class CircuitBreakerMTSpec extends AkkaSpec { class CircuitBreakerMTSpec extends AkkaSpec {
@ -14,6 +15,7 @@ class CircuitBreakerMTSpec extends AkkaSpec {
val callTimeout = 1.second.dilated val callTimeout = 1.second.dilated
val resetTimeout = 2.seconds.dilated val resetTimeout = 2.seconds.dilated
val breaker = new CircuitBreaker(system.scheduler, 5, callTimeout, resetTimeout) val breaker = new CircuitBreaker(system.scheduler, 5, callTimeout, resetTimeout)
val numberOfTestCalls = 100
def openBreaker(): Unit = { def openBreaker(): Unit = {
@tailrec def call(attemptsLeft: Int): Unit = { @tailrec def call(attemptsLeft: Int): Unit = {
@ -26,29 +28,31 @@ class CircuitBreakerMTSpec extends AkkaSpec {
call(10) call(10)
} }
def testCallsWithBreaker(): immutable.IndexedSeq[Future[String]] = {
val aFewActive = new TestLatch(5)
for (_ 1 to numberOfTestCalls) yield breaker.withCircuitBreaker(Future {
aFewActive.countDown()
Await.ready(aFewActive, 5.seconds.dilated)
"succeed"
}) recoverWith {
case _: CircuitBreakerOpenException
aFewActive.countDown()
Future.successful("CBO")
}
}
"allow many calls while in closed state with no errors" in { "allow many calls while in closed state with no errors" in {
val futures = testCallsWithBreaker()
val futures = for (i 1 to 100) yield breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" })
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("succeed") result.toSet must be === Set("succeed")
} }
"transition to open state upon reaching failure limit and fail-fast" in { "transition to open state upon reaching failure limit and fail-fast" in {
openBreaker() openBreaker()
val futures = testCallsWithBreaker()
val futures = for (i 1 to 100) yield breaker.withCircuitBreaker(Future {
Thread.sleep(10); "success"
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("CBO") result.toSet must be === Set("CBO")
} }
@ -58,17 +62,12 @@ class CircuitBreakerMTSpec extends AkkaSpec {
openBreaker() openBreaker()
// breaker should become half-open after a while
Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated)
val futures = for (i 1 to 100) yield breaker.withCircuitBreaker(Future { val futures = testCallsWithBreaker()
Thread.sleep(10); "succeed"
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("succeed", "CBO") result.toSet must be === Set("succeed", "CBO")
} }
@ -76,19 +75,19 @@ class CircuitBreakerMTSpec extends AkkaSpec {
val halfOpenLatch = new TestLatch(1) val halfOpenLatch = new TestLatch(1)
breaker.onHalfOpen(halfOpenLatch.countDown()) breaker.onHalfOpen(halfOpenLatch.countDown())
openBreaker() openBreaker()
Await.ready(halfOpenLatch, 5.seconds.dilated)
Await.ready(breaker.withCircuitBreaker(Future("succeed")), resetTimeout)
val futures = (1 to 100) map { // breaker should become half-open after a while
i Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated)
breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" }) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
}
// one successful call should close the latch
val closedLatch = new TestLatch(1)
breaker.onClose(closedLatch.countDown())
breaker.withCircuitBreaker(Future("succeed"))
Await.ready(closedLatch, 5.seconds.dilated)
val futures = testCallsWithBreaker()
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("succeed") result.toSet must be === Set("succeed")
} }
} }