Add Circuit Breakers panel concept (like in Lagom) (#30241)

Extension for looking up circuit breakers by id, telemetry trait for collecting circuit breaker metrics.
This commit is contained in:
Sergey Morgunov 2022-08-23 14:41:09 +03:00 committed by GitHub
parent 837b3c6fdb
commit 13892f7d01
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 449 additions and 33 deletions

View file

@ -14,13 +14,13 @@ import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.actor.ActorSystem
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.testkit._
object CircuitBreakerSpec {
class TestException extends RuntimeException
class AllowException extends RuntimeException
case class CBSuccess(value: FiniteDuration)
case class CBFailure(value: FiniteDuration)
case class CBTimeout(value: FiniteDuration)
@ -35,6 +35,16 @@ object CircuitBreakerSpec {
val callTimeoutLatch = new TestLatch(1)
val callBreakerOpenLatch = new TestLatch(1)
def resetAll() = {
halfOpenLatch.reset()
openLatch.reset()
closedLatch.reset()
callSuccessLatch.reset()
callFailureLatch.reset()
callTimeoutLatch.reset()
callBreakerOpenLatch.reset()
}
def apply(): CircuitBreaker = instance
instance
.onClose(closedLatch.countDown())
@ -65,9 +75,26 @@ object CircuitBreakerSpec {
case Success(i) => i % 2 == 0
case _ => true
}
val anyExceptionIsFailure: Try[Int] => Boolean = {
case Success(_) => false
case _ => true
}
}
class CircuitBreakerSpec extends AkkaSpec {
class CircuitBreakerSpec extends AkkaSpec("""
akka.circuit-breaker {
identified {
max-failures = 1
call-timeout = 100 ms
reset-timeout = 200 ms
exception-allowlist = [
"akka.pattern.CircuitBreakerSpec$AllowException"
]
}
}
""") {
import CircuitBreakerSpec._
implicit def ec: ExecutionContextExecutor = system.dispatcher
@ -92,6 +119,8 @@ class CircuitBreakerSpec extends AkkaSpec {
def throwException = throw new TestException
def throwAllowException = throw new AllowException
def sayHi = "hi"
"A synchronous circuit breaker that is open" must {
@ -719,4 +748,45 @@ class CircuitBreakerSpec extends AkkaSpec {
checkLatch(breaker.openLatch)
}
}
"An identified asynchronous circuit breaker" must {
val breaker = new Breaker(CircuitBreaker("identified")(system.asInstanceOf[ExtendedActorSystem]))
val cb = breaker()
"be closed after success result" taggedAs TimingTest in {
breaker.resetAll()
Await.result(cb.withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi")
checkLatch(breaker.callSuccessLatch)
}
"be closed after throw allowable exception" taggedAs TimingTest in {
breaker.resetAll()
intercept[AllowException] { Await.result(cb.withCircuitBreaker(Future(throwAllowException)), awaitTimeout) }
checkLatch(breaker.callSuccessLatch)
}
"be open after throw exception and half-open after reset timeout" taggedAs TimingTest in {
breaker.resetAll()
intercept[TestException] { Await.result(cb.withCircuitBreaker(Future(throwException)), awaitTimeout) }
checkLatch(breaker.openLatch)
Thread.sleep(250.millis.dilated(system).toMillis)
checkLatch(breaker.halfOpenLatch)
}
"be closed again after success result" taggedAs TimingTest in {
breaker.resetAll()
Await.result(cb.withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi")
checkLatch(breaker.callSuccessLatch)
checkLatch(breaker.closedLatch)
}
"be open after pass custom failure function and throw allowable exception" taggedAs TimingTest in {
breaker.resetAll()
intercept[AllowException] {
Await.result(cb.withCircuitBreaker(Future(throwAllowException), anyExceptionIsFailure), awaitTimeout)
}
checkLatch(breaker.openLatch)
}
}
}

View file

@ -1307,4 +1307,45 @@ akka {
#//#coordinated-shutdown-phases
}
#//#circuit-breaker-default
# Configuration for circuit breakers created with the APIs accepting an id to
# identify or look up the circuit breaker.
# Note: Circuit breakers created without ids are not affected by this configuration.
# A child configuration section with the same name as the circuit breaker identifier
# will be used, with fallback to the `akka.circuit-breaker.default` section.
circuit-breaker {
# Default configuration that is used if a configuration section
# with the circuit breaker identifier is not defined.
default {
# Number of failures before opening the circuit.
max-failures = 10
# Duration of time after which to consider a call a failure.
call-timeout = 10s
# Duration of time in open state after which to attempt to close
# the circuit, by first entering the half-open state.
reset-timeout = 15s
# The upper bound of reset-timeout
max-reset-timeout = 36500d
# Exponential backoff
# For details see https://en.wikipedia.org/wiki/Exponential_backoff
exponential-backoff = 1.0
# Additional random delay based on this factor is added to backoff
# For example 0.2 adds up to 20% delay
# In order to skip this additional delay set as 0
random-factor = 0.0
# A allowlist of fqcn of Exceptions that the CircuitBreaker
# should not consider failures. By default all exceptions are
# considered failures.
exception-allowlist = []
}
}
#//#circuit-breaker-default
}

View file

@ -5,11 +5,11 @@
package akka.pattern
import java.util.Optional
import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom }
import java.util.concurrent.{ Callable, CompletionException, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
import java.util.function.BiFunction
import java.util.function.Consumer
import scala.annotation.nowarn
import scala.compat.java8.FutureConverters
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.TimeoutException
@ -17,11 +17,10 @@ import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import scala.annotation.nowarn
import akka.AkkaException
import akka.actor.Scheduler
import akka.actor.{ ExtendedActorSystem, Scheduler }
import akka.dispatch.ExecutionContexts.parasitic
import akka.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelemetry }
import akka.util.JavaDurationConverters._
import akka.util.Unsafe
@ -49,6 +48,15 @@ object CircuitBreaker {
resetTimeout: FiniteDuration): CircuitBreaker =
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(parasitic)
/**
* Create or find a CircuitBreaker in registry.
*
* @param id Circuit Breaker identifier
* @param system [[ExtendedActorSystem]] that is storing this [[CircuitBreaker]]
*/
def apply(id: String)(implicit system: ExtendedActorSystem): CircuitBreaker =
CircuitBreakersRegistry(system).get(id)
/**
* Java API: Create a new CircuitBreaker.
*
@ -88,20 +96,14 @@ object CircuitBreaker {
resetTimeout: java.time.Duration): CircuitBreaker =
apply(scheduler, maxFailures, callTimeout.asScala, resetTimeout.asScala)
private val exceptionAsFailure: Try[_] => Boolean = {
case _: Success[_] => false
case _ => true
}
private def exceptionAsFailureJava[T]: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] =
new BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] {
override def apply(t: Optional[T], err: Optional[Throwable]) = {
if (err.isPresent)
true
else
false
}
}
/**
* Java API: Lookup a CircuitBreaker in registry.
*
* @param id Circuit Breaker identifier
* @param system [[ExtendedActorSystem]] that is storing this [[CircuitBreaker]]
*/
def lookup(id: String, system: ExtendedActorSystem): CircuitBreaker =
apply(id)(system)
protected def convertJavaFailureFnToScala[T](
javaFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Try[T] => Boolean = {
@ -143,12 +145,34 @@ class CircuitBreaker(
val resetTimeout: FiniteDuration,
maxResetTimeout: FiniteDuration,
exponentialBackoffFactor: Double,
randomFactor: Double)(implicit executor: ExecutionContext)
randomFactor: Double,
val allowExceptions: Set[String],
val telemetry: CircuitBreakerTelemetry)(implicit executor: ExecutionContext)
extends AbstractCircuitBreaker {
require(exponentialBackoffFactor >= 1.0, "exponentialBackoffFactor must be >= 1.0")
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
def this(
scheduler: Scheduler,
maxFailures: Int,
callTimeout: FiniteDuration,
resetTimeout: FiniteDuration,
maxResetTimeout: FiniteDuration,
exponentialBackoffFactor: Double,
randomFactor: Double)(implicit executor: ExecutionContext) = {
this(
scheduler,
maxFailures,
callTimeout,
resetTimeout,
maxResetTimeout,
exponentialBackoffFactor,
randomFactor,
Set.empty,
CircuitBreakerNoopTelemetry)(executor)
}
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def this(
executor: ExecutionContext,
@ -216,7 +240,16 @@ class CircuitBreaker(
* @param maxResetTimeout the upper bound of resetTimeout
*/
def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = {
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0, randomFactor)(executor)
new CircuitBreaker(
scheduler,
maxFailures,
callTimeout,
resetTimeout,
maxResetTimeout,
2.0,
randomFactor,
allowExceptions,
telemetry)(executor)
}
/**
@ -243,7 +276,9 @@ class CircuitBreaker(
resetTimeout,
maxResetTimeout,
exponentialBackoffFactor,
randomFactor)(executor)
randomFactor,
allowExceptions,
telemetry)(executor)
}
/**
@ -321,7 +356,7 @@ class CircuitBreaker(
*
*/
def withCircuitBreaker[T](body: => Future[T]): Future[T] =
currentState.invoke(body, CircuitBreaker.exceptionAsFailure)
currentState.invoke(body, failureFn)
/**
* Java API for [[#withCircuitBreaker]].
@ -331,7 +366,7 @@ class CircuitBreaker(
* `scala.concurrent.TimeoutException` if the call timed out
*/
def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] =
callWithCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T])
withCircuitBreaker(body.call)
/**
* Java API for [[#withCircuitBreaker]].
@ -357,7 +392,9 @@ class CircuitBreaker(
* `scala.concurrent.TimeoutException` if the call timed out
*/
def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] =
callWithCircuitBreakerCS(body, CircuitBreaker.exceptionAsFailureJava)
FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] {
override def call(): Future[T] = FutureConverters.toScala(body.call())
}))
/**
* Java API (8) for [[#withCircuitBreaker]].
@ -387,7 +424,7 @@ class CircuitBreaker(
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: => T): T =
withSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailure)
withSyncCircuitBreaker(body, this.failureFn)
/**
* Wraps invocations of synchronous calls that need to be protected.
@ -417,7 +454,7 @@ class CircuitBreaker(
* @return The result of the call
*/
def callWithSyncCircuitBreaker[T](body: Callable[T]): T =
callWithSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T])
withSyncCircuitBreaker(body.call)
/**
* Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out.
@ -742,6 +779,28 @@ class CircuitBreaker(
private val successListeners = new CopyOnWriteArrayList[Consumer[Long]]
if (telemetry != CircuitBreakerNoopTelemetry) {
onOpen(telemetry.onOpen())
onHalfOpen(telemetry.onHalfOpen())
onClose(telemetry.onClose())
onCallBreakerOpen(telemetry.onCallBreakerOpenFailure())
onCallTimeout(telemetry.onCallTimeoutFailure)
onCallFailure(telemetry.onCallFailure)
onCallSuccess(telemetry.onCallSuccess)
}
private def isIgnoredException(ex: Any): Boolean =
allowExceptions.nonEmpty && (ex match {
case ce: CompletionException => ce.getCause != null && allowExceptions.contains(ce.getCause.getClass.getName)
case _ => allowExceptions.contains(ex.getClass.getName)
})
private val failureFn: Try[_] => Boolean = {
case _: Success[_] => false
case Failure(t) if isIgnoredException(t) => false
case _ => true
}
/**
* Internal state abstraction
*/
@ -830,7 +889,7 @@ class CircuitBreaker(
timeout.cancel()
case Failure(ex) =>
if (p.tryFailure(ex)) {
notifyCallFailureListeners(start)
if (!isIgnoredException(ex)) notifyCallFailureListeners(start)
}
timeout.cancel()
}(parasitic)
@ -845,7 +904,7 @@ class CircuitBreaker(
* @param body Implementation of the call
* @return Future containing the result of the call
*/
def callThrough[T](body: => Future[T]): Future[T] = callThrough(body, CircuitBreaker.exceptionAsFailure)
def callThrough[T](body: => Future[T]): Future[T] = callThrough(body, failureFn)
/**
* Abstract entry point for all states
@ -862,7 +921,7 @@ class CircuitBreaker(
* @param body Implementation of the call that needs protected
* @return Future containing result of protected call
*/
def invoke[T](body: => Future[T]): Future[T] = invoke(body, CircuitBreaker.exceptionAsFailure)
def invoke[T](body: => Future[T]): Future[T] = invoke(body, failureFn)
/**
* Invoked when call succeeds

View file

@ -0,0 +1,89 @@
/*
* Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.duration.{ DurationLong, MILLISECONDS }
import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.pattern.internal.CircuitBreakerTelemetryProvider
import akka.util.ccompat.JavaConverters._
/**
* Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread
*/
object CircuitBreakersRegistry extends ExtensionId[CircuitBreakersRegistry] with ExtensionIdProvider {
/**
* Is used by Akka to instantiate the Extension identified by this ExtensionId,
* internal use only.
*/
override def createExtension(system: ExtendedActorSystem): CircuitBreakersRegistry =
new CircuitBreakersRegistry(system)
/**
* Returns the canonical ExtensionId for this Extension
*/
override def lookup: ExtensionId[_ <: Extension] = CircuitBreakersRegistry
/**
* Returns an instance of the extension identified by this ExtensionId instance.
* Java API
*/
override def get(system: ActorSystem): CircuitBreakersRegistry = super.get(system)
/**
* Returns an instance of the extension identified by this ExtensionId instance.
* Java API
*/
override def get(system: ClassicActorSystemProvider): CircuitBreakersRegistry = super.get(system)
}
/**
* A CircuitBreakersPanel is a central point collecting all circuit breakers in Akka.
*/
final class CircuitBreakersRegistry(system: ExtendedActorSystem) extends Extension {
private val breakers = new ConcurrentHashMap[String, CircuitBreaker]
private val config = system.settings.config.getConfig("akka.circuit-breaker")
private val defaultBreakerConfig = config.getConfig("default")
private def createCircuitBreaker(id: String): CircuitBreaker = {
val breakerConfig =
if (config.hasPath(id)) config.getConfig(id).withFallback(defaultBreakerConfig)
else defaultBreakerConfig
val maxFailures = breakerConfig.getInt("max-failures")
val callTimeout = breakerConfig.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = breakerConfig.getDuration("reset-timeout", MILLISECONDS).millis
val maxResetTimeout = breakerConfig.getDuration("max-reset-timeout", MILLISECONDS).millis
val exponentialBackoffFactor = breakerConfig.getDouble("exponential-backoff")
val randomFactor = breakerConfig.getDouble("random-factor")
val allowExceptions: Set[String] = breakerConfig.getStringList("exception-allowlist").asScala.toSet
val telemetry = CircuitBreakerTelemetryProvider.start(id, system)
new CircuitBreaker(
system.scheduler,
maxFailures,
callTimeout,
resetTimeout,
maxResetTimeout,
exponentialBackoffFactor,
randomFactor,
allowExceptions,
telemetry)(system.dispatcher)
}
private[akka] def get(id: String): CircuitBreaker =
breakers.computeIfAbsent(id, createCircuitBreaker)
}

View file

@ -0,0 +1,157 @@
/*
* Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern.internal
import java.util.{ List => JList }
import akka.actor.ExtendedActorSystem
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.util.ccompat.JavaConverters._
/**
* Service Provider Interface (SPI) for collecting metrics from Circuit Breaker.
*
* Implementations must include a single constructor with two arguments: Circuit Breaker id
* and [[ExtendedActorSystem]]. To setup your implementation, add a setting in your `application.conf`:
*
* {{{
* akka.circuit-breaker.telemetry.implementations += com.example.MyMetrics
* }}}
*/
@InternalStableApi
trait CircuitBreakerTelemetry {
/**
* Invoked when the circuit breaker transitions to the open state.
*/
def onOpen(): Unit
/**
* Invoked when the circuit breaker transitions to the close state.
*/
def onClose(): Unit
/**
* Invoked when the circuit breaker transitions to the half-open state after reset timeout.
*/
def onHalfOpen(): Unit
/**
* Invoked for each successful call.
*
* @param elapsedNanos the elapsed duration of the call in nanoseconds
*/
def onCallSuccess(elapsedNanos: Long): Unit
/**
* Invoked for each call when the future is completed with exception, except for
* [[scala.concurrent.TimeoutException]] and [[akka.pattern.CircuitBreakerOpenException]]
* that are handled by separate methods.
*
* @param elapsedNanos the elapsed duration of the call in nanoseconds
*/
def onCallFailure(elapsedNanos: Long): Unit
/**
* Invoked for each call when the future is completed with `java.util.concurrent.TimeoutException`
*
* @param elapsedNanos the elapsed duration of the call in nanoseconds
*/
def onCallTimeoutFailure(elapsedNanos: Long): Unit
/**
* Invoked for each call when the future is completed with
* `akka.pattern.CircuitBreakerOpenException`
*/
def onCallBreakerOpenFailure(): Unit
/**
* Called when the circuit breaker is removed, e.g. expired due to inactivity. It is also called
* if the circuit breaker is re-configured, before calling [[CircuitBreakerTelemetryProvider#start]].
*/
def stopped(): Unit
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object CircuitBreakerTelemetryProvider {
def start(breakerId: String, system: ExtendedActorSystem): CircuitBreakerTelemetry = {
val configPath = "akka.circuit-breaker.telemetry.implementations"
if (!system.settings.config.hasPath(configPath)) {
CircuitBreakerNoopTelemetry
} else {
val telemetryFqcns: JList[String] = system.settings.config.getStringList(configPath)
telemetryFqcns.size() match {
case 0 =>
CircuitBreakerNoopTelemetry
case 1 =>
val fqcn = telemetryFqcns.get(0)
create(breakerId, system, fqcn)
case _ =>
new CircuitBreakerEnsembleTelemetry(telemetryFqcns.asScala.toSeq, breakerId, system)
}
}
}
def create(breakerId: String, system: ExtendedActorSystem, fqcn: String): CircuitBreakerTelemetry = {
system.dynamicAccess
.createInstanceFor[CircuitBreakerTelemetry](
fqcn,
List(classOf[String] -> breakerId, classOf[ExtendedActorSystem] -> system))
.get
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object CircuitBreakerNoopTelemetry extends CircuitBreakerTelemetry {
override def onOpen(): Unit = ()
override def onClose(): Unit = ()
override def onHalfOpen(): Unit = ()
override def onCallSuccess(elapsedNanos: Long): Unit = ()
override def onCallFailure(elapsedNanos: Long): Unit = ()
override def onCallTimeoutFailure(elapsedNanos: Long): Unit = ()
override def onCallBreakerOpenFailure(): Unit = ()
override def stopped(): Unit = ()
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class CircuitBreakerEnsembleTelemetry(
telemetryFqcns: Seq[String],
breakerId: String,
system: ExtendedActorSystem)
extends CircuitBreakerTelemetry {
private val telemetries = telemetryFqcns.map(fqcn => CircuitBreakerTelemetryProvider.create(breakerId, system, fqcn))
override def onOpen(): Unit = telemetries.foreach(_.onOpen())
override def onClose(): Unit = telemetries.foreach(_.onClose())
override def onHalfOpen(): Unit = telemetries.foreach(_.onHalfOpen())
override def onCallSuccess(elapsedNanos: Long): Unit = telemetries.foreach(_.onCallSuccess(elapsedNanos))
override def onCallFailure(elapsedNanos: Long): Unit = telemetries.foreach(_.onCallFailure(elapsedNanos))
override def onCallTimeoutFailure(elapsedNanos: Long): Unit =
telemetries.foreach(_.onCallTimeoutFailure(elapsedNanos))
override def onCallBreakerOpenFailure(): Unit = telemetries.foreach(_.onCallBreakerOpenFailure())
override def stopped(): Unit = telemetries.foreach(_.stopped())
}