From c879c036795feb55bfc7aaf6463f3bc7a93938d1 Mon Sep 17 00:00:00 2001 From: Qingwei Date: Mon, 13 Feb 2017 07:27:42 -0500 Subject: [PATCH] 22295 Allow user define what is failure in terms on circuit breaker Revert all unnecessary style changes Add api for java Make exceptionsAsFailure function private Exclude changed method api for binary compatibility check - The changes expand the api by overload, thus is backward compatible --- .../java/akka/pattern/CircuitBreakerTest.java | 17 ++ .../akka/pattern/CircuitBreakerSpec.scala | 124 ++++++++++++++- .../scala/akka/pattern/CircuitBreaker.scala | 150 ++++++++++++++++-- project/MiMa.scala | 8 +- 4 files changed, 277 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java index af60657a2b..a36a320856 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java @@ -13,9 +13,11 @@ import scala.compat.java8.FutureConverters; import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static org.junit.Assert.assertEquals; @@ -39,4 +41,19 @@ public class CircuitBreakerTest extends JUnitSuite { assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); } + @Test + public void useCircuitBreakerWithCompletableFutureAndCustomDefineFailure() throws Exception { + final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS); + final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS); + final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis); + + final BiFunction, Optional, java.lang.Boolean> fn = + (result, err) -> (result.isPresent() && result.get().equals("hello")); + + final CompletableFuture f = new CompletableFuture<>(); + f.complete("hello"); + final CompletionStage res = breaker.callWithCircuitBreakerCS(() -> f, fn); + assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); + assertEquals(1, breaker.currentFailureCount()); + } } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 30e39d2605..a299cf59bd 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -5,13 +5,14 @@ package akka.pattern import akka.actor.ActorSystem +import language.postfixOps +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException } +import scala.util.{ Try, Success, Failure } import akka.testkit._ import org.mockito.ArgumentCaptor import org.scalatest.BeforeAndAfter import org.scalatest.mockito.MockitoSugar -import scala.concurrent.duration._ -import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException } -import scala.language.postfixOps import org.mockito.Mockito._ object CircuitBreakerSpec { @@ -71,6 +72,11 @@ object CircuitBreakerSpec { def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5)) + + val evenNumberIsFailure: Try[Int] ⇒ Boolean = { + case Success(i) ⇒ i % 2 == 0 + case _ ⇒ true + } } class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar { @@ -159,6 +165,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.closedLatch) } + "pass through next call and close on exception" when { + "exception is defined as call succeeded" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + + val allReturnIsSuccess: Try[String] ⇒ Boolean = _ ⇒ false + + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException, allReturnIsSuccess) } + checkLatch(breaker.closedLatch) + } + } + "open on exception in call" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } @@ -168,6 +187,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) } + "open on even number" when { + "even number is defined as failure" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + breaker.openLatch.reset + breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) + checkLatch(breaker.openLatch) + } + } + "open on calling fail method" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } @@ -270,6 +300,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().currentFailureCount should ===(1) } + "increment failure count on even number" when { + "even number is considered failure" in { + val breaker = CircuitBreakerSpec.longCallTimeoutCb() + breaker().currentFailureCount should ===(0) + val result = breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure) + checkLatch(breaker.openLatch) + + breaker().currentFailureCount should ===(1) + result should ===(2) + } + } + "increment failure count on fail method" in { val breaker = CircuitBreakerSpec.longCallTimeoutCb() breaker().currentFailureCount should ===(0) @@ -290,6 +332,30 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().currentFailureCount should ===(0) } + "reset failure count after exception in call" when { + "exception is defined as Success" in { + val breaker = CircuitBreakerSpec.multiFailureCb() + breaker().currentFailureCount should ===(0) + intercept[TestException] { + val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread + breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" }) + } + breaker().currentFailureCount should ===(1) + + val harmlessException = new TestException + val harmlessExceptionAsSuccess: Try[String] ⇒ Boolean = { + case Success(_) ⇒ false + case Failure(ex) ⇒ ex != harmlessException + } + + intercept[TestException] { + breaker().withSyncCircuitBreaker(throw harmlessException, harmlessExceptionAsSuccess) + } + + breaker().currentFailureCount should ===(0) + } + } + "reset failure count after success method" in { val breaker = CircuitBreakerSpec.multiFailureCb() breaker().currentFailureCount should ===(0) @@ -446,6 +512,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.closedLatch) } + "pass through next call and close on exception" when { + "exception is defined as call succeeded" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + breaker().withCircuitBreaker(Future(throwException)) + checkLatch(breaker.halfOpenLatch) + val allReturnIsSuccess: Try[String] ⇒ Boolean = _ ⇒ false + Await.ready(breaker().withCircuitBreaker(Future(throwException), allReturnIsSuccess), awaitTimeout) + checkLatch(breaker.closedLatch) + } + } + "re-open on exception in call" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -455,6 +532,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) } + "re-open on even number" when { + "even number is defined as failure" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + breaker.openLatch.reset + Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) + checkLatch(breaker.openLatch) + } + } + "re-open on async failure" in { val breaker = CircuitBreakerSpec.shortResetTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -549,6 +637,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar breaker().currentFailureCount should ===(1) } + "increment failure count on even number" when { + "even number is considered failure" in { + val breaker = CircuitBreakerSpec.longCallTimeoutCb() + breaker().currentFailureCount should ===(0) + val result = Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout) + checkLatch(breaker.openLatch) + breaker().currentFailureCount should ===(1) + result should ===(2) + } + } + "increment failure count on async failure" in { val breaker = CircuitBreakerSpec.longCallTimeoutCb() breaker().withCircuitBreaker(Future(throwException)) @@ -565,6 +664,24 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar awaitCond(breaker().currentFailureCount == 0, awaitTimeout) } + "reset failure count after exception in call" when { + "exception is defined as Success" in { + val breaker = CircuitBreakerSpec.multiFailureCb() + breaker().withCircuitBreaker(Future(sayHi)) + for (n ← 1 to 4) breaker().withCircuitBreaker(Future(throwException)) + awaitCond(breaker().currentFailureCount == 4, awaitTimeout) + + val harmlessException = new TestException + val harmlessExceptionAsSuccess: Try[String] ⇒ Boolean = { + case Success(_) ⇒ false + case Failure(ex) ⇒ ex != harmlessException + } + + breaker().withCircuitBreaker(Future(throw harmlessException), harmlessExceptionAsSuccess) + awaitCond(breaker().currentFailureCount == 0, awaitTimeout) + } + } + "increment failure count on callTimeout" in { val breaker = CircuitBreakerSpec.shortCallTimeoutCb() @@ -625,5 +742,4 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar checkLatch(breaker.openLatch) } } - } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index f1137e66ce..7eea2ef2fd 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -3,6 +3,7 @@ */ package akka.pattern +import java.util.Optional import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.Consumer @@ -12,12 +13,13 @@ import akka.util.Unsafe import scala.util.control.NoStackTrace import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList } +import java.util.function.BiFunction import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ import scala.concurrent.TimeoutException import scala.util.control.NonFatal -import scala.util.{ Failure, Success } +import scala.util.{ Failure, Success, Try } import akka.dispatch.ExecutionContexts.sameThreadExecutionContext import scala.compat.java8.FutureConverters @@ -41,6 +43,7 @@ object CircuitBreaker { */ def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext) + /** * Java API: Create a new CircuitBreaker. * @@ -55,6 +58,29 @@ object CircuitBreaker { */ def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = apply(scheduler, maxFailures, callTimeout, resetTimeout) + + private val exceptionAsFailure: Try[_] ⇒ Boolean = { + case _: Success[_] ⇒ false + case _ ⇒ true + } + + private def exceptionAsFailureJava[T]: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] = + new BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] { + override def apply(t: Optional[T], err: Optional[Throwable]) = { + if (err.isPresent) + true + else + false + } + } + + protected def convertJavaFailureFnToScala[T](javaFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Try[T] ⇒ Boolean = { + val failureFnInScala: Try[T] ⇒ Boolean = { + case Success(t) ⇒ javaFn(Optional.of(t), Optional.empty()) + case Failure(err) ⇒ javaFn(Optional.empty(), Optional.of(err)) + } + failureFnInScala + } } /** @@ -151,6 +177,17 @@ class CircuitBreaker( private[this] def currentResetTimeout: FiniteDuration = Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration] + /** + * Wraps invocations of asynchronous calls that need to be protected + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return [[scala.concurrent.Future]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def withCircuitBreaker[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = + currentState.invoke(body, defineFailureFn) + /** * Wraps invocations of asynchronous calls that need to be protected * @@ -159,7 +196,7 @@ class CircuitBreaker( * `scala.concurrent.TimeoutException` if the call timed out * */ - def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = currentState.invoke(body) + def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = currentState.invoke(body, CircuitBreaker.exceptionAsFailure) /** * Java API for [[#withCircuitBreaker]] @@ -168,7 +205,22 @@ class CircuitBreaker( * @return [[scala.concurrent.Future]] containing the call result or a * `scala.concurrent.TimeoutException` if the call timed out */ - def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) + def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = + callWithCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) + + /** + * Java API for [[#withCircuitBreaker]] + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return [[scala.concurrent.Future]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def callWithCircuitBreaker[T](body: Callable[Future[T]], defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Future[T] = { + val failureFnInScala = CircuitBreaker.convertJavaFailureFnToScala(defineFailureFn) + + withCircuitBreaker(body.call, failureFnInScala) + } /** * Java API (8) for [[#withCircuitBreaker]] @@ -178,9 +230,22 @@ class CircuitBreaker( * `scala.concurrent.TimeoutException` if the call timed out */ def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] = + callWithCircuitBreakerCS(body, CircuitBreaker.exceptionAsFailureJava) + + /** + * Java API (8) for [[#withCircuitBreaker]] + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return [[java.util.concurrent.CompletionStage]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def callWithCircuitBreakerCS[T]( + body: Callable[CompletionStage[T]], + defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): CompletionStage[T] = FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] { override def call(): Future[T] = FutureConverters.toScala(body.call()) - })) + }, defineFailureFn)) /** * Wraps invocations of synchronous calls that need to be protected @@ -195,8 +260,26 @@ class CircuitBreaker( * @return The result of the call */ def withSyncCircuitBreaker[T](body: ⇒ T): T = + withSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailure) + + /** + * Wraps invocations of synchronous calls that need to be protected + * + * Calls are run in caller's thread. Because of the synchronous nature of + * this call the `scala.concurrent.TimeoutException` will only be thrown + * after the body has completed. + * + * Throws java.util.concurrent.TimeoutException if the call timed out. + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return The result of the call + */ + def withSyncCircuitBreaker[T](body: ⇒ T, defineFailureFn: Try[T] ⇒ Boolean): T = Await.result( - withCircuitBreaker(try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }), + withCircuitBreaker( + try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }, + defineFailureFn), callTimeout) /** @@ -205,7 +288,20 @@ class CircuitBreaker( * @param body Call needing protected * @return The result of the call */ - def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) + def callWithSyncCircuitBreaker[T](body: Callable[T]): T = + callWithSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) + + /** + * Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. + * + * @param body Call needing protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return The result of the call + */ + def callWithSyncCircuitBreaker[T](body: Callable[T], defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): T = { + val failureFnInScala = CircuitBreaker.convertJavaFailureFnToScala(defineFailureFn) + withSyncCircuitBreaker(body.call, failureFnInScala) + } /** * Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the @@ -581,9 +677,10 @@ class CircuitBreaker( * call timeout is counted as a failed call, otherwise a successful call * * @param body Implementation of the call + * @param defineFailureFn function that define what should be consider failure and thus increase failure count * @return Future containing the result of the call */ - def callThrough[T](body: ⇒ Future[T]): Future[T] = { + def callThrough[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = { def materialize[U](value: ⇒ Future[U]): Future[U] = try value catch { case NonFatal(t) ⇒ Future.failed(t) } @@ -607,12 +704,13 @@ class CircuitBreaker( implicit val ec = sameThreadExecutionContext - p.future.onComplete { - case s: Success[_] ⇒ + p.future.onComplete { fResult ⇒ + if (defineFailureFn(fResult)) { + callFails() + } else { notifyCallSuccessListeners(start) callSucceeds() - case _ ⇒ - callFails() + } } val timeout = scheduler.scheduleOnce(callTimeout) { @@ -635,13 +733,31 @@ class CircuitBreaker( } } + /** + * Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed + * call timeout is counted as a failed call, otherwise a successful call + * + * @param body Implementation of the call + * @return Future containing the result of the call + */ + def callThrough[T](body: ⇒ Future[T]): Future[T] = callThrough(body, CircuitBreaker.exceptionAsFailure) + + /** + * Abstract entry point for all states + * + * @param body Implementation of the call that needs protected + * @param defineFailureFn function that define what should be consider failure and thus increase failure count + * @return Future containing result of protected call + */ + def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] + /** * Abstract entry point for all states * * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - def invoke[T](body: ⇒ Future[T]): Future[T] + def invoke[T](body: ⇒ Future[T]): Future[T] = invoke(body, CircuitBreaker.exceptionAsFailure) /** * Invoked when call succeeds @@ -683,7 +799,8 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = callThrough(body) + override def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = + callThrough(body, defineFailureFn) /** * On successful call, the failure count is reset to 0 @@ -730,8 +847,9 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = - if (compareAndSet(true, false)) callThrough(body) + override def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = + if (compareAndSet(true, false)) + callThrough(body, defineFailureFn) else { notifyCallBreakerOpenListeners() Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future @@ -777,7 +895,7 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = { + override def invoke[T](body: ⇒ Future[T], defineFailureFn: Try[T] ⇒ Boolean): Future[T] = { notifyCallBreakerOpenListeners() Promise.failed[T](new CircuitBreakerOpenException(remainingDuration())).future } diff --git a/project/MiMa.scala b/project/MiMa.scala index 17270dbbaa..c8ba68f0f7 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -115,8 +115,12 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.updatingStateTimeout"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForStateTimeout"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"), - - // #21423 Remove deprecated metrics + + // #22295 Improve Circuit breaker + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.CircuitBreaker#State.callThrough"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.CircuitBreaker#State.invoke"), + + // #21423 Remove deprecated metrics ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.clusterMetrics"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$MetricsTick$"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsCollector"),