From fcc1d1287916fc02caf9b0f531f8d042c5af32f5 Mon Sep 17 00:00:00 2001 From: gosubpl Date: Thu, 25 Aug 2016 23:32:14 +0200 Subject: [PATCH 1/5] fix for infinite timeout disabling timeout infrastructure (#20817) --- .../TimeoutDirectivesExamplesSpec.scala | 6 ++---- .../engine/server/HttpServerBluePrint.scala | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala index 415219fb77..49684d2c8b 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala @@ -25,9 +25,7 @@ private[this] object TimeoutDirectivesTestConfig { akka.stdout-loglevel = ERROR windows-connection-abort-workaround-enabled = auto akka.log-dead-letters = OFF - akka.http.server.request-timeout = 1000s""") - // large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header - // and withRequestTimeout will not work) + akka.http.server.request-timeout = infinite""") } class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf) @@ -36,7 +34,7 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig import system.dispatcher implicit val materializer = ActorMaterializer() - def slowFuture(): Future[String] = Promise[String].future // move to Future.never in Scala 2.12 + def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12 def runRoute(route: Route, routePath: String): HttpResponse = { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 38d6a8b688..97613c2b1c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -85,10 +85,7 @@ private[http] object HttpServerBluePrint { BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings)) def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] = - timeout match { - case x: FiniteDuration ⇒ BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed - case _ ⇒ BidiFlow.identity - } + BidiFlow.fromGraph(new RequestTimeoutSupport(timeout)).reversed /** * Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes @@ -249,7 +246,7 @@ private[http] object HttpServerBluePrint { .via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger")) } - class RequestTimeoutSupport(initialTimeout: FiniteDuration) + class RequestTimeoutSupport(initialTimeout: Duration) extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] { private val requestIn = Inlet[HttpRequest]("requestIn") private val requestOut = Outlet[HttpRequest]("requestOut") @@ -307,13 +304,21 @@ private[http] object HttpServerBluePrint { val timeout: Duration, val handler: HttpRequest ⇒ HttpResponse) - private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit], + private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, requestEnd: Future[Unit], trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer) extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ import materializer.executionContext - set { - requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this)) + initialTimeout match { + case timeout: FiniteDuration ⇒ set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this)) + } + case _ ⇒ set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, new Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + }, Duration.Inf, this)) + } } override def apply(request: HttpRequest) = From 7690d1dbbd7b9e83249035e75443701b45be360a Mon Sep 17 00:00:00 2001 From: gosubpl Date: Sat, 27 Aug 2016 07:09:40 +0200 Subject: [PATCH 2/5] filtering out timeout-access header in tests where necessary (#20817) --- .../impl/engine/server/HttpServerSpec.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index b83f6be39e..d367860500 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -40,7 +40,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -141,7 +141,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() shouldEqual + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -205,7 +205,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() shouldEqual + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -218,7 +218,7 @@ class HttpServerSpec extends AkkaSpec( | |mnopqrstuvwx""") - expectRequest() shouldEqual + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/next-strict", @@ -446,7 +446,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -456,7 +456,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -696,7 +696,7 @@ class HttpServerSpec extends AkkaSpec( | |""".stripMarginWithNewline("\r\n")) - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) responses.expectRequest() responses.sendError(new RuntimeException("CRASH BOOM BANG")) @@ -758,7 +758,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -767,7 +767,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) + expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) override def settings: ServerSettings = super.settings.withDefaultHostHeader(Host("example.com")) From ce4e208d85b1e78db99fcf81e1a51220e321479c Mon Sep 17 00:00:00 2001 From: gosubpl Date: Sun, 28 Aug 2016 00:23:40 +0200 Subject: [PATCH 3/5] filtering out timeout-access header in additional tests (#20817) --- .../impl/engine/server/HttpServerSpec.scala | 18 +++++++++--------- .../akka/http/scaladsl/ClientServerSpec.scala | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index d367860500..17c4ffdc20 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -40,7 +40,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -141,7 +141,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -205,7 +205,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -218,7 +218,7 @@ class HttpServerSpec extends AkkaSpec( | |mnopqrstuvwx""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/next-strict", @@ -446,7 +446,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -456,7 +456,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -696,7 +696,7 @@ class HttpServerSpec extends AkkaSpec( | |""".stripMarginWithNewline("\r\n")) - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) responses.expectRequest() responses.sendError(new RuntimeException("CRASH BOOM BANG")) @@ -758,7 +758,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -767,7 +767,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() mapHeaders(_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) override def settings: ServerSettings = super.settings.withDefaultHostHeader(Host("example.com")) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 264c3fbae0..60869305d6 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -447,7 +447,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit val serverInSub = serverIn.expectSubscription() serverInSub.request(1) private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)), - Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() + Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() mapHeaders (_.filterNot(_.is("timeout-access"))) uri shouldEqual Uri(s"http://$hostname:$port/chunked") Await.result(chunkStream.limit(5).runWith(Sink.seq), 100.millis) shouldEqual chunks From 0e09253adf7f4b2b066dd2eb0f8ddb7098de22c0 Mon Sep 17 00:00:00 2001 From: gosubpl Date: Mon, 29 Aug 2016 21:48:32 +0200 Subject: [PATCH 4/5] added DummyCancellable object to avoid allocation (#20817) --- .../http/impl/engine/server/HttpServerBluePrint.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 97613c2b1c..80414e2be3 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -304,6 +304,11 @@ private[http] object HttpServerBluePrint { val timeout: Duration, val handler: HttpRequest ⇒ HttpResponse) + private object DummyCancellable extends Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + } + private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, requestEnd: Future[Unit], trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer) extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ @@ -314,10 +319,7 @@ private[http] object HttpServerBluePrint { requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this)) } case _ ⇒ set { - requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, new Cancellable { - override def isCancelled: Boolean = true - override def cancel(): Boolean = true - }, Duration.Inf, this)) + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this)) } } From 7d7a5e51daee186ba3ce0d2422cafe665ff6eb76 Mon Sep 17 00:00:00 2001 From: gosubpl Date: Tue, 30 Aug 2016 22:18:26 +0200 Subject: [PATCH 5/5] added test showing that timeout is correctly reset for both infinite as well as finite initial request-timeout (#20817) --- .../TimeoutDirectivesExamplesSpec.scala | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala index 49684d2c8b..191215dac7 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala @@ -18,7 +18,7 @@ import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } import akka.testkit.AkkaSpec -private[this] object TimeoutDirectivesTestConfig { +private[this] object TimeoutDirectivesInfiniteTimeoutTestConfig { val testConf: Config = ConfigFactory.parseString(""" akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = ERROR @@ -28,7 +28,7 @@ private[this] object TimeoutDirectivesTestConfig { akka.http.server.request-timeout = infinite""") } -class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf) +class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesInfiniteTimeoutTestConfig.testConf) with ScalaFutures with CompileOnlySpec { //#testSetup import system.dispatcher @@ -49,8 +49,9 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig //# + // demonstrates that timeout is correctly set despite infinite initial value of akka.http.server.request-timeout "Request Timeout" should { - "be configurable in routing layer" in { + "be configurable in routing layer despite infinite initial value of request-timeout" in { //#withRequestTimeout-plain val route = path("timeout") { @@ -122,3 +123,47 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig } } + +private[this] object TimeoutDirectivesFiniteTimeoutTestConfig { + val testConf: Config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel = ERROR + akka.stdout-loglevel = ERROR + windows-connection-abort-workaround-enabled = auto + akka.log-dead-letters = OFF + akka.http.server.request-timeout = 1000s""") +} + +class TimeoutDirectivesFiniteTimeoutExamplesSpec extends AkkaSpec(TimeoutDirectivesFiniteTimeoutTestConfig.testConf) + with ScalaFutures with CompileOnlySpec { + import system.dispatcher + implicit val materializer = ActorMaterializer() + + def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12 + + def runRoute(route: Route, routePath: String): HttpResponse = { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val binding = Http().bindAndHandle(route, hostname, port) + + val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/$routePath")).futureValue + + binding.flatMap(_.unbind()).futureValue + + response + } + + // demonstrates that timeout is correctly modified for finite initial values of akka.http.server.request-timeout + "Request Timeout" should { + "be configurable in routing layer for finite initial value of request-timeout" in { + val route = + path("timeout") { + withRequestTimeout(1.seconds) { // modifies the global akka.http.server.request-timeout for this request + val response: Future[String] = slowFuture() // very slow + complete(response) + } + } + runRoute(route, "timeout").status should ===(StatusCodes.ServiceUnavailable) // the timeout response + } + } + +}