+htp #20198 onCompleteWithBreaker directive (#20402)

This commit is contained in:
Stefano Bonetti 2016-05-31 20:48:19 +01:00 committed by Konrad Malawski
parent f07041091f
commit 3f8dacfd44
14 changed files with 244 additions and 9 deletions

View file

@ -89,6 +89,7 @@ Directive Description
:ref:`-mapUnmatchedPath-java-` Transforms the ``unmatchedPath`` of the ``RequestContext`` using a ``Uri.Path ⇒ Uri.Path`` function
:ref:`-method-java-` Rejects all requests whose HTTP method does not match the given one
:ref:`-onComplete-java-` "Unwraps" a ``CompletionStage<T>`` and runs the inner route after future completion with the future's value as an extraction of type ``Try<T>``
:ref:`-onCompleteWithBreaker-java-` "Unwraps" a ``CompletionStage<T>`` inside a ``CircuitBreaker`` and runs the inner route after future completion with the future's value as an extraction of type ``Try<T>``
:ref:`-onSuccess-java-` "Unwraps" a ``CompletionStage<T>`` and runs the inner route after future completion with the future's value as an extraction of type ``T``
:ref:`-optionalCookie-java-` Extracts the ``HttpCookiePair`` with the given name as an ``Option<HttpCookiePair>``
:ref:`-optionalHeaderValue-java-` Extracts an optional HTTP header value using a given ``HttpHeader ⇒ Option<T>`` function

View file

@ -9,6 +9,7 @@ Future directives can be used to run inner routes once the provided ``Future[T]`
:maxdepth: 1
onComplete
onCompleteWithBreaker
onSuccess
completeOrRecoverWith

View file

@ -0,0 +1,19 @@
.. _-onCompleteWithBreaker-java-:
onCompleteWithBreaker
=====================
Description
-----------
Evaluates its parameter of type ``CompletionStage<T>`` protecting it with the specified ``CircuitBreaker``.
Refer to :ref:`Akka Circuit Breaker<circuit-breaker>` for a detailed description of this pattern.
If the ``CircuitBreaker`` is open, the request is rejected with a ``CircuitBreakerOpenRejection``.
Note that in this case the request's entity databytes stream is cancelled, and the connection is closed
as a consequence.
Otherwise, the same behaviour provided by :ref:`-onComplete-java-` is to be expected.
Example
-------
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.

View file

@ -5,17 +5,17 @@
package docs.http.scaladsl.server.directives
import java.util.concurrent.TimeUnit
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import docs.http.scaladsl.server.RoutingSpec
import scala.concurrent.Future
import scala.util.{ Success, Failure }
import akka.http.scaladsl.server.ExceptionHandler
import akka.actor.{ Actor, Props }
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.http.scaladsl.server.{CircuitBreakerOpenRejection, ExceptionHandler, Route}
import akka.util.Timeout
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import StatusCodes._
import akka.pattern.CircuitBreaker
// format: OFF
@ -54,6 +54,47 @@ class FutureDirectivesExamplesSpec extends RoutingSpec {
}
}
"onCompleteWithBreaker" in {
def divide(a: Int, b: Int): Future[Int] = Future {
a / b
}
val resetTimeout = 1.second
val breaker = new CircuitBreaker(system.scheduler,
maxFailures = 1,
callTimeout = 5.seconds,
resetTimeout
)
val route =
path("divide" / IntNumber / IntNumber) { (a, b) =>
onCompleteWithBreaker(breaker)(divide(a, b)) {
case Success(value) => complete(s"The result was $value")
case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}
// tests:
Get("/divide/10/2") ~> route ~> check {
responseAs[String] shouldEqual "The result was 5"
}
Get("/divide/10/0") ~> Route.seal(route) ~> check {
status shouldEqual InternalServerError
responseAs[String] shouldEqual "An error occurred: / by zero"
} // opens the circuit breaker
Get("/divide/10/2") ~> route ~> check {
rejection shouldBe a[CircuitBreakerOpenRejection]
}
Thread.sleep(resetTimeout.toMillis + 200)
Get("/divide/10/2") ~> route ~> check {
responseAs[String] shouldEqual "The result was 5"
}
}
"onSuccess" in {
val route =
path("success") {

View file

@ -134,6 +134,9 @@ Directive Description
:ref:`-method-` Rejects all requests whose HTTP method does not match the given one
:ref:`-onComplete-` "Unwraps" a ``Future[T]`` and runs the inner route after future completion
with the future's value as an extraction of type ``Try[T]``
:ref:`-onCompleteWithBreaker-` "Unwraps" a ``Future[T]`` inside a ``CircuitBreaker`` and runs the inner
route after future completion with the future's value as an extraction of
type ``Try[T]``
:ref:`-onSuccess-` "Unwraps" a ``Future[T]`` and runs the inner route after future completion
with the future's value as an extraction of type ``T``
:ref:`-optionalCookie-` Extracts the ``HttpCookiePair`` with the given name as an

View file

@ -9,6 +9,7 @@ Future directives can be used to run inner routes once the provided ``Future[T]`
:maxdepth: 1
onComplete
onCompleteWithBreaker
onSuccess
completeOrRecoverWith

