22295 Allow user define what is failure in terms on circuit breaker

Revert all unnecessary style changes

Add api for java

Make exceptionsAsFailure function private

Exclude changed method api for binary compatibility check

- The changes expand the api by overload, thus is backward compatible
This commit is contained in:
Qingwei 2017-02-13 07:27:42 -05:00 committed by Patrik Nordwall
parent f60710b58f
commit c879c03679
4 changed files with 277 additions and 22 deletions

View file

@ -13,9 +13,11 @@ import scala.compat.java8.FutureConverters;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -39,4 +41,19 @@ public class CircuitBreakerTest extends JUnitSuite {
assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds));
} }
@Test
public void useCircuitBreakerWithCompletableFutureAndCustomDefineFailure() throws Exception {
final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS);
final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS);
final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis);
final BiFunction<Optional<String>, Optional<Throwable>, java.lang.Boolean> fn =
(result, err) -> (result.isPresent() && result.get().equals("hello"));
final CompletableFuture<String> f = new CompletableFuture<>();
f.complete("hello");
final CompletionStage<String> res = breaker.callWithCircuitBreakerCS(() -> f, fn);
assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds));
assertEquals(1, breaker.currentFailureCount());
}
} }

View file

@ -5,13 +5,14 @@
package akka.pattern package akka.pattern
import akka.actor.ActorSystem import akka.actor.ActorSystem
import language.postfixOps
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException }
import scala.util.{ Try, Success, Failure }
import akka.testkit._ import akka.testkit._
import org.mockito.ArgumentCaptor import org.mockito.ArgumentCaptor
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.scalatest.mockito.MockitoSugar import org.scalatest.mockito.MockitoSugar
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException }
import scala.language.postfixOps
import org.mockito.Mockito._ import org.mockito.Mockito._
object CircuitBreakerSpec { object CircuitBreakerSpec {
@ -71,6 +72,11 @@ object CircuitBreakerSpec {
def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5)) new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5))
val evenNumberIsFailure: Try[Int] Boolean = {
case Success(i) i % 2 == 0
case _ true
}
} }
class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar { class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar {
@ -159,6 +165,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.closedLatch) checkLatch(breaker.closedLatch)
} }
"pass through next call and close on exception" when {
"exception is defined as call succeeded" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
val allReturnIsSuccess: Try[String] Boolean = _ false
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException, allReturnIsSuccess) }
checkLatch(breaker.closedLatch)
}
}
"open on exception in call" in { "open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
@ -168,6 +187,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
} }
"open on even number" when {
"even number is defined as failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset
breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure)
checkLatch(breaker.openLatch)
}
}
"open on calling fail method" in { "open on calling fail method" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
@ -270,6 +300,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().currentFailureCount should ===(1) breaker().currentFailureCount should ===(1)
} }
"increment failure count on even number" when {
"even number is considered failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should ===(0)
val result = breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure)
checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1)
result should ===(2)
}
}
"increment failure count on fail method" in { "increment failure count on fail method" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
@ -290,6 +332,30 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
} }
"reset failure count after exception in call" when {
"exception is defined as Success" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().currentFailureCount should ===(0)
intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" })
}
breaker().currentFailureCount should ===(1)
val harmlessException = new TestException
val harmlessExceptionAsSuccess: Try[String] Boolean = {
case Success(_) false
case Failure(ex) ex != harmlessException
}
intercept[TestException] {
breaker().withSyncCircuitBreaker(throw harmlessException, harmlessExceptionAsSuccess)
}
breaker().currentFailureCount should ===(0)
}
}
"reset failure count after success method" in { "reset failure count after success method" in {
val breaker = CircuitBreakerSpec.multiFailureCb() val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().currentFailureCount should ===(0) breaker().currentFailureCount should ===(0)
@ -446,6 +512,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.closedLatch) checkLatch(breaker.closedLatch)
} }
"pass through next call and close on exception" when {
"exception is defined as call succeeded" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch)
val allReturnIsSuccess: Try[String] Boolean = _ false
Await.ready(breaker().withCircuitBreaker(Future(throwException), allReturnIsSuccess), awaitTimeout)
checkLatch(breaker.closedLatch)
}
}
"re-open on exception in call" in { "re-open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
@ -455,6 +532,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
} }
"re-open on even number" when {
"even number is defined as failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset
Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout)
checkLatch(breaker.openLatch)
}
}
"re-open on async failure" in { "re-open on async failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb() val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
@ -549,6 +637,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().currentFailureCount should ===(1) breaker().currentFailureCount should ===(1)
} }
"increment failure count on even number" when {
"even number is considered failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should ===(0)
val result = Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout)
checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1)
result should ===(2)
}
}
"increment failure count on async failure" in { "increment failure count on async failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb() val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().withCircuitBreaker(Future(throwException)) breaker().withCircuitBreaker(Future(throwException))
@ -565,6 +664,24 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
awaitCond(breaker().currentFailureCount == 0, awaitTimeout) awaitCond(breaker().currentFailureCount == 0, awaitTimeout)
} }
"reset failure count after exception in call" when {
"exception is defined as Success" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().withCircuitBreaker(Future(sayHi))
for (n 1 to 4) breaker().withCircuitBreaker(Future(throwException))
awaitCond(breaker().currentFailureCount == 4, awaitTimeout)
val harmlessException = new TestException
val harmlessExceptionAsSuccess: Try[String] Boolean = {
case Success(_) false
case Failure(ex) ex != harmlessException
}
breaker().withCircuitBreaker(Future(throw harmlessException), harmlessExceptionAsSuccess)
awaitCond(breaker().currentFailureCount == 0, awaitTimeout)
}
}
"increment failure count on callTimeout" in { "increment failure count on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = CircuitBreakerSpec.shortCallTimeoutCb()
@ -625,5 +742,4 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
} }
} }
} }

