diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 7b3e166beb..0f71820458 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -5,21 +5,27 @@ package akka.pattern import akka.actor.ActorSystem -import language.postfixOps -import scala.concurrent.duration._ -import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException } -import scala.util.{ Failure, Success, Try } import akka.testkit._ -import org.mockito.ArgumentCaptor -import org.scalatest.BeforeAndAfter -import org.scalatestplus.mockito.MockitoSugar -import org.mockito.Mockito._ + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.Failure +import scala.util.Success +import scala.util.Try object CircuitBreakerSpec { class TestException extends RuntimeException + case class CBSuccess(value: FiniteDuration) + case class CBFailure(value: FiniteDuration) + case class CBTimeout(value: FiniteDuration) - class Breaker(val instance: CircuitBreaker)(implicit system: ActorSystem) extends MockitoSugar { + class Breaker(val instance: CircuitBreaker)(implicit system: ActorSystem) { + val probe = TestProbe() val halfOpenLatch = new TestLatch(1) val openLatch = new TestLatch(1) val closedLatch = new TestLatch(1) @@ -28,25 +34,21 @@ object CircuitBreakerSpec { val callTimeoutLatch = new TestLatch(1) val callBreakerOpenLatch = new TestLatch(1) - val callSuccessConsumerMock = mock[Long => Unit] - val callFailureConsumerMock = mock[Long => Unit] - val callTimeoutConsumerMock = mock[Long => Unit] - def apply(): CircuitBreaker = instance instance .onClose(closedLatch.countDown()) .onHalfOpen(halfOpenLatch.countDown()) .onOpen(openLatch.countDown()) .onCallSuccess(value => { - callSuccessConsumerMock(value) + probe.ref ! CBSuccess(value.nanos) callSuccessLatch.countDown() }) .onCallFailure(value => { - callFailureConsumerMock(value) + probe.ref ! CBFailure(value.nanos) callFailureLatch.countDown() }) .onCallTimeout(value => { - callTimeoutConsumerMock(value) + probe.ref ! CBTimeout(value.nanos) callTimeoutLatch.countDown() }) .onCallBreakerOpen(callBreakerOpenLatch.countDown()) @@ -79,10 +81,9 @@ object CircuitBreakerSpec { } } -class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar { - import CircuitBreakerSpec.TestException +class CircuitBreakerSpec extends AkkaSpec { + import CircuitBreakerSpec._ implicit def ec = system.dispatcher - implicit def s = system val awaitTimeout = 2.seconds.dilated @@ -92,8 +93,6 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar def sayHi = "hi" - def timeCaptor = ArgumentCaptor.forClass(classOf[Long]) - "A synchronous circuit breaker that is open" must { "throw exceptions when called before reset timeout" in { val breaker = CircuitBreakerSpec.longResetTimeoutCb() @@ -103,8 +102,8 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) val e = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } - (e.remainingDuration > Duration.Zero) should ===(true) - (e.remainingDuration <= CircuitBreakerSpec.longResetTimeout) should ===(true) + e.remainingDuration should be > Duration.Zero + e.remainingDuration should be <= CircuitBreakerSpec.longResetTimeout } "transition to half-open on reset timeout" in { @@ -136,7 +135,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "invoke onCallBreakerOpen when called before reset timeout" in { - val breaker = CircuitBreakerSpec.longResetTimeoutCb() + val breaker = longResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.openLatch) intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } @@ -144,21 +143,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "invoke onCallFailure when call results in exception" in { - val breaker = CircuitBreakerSpec.longResetTimeoutCb() - val captor = timeCaptor + val breaker = longResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.callFailureLatch) - verify(breaker.callFailureConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.longResetTimeout.toNanos should ===(true) + val failure = breaker.probe.expectMsgType[CBFailure] + failure.value should (be > Duration.Zero and be < longResetTimeout) } } "A synchronous circuit breaker that is half-open" must { "pass through next call and close on success" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) assert("hi" == breaker().withSyncCircuitBreaker(sayHi)) @@ -167,7 +164,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "pass through next call and close on exception" when { "exception is defined as call succeeded" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) @@ -179,7 +176,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "open on exception in call" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) breaker.openLatch.reset @@ -189,17 +186,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "open on even number" when { "even number is defined as failure" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) breaker.openLatch.reset - breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) + breaker().withSyncCircuitBreaker(2, evenNumberIsFailure) checkLatch(breaker.openLatch) } } "open on calling fail method" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) breaker.openLatch.reset @@ -208,7 +205,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "close on calling success method" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) breaker().succeed() @@ -216,23 +213,21 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "pass through next call and invoke onCallSuccess on success" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() - val captor = timeCaptor + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) + breaker.probe.expectMsgType[CBFailure] breaker().withSyncCircuitBreaker(sayHi) checkLatch(breaker.callSuccessLatch) - verify(breaker.callSuccessConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) + val success = breaker.probe.expectMsgType[CBSuccess] + success.value should (be > Duration.Zero and be < shortResetTimeout) } "pass through next call and invoke onCallFailure on failure" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() - val captor = timeCaptor + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) @@ -243,14 +238,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.callFailureLatch) - verify(breaker.callFailureConsumerMock, times(2))(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) + breaker.probe.expectMsgType[CBFailure] + val failure = breaker.probe.expectMsgType[CBFailure] + failure.value should (be > Duration.Zero and be < shortResetTimeout) } "pass through next call and invoke onCallTimeout on timeout" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) @@ -258,13 +252,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar intercept[TimeoutException] { breaker().withSyncCircuitBreaker(Thread.sleep(200.millis.dilated.toMillis)) } checkLatch(breaker.callTimeoutLatch) - verify(breaker.callTimeoutConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).dilated.toNanos should ===(true) + breaker.probe.expectMsgType[CBFailure] + val timeout = breaker.probe.expectMsgType[CBTimeout] + timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2).dilated) } "pass through next call and invoke onCallBreakerOpen while executing other" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) @@ -276,7 +270,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "pass through next call and invoke onCallSuccess after transition to open state" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) @@ -288,12 +282,12 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "A synchronous circuit breaker that is closed" must { "allow calls through" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() breaker().withSyncCircuitBreaker(sayHi) should ===("hi") } "increment failure count on failure" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() breaker().currentFailureCount should ===(0) intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.openLatch) @@ -302,9 +296,9 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "increment failure count on even number" when { "even number is considered failure" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() breaker().currentFailureCount should ===(0) - val result = breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) + val result = breaker().withSyncCircuitBreaker(2, evenNumberIsFailure) checkLatch(breaker.openLatch) breaker().currentFailureCount should ===(1) @@ -313,7 +307,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "increment failure count on fail method" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() breaker().currentFailureCount should ===(0) breaker().fail() checkLatch(breaker.openLatch) @@ -321,7 +315,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "reset failure count after success" in { - val breaker = CircuitBreakerSpec.multiFailureCb() + val breaker = multiFailureCb() breaker().currentFailureCount should ===(0) intercept[TestException] { val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread @@ -334,7 +328,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "reset failure count after exception in call" when { "exception is defined as Success" in { - val breaker = CircuitBreakerSpec.multiFailureCb() + val breaker = multiFailureCb() breaker().currentFailureCount should ===(0) intercept[TestException] { val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread @@ -357,7 +351,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "reset failure count after success method" in { - val breaker = CircuitBreakerSpec.multiFailureCb() + val breaker = multiFailureCb() breaker().currentFailureCount should ===(0) intercept[TestException] { val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread @@ -369,7 +363,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "throw TimeoutException on callTimeout" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() + val breaker = shortCallTimeoutCb() intercept[TimeoutException] { breaker().withSyncCircuitBreaker { Thread.sleep(200.millis.dilated.toMillis) @@ -379,7 +373,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "increment failure count on callTimeout before call finishes" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() + val breaker = shortCallTimeoutCb() Future { breaker().withSyncCircuitBreaker { Thread.sleep(1.second.dilated.toMillis) @@ -391,43 +385,37 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "invoke onCallSuccess if call succeeds" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() breaker().withSyncCircuitBreaker(sayHi) checkLatch(breaker.callSuccessLatch) - verify(breaker.callSuccessConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true) + val success = breaker.probe.expectMsgType[CBSuccess] + success.value should (be > Duration.Zero and be < shortCallTimeout) } "invoke onCallTimeout if call timeouts" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() intercept[TimeoutException](breaker().withSyncCircuitBreaker(Thread.sleep(250.millis.dilated.toMillis))) checkLatch(breaker.callTimeoutLatch) - verify(breaker.callTimeoutConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).toNanos should ===(true) + val timeout = breaker.probe.expectMsgType[CBTimeout] + timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2)) } "invoke onCallFailure if call fails" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() intercept[TestException](breaker().withSyncCircuitBreaker(throwException)) checkLatch(breaker.callFailureLatch) - verify(breaker.callFailureConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true) + val failure = breaker.probe.expectMsgType[CBFailure] + failure.value should (be > Duration.Zero and be < shortCallTimeout) } "invoke onOpen if call fails and breaker transits to open state" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() + val breaker = shortCallTimeoutCb() intercept[TestException](breaker().withSyncCircuitBreaker(throwException)) checkLatch(breaker.openLatch) @@ -436,7 +424,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "An asynchronous circuit breaker that is open" must { "throw exceptions when called before reset timeout" in { - val breaker = CircuitBreakerSpec.longResetTimeoutCb() + val breaker = longResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.openLatch) @@ -445,13 +433,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "transition to half-open on reset timeout" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) } "increase the reset timeout after it transits to open again" in { - val breaker = CircuitBreakerSpec.nonOneFactorCb() + val breaker = nonOneFactorCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.openLatch) @@ -469,19 +457,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } val longRemainingDuration = e2.remainingDuration - (shortRemainingDuration < longRemainingDuration) should ===(true) - + shortRemainingDuration should be < longRemainingDuration } "invoke onHalfOpen during transition to half-open state" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) } checkLatch(breaker.halfOpenLatch) } "invoke onCallBreakerOpen when called before reset timeout" in { - val breaker = CircuitBreakerSpec.longResetTimeoutCb() + val breaker = longResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.openLatch) @@ -491,21 +478,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "invoke onCallFailure when call results in exception" in { - val breaker = CircuitBreakerSpec.longResetTimeoutCb() - val captor = timeCaptor + val breaker = longResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.callFailureLatch) - verify(breaker.callFailureConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.longResetTimeout.toNanos should ===(true) + val failure = breaker.probe.expectMsgType[CBFailure] + failure.value should (be > Duration.Zero and be < longResetTimeout) } } "An asynchronous circuit breaker that is half-open" must { "pass through next call and close on success" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi") @@ -514,7 +499,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "pass through next call and close on exception" when { "exception is defined as call succeeded" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) val allReturnIsSuccess: Try[String] => Boolean = _ => false @@ -524,7 +509,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "re-open on exception in call" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) breaker.openLatch.reset @@ -534,17 +519,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "re-open on even number" when { "even number is defined as failure" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) breaker.openLatch.reset - Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) + Await.result(breaker().withCircuitBreaker(Future(2), evenNumberIsFailure), awaitTimeout) checkLatch(breaker.openLatch) } } "re-open on async failure" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) @@ -554,8 +539,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "pass through next call and invoke onCallSuccess on success" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() - val captor = timeCaptor + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) @@ -563,14 +547,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().withCircuitBreaker(Future(sayHi)) checkLatch(breaker.callSuccessLatch) - verify(breaker.callSuccessConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) + breaker.probe.expectMsgType[CBFailure] + val success = breaker.probe.expectMsgType[CBSuccess] + success.value should (be > Duration.Zero and be < shortResetTimeout) } "pass through next call and invoke onCallFailure on failure" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() - val captor = timeCaptor + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -581,14 +564,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.callFailureLatch) - verify(breaker.callFailureConsumerMock, times(2))(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortResetTimeout.toNanos should ===(true) + breaker.probe.expectMsgType[CBFailure] + val failure = breaker.probe.expectMsgType[CBFailure] + failure.value should (be > Duration.Zero and be < shortResetTimeout) } "pass through next call and invoke onCallTimeout on timeout" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) @@ -596,13 +578,13 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().withCircuitBreaker(Future(Thread.sleep(200.millis.dilated.toMillis))) checkLatch(breaker.callTimeoutLatch) - verify(breaker.callTimeoutConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).dilated.toNanos should ===(true) + breaker.probe.expectMsgType[CBFailure] + val timeout = breaker.probe.expectMsgType[CBTimeout] + timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2).dilated) } "pass through next call and invoke onCallBreakerOpen while executing other" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) @@ -613,7 +595,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "pass through next call and invoke onOpen after transition to open state" in { - val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + val breaker = shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) @@ -625,12 +607,12 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "An asynchronous circuit breaker that is closed" must { "allow calls through" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi") } "increment failure count on exception" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) } checkLatch(breaker.openLatch) breaker().currentFailureCount should ===(1) @@ -638,10 +620,10 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "increment failure count on even number" when { "even number is considered failure" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() breaker().currentFailureCount should ===(0) val result = - Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) + Await.result(breaker().withCircuitBreaker(Future(2), evenNumberIsFailure), awaitTimeout) checkLatch(breaker.openLatch) breaker().currentFailureCount should ===(1) result should ===(2) @@ -649,14 +631,14 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "increment failure count on async failure" in { - val breaker = CircuitBreakerSpec.longCallTimeoutCb() + val breaker = longCallTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.openLatch) breaker().currentFailureCount should ===(1) } "reset failure count after success" in { - val breaker = CircuitBreakerSpec.multiFailureCb() + val breaker = multiFailureCb() breaker().withCircuitBreaker(Future(sayHi)) for (_ <- 1 to 4) breaker().withCircuitBreaker(Future(throwException)) awaitCond(breaker().currentFailureCount == 4, awaitTimeout) @@ -666,7 +648,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar "reset failure count after exception in call" when { "exception is defined as Success" in { - val breaker: CircuitBreakerSpec.Breaker = CircuitBreakerSpec.multiFailureCb() + val breaker: Breaker = multiFailureCb() for (_ <- 1 to 4) breaker().withCircuitBreaker(Future(throwException)) awaitCond( @@ -686,7 +668,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "increment failure count on callTimeout" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() + val breaker = shortCallTimeoutCb() val fut = breaker().withCircuitBreaker(Future { Thread.sleep(150.millis.dilated.toMillis) @@ -703,43 +685,37 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar } "invoke onCallSuccess if call succeeds" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() breaker().withCircuitBreaker(Future(sayHi)) checkLatch(breaker.callSuccessLatch) - verify(breaker.callSuccessConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true) + val success = breaker.probe.expectMsgType[CBSuccess] + success.value should (be > Duration.Zero and be < shortCallTimeout) } "invoke onCallTimeout if call timeouts" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() breaker().withCircuitBreaker(Future(Thread.sleep(250.millis.dilated.toMillis))) checkLatch(breaker.callTimeoutLatch) - verify(breaker.callTimeoutConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < (CircuitBreakerSpec.shortCallTimeout * 2).toNanos should ===(true) + val timeout = breaker.probe.expectMsgType[CBTimeout] + timeout.value should (be > Duration.Zero and be < (shortCallTimeout * 2).dilated) } "invoke onCallFailure if call fails" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - val captor = timeCaptor + val breaker = shortCallTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.callFailureLatch) - verify(breaker.callFailureConsumerMock)(captor.capture()) - captor.getValue > 0 should ===(true) - captor.getValue < CircuitBreakerSpec.shortCallTimeout.toNanos should ===(true) + val failure = breaker.probe.expectMsgType[CBFailure] + failure.value should (be > Duration.Zero and be < shortCallTimeout) } "invoke onOpen if call fails and breaker transits to open state" in { - val breaker = CircuitBreakerSpec.shortCallTimeoutCb() + val breaker = shortCallTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.openLatch) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d40534a139..4eb8bd884e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -175,7 +175,6 @@ object Dependencies { Test.scalatest.value, Test.commonsCodec, Test.commonsMath, - Test.mockito, Test.scalacheck.value, Test.jimfs, Test.dockerClient, @@ -202,7 +201,8 @@ object Dependencies { Provided.levelDBNative, Test.junit, Test.scalatest.value, - Test.commonsIo) + Test.commonsIo, + Test.mockito) val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)