diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala index 415219fb77..191215dac7 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala @@ -18,25 +18,23 @@ import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } import akka.testkit.AkkaSpec -private[this] object TimeoutDirectivesTestConfig { +private[this] object TimeoutDirectivesInfiniteTimeoutTestConfig { val testConf: Config = ConfigFactory.parseString(""" akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = ERROR akka.stdout-loglevel = ERROR windows-connection-abort-workaround-enabled = auto akka.log-dead-letters = OFF - akka.http.server.request-timeout = 1000s""") - // large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header - // and withRequestTimeout will not work) + akka.http.server.request-timeout = infinite""") } -class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf) +class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesInfiniteTimeoutTestConfig.testConf) with ScalaFutures with CompileOnlySpec { //#testSetup import system.dispatcher implicit val materializer = ActorMaterializer() - def slowFuture(): Future[String] = Promise[String].future // move to Future.never in Scala 2.12 + def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12 def runRoute(route: Route, routePath: String): HttpResponse = { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() @@ -51,8 +49,9 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig //# + // demonstrates that timeout is correctly set despite infinite initial value of akka.http.server.request-timeout "Request Timeout" should { - "be configurable in routing layer" in { + "be configurable in routing layer despite infinite initial value of request-timeout" in { //#withRequestTimeout-plain val route = path("timeout") { @@ -124,3 +123,47 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig } } + +private[this] object TimeoutDirectivesFiniteTimeoutTestConfig { + val testConf: Config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel = ERROR + akka.stdout-loglevel = ERROR + windows-connection-abort-workaround-enabled = auto + akka.log-dead-letters = OFF + akka.http.server.request-timeout = 1000s""") +} + +class TimeoutDirectivesFiniteTimeoutExamplesSpec extends AkkaSpec(TimeoutDirectivesFiniteTimeoutTestConfig.testConf) + with ScalaFutures with CompileOnlySpec { + import system.dispatcher + implicit val materializer = ActorMaterializer() + + def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12 + + def runRoute(route: Route, routePath: String): HttpResponse = { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val binding = Http().bindAndHandle(route, hostname, port) + + val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/$routePath")).futureValue + + binding.flatMap(_.unbind()).futureValue + + response + } + + // demonstrates that timeout is correctly modified for finite initial values of akka.http.server.request-timeout + "Request Timeout" should { + "be configurable in routing layer for finite initial value of request-timeout" in { + val route = + path("timeout") { + withRequestTimeout(1.seconds) { // modifies the global akka.http.server.request-timeout for this request + val response: Future[String] = slowFuture() // very slow + complete(response) + } + } + runRoute(route, "timeout").status should ===(StatusCodes.ServiceUnavailable) // the timeout response + } + } + +} diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 38d6a8b688..80414e2be3 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -85,10 +85,7 @@ private[http] object HttpServerBluePrint { BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings)) def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] = - timeout match { - case x: FiniteDuration ⇒ BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed - case _ ⇒ BidiFlow.identity - } + BidiFlow.fromGraph(new RequestTimeoutSupport(timeout)).reversed /** * Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes @@ -249,7 +246,7 @@ private[http] object HttpServerBluePrint { .via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger")) } - class RequestTimeoutSupport(initialTimeout: FiniteDuration) + class RequestTimeoutSupport(initialTimeout: Duration) extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] { private val requestIn = Inlet[HttpRequest]("requestIn") private val requestOut = Outlet[HttpRequest]("requestOut") @@ -307,13 +304,23 @@ private[http] object HttpServerBluePrint { val timeout: Duration, val handler: HttpRequest ⇒ HttpResponse) - private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit], + private object DummyCancellable extends Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + } + + private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, requestEnd: Future[Unit], trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer) extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ import materializer.executionContext - set { - requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this)) + initialTimeout match { + case timeout: FiniteDuration ⇒ set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this)) + } + case _ ⇒ set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this)) + } } override def apply(request: HttpRequest) = diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index b83f6be39e..17c4ffdc20 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -40,7 +40,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -141,7 +141,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -205,7 +205,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -218,7 +218,7 @@ class HttpServerSpec extends AkkaSpec( | |mnopqrstuvwx""") - expectRequest() shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/next-strict", @@ -446,7 +446,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -456,7 +456,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -696,7 +696,7 @@ class HttpServerSpec extends AkkaSpec( | |""".stripMarginWithNewline("\r\n")) - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) responses.expectRequest() responses.sendError(new RuntimeException("CRASH BOOM BANG")) @@ -758,7 +758,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -767,7 +767,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) override def settings: ServerSettings = super.settings.withDefaultHostHeader(Host("example.com")) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 264c3fbae0..60869305d6 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -447,7 +447,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit val serverInSub = serverIn.expectSubscription() serverInSub.request(1) private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)), - Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() + Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() mapHeaders (_.filterNot(_.is("timeout-access"))) uri shouldEqual Uri(s"http://$hostname:$port/chunked") Await.result(chunkStream.limit(5).runWith(Sink.seq), 100.millis) shouldEqual chunks