Merge pull request #19871 from ktoso/wip-timeout-http-error-ktoso

=htc #19827 avoid double-push when request timeout triggers
This commit is contained in:
Konrad Malawski 2016-03-08 19:29:35 +01:00
commit ac1888fd66
25 changed files with 595 additions and 71 deletions

View file

@ -1,55 +0,0 @@
/**
* Copyright (C) 2015 Lightbend Inc. <http://www.lightbend.com/>
*/
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class RecipeSeq extends RecipeSpec {
"Recipe for draining a stream into a strict collection" must {
"work" in {
//#draining-to-seq-unsafe
val result = immutable.Seq[Message]("1", "2", "3")
val myData = Source(result)
val unsafe: Future[Seq[Message]] = myData.runWith(Sink.seq) // dangerous!
//#draining-to-seq-unsafe
Await.result(unsafe, 3.seconds) should be(result)
}
"work together with limit(n)" in {
//#draining-to-seq-safe
val result = List("1", "2", "3")
val myData = Source(result)
val max = 100
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val safe1: Future[immutable.Seq[Message]] = myData.limit(max).runWith(Sink.seq)
//#draining-to-seq-safe
Await.result(safe1, 3.seconds) should be(result)
}
"work together with take(n)" in {
val result = List("1", "2", "3")
val myData = Source(result)
val max = 100
//#draining-to-seq-safe
// OK. Collect up until max-th elements only, then cancel upstream
val safe2: Future[immutable.Seq[Message]] = myData.take(max).runWith(Sink.seq)
//#draining-to-seq-safe
Await.result(safe2, 3.seconds) should be(result)
}
}
}

View file

@ -31,8 +31,8 @@ code with ``git pull``::
git pull origin master
sbt - Simple Build Tool
=======================
sbt
===
Akka is using the excellent `sbt`_ build system. So the first thing you have to
do is to download and install sbt. You can read more about how to do that in the

View file

@ -4,10 +4,10 @@ Sponsors
========
Lightbend
--------
---------
Lightbend is the company behind the Akka Project, Scala Programming Language,
Play Web Framework, Scala IDE, Simple Build Tool and many other open source
Play Web Framework, Scala IDE, sbt and many other open source
projects. It also provides the Lightbend Stack, a full-featured development
stack consisting of AKka, Play and Scala. Learn more at
`lightbend.com <http://www.lightbend.com>`_.

View file

@ -0,0 +1,13 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs
trait CompileOnlySpec {
/**
* Given a block of code... does NOT execute it.
* Useful when writing code samples in tests, which should only be compiled.
*/
def compileOnlySpec(body: => Unit) = ()
}

View file

