diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/FutureDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/FutureDirectivesSpec.scala new file mode 100644 index 0000000000..93c57611f9 --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/FutureDirectivesSpec.scala @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.server +package directives + +import akka.http.model.StatusCodes + +import scala.concurrent.Future + +class FutureDirectivesSpec extends RoutingSpec { + + class TestException(msg: String) extends Exception(msg) + object TestException extends Exception("XXX") + def throwTestException[T](msgPrefix: String): T ⇒ Nothing = t ⇒ throw new TestException(msgPrefix + t) + + implicit val exceptionHandler = ExceptionHandler { + case e: TestException ⇒ complete(StatusCodes.InternalServerError, "Oops. " + e) + } + + "The `onComplete` directive" should { + "unwrap a Future in the success case" in { + var i = 0 + def nextNumber() = { i += 1; i } + val route = onComplete(Future.successful(nextNumber())) { echoComplete } + Get() ~> route ~> check { + responseAs[String] shouldEqual "Success(1)" + } + Get() ~> route ~> check { + responseAs[String] shouldEqual "Success(2)" + } + } + "unwrap a Future in the failure case" in { + Get() ~> onComplete(Future.failed[String](new RuntimeException("no"))) { echoComplete } ~> check { + responseAs[String] shouldEqual "Failure(java.lang.RuntimeException: no)" + } + } + "catch an exception in the success case" in { + Get() ~> onComplete(Future.successful("ok")) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual "Oops. akka.http.server.directives.FutureDirectivesSpec$TestException: EX when Success(ok)" + } + } + "catch an exception in the failure case" in { + Get() ~> onComplete(Future.failed[String](new RuntimeException("no"))) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual "Oops. akka.http.server.directives.FutureDirectivesSpec$TestException: EX when Failure(java.lang.RuntimeException: no)" + } + } + } + + "The `onSuccess` directive" should { + "unwrap a Future in the success case" in { + Get() ~> onSuccess(Future.successful("yes")) { echoComplete } ~> check { + responseAs[String] shouldEqual "yes" + } + } + "propagate the exception in the failure case" in { + Get() ~> onSuccess(Future.failed(TestException)) { echoComplete } ~> check { + status shouldEqual StatusCodes.InternalServerError + } + } + "catch an exception in the success case" in { + Get() ~> onSuccess(Future.successful("ok")) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual "Oops. akka.http.server.directives.FutureDirectivesSpec$TestException: EX when ok" + } + } + "catch an exception in the failure case" in { + Get() ~> onSuccess(Future.failed(TestException)) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual "There was an internal server error." + } + } + } + + "The `completeOrRecoverWith` directive" should { + "complete the request with the Future's value if the future succeeds" in { + Get() ~> completeOrRecoverWith(Future.successful("yes")) { echoComplete } ~> check { + responseAs[String] shouldEqual "yes" + } + } + "don't call the inner route if the Future succeeds" in { + Get() ~> completeOrRecoverWith(Future.successful("ok")) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.OK + responseAs[String] shouldEqual "ok" + } + } + "recover using the inner route if the Future fails" in { + val route = completeOrRecoverWith(Future.failed[String](TestException)) { + case e ⇒ complete(s"Exception occurred: ${e.getMessage}") + } + + Get() ~> route ~> check { + responseAs[String] shouldEqual "Exception occurred: XXX" + } + } + "catch an exception during recovery" in { + Get() ~> completeOrRecoverWith(Future.failed[String](TestException)) { throwTestException("EX when ") } ~> check { + status shouldEqual StatusCodes.InternalServerError + responseAs[String] shouldEqual "Oops. akka.http.server.directives.FutureDirectivesSpec$TestException: EX when akka.http.server.directives.FutureDirectivesSpec$TestException$: XXX" + } + } + } +} diff --git a/akka-http/src/main/scala/akka/http/server/Directives.scala b/akka-http/src/main/scala/akka/http/server/Directives.scala index 416703ae1a..a70e732c5e 100644 --- a/akka-http/src/main/scala/akka/http/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/server/Directives.scala @@ -18,7 +18,7 @@ trait Directives extends RouteConcatenation with ExecutionDirectives //with FileAndResourceDirectives //with FormFieldDirectives - //with FutureDirectives + with FutureDirectives with HeaderDirectives with HostDirectives //with MarshallingDirectives diff --git a/akka-http/src/main/scala/akka/http/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/FutureDirectives.scala new file mode 100644 index 0000000000..168f9d7ead --- /dev/null +++ b/akka-http/src/main/scala/akka/http/server/directives/FutureDirectives.scala @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.server +package directives + +import scala.concurrent.Future +import scala.util.{ Failure, Success, Try } + +import akka.http.util.FastFuture +import FastFuture._ + +import akka.http.marshalling.ToResponseMarshaller +import akka.http.server.util.Tupler + +trait FutureDirectives { + + /** + * "Unwraps" a ``Future[T]`` and runs its inner route after future + * completion with the future's value as an extraction of type ``Try[T]``. + */ + def onComplete[T](future: ⇒ Future[T]): Directive1[Try[T]] = + new Directive1[Try[T]] { + def tapply(f: Tuple1[Try[T]] ⇒ Route): Route = ctx ⇒ + future.fast.transformWith(t ⇒ f(Tuple1(t))(ctx))(ctx.executionContext) + } + + /** + * "Unwraps" a ``Future[T]`` and runs its inner route after future + * completion with the future's value as an extraction of type ``T``. + * If the future fails its failure Throwable is bubbled up to the nearest + * ExceptionHandler. + * If type ``T`` is already a Tuple it is directly expanded into the respective + * number of extractions. + */ + def onSuccess(magnet: OnSuccessMagnet): Directive[magnet.Out] = magnet.get + + /** + * "Unwraps" a ``Future[T]`` and runs its inner route when the future has failed + * with the future's failure exception as an extraction of type ``Throwable``. + * If the future succeeds the request is completed using the values marshaller + * (This directive therefore requires a marshaller for the futures type to be + * implicitly available.) + */ + def completeOrRecoverWith(magnet: CompleteOrRecoverWithMagnet): Directive1[Throwable] = magnet +} + +object FutureDirectives extends FutureDirectives + +trait OnSuccessMagnet { + type Out + def get: Directive[Out] +} + +object OnSuccessMagnet { + implicit def apply[T](future: ⇒ Future[T])(implicit tupler: Tupler[T]) = + new Directive[tupler.Out]()(tupler.OutIsTuple) with OnSuccessMagnet { + type Out = tupler.Out + def get = this + def tapply(f: Out ⇒ Route) = ctx ⇒ future.fast.flatMap(t ⇒ f(tupler(t))(ctx))(ctx.executionContext) + } +} + +trait CompleteOrRecoverWithMagnet extends Directive1[Throwable] + +object CompleteOrRecoverWithMagnet { + implicit def apply[T](future: ⇒ Future[T])(implicit m: ToResponseMarshaller[T]) = + new CompleteOrRecoverWithMagnet { + def tapply(f: Tuple1[Throwable] ⇒ Route) = ctx ⇒ + future.fast.transformWith { + case Success(res) ⇒ ctx.complete(res) + case Failure(error) ⇒ f(Tuple1(error))(ctx) + }(ctx.executionContext) + } +}