View file

@ -0,0 +1,27 @@
.. _-onCompleteWithBreaker-:
onCompleteWithBreaker
=====================
Signature
---------
.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/FutureDirectives.scala
:snippet: onCompleteWithBreaker
Description
-----------
Evaluates its parameter of type ``Future[T]`` protecting it with the specified ``CircuitBreaker``.
Refer to :ref:`Akka Circuit Breaker<circuit-breaker>` for a detailed description of this pattern.
If the ``CircuitBreaker`` is open, the request is rejected with a ``CircuitBreakerOpenRejection``.
Note that in this case the request's entity databytes stream is cancelled, and the connection is closed
as a consequence.
Otherwise, the same behaviour provided by :ref:`-onComplete-` is to be expected.
Example
-------
.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/FutureDirectivesExamplesSpec.scala
:snippet: onCompleteWithBreaker

View file

@ -6,10 +6,15 @@ package akka.http.scaladsl.server
package directives
import akka.http.scaladsl.model.StatusCodes
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import scala.concurrent.Future
import akka.testkit.EventFilter
import org.scalatest.Inside
class FutureDirectivesSpec extends RoutingSpec {
import scala.concurrent.duration._
class FutureDirectivesSpec extends RoutingSpec with Inside {
class TestException(msg: String) extends Exception(msg)
object TestException extends Exception("XXX")
@ -19,6 +24,12 @@ class FutureDirectivesSpec extends RoutingSpec {
case e: TestException complete((StatusCodes.InternalServerError, "Oops. " + e))
}
trait TestWithCircuitBreaker {
val breakerResetTimeout = 500.millis
val breaker = new CircuitBreaker(system.scheduler, maxFailures = 1, callTimeout = 10.seconds, breakerResetTimeout)
def openBreaker() = breaker.withCircuitBreaker(Future.failed(new Exception("boom")))
}
"The `onComplete` directive" should {
"unwrap a Future in the success case" in {
var i = 0
@ -50,6 +61,52 @@ class FutureDirectivesSpec extends RoutingSpec {
}
}
"The `onCompleteWithBreaker` directive" should {
"unwrap a Future in the success case" in new TestWithCircuitBreaker {
var i = 0
def nextNumber() = { i += 1; i }
val route = onCompleteWithBreaker(breaker)(Future.successful(nextNumber())) { echoComplete }
Get() ~> route ~> check {
responseAs[String] shouldEqual "Success(1)"
}
Get() ~> route ~> check {
responseAs[String] shouldEqual "Success(2)"
}
}
"unwrap a Future in the failure case" in new TestWithCircuitBreaker {
Get() ~> onCompleteWithBreaker(breaker)(Future.failed[String](new RuntimeException("no"))) { echoComplete } ~> check {
responseAs[String] shouldEqual "Failure(java.lang.RuntimeException: no)"
}
}
"fail fast if the circuit breaker is open" in new TestWithCircuitBreaker {
openBreaker()
Get() ~> onCompleteWithBreaker(breaker)(Future.successful(1)) { echoComplete } ~> check {
inside(rejection) {
case CircuitBreakerOpenRejection(_)
}
}
}
"stop failing fast when the circuit breaker closes" in new TestWithCircuitBreaker {
openBreaker()
Thread.sleep(breakerResetTimeout.toMillis + 200)
Get() ~> onCompleteWithBreaker(breaker)(Future.successful(1)) { echoComplete } ~> check {
responseAs[String] shouldEqual "Success(1)"
}
}
"catch an exception in the success case" in new TestWithCircuitBreaker {
Get() ~> onCompleteWithBreaker(breaker)(Future.successful("ok")) { throwTestException("EX when ") } ~> check {
status shouldEqual StatusCodes.InternalServerError
responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when Success(ok)"
}
}
"catch an exception in the failure case" in new TestWithCircuitBreaker {
Get() ~> onCompleteWithBreaker(breaker)(Future.failed[String](new RuntimeException("no"))) { throwTestException("EX when ") } ~> check {
status shouldEqual StatusCodes.InternalServerError
responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when Failure(java.lang.RuntimeException: no)"
}
}
}
"The `onSuccess` directive" should {
"unwrap a Future in the success case" in {
Get() ~> onSuccess(Future.successful("yes")) { echoComplete } ~> check {

View file

@ -56,4 +56,15 @@ class RejectionHandlerBuilder(asScala: server.RejectionHandler.Builder) {
asScala.handleNotFound(route.delegate)
this
}
/**
* Convenience method for handling rejections created by created by the onCompleteWithBreaker directive.
* Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast.
*
* Use to customise the error response being written instead of the default [[akka.http.javadsl.model.StatusCodes.SERVICE_UNAVAILABLE]] response.
*/
def handleCircuitBreakerOpenRejection(handler: function.Function[CircuitBreakerOpenRejection, Route]): RejectionHandlerBuilder = {
asScala.handleCircuitBreakerOpenRejection(t => handler.apply(t).delegate)
this
}
}

View file

@ -16,8 +16,10 @@ import akka.http.javadsl.model.headers.HttpChallenge
import java.util.Optional
import java.util.function.{ Function JFunction }
import java.lang.{ Iterable JIterable }
import akka.http.scaladsl
import akka.japi.Util
import akka.pattern.CircuitBreakerOpenException
import scala.compat.java8.OptionConverters._
import scala.collection.immutable
@ -250,6 +252,14 @@ trait ValidationRejection extends Rejection {
def getCause: Optional[Throwable]
}
/**
* Rejection created by the `onCompleteWithBreaker` directive.
* Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast.
*/
trait CircuitBreakerOpenRejection extends Rejection {
def cause: CircuitBreakerOpenException
}
/**
* A special Rejection that serves as a container for a transformation function on rejections.
* It is used by some directives to "cancel" rejections that are added by later directives of a similar type.

View file

@ -15,6 +15,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Try
import akka.http.javadsl.server.{ Marshaller, Route }
import akka.http.scaladsl.server.directives.{ CompleteOrRecoverWithMagnet, FutureDirectives D }
import akka.pattern.CircuitBreaker
abstract class FutureDirectives extends FormFieldDirectives {
@ -28,6 +29,22 @@ abstract class FutureDirectives extends FormFieldDirectives {
}
}
/**
* "Unwraps" a `CompletionStage[T]` and runs the inner route after future
* completion with the future's value as an extraction of type `T` if
* the supplied `CircuitBreaker` is closed.
*
* If the supplied [[CircuitBreaker]] is open the request is rejected
* with a [[akka.http.javadsl.server.CircuitBreakerOpenRejection]].
*
* @group future
*/
def onCompleteWithBreaker[T](breaker: CircuitBreaker, f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter {
D.onCompleteWithBreaker(breaker)(f.get.toScala.recover(unwrapCompletionException)) { value
inner(value).delegate
}
}
/**
* "Unwraps" a `CompletionStage<T>` and runs the inner route after stage
* completion with the stage's value as an extraction of type `T`.

View file

@ -19,6 +19,7 @@ import akka.http.impl.util.JavaMapping._
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.javadsl.server.RoutingJavaMapping
import RoutingJavaMapping._
import akka.pattern.CircuitBreakerOpenException
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters
@ -256,6 +257,13 @@ final case class TransformationRejection(transform: immutable.Seq[Rejection] ⇒
}
}
/**
* Rejection created by the `onCompleteWithBreaker` directive.
* Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast.
*/
final case class CircuitBreakerOpenRejection(cause: CircuitBreakerOpenException)
extends jserver.CircuitBreakerOpenRejection with Rejection
/**
* A Throwable wrapping a Rejection.
* Can be used for marshalling `Future[T]` or `Try[T]` instances, whose failure side is supposed to trigger a route

View file

@ -76,6 +76,15 @@ object RejectionHandler {
this
}
/**
* Convenience method for handling rejections created by created by the onCompleteWithBreaker directive.
* Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast.
*
* Use to customise the error response being written instead of the default [[ServiceUnavailable]] response.
*/
def handleCircuitBreakerOpenRejection(handler: CircuitBreakerOpenRejection => Route): this.type =
handle { case r: CircuitBreakerOpenRejection => handler(r) }
def result(): RejectionHandler =
new BuiltRejectionHandler(cases.result(), notFound, isDefault)
}
@ -165,7 +174,11 @@ object RejectionHandler {
}
.handle {
case TooManyRangesRejection(_)
complete((RequestedRangeNotSatisfiable, "Request contains too many ranges."))
complete((RequestedRangeNotSatisfiable, "Request contains too many ranges"))
}
.handle {
case CircuitBreakerOpenRejection(_)
complete(ServiceUnavailable)
}
.handle {
case UnsatisfiableRangeRejection(unsatisfiableRanges, actualEntityLength)

View file

@ -5,11 +5,15 @@
package akka.http.scaladsl.server
package directives
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.util.Tupler
import akka.http.scaladsl.util.FastFuture._
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
// format: OFF
@ -19,6 +23,8 @@ import akka.http.scaladsl.util.FastFuture._
*/
trait FutureDirectives {
import RouteDirectives._
/**
* "Unwraps" a `Future[T]` and runs the inner route after future
* completion with the future's value as an extraction of type `Try[T]`.
@ -31,6 +37,26 @@ trait FutureDirectives {
future.fast.transformWith(t inner(Tuple1(t))(ctx))
}
/**
* "Unwraps" a `Future[T]` and runs the inner route after future
* completion with the future's value as an extraction of type `T` if
* the supplied `CircuitBreaker` is closed.
*
* If the supplied [[CircuitBreaker]] is open the request is rejected
* with a [[CircuitBreakerOpenRejection]].
*
* @group future
*/
def onCompleteWithBreaker[T](breaker: CircuitBreaker)(future: Future[T]): Directive1[Try[T]] =
onComplete(breaker.withCircuitBreaker(future)).flatMap {
case Failure(ex: CircuitBreakerOpenException)
extractRequestContext.flatMap { ctx
ctx.request.entity.dataBytes.runWith(Sink.cancelled)(ctx.materializer)
reject(CircuitBreakerOpenRejection(ex))
}
case x provide(x)
}
/**
* "Unwraps" a `Future[T]` and runs the inner route after future
* completion with the future's value as an extraction of type `T`.