@ -12,19 +12,19 @@ import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import akka.testkit.TestActors
import docs.CompileOnlySpec
import org.scalatest.{ Matchers, WordSpec }
import scala.io.StdIn
import scala.language.postfixOps
import scala.concurrent.{ ExecutionContext, Future }
class HttpServerExampleSpec extends WordSpec with Matchers {
class HttpServerExampleSpec extends WordSpec with Matchers
with CompileOnlySpec {
// never actually called
val log: LoggingAdapter = null
def compileOnlySpec(body: => Unit) = ()
"binding-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

View file

@ -6,6 +6,9 @@ package docs.http.scaladsl.server
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.testkit.ScalatestRouteTest
import docs.CompileOnlySpec
import org.scalatest.{ Matchers, WordSpec }
abstract class RoutingSpec extends WordSpec with Matchers with Directives with ScalatestRouteTest
abstract class RoutingSpec extends WordSpec with Matchers
with Directives with ScalatestRouteTest
with CompileOnlySpec

View file

@ -260,6 +260,7 @@ class BasicDirectivesExamplesSpec extends RoutingSpec {
//#1mapResponse-advanced
trait ApiRoutes {
protected def system: ActorSystem
private val log = Logging(system, "ApiRoutes")
private val NullJsonEntity = HttpEntity(ContentTypes.`application/json`, "{}")
@ -800,5 +801,4 @@ class BasicDirectivesExamplesSpec extends RoutingSpec {
//#
}
private def compileOnlySpec(block: => Unit) = pending
}

View file

@ -102,5 +102,4 @@ class FileAndResourceDirectivesExamplesSpec extends RoutingSpec {
}
}
private def compileOnlySpec(block: => Unit) = pending
}

View file

@ -0,0 +1,78 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.http.scaladsl.server.directives
import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
import akka.http.scaladsl.server.RoutingSpec
import docs.CompileOnlySpec
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
class TimeoutDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec {
"Request Timeout" should {
"be configurable in routing layer" in compileOnlySpec {
//#withRequestTimeout-plain
val route =
path("timeout") {
withRequestTimeout(3.seconds) {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
//#
}
"without timeout" in compileOnlySpec {
//#withoutRequestTimeout-1
val route =
path("timeout") {
withoutRequestTimeout {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
//#
}
"allow mapping the response while setting the timeout" in compileOnlySpec {
//#withRequestTimeout-with-handler
val timeoutResponse = HttpResponse(StatusCodes.EnhanceYourCalm,
entity = "Unable to serve response within time limit, please enchance your calm.")
val route =
path("timeout") {
// updates timeout and handler at
withRequestTimeout(1.milli, request => timeoutResponse) {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
//#
}
"allow mapping the response" in compileOnlySpec {
pending // compile only spec since requires actuall Http server to be run
//#withRequestTimeoutResponse
val timeoutResponse = HttpResponse(StatusCodes.EnhanceYourCalm,
entity = "Unable to serve response within time limit, please enchance your calm.")
val route =
path("timeout") {
withRequestTimeout(1.milli) {
withRequestTimeoutResponse(request => timeoutResponse) {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
}
//#
}
}
def slowFuture(): Future[String] = Promise[String].future
}

View file

@ -65,7 +65,12 @@ Timeouts
Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded
as a more general purpose streaming infrastructure feature.
However, akka-stream should soon provide such a feature.
It should be noted that Akka Streams provide various timeout functionality so any API that uses a streams can benefit
from the stream stages such as ``idleTimeout``, ``completionTimeout``, ``initialTimeout`` and even ``throttle``.
To learn more about these refer to their documentation in Akka Streams (and Scala Doc).
For more details about timeout support in Akka HTTP in general refer to :ref:`http-timeouts`.
.. _http-client-layer:

View file

@ -19,3 +19,4 @@ which are specific to one side only.
de-coding
json-support
xml-support
timeouts

View file

@ -0,0 +1,76 @@
.. _http-timeouts:
Akka HTTP Timeouts
==================
Akka HTTP comes with a variety of built-in timeout mechanisms to protect your servers from malicious attacks or
programming mistakes. Some of these are simply configuration options (which may be overriden in code) while others
are left to the streaming APIs and are easily implementable as patterns in user-code directly.
Common timeouts
---------------
Idle timeouts
^^^^^^^^^^^^^
The ``idle-timeout`` is a global setting which sets the maximum inactivity time of a given connection.
In other words, if a connection is open but no request/response is being written to it for over ``idle-timeout`` time,
the connection will be automatically closed.
The setting works the same way for all connections, be it server-side or client-side, and it's configurable
independently for each of those using the following keys::
akka.http.server.idle-timeout
akka.http.client.idle-timeout
akka.http.http-connection-pool.idle-timeout
akka.http.http-connection-pool.client.idle-timeout
.. note::
For the connection pooled client side the idle period is counted only when the pool has no pending requests waiting.
Server timeouts
---------------
.. _request-timeout:
Request timeout
^^^^^^^^^^^^^^^
Request timeouts are a mechanism that limits the maximum time it may take to produce an ``HttpResponse`` from a route.
If that deadline is not met the server will automatically inject a Service Unavailable HTTP response and close the connection
to prevent it from leaking and staying around indefinitely (for example if by programming error a Future would never complete,
never sending the real response otherwise).
The default ``HttpResponse`` that is written when a request timeout is exceeded looks like this:
.. includecode2:: /../../akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala
:snippet: default-request-timeout-httpresponse
A default request timeout is applied globally to all routes and can be configured using the
``akka.http.server.request-timeout`` setting (which defaults to 20 seconds).
.. note::
Please note that if multiple requests (``R1,R2,R3,...``) were sent by a client (see "HTTP pipelining")
using the same connection and the ``n-th`` request triggers a request timeout the server will reply with an Http Response
and close the connection, leaving the ``(n+1)-th`` (and subsequent requests on the same connection) unhandled.
The request timeout can be configured at run-time for a given route using the any of the :ref:`TimeoutDirectives`.
Bind timeout
^^^^^^^^^^^^
The bind timeout is the time period within which the TCP binding process must be completed (using any of the ``Http().bind*`` methods).
It can be configured using the ``akka.http.server.bind-timeout`` setting.
Client timeouts
---------------
Connecting timeout
^^^^^^^^^^^^^^^^^^
The connecting timeout is the time period within which the TCP connecting process must be completed.
Tweaking it should rarely be required, but it allows erroring out the connection in case a connection
is unable to be established for a given amount of time.
it can be configured using the ``akka.http.client.connecting-timeout`` setting.

View file

@ -210,10 +210,14 @@ Directive Description
:ref:`-tprovide-` Injects a given tuple of values into a directive
:ref:`-uploadedFile-` Streams one uploaded file from a multipart request to a file on disk
:ref:`-validate-` Checks a given condition before running its inner route
:ref:`-withoutRequestTimeout-` Disables :ref:`request timeouts <request-timeout>` for a given route.
:ref:`-withExecutionContext-` Runs its inner route with the given alternative ``ExecutionContext``
:ref:`-withMaterializer-` Runs its inner route with the given alternative ``Materializer``
:ref:`-withLog-` Runs its inner route with the given alternative ``LoggingAdapter``
:ref:`-withRangeSupport-` Adds ``Accept-Ranges: bytes`` to responses to GET requests, produces partial
responses if the initial request contained a valid ``Range`` header
:ref:`-withRequestTimeout-` Configures the :ref:`request timeouts <request-timeout>` for a given route.
:ref:`-withRequestTimeoutResponse-` Prepares the ``HttpResponse`` that is emitted if a request timeout is triggered.
``RequestContext => RequestContext`` function
:ref:`-withSettings-` Runs its inner route with the given alternative ``RoutingSettings``
=========================================== ============================================================================

View file

@ -74,6 +74,9 @@ Directives creating or transforming the response
:ref:`BasicDirectives` and :ref:`MiscDirectives`
Directives handling or transforming response properties.
:ref:`TimeoutDirectives`
Configure request timeouts and automatic timeout responses.
List of predefined directives by trait
--------------------------------------
@ -104,3 +107,4 @@ List of predefined directives by trait
scheme-directives/index
security-directives/index
websocket-directives/index
timeout-directives/index

View file

@ -0,0 +1,11 @@
.. _TimeoutDirectives:
TimeoutDirectives
=================
.. toctree::
:maxdepth: 1
withRequestTimeout
withoutRequestTimeout
withRequestTimeoutResponse

View file

@ -0,0 +1,49 @@
.. _-withRequestTimeout-:
withRequestTimeout
==================
Signature
---------
.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/TimeoutDirectives.scala
:snippet: withRequestTimeout
Description
-----------
This directive enables "late" (during request processing) control over the :ref:`request-timeout` feature in Akka HTTP.
The timeout can be either loosened or made more tight using this directive, however one should be aware that it is
inherently racy (which may especially show with very tight timeouts) since a timeout may already have been triggered
when this directive executes.
In case of pipelined HTTP requests (multiple requests being accepted on the same connection before sending the first response)
a the request timeout failure of the ``n-th`` request *will shut down the connection* causing the already enqueued requests
to be dropped. This is by-design, as the request timeout feature serves as a "safety net" in case of programming errors
(e.g. a Future that never completes thus potentially blocking the entire connection forever) or malicious attacks on the server.
Optionally, a timeout handler may be provided in which is called when a time-out is triggered and must produce an
``HttpResponse`` that will be sent back to the client instead of the "too late" response (in case it'd ever arrive).
See also :ref:`-withRequestTimeoutResponse-` if only looking to customise the timeout response without changing the timeout itself.
.. warning::
Please note that setting the timeout from within a directive is inherently racy (as the "point in time from which
we're measuring the timeout" is already in the past (the moment we started handling the request), so if the existing
timeout already was triggered before your directive had the chance to change it, an timeout may still be logged.
It is recommended to use a larger statically configured timeout (think of it as a "safety net" against programming errors
or malicious attackers) and if needed tighten it using the directives not the other way around.
For more information about various timeouts in Akka HTTP see :ref:`http-timeouts`.
Example
-------
.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala
:snippet: withRequestTimeout-plain
With setting the handler at the same time:
.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala
:snippet: withRequestTimeout-with-handler

View file

@ -0,0 +1,34 @@
.. _-withRequestTimeoutResponse-:
withRequestTimeoutResponse
==========================
Signature
---------
.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/TimeoutDirectives.scala
:snippet: withRequestTimeoutResponse
Description
-----------
Allows customising the ``HttpResponse`` that will be sent to clients in case of a :ref:`request-timeout`.
See also :ref:`-withRequestTimeout-` or :ref:`-withoutRequestTimeout-` if interested in dynamically changing the timeout
for a given route instead.
.. warning::
Please note that setting handler is inherently racy as the timeout is measured from starting to handle the request
to its deadline, thus if the timeout triggers before the ``withRequestTimeoutResponse`` executed it would have emitted
the default timeout HttpResponse.
In practice this can only be a problem with very tight timeouts, so with default settings
of request timeouts being measured in seconds it shouldn't be a problem in reality (though certainly a possibility still).
To learn more about various timeouts in Akka HTTP and how to configure them see :ref:`http-timeouts`.
Example
-------
.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala
:snippet: withRequestTimeoutResponse

View file

@ -0,0 +1,31 @@
.. _-withoutRequestTimeout-:
withoutRequestTimeout
=====================
Signature
---------
.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/TimeoutDirectives.scala
:snippet: withoutRequestTimeout
Description
-----------
This directive enables "late" (during request processing) control over the :ref:`request-timeout` feature in Akka HTTP.
It is not recommended to turn off request timeouts using this method as it is inherently racy and disabling request timeouts
basically turns off the safety net against programming mistakes that it provides.
.. warning::
Please note that setting the timeout from within a directive is inherently racy (as the "point in time from which
we're measuring the timeout" is already in the past (the moment we started handling the request), so if the existing
timeout already was triggered before your directive had the chance to change it, an timeout may still be logged.
For more information about various timeouts in Akka HTTP see :ref:`http-timeouts`.
Example
-------
.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala
:snippet: withoutRequestTimeout

View file

@ -277,7 +277,7 @@ private[http] object HttpServerBluePrint {
override def onUpstreamFailure(ex: Throwable) = fail(requestOut, ex)
def emitTimeoutResponse(response: (TimeoutAccess, HttpResponse)) =
if (openTimeouts.head eq response._1) {
emit(responseOut, response._2, () complete(responseOut))
emit(responseOut, response._2, () completeStage())
} // else the application response arrived after we scheduled the timeout response, which is close but ok
})
// TODO: provide and use default impl for simply connecting an input and an output port as we do here
@ -315,8 +315,11 @@ private[http] object HttpServerBluePrint {
requestEnd.fast.map(_ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this))
}
override def apply(request: HttpRequest) = HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " +
override def apply(request: HttpRequest) =
//#default-request-timeout-httpresponse
HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " +
"to produce a timely response to your request.\r\nPlease try again in a short while!")
//#default-request-timeout-httpresponse
def clear(): Unit = // best effort timeout cancellation
get.fast.foreach(setup if (setup.scheduledTask ne null) setup.scheduledTask.cancel())

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.{ BindException, Socket }
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.event.Logging
import akka.event.Logging.LogEvent
import akka.http.impl.util._
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.stream.{ OverflowStrategy, ActorMaterializer, BindFailedException, StreamTcpException }
import akka.testkit.{ TestProbe, EventFilter }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
import scala.util.{ Success, Try }
class TightRequestTimeoutSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
val testConf: Config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
windows-connection-abort-workaround-enabled = auto
akka.log-dead-letters = OFF
akka.http.server.request-timeout = 10ms""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
implicit val patience = PatienceConfig(3.seconds)
"Tight request timeout" should {
"not cause double push error caused by the late response attemting to push" in {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val slowHandler = Flow[HttpRequest].map(_ HttpResponse()).delay(500.millis, OverflowStrategy.backpressure)
val binding = Http().bindAndHandle(slowHandler, hostname, port)
val p = TestProbe()
system.eventStream.subscribe(p.ref, classOf[Logging.Error])
val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/")).futureValue
response.status should ===(StatusCodes.ServiceUnavailable) // the timeout response
p.expectMsgPF(hint = "Expected truncation error") {
case Logging.Error(_, _, _, msg: String) if msg contains "Inner stream finished before inputs completed." ()
}
p.expectNoMsg(1.second) // here the double push might happen
binding.flatMap(_.unbind()).futureValue
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.server
import akka.actor.ActorSystem
import akka.http.scaladsl.{ Http, TestUtils }
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.{ HttpResponse, HttpRequest }
import akka.stream.ActorMaterializer
import akka.testkit.AkkaSpec
import org.scalatest.concurrent.{ IntegrationPatience, ScalaFutures }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.duration._
import scala.concurrent.Await
/** INTERNAL API - not (yet?) ready for public consuption */
private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers with BeforeAndAfterAll
with Directives with RequestBuilding
with ScalaFutures with IntegrationPatience {
implicit val system = ActorSystem(AkkaSpec.getCallerName(getClass))
implicit val mat = ActorMaterializer()
import system.dispatcher
override protected def afterAll(): Unit = {
Await.ready(system.terminate(), 3.seconds)
}
implicit class DSL(request: HttpRequest) {
def ~!>(route: Route) = new Prepped(request, route)
}
final case class Prepped(request: HttpRequest, route: Route)
implicit class Checking(p: Prepped) {
def ~!>(checking: HttpResponse Unit) = {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = Http().bindAndHandle(p.route, host, port)
try {
val targetUri = p.request.uri.withHost(host).withPort(port).withScheme("http")
val response = Http().singleRequest(p.request.withUri(targetUri)).futureValue
checking(response)
} finally binding.flatMap(_.unbind()).futureValue
}
}
}

View file

@ -5,11 +5,13 @@
package akka.http.scaladsl.server
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse }
import akka.http.scaladsl.server.directives.Credentials
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import scala.concurrent.duration._
object TestServer extends App {
val testConf: Config = ConfigFactory.parseString("""
@ -31,7 +33,11 @@ object TestServer extends App {
val bindingFuture = Http().bindAndHandle({
get {
path("") {
withRequestTimeout(1.milli, _ HttpResponse(StatusCodes.EnhanceYourCalm,
entity = "Unable to serve response within time limit, please enchance your calm.")) {
Thread.sleep(1000)
complete(index)
}
} ~
path("secure") {
authenticateBasicPF("My very secure site", auth) { user

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.server.directives
import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
import akka.http.scaladsl.server.IntegrationRoutingSpec
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
class TimeoutDirectivesSpec extends IntegrationRoutingSpec {
"Request Timeout" should {
"be configurable in routing layer" in {
val route = path("timeout") {
withRequestTimeout(3.seconds) {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
Get("/timeout") ~!> route ~!> { response
import response._
status should ===(StatusCodes.ServiceUnavailable)
}
}
}
"allow mapping the response" in {
val timeoutResponse = HttpResponse(StatusCodes.EnhanceYourCalm,
entity = "Unable to serve response within time limit, please enchance your calm.")
val route =
path("timeout") {
withRequestTimeout(500.millis) {
withRequestTimeoutResponse(request timeoutResponse) {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
} ~
path("equivalent") {
// updates timeout and handler at
withRequestTimeout(500.millis, request timeoutResponse) {
val response: Future[String] = slowFuture() // very slow
complete(response)
}
}
Get("/timeout") ~!> route ~!> { response
import response._
status should ===(StatusCodes.EnhanceYourCalm)
}
}
def slowFuture(): Future[String] = Promise[String].future
}

View file

@ -23,6 +23,7 @@ trait Directives extends RouteConcatenation
with MethodDirectives
with MiscDirectives
with ParameterDirectives
with TimeoutDirectives
with PathDirectives
with RangeDirectives
with RespondWithDirectives

View file

@ -0,0 +1,79 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.server.directives
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.`Timeout-Access`
import akka.http.scaladsl.server.{ Directive, Directive0 }
import scala.concurrent.duration.Duration
trait TimeoutDirectives {
def withoutRequestTimeout: Directive0 =
withRequestTimeout(Duration.Inf)
/**
* Tries to set a new request timeout and handler (if provided) at the same time.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
def withRequestTimeout(timeout: Duration): Directive0 =
withRequestTimeout(timeout, None)
/**
* Tries to set a new request timeout and handler (if provided) at the same time.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*
* @param handler optional custom "timeout response" function. If left None, the default timeout HttpResponse will be used.
*/
def withRequestTimeout(timeout: Duration, handler: HttpRequest HttpResponse): Directive0 =
withRequestTimeout(timeout, Some(handler))
/**
* Tries to set a new request timeout and handler (if provided) at the same time.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*
* @param handler optional custom "timeout response" function. If left None, the default timeout HttpResponse will be used.
*/
def withRequestTimeout(timeout: Duration, handler: Option[HttpRequest HttpResponse]): Directive0 =
Directive { inner
ctx
ctx.request.header[`Timeout-Access`] match {
case Some(t)
handler match {
case Some(h) t.timeoutAccess.update(timeout, h)
case _ t.timeoutAccess.updateTimeout(timeout)
}
case _ ctx.log.warning("withRequestTimeout was used in route however no request-timeout is set!")
}
inner()(ctx)
}
/**
* Tries to set a new request timeout handler, which produces the timeout response for a
* given request. Note that the handler must produce the response synchronously and shouldn't block!
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
def withRequestTimeoutResponse(handler: HttpRequest HttpResponse): Directive0 =
Directive { inner
ctx
ctx.request.header[`Timeout-Access`] match {
case Some(t) t.timeoutAccess.updateHandler(handler)
case _ ctx.log.warning("withRequestTimeoutResponse was used in route however no request-timeout is set!")
}
inner()(ctx)
}
}
object TimeoutDirectives extends TimeoutDirectives