From 13892f7d01e2b0b0a4fbb7e20b3bed3bafee3dde Mon Sep 17 00:00:00 2001 From: Sergey Morgunov Date: Tue, 23 Aug 2022 14:41:09 +0300 Subject: [PATCH] Add Circuit Breakers panel concept (like in Lagom) (#30241) Extension for looking up circuit breakers by id, telemetry trait for collecting circuit breaker metrics. --- .../akka/pattern/CircuitBreakerSpec.scala | 76 ++++++++- akka-actor/src/main/resources/reference.conf | 41 +++++ .../scala/akka/pattern/CircuitBreaker.scala | 119 +++++++++---- .../pattern/CircuitBreakersRegistry.scala | 89 ++++++++++ .../internal/CircuitBreakerTelemetry.scala | 157 ++++++++++++++++++ 5 files changed, 449 insertions(+), 33 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala create mode 100644 akka-actor/src/main/scala/akka/pattern/internal/CircuitBreakerTelemetry.scala diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 156112fee9..837a96988d 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -14,13 +14,13 @@ import scala.language.postfixOps import scala.util.Failure import scala.util.Success import scala.util.Try - -import akka.actor.ActorSystem +import akka.actor.{ ActorSystem, ExtendedActorSystem } import akka.testkit._ object CircuitBreakerSpec { class TestException extends RuntimeException + class AllowException extends RuntimeException case class CBSuccess(value: FiniteDuration) case class CBFailure(value: FiniteDuration) case class CBTimeout(value: FiniteDuration) @@ -35,6 +35,16 @@ object CircuitBreakerSpec { val callTimeoutLatch = new TestLatch(1) val callBreakerOpenLatch = new TestLatch(1) + def resetAll() = { + halfOpenLatch.reset() + openLatch.reset() + closedLatch.reset() + callSuccessLatch.reset() + callFailureLatch.reset() + callTimeoutLatch.reset() + callBreakerOpenLatch.reset() + } + def apply(): CircuitBreaker = instance instance .onClose(closedLatch.countDown()) @@ -65,9 +75,26 @@ object CircuitBreakerSpec { case Success(i) => i % 2 == 0 case _ => true } + + val anyExceptionIsFailure: Try[Int] => Boolean = { + case Success(_) => false + case _ => true + } } -class CircuitBreakerSpec extends AkkaSpec { +class CircuitBreakerSpec extends AkkaSpec(""" + akka.circuit-breaker { + identified { + max-failures = 1 + call-timeout = 100 ms + reset-timeout = 200 ms + exception-allowlist = [ + "akka.pattern.CircuitBreakerSpec$AllowException" + ] + } + } + """) { + import CircuitBreakerSpec._ implicit def ec: ExecutionContextExecutor = system.dispatcher @@ -92,6 +119,8 @@ class CircuitBreakerSpec extends AkkaSpec { def throwException = throw new TestException + def throwAllowException = throw new AllowException + def sayHi = "hi" "A synchronous circuit breaker that is open" must { @@ -719,4 +748,45 @@ class CircuitBreakerSpec extends AkkaSpec { checkLatch(breaker.openLatch) } } + + "An identified asynchronous circuit breaker" must { + + val breaker = new Breaker(CircuitBreaker("identified")(system.asInstanceOf[ExtendedActorSystem])) + val cb = breaker() + + "be closed after success result" taggedAs TimingTest in { + breaker.resetAll() + Await.result(cb.withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi") + checkLatch(breaker.callSuccessLatch) + } + + "be closed after throw allowable exception" taggedAs TimingTest in { + breaker.resetAll() + intercept[AllowException] { Await.result(cb.withCircuitBreaker(Future(throwAllowException)), awaitTimeout) } + checkLatch(breaker.callSuccessLatch) + } + + "be open after throw exception and half-open after reset timeout" taggedAs TimingTest in { + breaker.resetAll() + intercept[TestException] { Await.result(cb.withCircuitBreaker(Future(throwException)), awaitTimeout) } + checkLatch(breaker.openLatch) + Thread.sleep(250.millis.dilated(system).toMillis) + checkLatch(breaker.halfOpenLatch) + } + + "be closed again after success result" taggedAs TimingTest in { + breaker.resetAll() + Await.result(cb.withCircuitBreaker(Future(sayHi)), awaitTimeout) should ===("hi") + checkLatch(breaker.callSuccessLatch) + checkLatch(breaker.closedLatch) + } + + "be open after pass custom failure function and throw allowable exception" taggedAs TimingTest in { + breaker.resetAll() + intercept[AllowException] { + Await.result(cb.withCircuitBreaker(Future(throwAllowException), anyExceptionIsFailure), awaitTimeout) + } + checkLatch(breaker.openLatch) + } + } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 2a09484dab..906d064a83 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -1307,4 +1307,45 @@ akka { #//#coordinated-shutdown-phases } + #//#circuit-breaker-default + # Configuration for circuit breakers created with the APIs accepting an id to + # identify or look up the circuit breaker. + # Note: Circuit breakers created without ids are not affected by this configuration. + # A child configuration section with the same name as the circuit breaker identifier + # will be used, with fallback to the `akka.circuit-breaker.default` section. + circuit-breaker { + + # Default configuration that is used if a configuration section + # with the circuit breaker identifier is not defined. + default { + # Number of failures before opening the circuit. + max-failures = 10 + + # Duration of time after which to consider a call a failure. + call-timeout = 10s + + # Duration of time in open state after which to attempt to close + # the circuit, by first entering the half-open state. + reset-timeout = 15s + + # The upper bound of reset-timeout + max-reset-timeout = 36500d + + # Exponential backoff + # For details see https://en.wikipedia.org/wiki/Exponential_backoff + exponential-backoff = 1.0 + + # Additional random delay based on this factor is added to backoff + # For example 0.2 adds up to 20% delay + # In order to skip this additional delay set as 0 + random-factor = 0.0 + + # A allowlist of fqcn of Exceptions that the CircuitBreaker + # should not consider failures. By default all exceptions are + # considered failures. + exception-allowlist = [] + } + } + #//#circuit-breaker-default + } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 944f14cb3b..95e4bd11d7 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -5,11 +5,11 @@ package akka.pattern import java.util.Optional -import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom } +import java.util.concurrent.{ Callable, CompletionException, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.BiFunction import java.util.function.Consumer - +import scala.annotation.nowarn import scala.compat.java8.FutureConverters import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.TimeoutException @@ -17,11 +17,10 @@ import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } import scala.util.control.NoStackTrace import scala.util.control.NonFatal - -import scala.annotation.nowarn import akka.AkkaException -import akka.actor.Scheduler +import akka.actor.{ ExtendedActorSystem, Scheduler } import akka.dispatch.ExecutionContexts.parasitic +import akka.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelemetry } import akka.util.JavaDurationConverters._ import akka.util.Unsafe @@ -49,6 +48,15 @@ object CircuitBreaker { resetTimeout: FiniteDuration): CircuitBreaker = new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(parasitic) + /** + * Create or find a CircuitBreaker in registry. + * + * @param id Circuit Breaker identifier + * @param system [[ExtendedActorSystem]] that is storing this [[CircuitBreaker]] + */ + def apply(id: String)(implicit system: ExtendedActorSystem): CircuitBreaker = + CircuitBreakersRegistry(system).get(id) + /** * Java API: Create a new CircuitBreaker. * @@ -88,20 +96,14 @@ object CircuitBreaker { resetTimeout: java.time.Duration): CircuitBreaker = apply(scheduler, maxFailures, callTimeout.asScala, resetTimeout.asScala) - private val exceptionAsFailure: Try[_] => Boolean = { - case _: Success[_] => false - case _ => true - } - - private def exceptionAsFailureJava[T]: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] = - new BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean] { - override def apply(t: Optional[T], err: Optional[Throwable]) = { - if (err.isPresent) - true - else - false - } - } + /** + * Java API: Lookup a CircuitBreaker in registry. + * + * @param id Circuit Breaker identifier + * @param system [[ExtendedActorSystem]] that is storing this [[CircuitBreaker]] + */ + def lookup(id: String, system: ExtendedActorSystem): CircuitBreaker = + apply(id)(system) protected def convertJavaFailureFnToScala[T]( javaFn: BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]): Try[T] => Boolean = { @@ -143,12 +145,34 @@ class CircuitBreaker( val resetTimeout: FiniteDuration, maxResetTimeout: FiniteDuration, exponentialBackoffFactor: Double, - randomFactor: Double)(implicit executor: ExecutionContext) + randomFactor: Double, + val allowExceptions: Set[String], + val telemetry: CircuitBreakerTelemetry)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { require(exponentialBackoffFactor >= 1.0, "exponentialBackoffFactor must be >= 1.0") require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") + def this( + scheduler: Scheduler, + maxFailures: Int, + callTimeout: FiniteDuration, + resetTimeout: FiniteDuration, + maxResetTimeout: FiniteDuration, + exponentialBackoffFactor: Double, + randomFactor: Double)(implicit executor: ExecutionContext) = { + this( + scheduler, + maxFailures, + callTimeout, + resetTimeout, + maxResetTimeout, + exponentialBackoffFactor, + randomFactor, + Set.empty, + CircuitBreakerNoopTelemetry)(executor) + } + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def this( executor: ExecutionContext, @@ -216,7 +240,16 @@ class CircuitBreaker( * @param maxResetTimeout the upper bound of resetTimeout */ def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = { - new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0, randomFactor)(executor) + new CircuitBreaker( + scheduler, + maxFailures, + callTimeout, + resetTimeout, + maxResetTimeout, + 2.0, + randomFactor, + allowExceptions, + telemetry)(executor) } /** @@ -243,7 +276,9 @@ class CircuitBreaker( resetTimeout, maxResetTimeout, exponentialBackoffFactor, - randomFactor)(executor) + randomFactor, + allowExceptions, + telemetry)(executor) } /** @@ -321,7 +356,7 @@ class CircuitBreaker( * */ def withCircuitBreaker[T](body: => Future[T]): Future[T] = - currentState.invoke(body, CircuitBreaker.exceptionAsFailure) + currentState.invoke(body, failureFn) /** * Java API for [[#withCircuitBreaker]]. @@ -331,7 +366,7 @@ class CircuitBreaker( * `scala.concurrent.TimeoutException` if the call timed out */ def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = - callWithCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) + withCircuitBreaker(body.call) /** * Java API for [[#withCircuitBreaker]]. @@ -357,7 +392,9 @@ class CircuitBreaker( * `scala.concurrent.TimeoutException` if the call timed out */ def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] = - callWithCircuitBreakerCS(body, CircuitBreaker.exceptionAsFailureJava) + FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] { + override def call(): Future[T] = FutureConverters.toScala(body.call()) + })) /** * Java API (8) for [[#withCircuitBreaker]]. @@ -387,7 +424,7 @@ class CircuitBreaker( * @return The result of the call */ def withSyncCircuitBreaker[T](body: => T): T = - withSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailure) + withSyncCircuitBreaker(body, this.failureFn) /** * Wraps invocations of synchronous calls that need to be protected. @@ -417,7 +454,7 @@ class CircuitBreaker( * @return The result of the call */ def callWithSyncCircuitBreaker[T](body: Callable[T]): T = - callWithSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) + withSyncCircuitBreaker(body.call) /** * Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. @@ -742,6 +779,28 @@ class CircuitBreaker( private val successListeners = new CopyOnWriteArrayList[Consumer[Long]] + if (telemetry != CircuitBreakerNoopTelemetry) { + onOpen(telemetry.onOpen()) + onHalfOpen(telemetry.onHalfOpen()) + onClose(telemetry.onClose()) + onCallBreakerOpen(telemetry.onCallBreakerOpenFailure()) + onCallTimeout(telemetry.onCallTimeoutFailure) + onCallFailure(telemetry.onCallFailure) + onCallSuccess(telemetry.onCallSuccess) + } + + private def isIgnoredException(ex: Any): Boolean = + allowExceptions.nonEmpty && (ex match { + case ce: CompletionException => ce.getCause != null && allowExceptions.contains(ce.getCause.getClass.getName) + case _ => allowExceptions.contains(ex.getClass.getName) + }) + + private val failureFn: Try[_] => Boolean = { + case _: Success[_] => false + case Failure(t) if isIgnoredException(t) => false + case _ => true + } + /** * Internal state abstraction */ @@ -830,7 +889,7 @@ class CircuitBreaker( timeout.cancel() case Failure(ex) => if (p.tryFailure(ex)) { - notifyCallFailureListeners(start) + if (!isIgnoredException(ex)) notifyCallFailureListeners(start) } timeout.cancel() }(parasitic) @@ -845,7 +904,7 @@ class CircuitBreaker( * @param body Implementation of the call * @return Future containing the result of the call */ - def callThrough[T](body: => Future[T]): Future[T] = callThrough(body, CircuitBreaker.exceptionAsFailure) + def callThrough[T](body: => Future[T]): Future[T] = callThrough(body, failureFn) /** * Abstract entry point for all states @@ -862,7 +921,7 @@ class CircuitBreaker( * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ - def invoke[T](body: => Future[T]): Future[T] = invoke(body, CircuitBreaker.exceptionAsFailure) + def invoke[T](body: => Future[T]): Future[T] = invoke(body, failureFn) /** * Invoked when call succeeds diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala new file mode 100644 index 0000000000..3765140341 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package akka.pattern + +import java.util.concurrent.ConcurrentHashMap +import scala.concurrent.duration.{ DurationLong, MILLISECONDS } +import akka.actor.{ + ActorSystem, + ClassicActorSystemProvider, + ExtendedActorSystem, + Extension, + ExtensionId, + ExtensionIdProvider +} +import akka.pattern.internal.CircuitBreakerTelemetryProvider +import akka.util.ccompat.JavaConverters._ + +/** + * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread + */ +object CircuitBreakersRegistry extends ExtensionId[CircuitBreakersRegistry] with ExtensionIdProvider { + + /** + * Is used by Akka to instantiate the Extension identified by this ExtensionId, + * internal use only. + */ + override def createExtension(system: ExtendedActorSystem): CircuitBreakersRegistry = + new CircuitBreakersRegistry(system) + + /** + * Returns the canonical ExtensionId for this Extension + */ + override def lookup: ExtensionId[_ <: Extension] = CircuitBreakersRegistry + + /** + * Returns an instance of the extension identified by this ExtensionId instance. + * Java API + */ + override def get(system: ActorSystem): CircuitBreakersRegistry = super.get(system) + + /** + * Returns an instance of the extension identified by this ExtensionId instance. + * Java API + */ + override def get(system: ClassicActorSystemProvider): CircuitBreakersRegistry = super.get(system) +} + +/** + * A CircuitBreakersPanel is a central point collecting all circuit breakers in Akka. + */ +final class CircuitBreakersRegistry(system: ExtendedActorSystem) extends Extension { + + private val breakers = new ConcurrentHashMap[String, CircuitBreaker] + + private val config = system.settings.config.getConfig("akka.circuit-breaker") + private val defaultBreakerConfig = config.getConfig("default") + + private def createCircuitBreaker(id: String): CircuitBreaker = { + val breakerConfig = + if (config.hasPath(id)) config.getConfig(id).withFallback(defaultBreakerConfig) + else defaultBreakerConfig + + val maxFailures = breakerConfig.getInt("max-failures") + val callTimeout = breakerConfig.getDuration("call-timeout", MILLISECONDS).millis + val resetTimeout = breakerConfig.getDuration("reset-timeout", MILLISECONDS).millis + val maxResetTimeout = breakerConfig.getDuration("max-reset-timeout", MILLISECONDS).millis + val exponentialBackoffFactor = breakerConfig.getDouble("exponential-backoff") + val randomFactor = breakerConfig.getDouble("random-factor") + + val allowExceptions: Set[String] = breakerConfig.getStringList("exception-allowlist").asScala.toSet + + val telemetry = CircuitBreakerTelemetryProvider.start(id, system) + new CircuitBreaker( + system.scheduler, + maxFailures, + callTimeout, + resetTimeout, + maxResetTimeout, + exponentialBackoffFactor, + randomFactor, + allowExceptions, + telemetry)(system.dispatcher) + } + + private[akka] def get(id: String): CircuitBreaker = + breakers.computeIfAbsent(id, createCircuitBreaker) +} diff --git a/akka-actor/src/main/scala/akka/pattern/internal/CircuitBreakerTelemetry.scala b/akka-actor/src/main/scala/akka/pattern/internal/CircuitBreakerTelemetry.scala new file mode 100644 index 0000000000..61b051c3fe --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/internal/CircuitBreakerTelemetry.scala @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package akka.pattern.internal + +import java.util.{ List => JList } + +import akka.actor.ExtendedActorSystem +import akka.annotation.{ InternalApi, InternalStableApi } +import akka.util.ccompat.JavaConverters._ + +/** + * Service Provider Interface (SPI) for collecting metrics from Circuit Breaker. + * + * Implementations must include a single constructor with two arguments: Circuit Breaker id + * and [[ExtendedActorSystem]]. To setup your implementation, add a setting in your `application.conf`: + * + * {{{ + * akka.circuit-breaker.telemetry.implementations += com.example.MyMetrics + * }}} + */ +@InternalStableApi +trait CircuitBreakerTelemetry { + + /** + * Invoked when the circuit breaker transitions to the open state. + */ + def onOpen(): Unit + + /** + * Invoked when the circuit breaker transitions to the close state. + */ + def onClose(): Unit + + /** + * Invoked when the circuit breaker transitions to the half-open state after reset timeout. + */ + def onHalfOpen(): Unit + + /** + * Invoked for each successful call. + * + * @param elapsedNanos the elapsed duration of the call in nanoseconds + */ + def onCallSuccess(elapsedNanos: Long): Unit + + /** + * Invoked for each call when the future is completed with exception, except for + * [[scala.concurrent.TimeoutException]] and [[akka.pattern.CircuitBreakerOpenException]] + * that are handled by separate methods. + * + * @param elapsedNanos the elapsed duration of the call in nanoseconds + */ + def onCallFailure(elapsedNanos: Long): Unit + + /** + * Invoked for each call when the future is completed with `java.util.concurrent.TimeoutException` + * + * @param elapsedNanos the elapsed duration of the call in nanoseconds + */ + def onCallTimeoutFailure(elapsedNanos: Long): Unit + + /** + * Invoked for each call when the future is completed with + * `akka.pattern.CircuitBreakerOpenException` + */ + def onCallBreakerOpenFailure(): Unit + + /** + * Called when the circuit breaker is removed, e.g. expired due to inactivity. It is also called + * if the circuit breaker is re-configured, before calling [[CircuitBreakerTelemetryProvider#start]]. + */ + def stopped(): Unit +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object CircuitBreakerTelemetryProvider { + def start(breakerId: String, system: ExtendedActorSystem): CircuitBreakerTelemetry = { + val configPath = "akka.circuit-breaker.telemetry.implementations" + if (!system.settings.config.hasPath(configPath)) { + CircuitBreakerNoopTelemetry + } else { + val telemetryFqcns: JList[String] = system.settings.config.getStringList(configPath) + + telemetryFqcns.size() match { + case 0 => + CircuitBreakerNoopTelemetry + case 1 => + val fqcn = telemetryFqcns.get(0) + create(breakerId, system, fqcn) + case _ => + new CircuitBreakerEnsembleTelemetry(telemetryFqcns.asScala.toSeq, breakerId, system) + } + } + } + + def create(breakerId: String, system: ExtendedActorSystem, fqcn: String): CircuitBreakerTelemetry = { + system.dynamicAccess + .createInstanceFor[CircuitBreakerTelemetry]( + fqcn, + List(classOf[String] -> breakerId, classOf[ExtendedActorSystem] -> system)) + .get + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object CircuitBreakerNoopTelemetry extends CircuitBreakerTelemetry { + override def onOpen(): Unit = () + + override def onClose(): Unit = () + + override def onHalfOpen(): Unit = () + + override def onCallSuccess(elapsedNanos: Long): Unit = () + + override def onCallFailure(elapsedNanos: Long): Unit = () + + override def onCallTimeoutFailure(elapsedNanos: Long): Unit = () + + override def onCallBreakerOpenFailure(): Unit = () + + override def stopped(): Unit = () +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class CircuitBreakerEnsembleTelemetry( + telemetryFqcns: Seq[String], + breakerId: String, + system: ExtendedActorSystem) + extends CircuitBreakerTelemetry { + + private val telemetries = telemetryFqcns.map(fqcn => CircuitBreakerTelemetryProvider.create(breakerId, system, fqcn)) + + override def onOpen(): Unit = telemetries.foreach(_.onOpen()) + + override def onClose(): Unit = telemetries.foreach(_.onClose()) + + override def onHalfOpen(): Unit = telemetries.foreach(_.onHalfOpen()) + + override def onCallSuccess(elapsedNanos: Long): Unit = telemetries.foreach(_.onCallSuccess(elapsedNanos)) + + override def onCallFailure(elapsedNanos: Long): Unit = telemetries.foreach(_.onCallFailure(elapsedNanos)) + + override def onCallTimeoutFailure(elapsedNanos: Long): Unit = + telemetries.foreach(_.onCallTimeoutFailure(elapsedNanos)) + + override def onCallBreakerOpenFailure(): Unit = telemetries.foreach(_.onCallBreakerOpenFailure()) + + override def stopped(): Unit = telemetries.foreach(_.stopped()) +}