diff --git a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java index af60657a2b..a36a320856 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java @@ -13,9 +13,11 @@ import scala.compat.java8.FutureConverters; import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static org.junit.Assert.assertEquals; @@ -39,4 +41,19 @@ public class CircuitBreakerTest extends JUnitSuite { assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); } + @Test + public void useCircuitBreakerWithCompletableFutureAndCustomDefineFailure() throws Exception { + final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS); + final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS); + final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis); + + final BiFunction, Optional, java.lang.Boolean> fn = + (result, err) -> (result.isPresent() && result.get().equals("hello")); + + final CompletableFuture f = new CompletableFuture<>(); + f.complete("hello"); + final CompletionStage res = breaker.callWithCircuitBreakerCS(() -> f, fn); + assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); + assertEquals(1, breaker.currentFailureCount()); + } } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 30e39d2605..a299cf59bd 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -5,13 +5,14 @@ package akka.pattern import akka.actor.ActorSystem +import language.postfixOps +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException } +import scala.util.{ Try, Success, Failure } import akka.testkit._ import org.mockito.ArgumentCaptor import org.scalatest.BeforeAndAfter import org.scalatest.mockito.MockitoSugar -import scala.concurrent.duration._ -import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException } -import scala.language.postfixOps import org.mockito.Mockito._ object CircuitBreakerSpec { @@ -71,6 +72,11 @@ object CircuitBreakerSpec { def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5)) + + val evenNumberIsFailure: Try[Int] ⇒ Boolean = { + case Success(i) ⇒ i % 2 == 0 + case _ ⇒ true + } } class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar { @@ -159,6 +165,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.closedLatch) } + "pass through next call and close on exception" when { + "exception is defined as call succeeded" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + + val allReturnIsSuccess: Try[String] ⇒ Boolean = _ ⇒ false + + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException, allReturnIsSuccess) } + checkLatch(breaker.closedLatch) + } + } + "open on exception in call" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } @@ -168,6 +187,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) } + "open on even number" when { + "even number is defined as failure" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + breaker.openLatch.reset + breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) + checkLatch(breaker.openLatch) + } + } + "open on calling fail method" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } @@ -270,6 +300,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().currentFailureCount should ===(1) } + "increment failure count on even number" when { + "even number is considered failure" in { + val breaker = CircuitBreakerSpec.longCallTimeoutCb() + breaker().currentFailureCount should ===(0) + val result = breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) + checkLatch(breaker.openLatch) + + breaker().currentFailureCount should ===(1) + result should ===(2) + } + } + "increment failure count on fail method" in { val breaker = CircuitBreakerSpec.longCallTimeoutCb() breaker().currentFailureCount should ===(0) @@ -290,6 +332,30 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().currentFailureCount should ===(0) } + "reset failure count after exception in call" when { + "exception is defined as Success" in { + val breaker = CircuitBreakerSpec.multiFailureCb() + breaker().currentFailureCount should ===(0) + intercept[TestException] { + val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread + breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" }) + } + breaker().currentFailureCount should ===(1) + + val harmlessException = new TestException + val harmlessExceptionAsSuccess: Try[String] ⇒ Boolean = { + case Success(_) ⇒ false + case Failure(ex) ⇒ ex != harmlessException + } + + intercept[TestException] { + breaker().withSyncCircuitBreaker(throw harmlessException, harmlessExceptionAsSuccess) + } + + breaker().currentFailureCount should ===(0) + } + } + "reset failure count after success method" in { val breaker = CircuitBreakerSpec.multiFailureCb() breaker().currentFailureCount should ===(0) @@ -446,6 +512,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.closedLatch) } + "pass through next call and close on exception" when { + "exception is defined as call succeeded" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + breaker().withCircuitBreaker(Future(throwException)) + checkLatch(breaker.halfOpenLatch) + val allReturnIsSuccess: Try[String] ⇒ Boolean = _ ⇒ false + Await.ready(breaker().withCircuitBreaker(Future(throwException), allReturnIsSuccess), awaitTimeout) + checkLatch(breaker.closedLatch) + } + } + "re-open on exception in call" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -455,6 +532,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) } + "re-open on even number" when { + "even number is defined as failure" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + breaker.openLatch.reset + Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) + checkLatch(breaker.openLatch) + } + } + "re-open on async failure" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -549,6 +637,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().currentFailureCount should ===(1) } + "increment failure count on even number" when { + "even number is considered failure" in { + val breaker = CircuitBreakerSpec.longCallTimeoutCb() + breaker().currentFailureCount should ===(0) + val result = Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) + checkLatch(breaker.openLatch) + breaker().currentFailureCount should ===(1) + result should ===(2) + } + } + "increment failure count on async failure" in { val breaker = CircuitBreakerSpec.longCallTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -565,6 +664,24 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar awaitCond(breaker().currentFailureCount == 0, awaitTimeout) } + "reset failure count after exception in call" when { + "exception is defined as Success" in { + val breaker = CircuitBreakerSpec.multiFailureCb() + breaker().withCircuitBreaker(Future(sayHi)) + for (n ← 1 to 4) breaker().withCircuitBreaker(Future(throwException)) + awaitCond(breaker().currentFailureCount == 4, awaitTimeout) + + val harmlessException = new TestException + val harmlessExceptionAsSuccess: Try[String] ⇒ Boolean = { + case Success(_) ⇒ false + case Failure(ex) ⇒ ex != harmlessException + } + + breaker().withCircuitBreaker(Future(throw harmlessException), harmlessExceptionAsSuccess) + awaitCond(breaker().currentFailureCount == 0, awaitTimeout) + } + } + "increment failure count on callTimeout" in { val breaker = CircuitBreakerSpec.shortCallTimeoutCb() @@ -625,5 +742,4 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) } } - } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index f1137e66ce..7eea2ef2fd 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -3,6 +3,7 @@ */ package akka.pattern +import java.util.Optional import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.Consumer @@ -12,12 +13,13 @@ import akka.util.Unsafe import scala.util.control.NoStackTrace import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList } +import java.util.function.BiFunction import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ import scala.concurrent.TimeoutException import scala.util.control.NonFatal -import scala.util.{ Failure, Success } +import scala.util.{ Failure, Success, Try } import akka.dispatch.ExecutionContexts.sameThreadExecutionContext import scala.compat.java8.FutureConverters @@ -41,6 +43,7 @@ object CircuitBreaker { */ def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext) + /** * Java API: Create a new CircuitBreaker. * @@ -55,6 +58,29 @@ object CircuitBreaker { */ def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = apply(scheduler, maxFailures, callTimeout, resetTimeout) + + private val exceptionAsFailure: Try[_] ⇒ Boolean = { + case _: Success[_] ⇒ false + case _ ⇒ true + } + + private def exceptionAsFailureJava[T]: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] = + new BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] { + override def apply(t: Optional[T], err: Optional[Throwable]) = { + if (err.isPresent) + true + else + false + } + } + + protected def convertJavaFailureFnToScala[T](javaFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Try[T] ⇒ Boolean = { + val failureFnInScala: Try[T] ⇒ Boolean = { + case Success(t) ⇒ javaFn(Optional.of(t), Optional.empty()) + case Failure(err) ⇒ javaFn(Optional.empty(), Optional.of(err)) + } + failureFnInScala + } } /** @@ -151,6 +177,17 @@ class CircuitBreaker( private[this] def currentResetTimeout: FiniteDuration = Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration] + /** + * Wraps invocations of asynchronous calls that need to be protected + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return [[scala.concurrent.Future]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def withCircuitBreaker[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = + currentState.invoke(body, defineFailureFn) + /** * Wraps invocations of asynchronous calls that need to be protected * @@ -159,7 +196,7 @@ class CircuitBreaker( * `scala.concurrent.TimeoutException` if the call timed out * */ - def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = currentState.invoke(body) + def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = currentState.invoke(body, CircuitBreaker.exceptionAsFailure) /** * Java API for [[#withCircuitBreaker]] @@ -168,7 +205,22 @@ class CircuitBreaker( * @return [[scala.concurrent.Future]] containing the call result or a * `scala.concurrent.TimeoutException` if the call timed out */ - def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) + def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = + callWithCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) + + /** + * Java API for [[#withCircuitBreaker]] + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return [[scala.concurrent.Future]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def callWithCircuitBreaker[T](body: Callable[Future[T]], defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Future[T] = { + val failureFnInScala = CircuitBreaker.convertJavaFailureFnToScala(defineFailureFn) + + withCircuitBreaker(body.call, failureFnInScala) + } /** * Java API (8) for [[#withCircuitBreaker]] @@ -178,9 +230,22 @@ class CircuitBreaker( * `scala.concurrent.TimeoutException` if the call timed out */ def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] = + callWithCircuitBreakerCS(body, CircuitBreaker.exceptionAsFailureJava) + + /** + * Java API (8) for [[#withCircuitBreaker]] + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return [[java.util.concurrent.CompletionStage]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def callWithCircuitBreakerCS[T]( + body: Callable[CompletionStage[T]], + defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): CompletionStage[T] = FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] { override def call(): Future[T] = FutureConverters.toScala(body.call()) - })) + }, defineFailureFn)) /** * Wraps invocations of synchronous calls that need to be protected @@ -195,8 +260,26 @@ class CircuitBreaker( * @return The result of the call */ def withSyncCircuitBreaker[T](body: ⇒ T): T = + withSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailure) + + /** + * Wraps invocations of synchronous calls that need to be protected + * + * Calls are run in caller's thread. Because of the synchronous nature of + * this call the `scala.concurrent.TimeoutException` will only be thrown + * after the body has completed. + * + * Throws java.util.concurrent.TimeoutException if the call timed out. + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return The result of the call + */ + def withSyncCircuitBreaker[T](body: ⇒ T, defineFailureFn: Try[T] ⇒ Boolean): T = Await.result( - withCircuitBreaker(try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }), + withCircuitBreaker( + try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }, + defineFailureFn), callTimeout) /** @@ -205,7 +288,20 @@ class CircuitBreaker( * @param body Call needing protected * @return The result of the call */ - def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) + def callWithSyncCircuitBreaker[T](body: Callable[T]): T = + callWithSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) + + /** + * Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return The result of the call + */ + def callWithSyncCircuitBreaker[T](body: Callable[T], defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): T = { + val failureFnInScala = CircuitBreaker.convertJavaFailureFnToScala(defineFailureFn) + withSyncCircuitBreaker(body.call, failureFnInScala) + } /** * Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the @@ -581,9 +677,10 @@ class CircuitBreaker( * call timeout is counted as a failed call, otherwise a successful call * * @param body Implementation of the call + * @param defineFailureFn function that define what should be consider failure and thus increase failure count * @return Future containing the result of the call */ - def callThrough[T](body: ⇒ Future[T]): Future[T] = { + def callThrough[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = { def materialize[U](value: ⇒ Future[U]): Future[U] = try value catch { case NonFatal(t) ⇒ Future.failed(t) } @@ -607,12 +704,13 @@ class CircuitBreaker( implicit val ec = sameThreadExecutionContext - p.future.onComplete { - case s: Success[_] ⇒ + p.future.onComplete { fResult ⇒ + if (defineFailureFn(fResult)) { + callFails() + } else { notifyCallSuccessListeners(start) callSucceeds() - case _ ⇒ - callFails() + } } val timeout = scheduler.scheduleOnce(callTimeout) { @@ -635,13 +733,31 @@ class CircuitBreaker( } } + /** + * Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed + * call timeout is counted as a failed call, otherwise a successful call + * + * @param body Implementation of the call + * @return Future containing the result of the call + */ + def callThrough[T](body: ⇒ Future[T]): Future[T] = callThrough(body, CircuitBreaker.exceptionAsFailure) + + /** + * Abstract entry point for all states + * + * @param body Implementation of the call that needs protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return Future containing result of protected call + */ + def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] + /** * Abstract entry point for all states * * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - def invoke[T](body: ⇒ Future[T]): Future[T] + def invoke[T](body: ⇒ Future[T]): Future[T] = invoke(body, CircuitBreaker.exceptionAsFailure) /** * Invoked when call succeeds @@ -683,7 +799,8 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = callThrough(body) + override def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = + callThrough(body, defineFailureFn) /** * On successful call, the failure count is reset to 0 @@ -730,8 +847,9 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = - if (compareAndSet(true, false)) callThrough(body) + override def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = + if (compareAndSet(true, false)) + callThrough(body, defineFailureFn) else { notifyCallBreakerOpenListeners() Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future @@ -777,7 +895,7 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = { + override def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = { notifyCallBreakerOpenListeners() Promise.failed[T](new CircuitBreakerOpenException(remainingDuration())).future } diff --git a/project/MiMa.scala b/project/MiMa.scala index 17270dbbaa..c8ba68f0f7 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -115,8 +115,12 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.updatingStateTimeout"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForStateTimeout"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"), - - // #21423 Remove deprecated metrics + + // #22295 Improve Circuit breaker + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.CircuitBreaker#State.callThrough"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.CircuitBreaker#State.invoke"), + + // #21423 Remove deprecated metrics ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.clusterMetrics"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$MetricsTick$"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsCollector"),