diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java index 21d7fcc471..32790670a2 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java @@ -3,6 +3,7 @@ */ package docs.http.javadsl.server.directives; +import akka.NotUsed; import akka.actor.ActorSystem; import akka.dispatch.ExecutionContexts; import akka.event.Logging; @@ -31,14 +32,17 @@ import akka.util.ByteString; import org.junit.Ignore; import org.junit.Test; import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.duration.FiniteDuration; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -785,4 +789,53 @@ public class BasicDirectivesExamplesTest extends JUnitRouteTest { //#extractUnmatchedPath } + @Test + public void testExtractStrictEntity() { + //#extractStrictEntity + final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS); + final Route route = extractStrictEntity(timeout, strict -> + complete(strict.getData().utf8String()) + ); + + // tests: + final Iterator iterator = Arrays.asList( + ByteString.fromString("1"), + ByteString.fromString("2"), + ByteString.fromString("3")).iterator(); + final Source dataBytes = Source.fromIterator(() -> iterator); + testRoute(route).run( + HttpRequest.POST("/") + .withEntity(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, dataBytes)) + ).assertEntity("123"); + //#extractStrictEntity + } + + @Test + public void testToStrictEntity() { + //#toStrictEntity + final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS); + final Route route = toStrictEntity(timeout, () -> + extractRequest(req -> { + if (req.entity() instanceof HttpEntity.Strict) { + final HttpEntity.Strict strict = (HttpEntity.Strict)req.entity(); + return complete("Request entity is strict, data=" + strict.getData().utf8String()); + } else { + return complete("Ooops, request entity is not strict!"); + } + }) + ); + + // tests: + final Iterator iterator = Arrays.asList( + ByteString.fromString("1"), + ByteString.fromString("2"), + ByteString.fromString("3")).iterator(); + final Source dataBytes = Source.fromIterator(() -> iterator); + testRoute(route).run( + HttpRequest.POST("/") + .withEntity(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, dataBytes)) + ).assertEntity("Request entity is strict, data=123"); + //#toStrictEntity + } + } diff --git a/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/extractStrictEntity.rst b/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/extractStrictEntity.rst new file mode 100644 index 0000000000..3b2279c1aa --- /dev/null +++ b/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/extractStrictEntity.rst @@ -0,0 +1,23 @@ +.. _-extractStrictEntity-java-: + +extractStrictEntity +=================== + +Description +----------- + +Extracts the strict http entity as ``HttpEntity.Strict`` from the :class:`RequestContext`. + +A timeout parameter is given and if the stream isn't completed after the timeout, the directive will be failed. + +.. warning:: + + The directive will read the request entity into memory within the size limit(8M by default) and effectively disable streaming. + The size limit can be configured globally with ``akka.http.parsing.max-content-length`` or + overridden by wrapping with :ref:`-withSizeLimit-java-` or :ref:`-withoutSizeLimit-java-` directive. + + +Example +------- + +.. includecode:: ../../../../code/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java#extractStrictEntity diff --git a/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/index.rst b/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/index.rst index 39238f6d99..a212015c69 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/index.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/index.rst @@ -19,6 +19,7 @@ a single value or a tuple of values. * :ref:`-extract-java-` * :ref:`-extractExecutionContext-java-` * :ref:`-extractMaterializer-java-` + * :ref:`-extractStrictEntity-java-` * :ref:`-extractLog-java-` * :ref:`-extractRequest-java-` * :ref:`-extractRequestContext-java-` @@ -41,6 +42,7 @@ Transforming the Request(Context) * :ref:`-withMaterializer-java-` * :ref:`-withLog-java-` * :ref:`-withSettings-java-` + * :ref:`-toStrictEntity-java-` .. _Response Transforming Directives-java: @@ -93,6 +95,7 @@ Alphabetically extract extractExecutionContext extractMaterializer + extractStrictEntity extractLog extractRequest extractRequestContext @@ -117,6 +120,7 @@ Alphabetically provide recoverRejections recoverRejectionsWith + toStrictEntity withExecutionContext withMaterializer withLog diff --git a/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/toStrictEntity.rst b/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/toStrictEntity.rst new file mode 100644 index 0000000000..a1950e6b42 --- /dev/null +++ b/akka-docs/rst/java/http/routing-dsl/directives/basic-directives/toStrictEntity.rst @@ -0,0 +1,23 @@ +.. _-toStrictEntity-java-: + +toStrictEntity +============== + +Description +----------- + +Transforms the request entity to strict entity before it is handled by the inner route. + +A timeout parameter is given and if the stream isn't completed after the timeout, the directive will be failed. + +.. warning:: + + The directive will read the request entity into memory within the size limit(8M by default) and effectively disable streaming. + The size limit can be configured globally with ``akka.http.parsing.max-content-length`` or + overridden by wrapping with :ref:`-withSizeLimit-java-` or :ref:`-withoutSizeLimit-java-` directive. + + +Example +------- + +.. includecode:: ../../../../code/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java#toStrictEntity diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala index 0b5ff6247e..39d3969998 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala @@ -826,5 +826,40 @@ class BasicDirectivesExamplesSpec extends RoutingSpec { } //# } + "extractStrictEntity-example" in { + //#extractStrictEntity-example + import scala.concurrent.duration._ + val route = extractStrictEntity(3.seconds) { entity => + complete(entity.data.utf8String) + } + + // tests: + val dataBytes = Source.fromIterator(() ⇒ Iterator.range(1, 10).map(x ⇒ ByteString(x.toString))) + Post("/", HttpEntity(ContentTypes.`text/plain(UTF-8)`, data = dataBytes)) ~> route ~> check { + responseAs[String] shouldEqual "123456789" + } + //# + } + "toStrictEntity-example" in { + //#toStrictEntity-example + import scala.concurrent.duration._ + val route = toStrictEntity(3.seconds) { + extractRequest { req => + req.entity match { + case strict: HttpEntity.Strict => + complete(s"Request entity is strict, data=${strict.data.utf8String}") + case _ => + complete("Ooops, request entity is not strict!") + } + } + } + + // tests: + val dataBytes = Source.fromIterator(() ⇒ Iterator.range(1, 10).map(x ⇒ ByteString(x.toString))) + Post("/", HttpEntity(ContentTypes.`text/plain(UTF-8)`, data = dataBytes)) ~> route ~> check { + responseAs[String] shouldEqual "Request entity is strict, data=123456789" + } + //# + } } diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/extractStrictEntity.rst b/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/extractStrictEntity.rst new file mode 100644 index 0000000000..29ca0aa174 --- /dev/null +++ b/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/extractStrictEntity.rst @@ -0,0 +1,30 @@ +.. _-extractStrictEntity-: + +extractStrictEntity +=================== + +Signature +--------- + +.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala + :snippet: extractStrictEntity + +Description +----------- + +Extracts the strict http entity as ``HttpEntity.Strict`` from the :class:`RequestContext`. + +A timeout parameter is given and if the stream isn't completed after the timeout, the directive will be failed. + +.. warning:: + + The directive will read the request entity into memory within the size limit(8M by default) and effectively disable streaming. + The size limit can be configured globally with ``akka.http.parsing.max-content-length`` or + overridden by wrapping with :ref:`-withSizeLimit-` or :ref:`-withoutSizeLimit-` directive. + + +Example +------- + +.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala + :snippet: extractStrictEntity-example diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/index.rst b/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/index.rst index 709f7d7b29..012bc536ca 100644 --- a/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/index.rst @@ -20,6 +20,7 @@ a single value or a tuple of values. * :ref:`-extractDataBytes-` * :ref:`-extractExecutionContext-` * :ref:`-extractMaterializer-` + * :ref:`-extractStrictEntity-` * :ref:`-extractLog-` * :ref:`-extractRequest-` * :ref:`-extractRequestContext-` @@ -45,6 +46,7 @@ Transforming the Request(Context) * :ref:`-withMaterializer-` * :ref:`-withLog-` * :ref:`-withSettings-` + * :ref:`-toStrictEntity-` .. _Response Transforming Directives: @@ -98,6 +100,7 @@ Alphabetically extractExecutionContext extractDataBytes extractMaterializer + extractStrictEntity extractLog extractRequest extractRequestContext @@ -124,6 +127,7 @@ Alphabetically recoverRejections recoverRejectionsWith textract + toStrictEntity tprovide withExecutionContext withMaterializer diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/toStrictEntity.rst b/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/toStrictEntity.rst new file mode 100644 index 0000000000..f45a9ac048 --- /dev/null +++ b/akka-docs/rst/scala/http/routing-dsl/directives/basic-directives/toStrictEntity.rst @@ -0,0 +1,30 @@ +.. _-toStrictEntity-: + +toStrictEntity +============== + +Signature +--------- + +.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala + :snippet: toStrictEntity + +Description +----------- + +Transforms the request entity to strict entity before it is handled by the inner route. + +A timeout parameter is given and if the stream isn't completed after the timeout, the directive will be failed. + +.. warning:: + + The directive will read the request entity into memory within the size limit(8M by default) and effectively disable streaming. + The size limit can be configured globally with ``akka.http.parsing.max-content-length`` or + overridden by wrapping with :ref:`-withSizeLimit-` or :ref:`-withoutSizeLimit-` directive. + + +Example +------- + +.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala + :snippet: toStrictEntity-example diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala index 886223d662..f87a9b55f3 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala @@ -16,6 +16,7 @@ import akka.util.ByteString import scala.concurrent.ExecutionContextExecutor import akka.http.impl.model.JavaUri import akka.http.javadsl.model.HttpRequest +import akka.http.javadsl.model.HttpEntity import akka.http.javadsl.model.RequestEntity import akka.http.javadsl.model.Uri import akka.http.javadsl.server._ @@ -38,6 +39,7 @@ import akka.event.LoggingAdapter import akka.http.javadsl.server import scala.compat.java8.FutureConverters._ +import scala.concurrent.duration.FiniteDuration abstract class BasicDirectives { import akka.http.impl.util.JavaMapping.Implicits._ @@ -283,4 +285,29 @@ abstract class BasicDirectives { */ def extractRequestEntity(inner: JFunction[RequestEntity, Route]): Route = extractEntity(inner) + /** + * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * + * Converts the HttpEntity from the [[akka.http.javadsl.server.RequestContext]] into an + * [[akka.http.javadsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the + * entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + */ + def extractStrictEntity(timeout: FiniteDuration, inner: JFunction[HttpEntity.Strict, Route]): Route = RouteAdapter { + D.extractStrictEntity(timeout) { strict ⇒ inner.apply(strict).delegate } + } + + /** + * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * + * Extracts the [[akka.http.javadsl.server.RequestContext]] itself with the strict HTTP entity, + * or fails the route if unable to drain the entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + */ + def toStrictEntity(timeout: FiniteDuration, inner: Supplier[Route]): Route = RouteAdapter { + D.toStrictEntity(timeout) { inner.get.delegate } + } + } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala index 1b5d4fcecc..0b0fd6a99c 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala @@ -8,9 +8,11 @@ package directives import akka.stream.scaladsl.Source import akka.util.ByteString +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, ExecutionContextExecutor } import scala.collection.immutable import akka.event.LoggingAdapter +import akka.stream.impl.ConstantFun.scalaIdentityFunction import akka.stream.Materializer import akka.http.scaladsl.settings.{ RoutingSettings, ParserSettings } import akka.http.scaladsl.server.util.Tuple @@ -18,6 +20,8 @@ import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.model._ import akka.http.scaladsl.util.FastFuture._ +import scala.util.{ Failure, Success } + /** * @groupname basic Basic directives * @groupprio basic 10 @@ -301,6 +305,51 @@ trait BasicDirectives { * @group basic */ def extractDataBytes: Directive1[Source[ByteString, Any]] = BasicDirectives._extractDataBytes + + /** + * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * + * Converts the HttpEntity from the [[akka.http.scaladsl.server.RequestContext]] into an + * [[akka.http.scaladsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the + * entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @group basic + */ + def extractStrictEntity(timeout: FiniteDuration): Directive1[HttpEntity.Strict] = + extract { ctx ⇒ + import ctx.materializer + + ctx.request.entity.toStrict(timeout) + + }.flatMap { entity ⇒ + import FutureDirectives._ + + onComplete(entity).flatMap { + case Success(x) ⇒ provide(x) + case Failure(t) ⇒ StandardRoute(_.fail(t)) + } + } + + /** + * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * + * Extracts the [[akka.http.scaladsl.server.RequestContext]] itself with the strict HTTP entity, + * or fails the route if unable to drain the entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @group basic + */ + def toStrictEntity(timeout: FiniteDuration): Directive0 = + Directive { inner ⇒ ctx ⇒ + import ctx.{ executionContext, materializer } + + ctx.request.entity.toStrict(timeout).flatMap { strictEntity ⇒ + val newCtx = ctx.mapRequest(_.copy(entity = strictEntity)) + inner(())(newCtx) + } + } + } object BasicDirectives extends BasicDirectives { @@ -312,7 +361,7 @@ object BasicDirectives extends BasicDirectives { private val _extractLog: Directive1[LoggingAdapter] = extract(_.log) private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings) private val _extractParserSettings: Directive1[ParserSettings] = extract(_.parserSettings) - private val _extractRequestContext: Directive1[RequestContext] = extract(conforms) + private val _extractRequestContext: Directive1[RequestContext] = extract(scalaIdentityFunction) private val _extractRequestEntity: Directive1[RequestEntity] = extract(_.request.entity) private val _extractDataBytes: Directive1[Source[ByteString, Any]] = extract(_.request.entity.dataBytes) }