=htp fix for infinite timeout disabling timeout infrastructure (#20817) (#21295)

* fix for infinite timeout disabling timeout infrastructure (#20817)

* filtering out timeout-access header in tests where necessary (#20817)

* filtering out timeout-access header in additional tests (#20817)

* added DummyCancellable object to avoid allocation (#20817)

* added test showing that timeout is correctly reset for both infinite as well as finite initial request-timeout (#20817)
This commit is contained in:
Konrad Malawski 2016-08-31 09:49:41 +01:00 committed by GitHub
commit b1d07498e2
4 changed files with 75 additions and 25 deletions

View file

@ -18,25 +18,23 @@ import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
import akka.testkit.AkkaSpec
private[this] object TimeoutDirectivesTestConfig {
private[this] object TimeoutDirectivesInfiniteTimeoutTestConfig {
val testConf: Config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
windows-connection-abort-workaround-enabled = auto
akka.log-dead-letters = OFF
akka.http.server.request-timeout = 1000s""")
// large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header
// and withRequestTimeout will not work)
akka.http.server.request-timeout = infinite""")
}
class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf)
class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesInfiniteTimeoutTestConfig.testConf)
with ScalaFutures with CompileOnlySpec {
//#testSetup
import system.dispatcher
implicit val materializer = ActorMaterializer()
def slowFuture(): Future[String] = Promise[String].future // move to Future.never in Scala 2.12
def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12
def runRoute(route: Route, routePath: String): HttpResponse = {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
@ -51,8 +49,9 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig
//#
// demonstrates that timeout is correctly set despite infinite initial value of akka.http.server.request-timeout
"Request Timeout" should {
"be configurable in routing layer" in {
"be configurable in routing layer despite infinite initial value of request-timeout" in {
//#withRequestTimeout-plain
val route =
path("timeout") {
@ -124,3 +123,47 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig
}
}
private[this] object TimeoutDirectivesFiniteTimeoutTestConfig {
val testConf: Config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
windows-connection-abort-workaround-enabled = auto
akka.log-dead-letters = OFF
akka.http.server.request-timeout = 1000s""")
}
class TimeoutDirectivesFiniteTimeoutExamplesSpec extends AkkaSpec(TimeoutDirectivesFiniteTimeoutTestConfig.testConf)
with ScalaFutures with CompileOnlySpec {
import system.dispatcher
implicit val materializer = ActorMaterializer()
def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12
def runRoute(route: Route, routePath: String): HttpResponse = {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = Http().bindAndHandle(route, hostname, port)
val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/$routePath")).futureValue
binding.flatMap(_.unbind()).futureValue
response
}
// demonstrates that timeout is correctly modified for finite initial values of akka.http.server.request-timeout
"Request Timeout" should {
"be configurable in routing layer for finite initial value of request-timeout" in {
val route =
path("timeout") {
withRequestTimeout(1.seconds) { // modifies the global akka.http.server.request-timeout for this request
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
runRoute(route, "timeout").status should ===(StatusCodes.ServiceUnavailable) // the timeout response
}
}
}

View file

@ -85,10 +85,7 @@ private[http] object HttpServerBluePrint {
BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings))
def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
timeout match {
case x: FiniteDuration BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed
case _ BidiFlow.identity
}
BidiFlow.fromGraph(new RequestTimeoutSupport(timeout)).reversed
/**
* Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes
@ -249,7 +246,7 @@ private[http] object HttpServerBluePrint {
.via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger"))
}
class RequestTimeoutSupport(initialTimeout: FiniteDuration)
class RequestTimeoutSupport(initialTimeout: Duration)
extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] {
private val requestIn = Inlet[HttpRequest]("requestIn")
private val requestOut = Outlet[HttpRequest]("requestOut")
@ -307,13 +304,23 @@ private[http] object HttpServerBluePrint {
val timeout: Duration,
val handler: HttpRequest HttpResponse)
private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit],
private object DummyCancellable extends Cancellable {
override def isCancelled: Boolean = true
override def cancel(): Boolean = true
}
private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, requestEnd: Future[Unit],
trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer)
extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest HttpResponse) { self
import materializer.executionContext
set {
requestEnd.fast.map(_ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this))
initialTimeout match {
case timeout: FiniteDuration set {
requestEnd.fast.map(_ new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this))
}
case _ set {
requestEnd.fast.map(_ new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this))
}
}
override def apply(request: HttpRequest) =

View file

@ -40,7 +40,7 @@ class HttpServerSpec extends AkkaSpec(
|
|""")
expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
shutdownBlueprint()
})
@ -141,7 +141,7 @@ class HttpServerSpec extends AkkaSpec(
|
|abcdefghijkl""")
expectRequest() shouldEqual
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual
HttpRequest(
method = POST,
uri = "http://example.com/strict",
@ -205,7 +205,7 @@ class HttpServerSpec extends AkkaSpec(
|
|abcdefghijkl""")
expectRequest() shouldEqual
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual
HttpRequest(
method = POST,
uri = "http://example.com/strict",
@ -218,7 +218,7 @@ class HttpServerSpec extends AkkaSpec(
|
|mnopqrstuvwx""")
expectRequest() shouldEqual
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual
HttpRequest(
method = POST,
uri = "http://example.com/next-strict",
@ -446,7 +446,7 @@ class HttpServerSpec extends AkkaSpec(
|Host: example.com
|
|""")
expectRequest() shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com")))
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com")))
shutdownBlueprint()
})
@ -456,7 +456,7 @@ class HttpServerSpec extends AkkaSpec(
|Host: example.com
|
|""")
expectRequest() shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com")))
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com")))
shutdownBlueprint()
})
@ -696,7 +696,7 @@ class HttpServerSpec extends AkkaSpec(
|
|""".stripMarginWithNewline("\r\n"))
expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
responses.expectRequest()
responses.sendError(new RuntimeException("CRASH BOOM BANG"))
@ -758,7 +758,7 @@ class HttpServerSpec extends AkkaSpec(
|
|""")
expectRequest() shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com")))
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com")))
shutdownBlueprint()
})
@ -767,7 +767,7 @@ class HttpServerSpec extends AkkaSpec(
|
|""")
expectRequest() shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`)
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`)
override def settings: ServerSettings = super.settings.withDefaultHostHeader(Host("example.com"))

View file

@ -447,7 +447,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit
val serverInSub = serverIn.expectSubscription()
serverInSub.request(1)
private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)),
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() mapHeaders (_.filterNot(_.is("timeout-access")))
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
Await.result(chunkStream.limit(5).runWith(Sink.seq), 100.millis) shouldEqual chunks