From 69387bbc456caca4693b78f73e056084a26fe09f Mon Sep 17 00:00:00 2001 From: Martin Eigenbrodt Date: Tue, 28 Oct 2014 18:10:32 +0100 Subject: [PATCH] !act #16125 Timeout Calls through CircuitBreaker. CircuitBreaker used to wait idenfinitly for a call and only compare duration after the facts. It will now increment its failure count early and throw a TimeoutException or return a Failure[TimeoutException]. --- .../akka/pattern/CircuitBreakerSpec.scala | 35 ++++++++++++-- .../scala/akka/pattern/CircuitBreaker.scala | 47 ++++++++++++++----- .../project/migration-guide-2.3.x-2.4.x.rst | 9 +++- 3 files changed, 75 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 59bc788be3..9c22391392 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -7,6 +7,7 @@ package akka.pattern import language.postfixOps import scala.concurrent.duration._ +import scala.concurrent.TimeoutException import akka.testkit._ import org.scalatest.BeforeAndAfter import akka.actor.{ ActorSystem, Scheduler } @@ -119,10 +120,26 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { breaker().currentFailureCount should be(0) } - "increment failure count on callTimeout" in { + "throw TimeoutException on callTimeout" in { val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - breaker().withSyncCircuitBreaker(Thread.sleep(100.millis.dilated.toMillis)) - awaitCond(breaker().currentFailureCount == 1) + intercept[TimeoutException] { + breaker().withSyncCircuitBreaker { + Thread.sleep(200.millis.dilated.toMillis) + } + } + breaker().currentFailureCount should be(1) + } + + "increment failure count on callTimeout before call finishes" in { + val breaker = CircuitBreakerSpec.shortCallTimeoutCb() + Future { + breaker().withSyncCircuitBreaker { + Thread.sleep(500.millis.dilated.toMillis) + } + } + within(300.millis) { + awaitCond(breaker().currentFailureCount == 1, 100.millis.dilated) + } } } @@ -201,9 +218,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { "increment failure count on callTimeout" in { val breaker = CircuitBreakerSpec.shortCallTimeoutCb() - breaker().withCircuitBreaker(Future { Thread.sleep(100.millis.dilated.toMillis); sayHi }) + + val fut = breaker().withCircuitBreaker(Future { + Thread.sleep(150.millis.dilated.toMillis); + throwException + }) checkLatch(breaker.openLatch) breaker().currentFailureCount should be(1) + // Since the timeout should have happend before the inner code finishes + // we expect a timeout, not TestException + intercept[TimeoutException] { + Await.result(fut, awaitTimeout) + } + } } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index a12545e430..b7c2022844 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -11,6 +11,7 @@ import scala.util.control.NoStackTrace import java.util.concurrent.{ Callable, CopyOnWriteArrayList } import scala.concurrent.{ ExecutionContext, Future, Promise, Await } import scala.concurrent.duration._ +import scala.concurrent.TimeoutException import scala.util.control.NonFatal import scala.util.Success import akka.dispatch.ExecutionContexts.sameThreadExecutionContext @@ -108,7 +109,9 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * * @param body Call needing protected * @tparam T return type from call - * @return [[scala.concurrent.Future]] containing the call result + * @return [[scala.concurrent.Future]] containing the call result or a + * [[scala.concurrent.TimeoutException]] if the call timed out + * */ def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = currentState.invoke(body) @@ -117,17 +120,21 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * * @param body Call needing protected * @tparam T return type from call - * @return [[scala.concurrent.Future]] containing the call result + * @return [[scala.concurrent.Future]] containing the call result or a + * [[scala.concurrent.TimeoutException]] if the call timed out */ def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) /** * Wraps invocations of synchronous calls that need to be protected * - * Calls are run in caller's thread + * Calls are run in caller's thread. Because of the synchronous nature of + * this call the [[scala.concurrent.TimeoutException]] will only be thrown + * after the body has completed. * * @param body Call needing protected * @tparam T return type from call + * @throws scala.concurrent.TimeoutException if the call timed out * @return The result of the call */ def withSyncCircuitBreaker[T](body: ⇒ T): T = @@ -140,9 +147,9 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * * @param body Call needing protected * @tparam T return type from call + * @throws scala.concurrent.TimeoutException if the call timed out * @return The result of the call */ - def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) /** @@ -292,13 +299,31 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * @return Future containing the result of the call */ def callThrough[T](body: ⇒ Future[T]): Future[T] = { - val deadline = callTimeout.fromNow - val bodyFuture = try body catch { case NonFatal(t) ⇒ Future.failed(t) } - bodyFuture.onComplete({ - case s: Success[_] if !deadline.isOverdue() ⇒ callSucceeds() - case _ ⇒ callFails() - })(sameThreadExecutionContext) - bodyFuture + + def materialize[T](value: ⇒ Future[T]): Future[T] = try value catch { case NonFatal(t) ⇒ Future.failed(t) } + + if (callTimeout == Duration.Zero) { + materialize(body) + } else { + val p = Promise[T]() + + implicit val ec = sameThreadExecutionContext + p.future.onComplete({ + case s: Success[_] ⇒ callSucceeds() + case _ ⇒ callFails() + }) + + val timeout = scheduler.scheduleOnce(callTimeout) { + p.tryCompleteWith( + Future.failed(new TimeoutException("Circuit Breaker Timed out."))) + } + + materialize(body).onComplete { result ⇒ + p tryComplete result + timeout.cancel + } + p.future + } } /** diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 2f200a3143..6b7068e38d 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -78,6 +78,14 @@ included ``self.path.parent.name`` to include the cluster type name. In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry. + +Circuit Breaker Timeout Change +============================== +In ``2.3.x`` calls protected by the ``CircuitBreaker`` were allowed to run indefinitely and the check to see if the timeout had been exceeded was done after the call had returned. + +In ``2.4.x`` the failureCount of the Breaker will be increased as soon as the timeout is reached and a ``Failure[TimeoutException]`` will be returned immediately for asynchronous calls. Synchronous calls will now throw a ``TimeoutException`` after the call is finished. + + Removed Deprecated Features =========================== @@ -123,4 +131,3 @@ In order to make cluster routers smarter about when they can start local routees ``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument. In case you have implemented a custom Pool you will have to update the method's signature, however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. -