diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java new file mode 100644 index 0000000000..b5aeb28d8f --- /dev/null +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package docs.http.javadsl.server.directives; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.http.javadsl.ConnectHttp; +import akka.http.javadsl.Http; +import akka.http.javadsl.ServerBinding; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.http.javadsl.model.StatusCode; +import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.server.AllDirectives; +import akka.http.javadsl.server.Route; +import akka.http.scaladsl.TestUtils; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Flow; +import akka.testkit.TestKit; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import scala.Tuple2; +import scala.Tuple3; +import scala.concurrent.duration.Duration; +import scala.runtime.BoxedUnit; + +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.*; + +public class TimeoutDirectivesExamplesTest extends AllDirectives { + //#testSetup + private final Config testConf = ConfigFactory.parseString("akka.loggers = [\"akka.testkit.TestEventListener\"]\n" + + "akka.loglevel = ERROR\n" + + "akka.stdout-loglevel = ERROR\n" + + "windows-connection-abort-workaround-enabled = auto\n" + + "akka.log-dead-letters = OFF\n" + + "akka.http.server.request-timeout = 1000s"); + // large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header + // and withRequestTimeout will not work) + + private final ActorSystem system = ActorSystem.create("TimeoutDirectivesExamplesTest", testConf); + + private final ActorMaterializer materializer = ActorMaterializer.create(system); + + private final Http http = Http.get(system); + + private CompletionStage shutdown(CompletionStage binding) { + return binding.thenAccept(b -> { + System.out.println(String.format("Unbinding from %s", b.localAddress())); + + final CompletionStage unbound = b.unbind(); + try { + unbound.toCompletableFuture().get(3, TimeUnit.SECONDS); // block... + } catch (TimeoutException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + private Optional runRoute(ActorSystem system, ActorMaterializer materializer, Route route, String routePath) { + final Tuple3 inetaddrHostAndPort = TestUtils.temporaryServerHostnameAndPort("127.0.0.1"); + Tuple2 hostAndPort = new Tuple2<>( + inetaddrHostAndPort._2(), + (Integer) inetaddrHostAndPort._3() + ); + + final Flow routeFlow = route.flow(system, materializer); + final CompletionStage binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(hostAndPort._1(), hostAndPort._2()), materializer); + + final CompletionStage responseCompletionStage = http.singleRequest(HttpRequest.create("http://" + hostAndPort._1() + ":" + hostAndPort._2() + "/" + routePath), materializer); + + CompletableFuture responseFuture = responseCompletionStage.toCompletableFuture(); + + Optional responseOptional; + try { + responseOptional = Optional.of(responseFuture.get(3, TimeUnit.SECONDS)); // patienceConfig + } catch (Exception e) { + responseOptional = Optional.empty(); + } + + shutdown(binding); + + return responseOptional; + } + //# + + @After + public void shutDown() { + TestKit.shutdownActorSystem(system, Duration.create(1, TimeUnit.SECONDS), false); + } + + @Test + public void testRequestTimeoutIsConfigurable() { + //#withRequestTimeout-plain + final Duration timeout = Duration.create(1, TimeUnit.SECONDS); + CompletionStage slowFuture = new CompletableFuture<>(); + + final Route route = path("timeout", () -> + withRequestTimeout(timeout, () -> { + return completeOKWithFutureString(slowFuture); // very slow + }) + ); + + // test: + StatusCode statusCode = runRoute(system, materializer, route, "timeout").get().status(); + assert (StatusCodes.SERVICE_UNAVAILABLE.equals(statusCode)); + //# + } + + @Test + public void testRequestWithoutTimeoutCancelsTimeout() { + //#withoutRequestTimeout-1 + CompletionStage slowFuture = new CompletableFuture<>(); + + final Route route = path("timeout", () -> + withoutRequestTimeout(() -> { + return completeOKWithFutureString(slowFuture); // very slow + }) + ); + + // test: + Boolean receivedReply = runRoute(system, materializer, route, "timeout").isPresent(); + assert (!receivedReply); // timed-out + //# + } + + @Test + public void testRequestTimeoutAllowsCustomResponse() { + //#withRequestTimeout-with-handler + final Duration timeout = Duration.create(1, TimeUnit.MILLISECONDS); + CompletionStage slowFuture = new CompletableFuture<>(); + + HttpResponse enhanceYourCalmResponse = HttpResponse.create() + .withStatus(StatusCodes.ENHANCE_YOUR_CALM) + .withEntity("Unable to serve response within time limit, please enhance your calm."); + + final Route route = path("timeout", () -> + withRequestTimeout(timeout, (request) -> enhanceYourCalmResponse, () -> { + return completeOKWithFutureString(slowFuture); // very slow + }) + ); + + // test: + StatusCode statusCode = runRoute(system, materializer, route, "timeout").get().status(); + assert (StatusCodes.ENHANCE_YOUR_CALM.equals(statusCode)); + //# + } + + // make it compile only to avoid flaking in slow builds + @Ignore("Compile only test") + @Test + public void testRequestTimeoutCustomResponseCanBeAddedSeparately() { + //#withRequestTimeoutResponse + final Duration timeout = Duration.create(100, TimeUnit.MILLISECONDS); + CompletionStage slowFuture = new CompletableFuture<>(); + + HttpResponse enhanceYourCalmResponse = HttpResponse.create() + .withStatus(StatusCodes.ENHANCE_YOUR_CALM) + .withEntity("Unable to serve response within time limit, please enhance your calm."); + + final Route route = path("timeout", () -> + withRequestTimeout(timeout, () -> + // racy! for a very short timeout like 1.milli you can still get 503 + withRequestTimeoutResponse((request) -> enhanceYourCalmResponse, () -> { + return completeOKWithFutureString(slowFuture); // very slow + })) + ); + + // test: + StatusCode statusCode = runRoute(system, materializer, route, "timeout").get().status(); + assert (StatusCodes.ENHANCE_YOUR_CALM.equals(statusCode)); + //# + } +} diff --git a/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeout.rst b/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeout.rst index 43fe7c2376..985e433fc4 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeout.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeout.rst @@ -33,8 +33,10 @@ For more information about various timeouts in Akka HTTP see :ref:`http-timeouts Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. +.. includecode2:: ../../../../code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java + :snippet: withRequestTimeout-plain With setting the handler at the same time: -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. +.. includecode2:: ../../../../code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java + :snippet: withRequestTimeout-with-handler \ No newline at end of file diff --git a/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeoutResponse.rst b/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeoutResponse.rst index cff7040784..dfb27824a4 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeoutResponse.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withRequestTimeoutResponse.rst @@ -23,4 +23,5 @@ To learn more about various timeouts in Akka HTTP and how to configure them see Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. +.. includecode2:: ../../../../code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java + :snippet: withRequestTimeoutResponse \ No newline at end of file diff --git a/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withoutRequestTimeout.rst b/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withoutRequestTimeout.rst index 271489b739..8533ec5b5f 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withoutRequestTimeout.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/timeout-directives/withoutRequestTimeout.rst @@ -20,4 +20,5 @@ For more information about various timeouts in Akka HTTP see :ref:`http-timeouts Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. +.. includecode2:: ../../../../code/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java + :snippet: withoutRequestTimeout \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala index 3ea141bce4..415219fb77 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala @@ -5,24 +5,65 @@ package docs.http.scaladsl.server.directives import akka.http.scaladsl.model.{ HttpResponse, StatusCodes } -import akka.http.scaladsl.server.RoutingSpec +import akka.http.scaladsl.server.Route import docs.CompileOnlySpec - +import akka.http.scaladsl.{ Http, TestUtils } +import akka.http.scaladsl.server.Directives._ +import akka.stream.ActorMaterializer +import akka.http.scaladsl.model.HttpEntity._ +import akka.http.scaladsl.model._ +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } +import akka.testkit.AkkaSpec -class TimeoutDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec { +private[this] object TimeoutDirectivesTestConfig { + val testConf: Config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel = ERROR + akka.stdout-loglevel = ERROR + windows-connection-abort-workaround-enabled = auto + akka.log-dead-letters = OFF + akka.http.server.request-timeout = 1000s""") + // large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header + // and withRequestTimeout will not work) +} + +class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf) + with ScalaFutures with CompileOnlySpec { + //#testSetup + import system.dispatcher + implicit val materializer = ActorMaterializer() + + def slowFuture(): Future[String] = Promise[String].future // move to Future.never in Scala 2.12 + + def runRoute(route: Route, routePath: String): HttpResponse = { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val binding = Http().bindAndHandle(route, hostname, port) + + val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/$routePath")).futureValue + + binding.flatMap(_.unbind()).futureValue + + response + } + + //# "Request Timeout" should { - "be configurable in routing layer" in compileOnlySpec { + "be configurable in routing layer" in { //#withRequestTimeout-plain val route = path("timeout") { - withRequestTimeout(3.seconds) { + withRequestTimeout(1.seconds) { // modifies the global akka.http.server.request-timeout for this request val response: Future[String] = slowFuture() // very slow complete(response) } } + + // check + runRoute(route, "timeout").status should ===(StatusCodes.ServiceUnavailable) // the timeout response //# } "without timeout" in compileOnlySpec { @@ -34,14 +75,16 @@ class TimeoutDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec { complete(response) } } + + // no check as there is no time-out, the future would time out failing the test //# } - "allow mapping the response while setting the timeout" in compileOnlySpec { + "allow mapping the response while setting the timeout" in { //#withRequestTimeout-with-handler val timeoutResponse = HttpResponse( StatusCodes.EnhanceYourCalm, - entity = "Unable to serve response within time limit, please enchance your calm.") + entity = "Unable to serve response within time limit, please enhance your calm.") val route = path("timeout") { @@ -51,28 +94,33 @@ class TimeoutDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec { complete(response) } } + + // check + runRoute(route, "timeout").status should ===(StatusCodes.EnhanceYourCalm) // the timeout response //# } + // make it compile only to avoid flaking in slow builds "allow mapping the response" in compileOnlySpec { //#withRequestTimeoutResponse val timeoutResponse = HttpResponse( StatusCodes.EnhanceYourCalm, - entity = "Unable to serve response within time limit, please enchance your calm.") + entity = "Unable to serve response within time limit, please enhance your calm.") val route = path("timeout") { - withRequestTimeout(1.milli) { + withRequestTimeout(100.milli) { // racy! for a very short timeout like 1.milli you can still get 503 withRequestTimeoutResponse(request => timeoutResponse) { val response: Future[String] = slowFuture() // very slow complete(response) } } } + + // check + runRoute(route, "timeout").status should ===(StatusCodes.EnhanceYourCalm) // the timeout response //# } } - def slowFuture(): Future[String] = Promise[String].future - } diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java index 4ccb14c1e5..33f2e98664 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java @@ -26,11 +26,18 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv final Route index = path("", () -> withRequestTimeout(timeout, this::mkTimeoutResponse, () -> { - silentSleep(5000); // too long, trigger failure + silentSleep(5000); // too long, but note that this will NOT activate withRequestTimeout, see below return complete(index()); }) ); + final Route requestTimeout = path("timeout", () -> + withRequestTimeout(timeout, this::mkTimeoutResponse, () -> { + // here timeout will work + return completeOKWithFutureString(neverEndingFuture(index())); + }) + ); + final Function, Optional> handleAuth = (maybeCreds) -> { if (maybeCreds.isPresent() && maybeCreds.get().verify("pa$$word")) // some secure hash + check return Optional.of(maybeCreds.get().identifier()); @@ -58,7 +65,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv return get(() -> - index.orElse(secure).orElse(ping).orElse(crash).orElse(inner) + index.orElse(secure).orElse(ping).orElse(crash).orElse(inner).orElse(requestTimeout) ); } @@ -70,10 +77,14 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv } } + private CompletableFuture neverEndingFuture(String futureContent) { + return new CompletableFuture<>().thenApply((string) -> futureContent); + } + private HttpResponse mkTimeoutResponse(HttpRequest request) { return HttpResponse.create() .withStatus(StatusCodes.ENHANCE_YOUR_CALM) - .withEntity("Unable to serve response within time limit, please enchance your calm."); + .withEntity("Unable to serve response within time limit, please enhance your calm."); } private String index() { @@ -85,6 +96,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv "
  • /ping
  • \n" + "
  • /secure Use any username and '<username>-password' as credentials
  • \n" + "
  • /crash
  • \n" + + "
  • /timeout Demonstrates timeout
  • \n" + " \n" + " \n" + " \n"; diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/TimeoutDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/TimeoutDirectives.scala index 68ca4b9f52..b558630b30 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/TimeoutDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/TimeoutDirectives.scala @@ -43,4 +43,15 @@ abstract class TimeoutDirectives extends WebSocketDirectives { D.withoutRequestTimeout { inner.get.delegate } } + /** + * Tries to set a new request timeout handler, which produces the timeout response for a + * given request. Note that the handler must produce the response synchronously and shouldn't block! + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + def withRequestTimeoutResponse(timeoutHandler: JFunction[HttpRequest, HttpResponse], inner: Supplier[Route]): RouteAdapter = RouteAdapter { + D.withRequestTimeoutResponse(in ⇒ timeoutHandler(in.asJava).asScala) { inner.get.delegate } + } + }