View file

@ -3,6 +3,7 @@
*/ */
package akka.pattern package akka.pattern
import java.util.Optional
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
import java.util.function.Consumer import java.util.function.Consumer
@ -12,12 +13,13 @@ import akka.util.Unsafe
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList } import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList }
import java.util.function.BiFunction
import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.TimeoutException import scala.concurrent.TimeoutException
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success, Try }
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters
@ -41,6 +43,7 @@ object CircuitBreaker {
*/ */
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext) new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext)
/** /**
* Java API: Create a new CircuitBreaker. * Java API: Create a new CircuitBreaker.
* *
@ -55,6 +58,29 @@ object CircuitBreaker {
*/ */
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
apply(scheduler, maxFailures, callTimeout, resetTimeout) apply(scheduler, maxFailures, callTimeout, resetTimeout)
private val exceptionAsFailure: Try[_] Boolean = {
case _: Success[_] false
case _ true
}
private def exceptionAsFailureJava[T]: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] =
new BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] {
override def apply(t: Optional[T], err: Optional[Throwable]) = {
if (err.isPresent)
true
else
false
}
}
protected def convertJavaFailureFnToScala[T](javaFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Try[T] Boolean = {
val failureFnInScala: Try[T] Boolean = {
case Success(t) javaFn(Optional.of(t), Optional.empty())
case Failure(err) javaFn(Optional.empty(), Optional.of(err))
}
failureFnInScala
}
} }
/** /**
@ -151,6 +177,17 @@ class CircuitBreaker(
private[this] def currentResetTimeout: FiniteDuration = private[this] def currentResetTimeout: FiniteDuration =
Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration] Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration]
/**
* Wraps invocations of asynchronous calls that need to be protected
*
* @param body Call needing protected
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return [[scala.concurrent.Future]] containing the call result or a
* `scala.concurrent.TimeoutException` if the call timed out
*/
def withCircuitBreaker[T](body: Future[T], defineFailureFn: Try[T] Boolean): Future[T] =
currentState.invoke(body, defineFailureFn)
/** /**
* Wraps invocations of asynchronous calls that need to be protected * Wraps invocations of asynchronous calls that need to be protected
* *
@ -159,7 +196,7 @@ class CircuitBreaker(
* `scala.concurrent.TimeoutException` if the call timed out * `scala.concurrent.TimeoutException` if the call timed out
* *
*/ */
def withCircuitBreaker[T](body: Future[T]): Future[T] = currentState.invoke(body) def withCircuitBreaker[T](body: Future[T]): Future[T] = currentState.invoke(body, CircuitBreaker.exceptionAsFailure)
/** /**
* Java API for [[#withCircuitBreaker]] * Java API for [[#withCircuitBreaker]]
@ -168,7 +205,22 @@ class CircuitBreaker(
* @return [[scala.concurrent.Future]] containing the call result or a * @return [[scala.concurrent.Future]] containing the call result or a
* `scala.concurrent.TimeoutException` if the call timed out * `scala.concurrent.TimeoutException` if the call timed out
*/ */
def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] =
callWithCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T])
/**
* Java API for [[#withCircuitBreaker]]
*
* @param body Call needing protected
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return [[scala.concurrent.Future]] containing the call result or a
* `scala.concurrent.TimeoutException` if the call timed out
*/
def callWithCircuitBreaker[T](body: Callable[Future[T]], defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Future[T] = {
val failureFnInScala = CircuitBreaker.convertJavaFailureFnToScala(defineFailureFn)
withCircuitBreaker(body.call, failureFnInScala)
}
/** /**
* Java API (8) for [[#withCircuitBreaker]] * Java API (8) for [[#withCircuitBreaker]]
@ -178,9 +230,22 @@ class CircuitBreaker(
* `scala.concurrent.TimeoutException` if the call timed out * `scala.concurrent.TimeoutException` if the call timed out
*/ */
def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] = def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] =
callWithCircuitBreakerCS(body, CircuitBreaker.exceptionAsFailureJava)
/**
* Java API (8) for [[#withCircuitBreaker]]
*
* @param body Call needing protected
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return [[java.util.concurrent.CompletionStage]] containing the call result or a
* `scala.concurrent.TimeoutException` if the call timed out
*/
def callWithCircuitBreakerCS[T](
body: Callable[CompletionStage[T]],
defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): CompletionStage[T] =
FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] { FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] {
override def call(): Future[T] = FutureConverters.toScala(body.call()) override def call(): Future[T] = FutureConverters.toScala(body.call())
})) }, defineFailureFn))
/** /**
* Wraps invocations of synchronous calls that need to be protected * Wraps invocations of synchronous calls that need to be protected
@ -195,8 +260,26 @@ class CircuitBreaker(
* @return The result of the call * @return The result of the call
*/ */
def withSyncCircuitBreaker[T](body: T): T = def withSyncCircuitBreaker[T](body: T): T =
withSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailure)
/**
* Wraps invocations of synchronous calls that need to be protected
*
* Calls are run in caller's thread. Because of the synchronous nature of
* this call the `scala.concurrent.TimeoutException` will only be thrown
* after the body has completed.
*
* Throws java.util.concurrent.TimeoutException if the call timed out.
*
* @param body Call needing protected
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: T, defineFailureFn: Try[T] Boolean): T =
Await.result( Await.result(
withCircuitBreaker(try Future.successful(body) catch { case NonFatal(t) Future.failed(t) }), withCircuitBreaker(
try Future.successful(body) catch { case NonFatal(t) Future.failed(t) },
defineFailureFn),
callTimeout) callTimeout)
/** /**
@ -205,7 +288,20 @@ class CircuitBreaker(
* @param body Call needing protected * @param body Call needing protected
* @return The result of the call * @return The result of the call
*/ */
def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) def callWithSyncCircuitBreaker[T](body: Callable[T]): T =
callWithSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T])
/**
* Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out.
*
* @param body Call needing protected
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return The result of the call
*/
def callWithSyncCircuitBreaker[T](body: Callable[T], defineFailureFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): T = {
val failureFnInScala = CircuitBreaker.convertJavaFailureFnToScala(defineFailureFn)
withSyncCircuitBreaker(body.call, failureFnInScala)
}
/** /**
* Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the * Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the
@ -581,9 +677,10 @@ class CircuitBreaker(
* call timeout is counted as a failed call, otherwise a successful call * call timeout is counted as a failed call, otherwise a successful call
* *
* @param body Implementation of the call * @param body Implementation of the call
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return Future containing the result of the call * @return Future containing the result of the call
*/ */
def callThrough[T](body: Future[T]): Future[T] = { def callThrough[T](body: Future[T], defineFailureFn: Try[T] Boolean): Future[T] = {
def materialize[U](value: Future[U]): Future[U] = try value catch { case NonFatal(t) Future.failed(t) } def materialize[U](value: Future[U]): Future[U] = try value catch { case NonFatal(t) Future.failed(t) }
@ -607,12 +704,13 @@ class CircuitBreaker(
implicit val ec = sameThreadExecutionContext implicit val ec = sameThreadExecutionContext
p.future.onComplete { p.future.onComplete { fResult
case s: Success[_] if (defineFailureFn(fResult)) {
callFails()
} else {
notifyCallSuccessListeners(start) notifyCallSuccessListeners(start)
callSucceeds() callSucceeds()
case _ }
callFails()
} }
val timeout = scheduler.scheduleOnce(callTimeout) { val timeout = scheduler.scheduleOnce(callTimeout) {
@ -635,13 +733,31 @@ class CircuitBreaker(
} }
} }
/**
* Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
* call timeout is counted as a failed call, otherwise a successful call
*
* @param body Implementation of the call
* @return Future containing the result of the call
*/
def callThrough[T](body: Future[T]): Future[T] = callThrough(body, CircuitBreaker.exceptionAsFailure)
/**
* Abstract entry point for all states
*
* @param body Implementation of the call that needs protected
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return Future containing result of protected call
*/
def invoke[T](body: Future[T], defineFailureFn: Try[T] Boolean): Future[T]
/** /**
* Abstract entry point for all states * Abstract entry point for all states
* *
* @param body Implementation of the call that needs protected * @param body Implementation of the call that needs protected
* @return Future containing result of protected call * @return Future containing result of protected call
*/ */
def invoke[T](body: Future[T]): Future[T] def invoke[T](body: Future[T]): Future[T] = invoke(body, CircuitBreaker.exceptionAsFailure)
/** /**
* Invoked when call succeeds * Invoked when call succeeds
@ -683,7 +799,8 @@ class CircuitBreaker(
* @param body Implementation of the call that needs protected * @param body Implementation of the call that needs protected
* @return Future containing result of protected call * @return Future containing result of protected call
*/ */
override def invoke[T](body: Future[T]): Future[T] = callThrough(body) override def invoke[T](body: Future[T], defineFailureFn: Try[T] Boolean): Future[T] =
callThrough(body, defineFailureFn)
/** /**
* On successful call, the failure count is reset to 0 * On successful call, the failure count is reset to 0
@ -730,8 +847,9 @@ class CircuitBreaker(
* @param body Implementation of the call that needs protected * @param body Implementation of the call that needs protected
* @return Future containing result of protected call * @return Future containing result of protected call
*/ */
override def invoke[T](body: Future[T]): Future[T] = override def invoke[T](body: Future[T], defineFailureFn: Try[T] Boolean): Future[T] =
if (compareAndSet(true, false)) callThrough(body) if (compareAndSet(true, false))
callThrough(body, defineFailureFn)
else { else {
notifyCallBreakerOpenListeners() notifyCallBreakerOpenListeners()
Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future
@ -777,7 +895,7 @@ class CircuitBreaker(
* @param body Implementation of the call that needs protected * @param body Implementation of the call that needs protected
* @return Future containing result of protected call * @return Future containing result of protected call
*/ */
override def invoke[T](body: Future[T]): Future[T] = { override def invoke[T](body: Future[T], defineFailureFn: Try[T] Boolean): Future[T] = {
notifyCallBreakerOpenListeners() notifyCallBreakerOpenListeners()
Promise.failed[T](new CircuitBreakerOpenException(remainingDuration())).future Promise.failed[T](new CircuitBreakerOpenException(remainingDuration())).future
} }

View file

@ -115,8 +115,12 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.updatingStateTimeout"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.updatingStateTimeout"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForStateTimeout"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForStateTimeout"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"),
// #21423 Remove deprecated metrics // #22295 Improve Circuit breaker
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.CircuitBreaker#State.callThrough"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.CircuitBreaker#State.invoke"),
// #21423 Remove deprecated metrics
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.clusterMetrics"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.clusterMetrics"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$MetricsTick$"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$MetricsTick$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsCollector"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsCollector"),