From 3f8dacfd444ace86abdc334aadadb148a91ed97d Mon Sep 17 00:00:00 2001 From: Stefano Bonetti Date: Tue, 31 May 2016 20:48:19 +0100 Subject: [PATCH] +htp #20198 onCompleteWithBreaker directive (#20402) --- .../routing-dsl/directives/alphabetically.rst | 1 + .../directives/future-directives/index.rst | 1 + .../onCompleteWithBreaker.rst | 19 ++++++ .../FutureDirectivesExamplesSpec.scala | 51 ++++++++++++++-- .../routing-dsl/directives/alphabetically.rst | 3 + .../directives/future-directives/index.rst | 1 + .../onCompleteWithBreaker.rst | 27 +++++++++ .../directives/FutureDirectivesSpec.scala | 59 ++++++++++++++++++- .../javadsl/server/RejectionHandler.scala | 11 ++++ .../akka/http/javadsl/server/Rejections.scala | 10 ++++ .../server/directives/FutureDirectives.scala | 17 ++++++ .../akka/http/scaladsl/server/Rejection.scala | 8 +++ .../scaladsl/server/RejectionHandler.scala | 15 ++++- .../server/directives/FutureDirectives.scala | 30 +++++++++- 14 files changed, 244 insertions(+), 9 deletions(-) create mode 100644 akka-docs/rst/java/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst create mode 100644 akka-docs/rst/scala/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst diff --git a/akka-docs/rst/java/http/routing-dsl/directives/alphabetically.rst b/akka-docs/rst/java/http/routing-dsl/directives/alphabetically.rst index c992c44ff8..2773555667 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/alphabetically.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/alphabetically.rst @@ -89,6 +89,7 @@ Directive Description :ref:`-mapUnmatchedPath-java-` Transforms the ``unmatchedPath`` of the ``RequestContext`` using a ``Uri.Path ⇒ Uri.Path`` function :ref:`-method-java-` Rejects all requests whose HTTP method does not match the given one :ref:`-onComplete-java-` "Unwraps" a ``CompletionStage`` and runs the inner route after future completion with the future's value as an extraction of type ``Try`` +:ref:`-onCompleteWithBreaker-java-` "Unwraps" a ``CompletionStage`` inside a ``CircuitBreaker`` and runs the inner route after future completion with the future's value as an extraction of type ``Try`` :ref:`-onSuccess-java-` "Unwraps" a ``CompletionStage`` and runs the inner route after future completion with the future's value as an extraction of type ``T`` :ref:`-optionalCookie-java-` Extracts the ``HttpCookiePair`` with the given name as an ``Option`` :ref:`-optionalHeaderValue-java-` Extracts an optional HTTP header value using a given ``HttpHeader ⇒ Option`` function diff --git a/akka-docs/rst/java/http/routing-dsl/directives/future-directives/index.rst b/akka-docs/rst/java/http/routing-dsl/directives/future-directives/index.rst index c47037fb07..5aa06d193d 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/future-directives/index.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/future-directives/index.rst @@ -9,6 +9,7 @@ Future directives can be used to run inner routes once the provided ``Future[T]` :maxdepth: 1 onComplete + onCompleteWithBreaker onSuccess completeOrRecoverWith diff --git a/akka-docs/rst/java/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst b/akka-docs/rst/java/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst new file mode 100644 index 0000000000..bcb0876b2f --- /dev/null +++ b/akka-docs/rst/java/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst @@ -0,0 +1,19 @@ +.. _-onCompleteWithBreaker-java-: + +onCompleteWithBreaker +===================== + +Description +----------- +Evaluates its parameter of type ``CompletionStage`` protecting it with the specified ``CircuitBreaker``. +Refer to :ref:`Akka Circuit Breaker` for a detailed description of this pattern. + +If the ``CircuitBreaker`` is open, the request is rejected with a ``CircuitBreakerOpenRejection``. +Note that in this case the request's entity databytes stream is cancelled, and the connection is closed +as a consequence. + +Otherwise, the same behaviour provided by :ref:`-onComplete-java-` is to be expected. + +Example +------- +TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FutureDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FutureDirectivesExamplesSpec.scala index 815d7f8b87..267ba9da2e 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FutureDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FutureDirectivesExamplesSpec.scala @@ -5,17 +5,17 @@ package docs.http.scaladsl.server.directives import java.util.concurrent.TimeUnit -import akka.http.scaladsl.marshalling.ToResponseMarshallable + import docs.http.scaladsl.server.RoutingSpec import scala.concurrent.Future -import scala.util.{ Success, Failure } -import akka.http.scaladsl.server.ExceptionHandler -import akka.actor.{ Actor, Props } +import scala.concurrent.duration._ +import scala.util.{Failure, Success} +import akka.http.scaladsl.server.{CircuitBreakerOpenRejection, ExceptionHandler, Route} import akka.util.Timeout import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.Route import StatusCodes._ +import akka.pattern.CircuitBreaker // format: OFF @@ -54,6 +54,47 @@ class FutureDirectivesExamplesSpec extends RoutingSpec { } } + "onCompleteWithBreaker" in { + def divide(a: Int, b: Int): Future[Int] = Future { + a / b + } + + val resetTimeout = 1.second + val breaker = new CircuitBreaker(system.scheduler, + maxFailures = 1, + callTimeout = 5.seconds, + resetTimeout + ) + + val route = + path("divide" / IntNumber / IntNumber) { (a, b) => + onCompleteWithBreaker(breaker)(divide(a, b)) { + case Success(value) => complete(s"The result was $value") + case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}")) + } + } + + // tests: + Get("/divide/10/2") ~> route ~> check { + responseAs[String] shouldEqual "The result was 5" + } + + Get("/divide/10/0") ~> Route.seal(route) ~> check { + status shouldEqual InternalServerError + responseAs[String] shouldEqual "An error occurred: / by zero" + } // opens the circuit breaker + + Get("/divide/10/2") ~> route ~> check { + rejection shouldBe a[CircuitBreakerOpenRejection] + } + + Thread.sleep(resetTimeout.toMillis + 200) + + Get("/divide/10/2") ~> route ~> check { + responseAs[String] shouldEqual "The result was 5" + } + } + "onSuccess" in { val route = path("success") { diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/alphabetically.rst b/akka-docs/rst/scala/http/routing-dsl/directives/alphabetically.rst index e528f3296e..f4204a5a67 100644 --- a/akka-docs/rst/scala/http/routing-dsl/directives/alphabetically.rst +++ b/akka-docs/rst/scala/http/routing-dsl/directives/alphabetically.rst @@ -134,6 +134,9 @@ Directive Description :ref:`-method-` Rejects all requests whose HTTP method does not match the given one :ref:`-onComplete-` "Unwraps" a ``Future[T]`` and runs the inner route after future completion with the future's value as an extraction of type ``Try[T]`` +:ref:`-onCompleteWithBreaker-` "Unwraps" a ``Future[T]`` inside a ``CircuitBreaker`` and runs the inner + route after future completion with the future's value as an extraction of + type ``Try[T]`` :ref:`-onSuccess-` "Unwraps" a ``Future[T]`` and runs the inner route after future completion with the future's value as an extraction of type ``T`` :ref:`-optionalCookie-` Extracts the ``HttpCookiePair`` with the given name as an diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/index.rst b/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/index.rst index 55f98bbc45..a8157ea93e 100644 --- a/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/index.rst @@ -9,6 +9,7 @@ Future directives can be used to run inner routes once the provided ``Future[T]` :maxdepth: 1 onComplete + onCompleteWithBreaker onSuccess completeOrRecoverWith diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst b/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst new file mode 100644 index 0000000000..5e4b8d952b --- /dev/null +++ b/akka-docs/rst/scala/http/routing-dsl/directives/future-directives/onCompleteWithBreaker.rst @@ -0,0 +1,27 @@ +.. _-onCompleteWithBreaker-: + +onCompleteWithBreaker +===================== + +Signature +--------- + +.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/FutureDirectives.scala + :snippet: onCompleteWithBreaker + +Description +----------- +Evaluates its parameter of type ``Future[T]`` protecting it with the specified ``CircuitBreaker``. +Refer to :ref:`Akka Circuit Breaker` for a detailed description of this pattern. + +If the ``CircuitBreaker`` is open, the request is rejected with a ``CircuitBreakerOpenRejection``. +Note that in this case the request's entity databytes stream is cancelled, and the connection is closed +as a consequence. + +Otherwise, the same behaviour provided by :ref:`-onComplete-` is to be expected. + +Example +------- + +.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/FutureDirectivesExamplesSpec.scala + :snippet: onCompleteWithBreaker diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FutureDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FutureDirectivesSpec.scala index 35286523d9..6df986d115 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FutureDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FutureDirectivesSpec.scala @@ -6,10 +6,15 @@ package akka.http.scaladsl.server package directives import akka.http.scaladsl.model.StatusCodes +import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } + import scala.concurrent.Future import akka.testkit.EventFilter +import org.scalatest.Inside -class FutureDirectivesSpec extends RoutingSpec { +import scala.concurrent.duration._ + +class FutureDirectivesSpec extends RoutingSpec with Inside { class TestException(msg: String) extends Exception(msg) object TestException extends Exception("XXX") @@ -19,6 +24,12 @@ class FutureDirectivesSpec extends RoutingSpec { case e: TestException ⇒ complete((StatusCodes.InternalServerError, "Oops. " + e)) } + trait TestWithCircuitBreaker { + val breakerResetTimeout = 500.millis + val breaker = new CircuitBreaker(system.scheduler, maxFailures = 1, callTimeout = 10.seconds, breakerResetTimeout) + def openBreaker() = breaker.withCircuitBreaker(Future.failed(new Exception("boom"))) + } + "The `onComplete` directive" should { "unwrap a Future in the success case" in { var i = 0 @@ -50,6 +61,52 @@ class FutureDirectivesSpec extends RoutingSpec { } } + "The `onCompleteWithBreaker` directive" should { + "unwrap a Future in the success case" in new TestWithCircuitBreaker { + var i = 0 + def nextNumber() = { i += 1; i } + val route = onCompleteWithBreaker(breaker)(Future.successful(nextNumber())) { echoComplete } + Get() ~> route ~> check { + responseAs[String] shouldEqual "Success(1)" + } + Get() ~> route ~> check { + responseAs[String] shouldEqual "Success(2)" + } + } + "unwrap a Future in the failure case" in new TestWithCircuitBreaker { + Get() ~> onCompleteWithBreaker(breaker)(Future.failed[String](new RuntimeException("no"))) { echoComplete } ~> check { + responseAs[String] shouldEqual "Failure(java.lang.RuntimeException: no)" + } + } + "fail fast if the circuit breaker is open" in new TestWithCircuitBreaker { + openBreaker() + Get() ~> onCompleteWithBreaker(breaker)(Future.successful(1)) { echoComplete } ~> check { + inside(rejection) { + case CircuitBreakerOpenRejection(_) ⇒ + } + } + } + "stop failing fast when the circuit breaker closes" in new TestWithCircuitBreaker { + openBreaker() + Thread.sleep(breakerResetTimeout.toMillis + 200) + Get() ~> onCompleteWithBreaker(breaker)(Future.successful(1)) { echoComplete } ~> check { + responseAs[String] shouldEqual "Success(1)" + } + } + "catch an exception in the success case" in new TestWithCircuitBreaker { + Get() ~> onCompleteWithBreaker(breaker)(Future.successful("ok")) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when Success(ok)" + } + } + "catch an exception in the failure case" in new TestWithCircuitBreaker { + Get() ~> onCompleteWithBreaker(breaker)(Future.failed[String](new RuntimeException("no"))) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when Failure(java.lang.RuntimeException: no)" + } + } + } + "The `onSuccess` directive" should { "unwrap a Future in the success case" in { Get() ~> onSuccess(Future.successful("yes")) { echoComplete } ~> check { diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/RejectionHandler.scala b/akka-http/src/main/scala/akka/http/javadsl/server/RejectionHandler.scala index 363fc00c8c..b3d3a5df5f 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/RejectionHandler.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/RejectionHandler.scala @@ -56,4 +56,15 @@ class RejectionHandlerBuilder(asScala: server.RejectionHandler.Builder) { asScala.handleNotFound(route.delegate) this } + + /** + * Convenience method for handling rejections created by created by the onCompleteWithBreaker directive. + * Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast. + * + * Use to customise the error response being written instead of the default [[akka.http.javadsl.model.StatusCodes.SERVICE_UNAVAILABLE]] response. + */ + def handleCircuitBreakerOpenRejection(handler: function.Function[CircuitBreakerOpenRejection, Route]): RejectionHandlerBuilder = { + asScala.handleCircuitBreakerOpenRejection(t => handler.apply(t).delegate) + this + } } diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/Rejections.scala b/akka-http/src/main/scala/akka/http/javadsl/server/Rejections.scala index 48a1e09b39..d396444b4f 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/Rejections.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/Rejections.scala @@ -16,8 +16,10 @@ import akka.http.javadsl.model.headers.HttpChallenge import java.util.Optional import java.util.function.{ Function ⇒ JFunction } import java.lang.{ Iterable ⇒ JIterable } + import akka.http.scaladsl import akka.japi.Util +import akka.pattern.CircuitBreakerOpenException import scala.compat.java8.OptionConverters._ import scala.collection.immutable @@ -250,6 +252,14 @@ trait ValidationRejection extends Rejection { def getCause: Optional[Throwable] } +/** + * Rejection created by the `onCompleteWithBreaker` directive. + * Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast. + */ +trait CircuitBreakerOpenRejection extends Rejection { + def cause: CircuitBreakerOpenException +} + /** * A special Rejection that serves as a container for a transformation function on rejections. * It is used by some directives to "cancel" rejections that are added by later directives of a similar type. diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala index 4c83e2a192..8b10bba0af 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala @@ -15,6 +15,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.Try import akka.http.javadsl.server.{ Marshaller, Route } import akka.http.scaladsl.server.directives.{ CompleteOrRecoverWithMagnet, FutureDirectives ⇒ D } +import akka.pattern.CircuitBreaker abstract class FutureDirectives extends FormFieldDirectives { @@ -28,6 +29,22 @@ abstract class FutureDirectives extends FormFieldDirectives { } } + /** + * "Unwraps" a `CompletionStage[T]` and runs the inner route after future + * completion with the future's value as an extraction of type `T` if + * the supplied `CircuitBreaker` is closed. + * + * If the supplied [[CircuitBreaker]] is open the request is rejected + * with a [[akka.http.javadsl.server.CircuitBreakerOpenRejection]]. + * + * @group future + */ + def onCompleteWithBreaker[T](breaker: CircuitBreaker, f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter { + D.onCompleteWithBreaker(breaker)(f.get.toScala.recover(unwrapCompletionException)) { value ⇒ + inner(value).delegate + } + } + /** * "Unwraps" a `CompletionStage` and runs the inner route after stage * completion with the stage's value as an extraction of type `T`. diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Rejection.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Rejection.scala index 25ce11bf7b..31d05b1ab8 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Rejection.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Rejection.scala @@ -19,6 +19,7 @@ import akka.http.impl.util.JavaMapping._ import akka.http.impl.util.JavaMapping.Implicits._ import akka.http.javadsl.server.RoutingJavaMapping import RoutingJavaMapping._ +import akka.pattern.CircuitBreakerOpenException import scala.collection.JavaConverters._ import scala.compat.java8.OptionConverters @@ -256,6 +257,13 @@ final case class TransformationRejection(transform: immutable.Seq[Rejection] ⇒ } } +/** + * Rejection created by the `onCompleteWithBreaker` directive. + * Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast. + */ +final case class CircuitBreakerOpenRejection(cause: CircuitBreakerOpenException) + extends jserver.CircuitBreakerOpenRejection with Rejection + /** * A Throwable wrapping a Rejection. * Can be used for marshalling `Future[T]` or `Try[T]` instances, whose failure side is supposed to trigger a route diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/RejectionHandler.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/RejectionHandler.scala index cb2e4df28d..b8b598ef8c 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/RejectionHandler.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/RejectionHandler.scala @@ -76,6 +76,15 @@ object RejectionHandler { this } + /** + * Convenience method for handling rejections created by created by the onCompleteWithBreaker directive. + * Signals that the request was rejected because the supplied circuit breaker is open and requests are failing fast. + * + * Use to customise the error response being written instead of the default [[ServiceUnavailable]] response. + */ + def handleCircuitBreakerOpenRejection(handler: CircuitBreakerOpenRejection => Route): this.type = + handle { case r: CircuitBreakerOpenRejection => handler(r) } + def result(): RejectionHandler = new BuiltRejectionHandler(cases.result(), notFound, isDefault) } @@ -165,7 +174,11 @@ object RejectionHandler { } .handle { case TooManyRangesRejection(_) ⇒ - complete((RequestedRangeNotSatisfiable, "Request contains too many ranges.")) + complete((RequestedRangeNotSatisfiable, "Request contains too many ranges")) + } + .handle { + case CircuitBreakerOpenRejection(_) ⇒ + complete(ServiceUnavailable) } .handle { case UnsatisfiableRangeRejection(unsatisfiableRanges, actualEntityLength) ⇒ diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FutureDirectives.scala index c0ce5b49fc..73c64e9572 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FutureDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FutureDirectives.scala @@ -5,11 +5,15 @@ package akka.http.scaladsl.server package directives -import scala.concurrent.Future -import scala.util.{ Failure, Success, Try } import akka.http.scaladsl.marshalling.ToResponseMarshaller +import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.util.Tupler import akka.http.scaladsl.util.FastFuture._ +import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import akka.stream.scaladsl.Sink + +import scala.concurrent.Future +import scala.util.{ Failure, Success, Try } // format: OFF @@ -19,6 +23,8 @@ import akka.http.scaladsl.util.FastFuture._ */ trait FutureDirectives { + import RouteDirectives._ + /** * "Unwraps" a `Future[T]` and runs the inner route after future * completion with the future's value as an extraction of type `Try[T]`. @@ -31,6 +37,26 @@ trait FutureDirectives { future.fast.transformWith(t ⇒ inner(Tuple1(t))(ctx)) } + /** + * "Unwraps" a `Future[T]` and runs the inner route after future + * completion with the future's value as an extraction of type `T` if + * the supplied `CircuitBreaker` is closed. + * + * If the supplied [[CircuitBreaker]] is open the request is rejected + * with a [[CircuitBreakerOpenRejection]]. + * + * @group future + */ + def onCompleteWithBreaker[T](breaker: CircuitBreaker)(future: ⇒ Future[T]): Directive1[Try[T]] = + onComplete(breaker.withCircuitBreaker(future)).flatMap { + case Failure(ex: CircuitBreakerOpenException) ⇒ + extractRequestContext.flatMap { ctx ⇒ + ctx.request.entity.dataBytes.runWith(Sink.cancelled)(ctx.materializer) + reject(CircuitBreakerOpenRejection(ex)) + } + case x ⇒ provide(x) + } + /** * "Unwraps" a `Future[T]` and runs the inner route after future * completion with the future's value as an extraction of type `T`.