Circuitbreaker spec cleanup (#27560)

Test cleanup:

 * No need to use mockito, replaced with TestProbe (side effect is that it actually also
  makes some test cases more explicit in what they expect)
 * Use matchers to get reasonable failure messages
 * Use types where it makes sense
 * Remove mockito dependency from akka-actor-tests
This commit is contained in:
Johan Andrén 2019-08-28 16:44:07 +02:00 committed by GitHub
parent 5c0f213fba
commit 6fd9d01b19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 140 deletions

View file

@ -5,21 +5,27 @@
package akka.pattern package akka.pattern
import akka.actor.ActorSystem import akka.actor.ActorSystem
import language.postfixOps
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException }
import scala.util.{ Failure, Success, Try }
import akka.testkit._ import akka.testkit._
import org.mockito.ArgumentCaptor
import org.scalatest.BeforeAndAfter import scala.concurrent.Await
import org.scalatestplus.mockito.MockitoSugar import scala.concurrent.ExecutionContext
import org.mockito.Mockito._ import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
import scala.util.Try
object CircuitBreakerSpec { object CircuitBreakerSpec {
class TestException extends RuntimeException class TestException extends RuntimeException
case class CBSuccess(value: FiniteDuration)
case class CBFailure(value: FiniteDuration)
case class CBTimeout(value: FiniteDuration)
class Breaker(val instance: CircuitBreaker)(implicit system: ActorSystem) extends MockitoSugar { class Breaker(val instance: CircuitBreaker)(implicit system: ActorSystem) {
val probe = TestProbe()
val halfOpenLatch = new TestLatch(1) val halfOpenLatch = new TestLatch(1)
val openLatch = new TestLatch(1) val openLatch = new TestLatch(1)
val closedLatch = new TestLatch(1) val closedLatch = new TestLatch(1)
@ -28,25 +34,21 @@ object CircuitBreakerSpec {
val callTimeoutLatch = new TestLatch(1) val callTimeoutLatch = new TestLatch(1)
val callBreakerOpenLatch = new TestLatch(1) val callBreakerOpenLatch = new TestLatch(1)
val callSuccessConsumerMock = mock[Long => Unit]
val callFailureConsumerMock = mock[Long => Unit]
val callTimeoutConsumerMock = mock[Long => Unit]
def apply(): CircuitBreaker = instance def apply(): CircuitBreaker = instance
instance instance
.onClose(closedLatch.countDown()) .onClose(closedLatch.countDown())
.onHalfOpen(halfOpenLatch.countDown()) .onHalfOpen(halfOpenLatch.countDown())
.onOpen(openLatch.countDown()) .onOpen(openLatch.countDown())
.onCallSuccess(value => { .onCallSuccess(value => {
callSuccessConsumerMock(value) probe.ref ! CBSuccess(value.nanos)
callSuccessLatch.countDown() callSuccessLatch.countDown()
}) })
.onCallFailure(value => { .onCallFailure(value => {
callFailureConsumerMock(value) probe.ref ! CBFailure(value.nanos)
callFailureLatch.countDown() callFailureLatch.countDown()
}) })
.onCallTimeout(value => { .onCallTimeout(value => {
callTimeoutConsumerMock(value) probe.ref ! CBTimeout(value.nanos)
callTimeoutLatch.countDown() callTimeoutLatch.countDown()
}) })
.onCallBreakerOpen(callBreakerOpenLatch.countDown()) .onCallBreakerOpen(callBreakerOpenLatch.countDown())
@ -79,10 +81,9 @@ object CircuitBreakerSpec {
} }
} }
class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar { class CircuitBreakerSpec extends AkkaSpec {
import CircuitBreakerSpec.TestException import CircuitBreakerSpec._
implicit def ec = system.dispatcher implicit def ec = system.dispatcher
implicit def s = system
val awaitTimeout = 2.seconds.dilated val awaitTimeout = 2.seconds.dilated
@ -92,8 +93,6 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
def sayHi = "hi" def sayHi = "hi"
def timeCaptor = ArgumentCaptor.forClass(classOf[Long])
"A synchronous circuit breaker that is open" must { "A synchronous circuit breaker that is open" must {
"throw exceptions when called before reset timeout" in { "throw exceptions when called before reset timeout" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb() val breaker = CircuitBreakerSpec.longResetTimeoutCb()
@ -103,8 +102,8 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
val e = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } val e = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
(e.remainingDuration > Duration.Zero) should ===(true) e.remainingDuration should be > Duration.Zero
(e.remainingDuration <= CircuitBreakerSpec.longResetTimeout) should ===(true) e.remainingDuration should be <= CircuitBreakerSpec.longResetTimeout
} }
"transition to half-open on reset timeout" in { "transition to half-open on reset timeout" in {
@ -136,7 +135,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"invoke onCallBreakerOpen when called before reset timeout" in { "invoke onCallBreakerOpen when called before reset timeout" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb() val breaker = longResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
@ -144,21 +143,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"invoke onCallFailure when call results in exception" in { "invoke onCallFailure when call results in exception" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb() val breaker = longResetTimeoutCb()
val captor = timeCaptor
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.callFailureLatch) checkLatch(breaker.callFailureLatch)
verify(breaker.callFailureConsumerMock)(captor.capture()) val failure = breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) failure.value should (be > Duration.Zero and be < longResetTimeout)
captor.getValue < CircuitBreakerSpec.longResetTimeout.toNanos should ===(true)
} }
} }
"A synchronous circuit breaker that is half-open" must { "A synchronous circuit breaker that is half-open" must {
"pass through next call and close on success" in { "pass through next call and close on success" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
assert("hi" == breaker().withSyncCircuitBreaker(sayHi)) assert("hi" == breaker().withSyncCircuitBreaker(sayHi))
@ -167,7 +164,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"pass through next call and close on exception" when { "pass through next call and close on exception" when {
"exception is defined as call succeeded" in { "exception is defined as call succeeded" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -179,7 +176,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"open on exception in call" in { "open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset breaker.openLatch.reset
@ -189,17 +186,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"open on even number" when { "open on even number" when {
"even number is defined as failure" in { "even number is defined as failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset breaker.openLatch.reset
breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) breaker().withSyncCircuitBreaker(2, evenNumberIsFailure)
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
} }
} }
"open on calling fail method" in { "open on calling fail method" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset breaker.openLatch.reset
@ -208,7 +205,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"close on calling success method" in { "close on calling success method" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker().succeed() breaker().succeed()
@ -216,23 +213,21 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"pass through next call and invoke onCallSuccess on success" in { "pass through next call and invoke onCallSuccess on success" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
val captor = timeCaptor
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker.probe.expectMsgType[CBFailure]
breaker().withSyncCircuitBreaker(sayHi) breaker().withSyncCircuitBreaker(sayHi)
checkLatch(breaker.callSuccessLatch) checkLatch(breaker.callSuccessLatch)
verify(breaker.callSuccessConsumerMock)(captor.capture()) val success = breaker.probe.expectMsgType[CBSuccess]
captor.getValue > 0 should ===(true) success.value should (be > Duration.Zero and be < shortResetTimeout)
captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true)
} }
"pass through next call and invoke onCallFailure on failure" in { "pass through next call and invoke onCallFailure on failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
val captor = timeCaptor
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -243,14 +238,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.callFailureLatch) checkLatch(breaker.callFailureLatch)
verify(breaker.callFailureConsumerMock, times(2))(captor.capture()) breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) val failure = breaker.probe.expectMsgType[CBFailure]
captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) failure.value should (be > Duration.Zero and be < shortResetTimeout)
} }
"pass through next call and invoke onCallTimeout on timeout" in { "pass through next call and invoke onCallTimeout on timeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -258,13 +252,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
intercept[TimeoutException] { breaker().withSyncCircuitBreaker(Thread.sleep(200.millis.dilated.toMillis)) } intercept[TimeoutException] { breaker().withSyncCircuitBreaker(Thread.sleep(200.millis.dilated.toMillis)) }
checkLatch(breaker.callTimeoutLatch) checkLatch(breaker.callTimeoutLatch)
verify(breaker.callTimeoutConsumerMock)(captor.capture()) breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) val timeout = breaker.probe.expectMsgType[CBTimeout]
captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).dilated.toNanos should ===(true) timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2).dilated)
} }
"pass through next call and invoke onCallBreakerOpen while executing other" in { "pass through next call and invoke onCallBreakerOpen while executing other" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -276,7 +270,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"pass through next call and invoke onCallSuccess after transition to open state" in { "pass through next call and invoke onCallSuccess after transition to open state" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -288,12 +282,12 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"A synchronous circuit breaker that is closed" must { "A synchronous circuit breaker that is closed" must {
"allow calls through" in { "allow calls through" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
breaker().withSyncCircuitBreaker(sayHi) should ===("hi") breaker().withSyncCircuitBreaker(sayHi) should ===("hi")
} }
"increment failure count on failure" in { "increment failure count on failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
@ -302,9 +296,9 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"increment failure count on even number" when { "increment failure count on even number" when {
"even number is considered failure" in { "even number is considered failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
val result = breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) val result = breaker().withSyncCircuitBreaker(2, evenNumberIsFailure)
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1) breaker().currentFailureCount should ===(1)
@ -313,7 +307,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"increment failure count on fail method" in { "increment failure count on fail method" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
breaker().fail() breaker().fail()
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
@ -321,7 +315,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"reset failure count after success" in { "reset failure count after success" in {
val breaker = CircuitBreakerSpec.multiFailureCb() val breaker = multiFailureCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
intercept[TestException] { intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
@ -334,7 +328,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"reset failure count after exception in call" when { "reset failure count after exception in call" when {
"exception is defined as Success" in { "exception is defined as Success" in {
val breaker = CircuitBreakerSpec.multiFailureCb() val breaker = multiFailureCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
intercept[TestException] { intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
@ -357,7 +351,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"reset failure count after success method" in { "reset failure count after success method" in {
val breaker = CircuitBreakerSpec.multiFailureCb() val breaker = multiFailureCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
intercept[TestException] { intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
@ -369,7 +363,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"throw TimeoutException on callTimeout" in { "throw TimeoutException on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
intercept[TimeoutException] { intercept[TimeoutException] {
breaker().withSyncCircuitBreaker { breaker().withSyncCircuitBreaker {
Thread.sleep(200.millis.dilated.toMillis) Thread.sleep(200.millis.dilated.toMillis)
@ -379,7 +373,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"increment failure count on callTimeout before call finishes" in { "increment failure count on callTimeout before call finishes" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
Future { Future {
breaker().withSyncCircuitBreaker { breaker().withSyncCircuitBreaker {
Thread.sleep(1.second.dilated.toMillis) Thread.sleep(1.second.dilated.toMillis)
@ -391,43 +385,37 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"invoke onCallSuccess if call succeeds" in { "invoke onCallSuccess if call succeeds" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
breaker().withSyncCircuitBreaker(sayHi) breaker().withSyncCircuitBreaker(sayHi)
checkLatch(breaker.callSuccessLatch) checkLatch(breaker.callSuccessLatch)
verify(breaker.callSuccessConsumerMock)(captor.capture()) val success = breaker.probe.expectMsgType[CBSuccess]
captor.getValue > 0 should ===(true) success.value should (be > Duration.Zero and be < shortCallTimeout)
captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true)
} }
"invoke onCallTimeout if call timeouts" in { "invoke onCallTimeout if call timeouts" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
intercept[TimeoutException](breaker().withSyncCircuitBreaker(Thread.sleep(250.millis.dilated.toMillis))) intercept[TimeoutException](breaker().withSyncCircuitBreaker(Thread.sleep(250.millis.dilated.toMillis)))
checkLatch(breaker.callTimeoutLatch) checkLatch(breaker.callTimeoutLatch)
verify(breaker.callTimeoutConsumerMock)(captor.capture()) val timeout = breaker.probe.expectMsgType[CBTimeout]
captor.getValue > 0 should ===(true) timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2))
captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).toNanos should ===(true)
} }
"invoke onCallFailure if call fails" in { "invoke onCallFailure if call fails" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
intercept[TestException](breaker().withSyncCircuitBreaker(throwException)) intercept[TestException](breaker().withSyncCircuitBreaker(throwException))
checkLatch(breaker.callFailureLatch) checkLatch(breaker.callFailureLatch)
verify(breaker.callFailureConsumerMock)(captor.capture()) val failure = breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) failure.value should (be > Duration.Zero and be < shortCallTimeout)
captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true)
} }
"invoke onOpen if call fails and breaker transits to open state" in { "invoke onOpen if call fails and breaker transits to open state" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
intercept[TestException](breaker().withSyncCircuitBreaker(throwException)) intercept[TestException](breaker().withSyncCircuitBreaker(throwException))
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
@ -436,7 +424,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"An asynchronous circuit breaker that is open" must { "An asynchronous circuit breaker that is open" must {
"throw exceptions when called before reset timeout" in { "throw exceptions when called before reset timeout" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb() val breaker = longResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
@ -445,13 +433,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"transition to half-open on reset timeout" in { "transition to half-open on reset timeout" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
} }
"increase the reset timeout after it transits to open again" in { "increase the reset timeout after it transits to open again" in {
val breaker = CircuitBreakerSpec.nonOneFactorCb() val breaker = nonOneFactorCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
@ -469,19 +457,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
val longRemainingDuration = e2.remainingDuration val longRemainingDuration = e2.remainingDuration
(shortRemainingDuration < longRemainingDuration) should ===(true) shortRemainingDuration should be < longRemainingDuration
} }
"invoke onHalfOpen during transition to half-open state" in { "invoke onHalfOpen during transition to half-open state" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) } intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
} }
"invoke onCallBreakerOpen when called before reset timeout" in { "invoke onCallBreakerOpen when called before reset timeout" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb() val breaker = longResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
@ -491,21 +478,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"invoke onCallFailure when call results in exception" in { "invoke onCallFailure when call results in exception" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb() val breaker = longResetTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.callFailureLatch) checkLatch(breaker.callFailureLatch)
verify(breaker.callFailureConsumerMock)(captor.capture()) val failure = breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) failure.value should (be > Duration.Zero and be < longResetTimeout)
captor.getValue < CircuitBreakerSpec.longResetTimeout.toNanos should ===(true)
} }
} }
"An asynchronous circuit breaker that is half-open" must { "An asynchronous circuit breaker that is half-open" must {
"pass through next call and close on success" in { "pass through next call and close on success" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi") Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi")
@ -514,7 +499,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"pass through next call and close on exception" when { "pass through next call and close on exception" when {
"exception is defined as call succeeded" in { "exception is defined as call succeeded" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
val allReturnIsSuccess: Try[String] => Boolean = _ => false val allReturnIsSuccess: Try[String] => Boolean = _ => false
@ -524,7 +509,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"re-open on exception in call" in { "re-open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset breaker.openLatch.reset
@ -534,17 +519,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"re-open on even number" when { "re-open on even number" when {
"even number is defined as failure" in { "even number is defined as failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset breaker.openLatch.reset
Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) Await.result(breaker().withCircuitBreaker(Future(2), evenNumberIsFailure), awaitTimeout)
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
} }
} }
"re-open on async failure" in { "re-open on async failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -554,8 +539,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"pass through next call and invoke onCallSuccess on success" in { "pass through next call and invoke onCallSuccess on success" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -563,14 +547,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().withCircuitBreaker(Future(sayHi)) breaker().withCircuitBreaker(Future(sayHi))
checkLatch(breaker.callSuccessLatch) checkLatch(breaker.callSuccessLatch)
verify(breaker.callSuccessConsumerMock)(captor.capture()) breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) val success = breaker.probe.expectMsgType[CBSuccess]
captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) success.value should (be > Duration.Zero and be < shortResetTimeout)
} }
"pass through next call and invoke onCallFailure on failure" in { "pass through next call and invoke onCallFailure on failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
@ -581,14 +564,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.callFailureLatch) checkLatch(breaker.callFailureLatch)
verify(breaker.callFailureConsumerMock, times(2))(captor.capture()) breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) val failure = breaker.probe.expectMsgType[CBFailure]
captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) failure.value should (be > Duration.Zero and be < shortResetTimeout)
} }
"pass through next call and invoke onCallTimeout on timeout" in { "pass through next call and invoke onCallTimeout on timeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -596,13 +578,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().withCircuitBreaker(Future(Thread.sleep(200.millis.dilated.toMillis))) breaker().withCircuitBreaker(Future(Thread.sleep(200.millis.dilated.toMillis)))
checkLatch(breaker.callTimeoutLatch) checkLatch(breaker.callTimeoutLatch)
verify(breaker.callTimeoutConsumerMock)(captor.capture()) breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) val timeout = breaker.probe.expectMsgType[CBTimeout]
captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).dilated.toNanos should ===(true) timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2).dilated)
} }
"pass through next call and invoke onCallBreakerOpen while executing other" in { "pass through next call and invoke onCallBreakerOpen while executing other" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -613,7 +595,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"pass through next call and invoke onOpen after transition to open state" in { "pass through next call and invoke onOpen after transition to open state" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch) checkLatch(breaker.halfOpenLatch)
@ -625,12 +607,12 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"An asynchronous circuit breaker that is closed" must { "An asynchronous circuit breaker that is closed" must {
"allow calls through" in { "allow calls through" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi") Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi")
} }
"increment failure count on exception" in { "increment failure count on exception" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) } intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) }
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1) breaker().currentFailureCount should ===(1)
@ -638,10 +620,10 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"increment failure count on even number" when { "increment failure count on even number" when {
"even number is considered failure" in { "even number is considered failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
val result = val result =
Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) Await.result(breaker().withCircuitBreaker(Future(2), evenNumberIsFailure), awaitTimeout)
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1) breaker().currentFailureCount should ===(1)
result should ===(2) result should ===(2)
@ -649,14 +631,14 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"increment failure count on async failure" in { "increment failure count on async failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = longCallTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1) breaker().currentFailureCount should ===(1)
} }
"reset failure count after success" in { "reset failure count after success" in {
val breaker = CircuitBreakerSpec.multiFailureCb() val breaker = multiFailureCb()
breaker().withCircuitBreaker(Future(sayHi)) breaker().withCircuitBreaker(Future(sayHi))
for (_ <- 1 to 4) breaker().withCircuitBreaker(Future(throwException)) for (_ <- 1 to 4) breaker().withCircuitBreaker(Future(throwException))
awaitCond(breaker().currentFailureCount == 4, awaitTimeout) awaitCond(breaker().currentFailureCount == 4, awaitTimeout)
@ -666,7 +648,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
"reset failure count after exception in call" when { "reset failure count after exception in call" when {
"exception is defined as Success" in { "exception is defined as Success" in {
val breaker: CircuitBreakerSpec.Breaker = CircuitBreakerSpec.multiFailureCb() val breaker: Breaker = multiFailureCb()
for (_ <- 1 to 4) breaker().withCircuitBreaker(Future(throwException)) for (_ <- 1 to 4) breaker().withCircuitBreaker(Future(throwException))
awaitCond( awaitCond(
@ -686,7 +668,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"increment failure count on callTimeout" in { "increment failure count on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val fut = breaker().withCircuitBreaker(Future { val fut = breaker().withCircuitBreaker(Future {
Thread.sleep(150.millis.dilated.toMillis) Thread.sleep(150.millis.dilated.toMillis)
@ -703,43 +685,37 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
} }
"invoke onCallSuccess if call succeeds" in { "invoke onCallSuccess if call succeeds" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(sayHi)) breaker().withCircuitBreaker(Future(sayHi))
checkLatch(breaker.callSuccessLatch) checkLatch(breaker.callSuccessLatch)
verify(breaker.callSuccessConsumerMock)(captor.capture()) val success = breaker.probe.expectMsgType[CBSuccess]
captor.getValue > 0 should ===(true) success.value should (be > Duration.Zero and be < shortCallTimeout)
captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true)
} }
"invoke onCallTimeout if call timeouts" in { "invoke onCallTimeout if call timeouts" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(Thread.sleep(250.millis.dilated.toMillis))) breaker().withCircuitBreaker(Future(Thread.sleep(250.millis.dilated.toMillis)))
checkLatch(breaker.callTimeoutLatch) checkLatch(breaker.callTimeoutLatch)
verify(breaker.callTimeoutConsumerMock)(captor.capture()) val timeout = breaker.probe.expectMsgType[CBTimeout]
captor.getValue > 0 should ===(true) timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2).dilated)
captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).toNanos should ===(true)
} }
"invoke onCallFailure if call fails" in { "invoke onCallFailure if call fails" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
val captor = timeCaptor
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.callFailureLatch) checkLatch(breaker.callFailureLatch)
verify(breaker.callFailureConsumerMock)(captor.capture()) val failure = breaker.probe.expectMsgType[CBFailure]
captor.getValue > 0 should ===(true) failure.value should (be > Duration.Zero and be < shortCallTimeout)
captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true)
} }
"invoke onOpen if call fails and breaker transits to open state" in { "invoke onOpen if call fails and breaker transits to open state" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = shortCallTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)

View file

@ -175,7 +175,6 @@ object Dependencies {
Test.scalatest.value, Test.scalatest.value,
Test.commonsCodec, Test.commonsCodec,
Test.commonsMath, Test.commonsMath,
Test.mockito,
Test.scalacheck.value, Test.scalacheck.value,
Test.jimfs, Test.jimfs,
Test.dockerClient, Test.dockerClient,
@ -202,7 +201,8 @@ object Dependencies {
Provided.levelDBNative, Provided.levelDBNative,
Test.junit, Test.junit,
Test.scalatest.value, Test.scalatest.value,
Test.commonsIo) Test.commonsIo,
Test.mockito)
val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito) val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)