!act #16125 Timeout Calls through CircuitBreaker.

CircuitBreaker used to wait idenfinitly for a call and only compare
duration after the facts.
It will now increment its failure count early and throw a TimeoutException or return a Failure[TimeoutException].
This commit is contained in:
Martin Eigenbrodt 2014-10-28 18:10:32 +01:00
parent c0235ceee6
commit 69387bbc45
3 changed files with 75 additions and 16 deletions

View file

@ -7,6 +7,7 @@ package akka.pattern
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.TimeoutException
import akka.testkit._ import akka.testkit._
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import akka.actor.{ ActorSystem, Scheduler } import akka.actor.{ ActorSystem, Scheduler }
@ -119,10 +120,26 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
breaker().currentFailureCount should be(0) breaker().currentFailureCount should be(0)
} }
"increment failure count on callTimeout" in { "throw TimeoutException on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = CircuitBreakerSpec.shortCallTimeoutCb()
breaker().withSyncCircuitBreaker(Thread.sleep(100.millis.dilated.toMillis)) intercept[TimeoutException] {
awaitCond(breaker().currentFailureCount == 1) breaker().withSyncCircuitBreaker {
Thread.sleep(200.millis.dilated.toMillis)
}
}
breaker().currentFailureCount should be(1)
}
"increment failure count on callTimeout before call finishes" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb()
Future {
breaker().withSyncCircuitBreaker {
Thread.sleep(500.millis.dilated.toMillis)
}
}
within(300.millis) {
awaitCond(breaker().currentFailureCount == 1, 100.millis.dilated)
}
} }
} }
@ -201,9 +218,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
"increment failure count on callTimeout" in { "increment failure count on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val breaker = CircuitBreakerSpec.shortCallTimeoutCb()
breaker().withCircuitBreaker(Future { Thread.sleep(100.millis.dilated.toMillis); sayHi })
val fut = breaker().withCircuitBreaker(Future {
Thread.sleep(150.millis.dilated.toMillis);
throwException
})
checkLatch(breaker.openLatch) checkLatch(breaker.openLatch)
breaker().currentFailureCount should be(1) breaker().currentFailureCount should be(1)
// Since the timeout should have happend before the inner code finishes
// we expect a timeout, not TestException
intercept[TimeoutException] {
Await.result(fut, awaitTimeout)
}
} }
} }

View file

@ -11,6 +11,7 @@ import scala.util.control.NoStackTrace
import java.util.concurrent.{ Callable, CopyOnWriteArrayList } import java.util.concurrent.{ Callable, CopyOnWriteArrayList }
import scala.concurrent.{ ExecutionContext, Future, Promise, Await } import scala.concurrent.{ ExecutionContext, Future, Promise, Await }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.TimeoutException
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.Success import scala.util.Success
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
@ -108,7 +109,9 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* *
* @param body Call needing protected * @param body Call needing protected
* @tparam T return type from call * @tparam T return type from call
* @return [[scala.concurrent.Future]] containing the call result * @return [[scala.concurrent.Future]] containing the call result or a
* [[scala.concurrent.TimeoutException]] if the call timed out
*
*/ */
def withCircuitBreaker[T](body: Future[T]): Future[T] = currentState.invoke(body) def withCircuitBreaker[T](body: Future[T]): Future[T] = currentState.invoke(body)
@ -117,17 +120,21 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* *
* @param body Call needing protected * @param body Call needing protected
* @tparam T return type from call * @tparam T return type from call
* @return [[scala.concurrent.Future]] containing the call result * @return [[scala.concurrent.Future]] containing the call result or a
* [[scala.concurrent.TimeoutException]] if the call timed out
*/ */
def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call)
/** /**
* Wraps invocations of synchronous calls that need to be protected * Wraps invocations of synchronous calls that need to be protected
* *
* Calls are run in caller's thread * Calls are run in caller's thread. Because of the synchronous nature of
* this call the [[scala.concurrent.TimeoutException]] will only be thrown
* after the body has completed.
* *
* @param body Call needing protected * @param body Call needing protected
* @tparam T return type from call * @tparam T return type from call
* @throws scala.concurrent.TimeoutException if the call timed out
* @return The result of the call * @return The result of the call
*/ */
def withSyncCircuitBreaker[T](body: T): T = def withSyncCircuitBreaker[T](body: T): T =
@ -140,9 +147,9 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* *
* @param body Call needing protected * @param body Call needing protected
* @tparam T return type from call * @tparam T return type from call
* @throws scala.concurrent.TimeoutException if the call timed out
* @return The result of the call * @return The result of the call
*/ */
def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call)
/** /**
@ -292,13 +299,31 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* @return Future containing the result of the call * @return Future containing the result of the call
*/ */
def callThrough[T](body: Future[T]): Future[T] = { def callThrough[T](body: Future[T]): Future[T] = {
val deadline = callTimeout.fromNow
val bodyFuture = try body catch { case NonFatal(t) Future.failed(t) } def materialize[T](value: Future[T]): Future[T] = try value catch { case NonFatal(t) Future.failed(t) }
bodyFuture.onComplete({
case s: Success[_] if !deadline.isOverdue() callSucceeds() if (callTimeout == Duration.Zero) {
materialize(body)
} else {
val p = Promise[T]()
implicit val ec = sameThreadExecutionContext
p.future.onComplete({
case s: Success[_] callSucceeds()
case _ callFails() case _ callFails()
})(sameThreadExecutionContext) })
bodyFuture
val timeout = scheduler.scheduleOnce(callTimeout) {
p.tryCompleteWith(
Future.failed(new TimeoutException("Circuit Breaker Timed out.")))
}
materialize(body).onComplete { result
p tryComplete result
timeout.cancel
}
p.future
}
} }
/** /**

View file

@ -78,6 +78,14 @@ included ``self.path.parent.name`` to include the cluster type name.
In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard
type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry. type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry.
Circuit Breaker Timeout Change
==============================
In ``2.3.x`` calls protected by the ``CircuitBreaker`` were allowed to run indefinitely and the check to see if the timeout had been exceeded was done after the call had returned.
In ``2.4.x`` the failureCount of the Breaker will be increased as soon as the timeout is reached and a ``Failure[TimeoutException]`` will be returned immediately for asynchronous calls. Synchronous calls will now throw a ``TimeoutException`` after the call is finished.
Removed Deprecated Features Removed Deprecated Features
=========================== ===========================
@ -123,4 +131,3 @@ In order to make cluster routers smarter about when they can start local routees
``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument. ``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument.
In case you have implemented a custom Pool you will have to update the method's signature, In case you have implemented a custom Pool you will have to update the method's signature,
however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic.