diff --git a/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java b/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java index f34f7be433..5377fa75e8 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java @@ -4,31 +4,137 @@ package docs.http.javadsl; +import akka.Done; import akka.actor.AbstractActor; -import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.HostConnectionPool; import akka.japi.Pair; import akka.japi.pf.ReceiveBuilder; import akka.stream.Materializer; +import akka.util.ByteString; +import scala.compat.java8.FutureConverters; import scala.concurrent.ExecutionContextExecutor; import scala.concurrent.Future; -import akka.stream.ActorMaterializer; import akka.stream.javadsl.*; import akka.http.javadsl.OutgoingConnection; -import akka.http.javadsl.model.*; import akka.http.javadsl.Http; -import scala.util.Try; import static akka.http.javadsl.ConnectHttp.toHost; import static akka.pattern.PatternsCS.*; import java.util.concurrent.CompletionStage; +//#manual-entity-consume-example-1 +import java.io.File; +import akka.actor.ActorSystem; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Framing; +import akka.http.javadsl.model.*; +import scala.concurrent.duration.FiniteDuration; +import scala.util.Try; +//#manual-entity-consume-example-1 + @SuppressWarnings("unused") public class HttpClientExampleDocTest { + HttpResponse responseFromSomewhere() { + return null; + } + + void manualEntityComsumeExample() { + //#manual-entity-consume-example-1 + + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final HttpResponse response = responseFromSomewhere(); + + final Function transformEachLine = line -> line /* some transformation here */; + + final int maximumFrameLength = 256; + + response.entity().getDataBytes() + .via(Framing.delimiter(ByteString.fromString("\n"), maximumFrameLength, FramingTruncation.ALLOW)) + .map(transformEachLine::apply) + .runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer); + //#manual-entity-consume-example-1 + } + + private + //#manual-entity-consume-example-2 + final class ExamplePerson { + final String name; + public ExamplePerson(String name) { this.name = name; } + } + + public ExamplePerson parse(ByteString line) { + return new ExamplePerson(line.utf8String()); + } + //#manual-entity-consume-example-2 + + void manualEntityConsumeExample2() { + //#manual-entity-consume-example-2 + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final HttpResponse response = responseFromSomewhere(); + + // toStrict to enforce all data be loaded into memory from the connection + final CompletionStage strictEntity = response.entity() + .toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), materializer); + + // while API remains the same to consume dataBytes, now they're in memory already: + + final CompletionStage person = + strictEntity + .thenCompose(strict -> + strict.getDataBytes() + .runFold(ByteString.empty(), (acc, b) -> acc.concat(b), materializer) + .thenApply(this::parse) + ); + + //#manual-entity-consume-example-2 + } + + void manualEntityDiscardExample1() { + //#manual-entity-discard-example-1 + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final HttpResponse response = responseFromSomewhere(); + + final HttpMessage.DiscardedEntity discarded = response.discardEntityBytes(materializer); + + discarded.completionStage().whenComplete((done, ex) -> { + System.out.println("Entity discarded completely!"); + }); + //#manual-entity-discard-example-1 + } + + void manualEntityDiscardExample2() { + //#manual-entity-discard-example-2 + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final HttpResponse response = responseFromSomewhere(); + + final CompletionStage discardingComplete = response.entity().getDataBytes().runWith(Sink.ignore(), materializer); + + discardingComplete.whenComplete((done, ex) -> { + System.out.println("Entity discarded completely!"); + }); + //#manual-entity-discard-example-2 + } + + // compile only test public void testConstructRequest() { //#outgoing-connection-example diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java index 37a773e5f6..d06236fd5b 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java @@ -4,26 +4,37 @@ package docs.http.javadsl.server; +import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.IncomingConnection; import akka.http.javadsl.ServerBinding; +import akka.http.javadsl.marshallers.jackson.Jackson; import akka.http.javadsl.model.*; +import akka.http.javadsl.model.headers.Connection; +import akka.http.javadsl.server.Route; +import akka.http.javadsl.server.Unmarshaller; import akka.japi.function.Function; import akka.stream.ActorMaterializer; +import akka.stream.IOResult; import akka.stream.Materializer; +import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; +import scala.concurrent.ExecutionContextExecutor; import java.io.BufferedReader; +import java.io.File; import java.io.InputStreamReader; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import static akka.http.javadsl.server.Directives.*; + @SuppressWarnings("unused") public class HttpServerExampleDocTest { @@ -205,4 +216,113 @@ public class HttpServerExampleDocTest { public static void main(String[] args) throws Exception { fullServerExample(); } + + + //#consume-entity-directive + class Bid { + final String userId; + final int bid; + + Bid(String userId, int bid) { + this.userId = userId; + this.bid = bid; + } + } + //#consume-entity-directive + + void consumeEntityUsingEntityDirective() { + //#consume-entity-directive + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final Unmarshaller asBid = Jackson.unmarshaller(Bid.class); + + final Route s = path("bid", () -> + put(() -> + entity(asBid, bid -> + // incoming entity is fully consumed and converted into a Bid + complete("The bid was: " + bid) + ) + ) + ); + //#consume-entity-directive + } + + void consumeEntityUsingRawDataBytes() { + //#consume-raw-dataBytes + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final Route s = + put(() -> + path("lines", () -> + withoutSizeLimit(() -> + extractDataBytes(bytes -> { + final CompletionStage res = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer); + + return onComplete(() -> res, ioResult -> + // we only want to respond once the incoming data has been handled: + complete("Finished writing data :" + ioResult)); + }) + ) + ) + ); + + //#consume-raw-dataBytes + } + + void discardEntityUsingRawBytes() { + //#discard-discardEntityBytes + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final Route s = + put(() -> + path("lines", () -> + withoutSizeLimit(() -> + extractRequest(r -> { + final CompletionStage res = r.discardEntityBytes(materializer).completionStage(); + + return onComplete(() -> res, done -> + // we only want to respond once the incoming data has been handled: + complete("Finished writing data :" + done)); + }) + ) + ) + ); + //#discard-discardEntityBytes + } + + void discardEntityManuallyCloseConnections() { + //#discard-close-connections + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + final Route s = + put(() -> + path("lines", () -> + withoutSizeLimit(() -> + extractDataBytes(bytes -> { + // Closing connections, method 1 (eager): + // we deem this request as illegal, and close the connection right away: + bytes.runWith(Sink.cancelled(), materializer); // "brutally" closes the connection + + // Closing connections, method 2 (graceful): + // consider draining connection and replying with `Connection: Close` header + // if you want the client to close after this request/reply cycle instead: + return respondWithHeader(Connection.create("close"), () -> + complete(StatusCodes.FORBIDDEN, "Not allowed!") + ); + }) + ) + ) + ); + //#discard-close-connections + } + + } diff --git a/akka-docs/rst/java/http/implications-of-streaming-http-entity.rst b/akka-docs/rst/java/http/implications-of-streaming-http-entity.rst new file mode 100644 index 0000000000..b66f66da53 --- /dev/null +++ b/akka-docs/rst/java/http/implications-of-streaming-http-entity.rst @@ -0,0 +1,121 @@ +.. _implications-of-streaming-http-entities-java: + +Implications of the streaming nature of Request/Response Entities +----------------------------------------------------------------- + +Akka HTTP is streaming *all the way through*, which means that the back-pressure mechanisms enabled by Akka Streams +are exposed through all layers–from the TCP layer, through the HTTP server, all the way up to the user-facing ``HttpRequest`` +and ``HttpResponse`` and their ``HttpEntity`` APIs. + +This has suprising implications if you are used to non-streaming / not-reactive HTTP clients. +Specifically it means that: "*lack of consumption of the HTTP Entity, is signaled as back-pressure to the other +side of the connection*". This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients +from overwhelming our application, possibly causing un-necessary buffering of the entity in memory. + +.. warning:: + Consuming (or discarding) the Entity of a request is mandatory! + If *accidentally* left neither consumed or discarded Akka HTTP will + asume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. + +Client-Side handling of streaming HTTP Entities +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Consuming the HTTP Response Entity (Client) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The most commong use-case of course is consuming the response entity, which can be done via +running the underlying ``dataBytes`` Source. This is as simple as running the dataBytes source, +(or on the server-side using directives such as + +It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest, +for example by framing the incoming chunks, parsing them line-by-line and the connecting the flow into another +destination Sink, such as a File or other Akka Streams connector: + +.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-consume-example-1 + +however sometimes the need may arise to consume the entire entity as ``Strict`` entity (which means that it is +completely loaded into memory). Akka HTTP provides a special ``toStrict(timeout, materializer)`` method which can be used to +eagerly consume the entity and make it available in memory: + +.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-consume-example-2 + + +Discarding the HTTP Response Entity (Client) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code), +yet as explained above entity still has to be consumed in some way, otherwise we'll be exherting back-pressure on the +underlying TCP connection. + +The ``discardEntityBytes`` convenience method serves the purpose of easily discarding the entity if it has no purpose for us. +It does so by piping the incoming bytes directly into an ``Sink.ignore``. + +The two snippets below are equivalent, and work the same way on the server-side for incoming HTTP Requests: + +.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-discard-example-1 + +Or the equivalent low-level code achieving the same result: + +.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-discard-example-2 + +Server-Side handling of streaming HTTP Entities +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Similarily as with the Client-side, HTTP Entities are directly linked to Streams which are fed by the underlying +TCP connection. Thus, if request entities remain not consumed, the server will back-pressure the connection, expecting +that the user-code will eventually decide what to do with the incoming data. + +Note that some directives force an implicit ``toStrict`` operation, such as ``entity(exampleUnmarshaller, example -> {})`` and similar ones. + +Consuming the HTTP Request Entity (Server) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The simplest way of consuming the incoming request entity is to simply transform it into an actual domain object, +for example by using the :ref:`-entity-java-` directive: + +.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#consume-entity-directive + +Of course you can access the raw dataBytes as well and run the underlying stream, for example piping it into an +FileIO Sink, that signals completion via a ``CompletionStage`` once all the data has been written into the file: + +.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#consume-raw-dataBytes + +Discarding the HTTP Request Entity (Server) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sometimes, depending on some validation (e.g. checking if given user is allowed to perform uploads or not) +you may want to decide to discard the uploaded entity. + +Please note that discarding means that the entire upload will proceed, even though you are not interested in the data +being streamed to the server - this may be useful if you are simply not interested in the given entity, however +you don't want to abort the entire connection (which we'll demonstrate as well), since there may be more requests +pending on the same connection still. + +In order to discard the databytes explicitly you can invoke the ``discardEntityBytes`` bytes of the incoming ``HTTPRequest``: + +.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#discard-discardEntityBytes + +A related concept is *cancelling* the incoming ``entity.getDataBytes()`` stream, which results in Akka HTTP +*abruptly closing the connection from the Client*. This may be useful when you detect that the given user should not be allowed to make any +uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data). +This can be done by attaching the incoming ``entity.getDataBytes()`` to a ``Sink.cancelled`` which will cancel +the entity stream, which in turn will cause the underlying connection to be shut-down by the server – +effectively hard-aborting the incoming request: + +.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#discard-close-connections + +Closing connections is also explained in depth in the :ref:`http-closing-connection-low-level-java` section of the docs. + +Pending: Automatic discarding of not used entities +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Under certin conditions is is possible to detect an entity is very unlikely to be used by the user for a given request, +and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below +note and issues for further discussion and ideas. + +.. note:: + An advanced feature code named "auto draining" has been discussed and proposed for Akka HTTP, and we're hoping + to implement or help the community implement it. + + You can read more about it in `issue #18716 `_ + as well as `issue #18540 `_ ; as always, contributions are very welcome! + diff --git a/akka-docs/rst/java/http/index.rst b/akka-docs/rst/java/http/index.rst index 1a086d69b9..49e63aba62 100644 --- a/akka-docs/rst/java/http/index.rst +++ b/akka-docs/rst/java/http/index.rst @@ -37,6 +37,7 @@ akka-http-jackson routing-dsl/index client-side/index common/index + implications-of-streaming-http-entity configuration server-side-https-support diff --git a/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst b/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst index ff751fca87..4c865ed90f 100644 --- a/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst +++ b/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst @@ -121,6 +121,7 @@ Streaming of HTTP message entities is supported through subclasses of ``HttpEnti to deal with streamed entities when receiving a request as well as, in many cases, when constructing responses. See :ref:`HttpEntity-java` for a description of the alternatives. +.. _http-closing-connection-low-level-java: Closing a connection ~~~~~~~~~~~~~~~~~~~~ diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala index 85603e37ad..817edbe4d4 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala @@ -4,13 +4,110 @@ package docs.http.scaladsl +import akka.Done import akka.actor.{ ActorLogging, ActorSystem } +import akka.http.scaladsl.model.HttpEntity.Strict +import akka.http.scaladsl.model.HttpMessage.DiscardedEntity +import akka.stream.{ IOResult, Materializer } +import akka.stream.scaladsl.{ Framing, Sink } import akka.util.ByteString import docs.CompileOnlySpec import org.scalatest.{ Matchers, WordSpec } +import scala.concurrent.{ ExecutionContextExecutor, Future } + class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec { + "manual-entity-consume-example-1" in compileOnlySpec { + //#manual-entity-consume-example-1 + import java.io.File + import akka.actor.ActorSystem + import akka.stream.ActorMaterializer + import akka.stream.scaladsl.Framing + import akka.stream.scaladsl.FileIO + import akka.http.scaladsl.model._ + + implicit val system = ActorSystem() + implicit val dispatcher = system.dispatcher + implicit val materializer = ActorMaterializer() + + val response: HttpResponse = ??? + + response.entity.dataBytes + .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256)) + .map(transformEachLine) + .runWith(FileIO.toPath(new File("/tmp/example.out").toPath)) + + def transformEachLine(line: ByteString): ByteString = ??? + + //#manual-entity-consume-example-1 + } + + "manual-entity-consume-example-2" in compileOnlySpec { + //#manual-entity-consume-example-2 + import java.io.File + import akka.actor.ActorSystem + import akka.stream.ActorMaterializer + import akka.http.scaladsl.model._ + import scala.concurrent.duration._ + + implicit val system = ActorSystem() + implicit val dispatcher = system.dispatcher + implicit val materializer = ActorMaterializer() + + case class ExamplePerson(name: String) + def parse(line: ByteString): ExamplePerson = ??? + + val response: HttpResponse = ??? + + // toStrict to enforce all data be loaded into memory from the connection + val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3.seconds) + + // while API remains the same to consume dataBytes, now they're in memory already: + val transformedData: Future[ExamplePerson] = + strictEntity flatMap { e => + e.dataBytes + .runFold(ByteString.empty) { case (acc, b) => acc ++ b } + .map(parse) + } + + //#manual-entity-consume-example-2 + } + + "manual-entity-discard-example-1" in compileOnlySpec { + //#manual-entity-discard-example-1 + import akka.actor.ActorSystem + import akka.stream.ActorMaterializer + import akka.http.scaladsl.model._ + + implicit val system = ActorSystem() + implicit val dispatcher = system.dispatcher + implicit val materializer = ActorMaterializer() + + val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below) + + val discarded: DiscardedEntity = response1.discardEntityBytes() + discarded.future.onComplete { case done => println("Entity discarded completely!") } + + //#manual-entity-discard-example-1 + } + "manual-entity-discard-example-2" in compileOnlySpec { + import akka.actor.ActorSystem + import akka.stream.ActorMaterializer + import akka.http.scaladsl.model._ + + implicit val system = ActorSystem() + implicit val dispatcher = system.dispatcher + implicit val materializer = ActorMaterializer() + + //#manual-entity-discard-example-2 + val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below) + + val discardingComplete: Future[Done] = response1.entity.dataBytes.runWith(Sink.ignore) + discardingComplete.onComplete { case done => println("Entity discarded completely!") } + //#manual-entity-discard-example-2 + } + "outgoing-connection-example" in compileOnlySpec { //#outgoing-connection-example import akka.actor.ActorSystem @@ -74,6 +171,7 @@ class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec import akka.stream.ActorMaterializer import scala.concurrent.Future + import scala.util.{ Failure, Success } implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala index 6d9e964297..d91d819b59 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala @@ -5,11 +5,13 @@ package docs.http.scaladsl import akka.event.LoggingAdapter +import akka.http.scaladsl.model.{ RequestEntity, StatusCodes } +import akka.stream.scaladsl.Sink import akka.testkit.TestActors import docs.CompileOnlySpec import org.scalatest.{ Matchers, WordSpec } -import scala.language.postfixOps +import scala.language.postfixOps import scala.concurrent.{ ExecutionContext, Future } class HttpServerExampleSpec extends WordSpec with Matchers @@ -159,7 +161,7 @@ class HttpServerExampleSpec extends WordSpec with Matchers val httpEcho = Flow[HttpRequest] .via(reactToConnectionFailure) .map { request => - // simple text "echo" response: + // simple streaming (!) "echo" response: HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, request.entity.dataBytes)) } @@ -195,7 +197,8 @@ class HttpServerExampleSpec extends WordSpec with Matchers case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => sys.error("BOOM!") - case _: HttpRequest => + case r: HttpRequest => + r.discardEntityBytes() // important to drain incoming HTTP Entity stream HttpResponse(404, entity = "Unknown resource!") } @@ -237,7 +240,8 @@ class HttpServerExampleSpec extends WordSpec with Matchers case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => sys.error("BOOM!") - case _: HttpRequest => + case r: HttpRequest => + r.discardEntityBytes() // important to drain incoming HTTP Entity stream HttpResponse(404, entity = "Unknown resource!") } @@ -553,6 +557,126 @@ class HttpServerExampleSpec extends WordSpec with Matchers } //#actor-interaction } + + "consume entity using entity directive" in compileOnlySpec { + //#consume-entity-directive + import akka.actor.ActorSystem + import akka.http.scaladsl.server.Directives._ + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + import akka.stream.ActorMaterializer + import spray.json.DefaultJsonProtocol._ + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + // needed for the future flatMap/onComplete in the end + implicit val executionContext = system.dispatcher + + final case class Bid(userId: String, bid: Int) + + // these are from spray-json + implicit val bidFormat = jsonFormat2(Bid) + + val route = + path("bid") { + put { + entity(as[Bid]) { bid => + // incoming entity is fully consumed and converted into a Bid + complete("The bid was: " + bid) + } + } + } + //#consume-entity-directive + } + + "consume entity using raw dataBytes to file" in compileOnlySpec { + //#consume-raw-dataBytes + import akka.actor.ActorSystem + import akka.stream.scaladsl.FileIO + import akka.http.scaladsl.server.Directives._ + import akka.stream.ActorMaterializer + import java.io.File + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + // needed for the future flatMap/onComplete in the end + implicit val executionContext = system.dispatcher + + val route = + (put & path("lines")) { + withoutSizeLimit { + extractDataBytes { bytes => + val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath)) + + // we only want to respond once the incoming data has been handled: + onComplete(finishedWriting) { ioResult => + complete("Finished writing data: " + ioResult) + } + } + } + } + //#consume-raw-dataBytes + } + + "drain entity using request#discardEntityBytes" in compileOnlySpec { + //#discard-discardEntityBytes + import akka.actor.ActorSystem + import akka.stream.scaladsl.FileIO + import akka.http.scaladsl.server.Directives._ + import akka.stream.ActorMaterializer + import akka.http.scaladsl.model.HttpRequest + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + // needed for the future flatMap/onComplete in the end + implicit val executionContext = system.dispatcher + + val route = + (put & path("lines")) { + withoutSizeLimit { + extractRequest { r: HttpRequest => + val finishedWriting = r.discardEntityBytes().future + + // we only want to respond once the incoming data has been handled: + onComplete(finishedWriting) { done => + complete("Drained all data from connection... (" + done + ")") + } + } + } + } + //#discard-discardEntityBytes + } + + "discard entity manually" in compileOnlySpec { + //#discard-close-connections + import akka.actor.ActorSystem + import akka.stream.scaladsl.Sink + import akka.http.scaladsl.server.Directives._ + import akka.http.scaladsl.model.headers.Connection + import akka.stream.ActorMaterializer + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + // needed for the future flatMap/onComplete in the end + implicit val executionContext = system.dispatcher + + val route = + (put & path("lines")) { + withoutSizeLimit { + extractDataBytes { data => + // Closing connections, method 1 (eager): + // we deem this request as illegal, and close the connection right away: + data.runWith(Sink.cancelled) // "brutally" closes the connection + + // Closing connections, method 2 (graceful): + // consider draining connection and replying with `Connection: Close` header + // if you want the client to close after this request/reply cycle instead: + respondWithHeader(Connection("close")) + complete(StatusCodes.Forbidden -> "Not allowed!") + } + } + } + //#discard-close-connections + } } diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/WebSocketExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/WebSocketExampleSpec.scala index fa552bea27..6317359124 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/WebSocketExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/WebSocketExampleSpec.scala @@ -49,7 +49,9 @@ class WebSocketExampleSpec extends WordSpec with Matchers with CompileOnlySpec { case Some(upgrade) => upgrade.handleMessages(greeterWebSocketService) case None => HttpResponse(400, entity = "Not a valid websocket request!") } - case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!") + case r: HttpRequest => + r.discardEntityBytes() // important to drain incoming HTTP Entity stream + HttpResponse(404, entity = "Unknown resource!") } //#websocket-request-handling @@ -84,6 +86,7 @@ class WebSocketExampleSpec extends WordSpec with Matchers with CompileOnlySpec { .collect { case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream) // ignore binary messages + // TODO #20096 in case a Streamed message comes in, we should runWith(Sink.ignore) its data } //#websocket-routing diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/RouteDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/RouteDirectivesExamplesSpec.scala index be19949343..df10fd21c5 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/RouteDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/RouteDirectivesExamplesSpec.scala @@ -22,10 +22,10 @@ class RouteDirectivesExamplesSpec extends RoutingSpec { complete(StatusCodes.OK) } ~ path("c") { - complete(StatusCodes.Created, "bar") + complete(StatusCodes.Created -> "bar") } ~ path("d") { - complete(201, "bar") + complete(201 -> "bar") } ~ path("e") { complete(StatusCodes.Created, List(`Content-Type`(`text/plain(UTF-8)`)), "bar") diff --git a/akka-docs/rst/scala/http/client-side/connection-level.rst b/akka-docs/rst/scala/http/client-side/connection-level.rst index ab775cd7fe..8f3e1d0468 100644 --- a/akka-docs/rst/scala/http/client-side/connection-level.rst +++ b/akka-docs/rst/scala/http/client-side/connection-level.rst @@ -7,6 +7,10 @@ The connection-level API is the lowest-level client-side API Akka HTTP provides. HTTP connections are opened and closed and how requests are to be send across which connection. As such it offers the highest flexibility at the cost of providing the least convenience. +.. note:: + It is recommended to first read the :ref:`implications-of-streaming-http-entities` section, + as it explains the underlying full-stack streaming concepts, which may be unexpected when coming + from a background with non-"streaming first" HTTP Clients. Opening HTTP Connections ------------------------ @@ -90,4 +94,4 @@ On the client-side the stand-alone HTTP layer forms a ``BidiStage`` that is defi :snippet: client-layer You create an instance of ``Http.ClientLayer`` by calling one of the two overloads of the ``Http().clientLayer`` method, -which also allows for varying degrees of configuration. \ No newline at end of file +which also allows for varying degrees of configuration. diff --git a/akka-docs/rst/scala/http/client-side/host-level.rst b/akka-docs/rst/scala/http/client-side/host-level.rst index 6a5b6ef84b..629af2b26f 100644 --- a/akka-docs/rst/scala/http/client-side/host-level.rst +++ b/akka-docs/rst/scala/http/client-side/host-level.rst @@ -7,6 +7,10 @@ As opposed to the :ref:`connection-level-api` the host-level API relieves you fr connections. It autonomously manages a configurable pool of connections to *one particular target endpoint* (i.e. host/port combination). +.. note:: + It is recommended to first read the :ref:`implications-of-streaming-http-entities` section, + as it explains the underlying full-stack streaming concepts, which may be unexpected when coming + from a background with non-"streaming first" HTTP Clients. Requesting a Host Connection Pool --------------------------------- @@ -153,4 +157,4 @@ Example ------- .. includecode:: ../../code/docs/http/scaladsl/HttpClientExampleSpec.scala - :include: host-level-example \ No newline at end of file + :include: host-level-example diff --git a/akka-docs/rst/scala/http/client-side/index.rst b/akka-docs/rst/scala/http/client-side/index.rst index c0b1f9376e..0a55263911 100644 --- a/akka-docs/rst/scala/http/client-side/index.rst +++ b/akka-docs/rst/scala/http/client-side/index.rst @@ -6,6 +6,10 @@ Consuming HTTP-based Services (Client-Side) All client-side functionality of Akka HTTP, for consuming HTTP-based services offered by other endpoints, is currently provided by the ``akka-http-core`` module. +It is recommended to first read the :ref:`implications-of-streaming-http-entities` section, +as it explains the underlying full-stack streaming concepts, which may be unexpected when coming +from a background with non-"streaming first" HTTP Clients. + Depending on your application's specific needs you can choose from three different API levels: :ref:`connection-level-api` @@ -28,4 +32,4 @@ Akka HTTP will happily handle many thousand concurrent connections to a single o host-level request-level client-https-support - websocket-support \ No newline at end of file + websocket-support diff --git a/akka-docs/rst/scala/http/client-side/request-level.rst b/akka-docs/rst/scala/http/client-side/request-level.rst index 5061b124bc..8f1759b2be 100644 --- a/akka-docs/rst/scala/http/client-side/request-level.rst +++ b/akka-docs/rst/scala/http/client-side/request-level.rst @@ -7,6 +7,11 @@ The request-level API is the most convenient way of using Akka HTTP's client-sid :ref:`host-level-api` to provide you with a simple and easy-to-use way of retrieving HTTP responses from remote servers. Depending on your preference you can pick the flow-based or the future-based variant. +.. note:: + It is recommended to first read the :ref:`implications-of-streaming-http-entities` section, + as it explains the underlying full-stack streaming concepts, which may be unexpected when coming + from a background with non-"streaming first" HTTP Clients. + .. note:: The request-level API is implemented on top of a connection pool that is shared inside the ActorSystem. A consequence of using a pool is that long-running requests block a connection while running and starve other requests. Make sure not to use diff --git a/akka-docs/rst/scala/http/implications-of-streaming-http-entity.rst b/akka-docs/rst/scala/http/implications-of-streaming-http-entity.rst new file mode 100644 index 0000000000..d6a0403eef --- /dev/null +++ b/akka-docs/rst/scala/http/implications-of-streaming-http-entity.rst @@ -0,0 +1,129 @@ +.. _implications-of-streaming-http-entities: + +Implications of the streaming nature of Request/Response Entities +----------------------------------------------------------------- + +Akka HTTP is streaming *all the way through*, which means that the back-pressure mechanisms enabled by Akka Streams +are exposed through all layers–from the TCP layer, through the HTTP server, all the way up to the user-facing ``HttpRequest`` +and ``HttpResponse`` and their ``HttpEntity`` APIs. + +This has suprising implications if you are used to non-streaming / not-reactive HTTP clients. +Specifically it means that: "*lack of consumption of the HTTP Entity, is signaled as back-pressure to the other +side of the connection*". This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients +from overwhelming our application, possibly causing un-necessary buffering of the entity in memory. + +.. warning:: + Consuming (or discarding) the Entity of a request is mandatory! + If *accidentally* left neither consumed or discarded Akka HTTP will + asume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. + +Client-Side handling of streaming HTTP Entities +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Consuming the HTTP Response Entity (Client) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The most commong use-case of course is consuming the response entity, which can be done via +running the underlying ``dataBytes`` Source. This is as simple as running the dataBytes source, +(or on the server-side using directives such as + +It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest, +for example by framing the incoming chunks, parsing them line-by-line and the connecting the flow into another +destination Sink, such as a File or other Akka Streams connector: + +.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala + :include: manual-entity-consume-example-1 + +however sometimes the need may arise to consume the entire entity as ``Strict`` entity (which means that it is +completely loaded into memory). Akka HTTP provides a special ``toStrict(timeout)`` method which can be used to +eagerly consume the entity and make it available in memory: + +.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala + :include: manual-entity-consume-example-2 + + +Discarding the HTTP Response Entity (Client) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code), +yet as explained above entity still has to be consumed in some way, otherwise we'll be exherting back-pressure on the +underlying TCP connection. + +The ``discardEntityBytes`` convenience method serves the purpose of easily discarding the entity if it has no purpose for us. +It does so by piping the incoming bytes directly into an ``Sink.ignore``. + +The two snippets below are equivalent, and work the same way on the server-side for incoming HTTP Requests: + +.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala + :include: manual-entity-discard-example-1 + +Or the equivalent low-level code achieving the same result: + +.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala + :include: manual-entity-discard-example-2 + +Server-Side handling of streaming HTTP Entities +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Similarily as with the Client-side, HTTP Entities are directly linked to Streams which are fed by the underlying +TCP connection. Thus, if request entities remain not consumed, the server will back-pressure the connection, expecting +that the user-code will eventually decide what to do with the incoming data. + +Note that some directives force an implicit ``toStrict`` operation, such as ``entity(as[String])`` and similar ones. + +Consuming the HTTP Request Entity (Server) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The simplest way of consuming the incoming request entity is to simply transform it into an actual domain object, +for example by using the :ref:`-entity-` directive: + +.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :include: consume-entity-directive + +Of course you can access the raw dataBytes as well and run the underlying stream, for example piping it into an +FileIO Sink, that signals completion via a ``Future[IoResult]`` once all the data has been written into the file: + +.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :include: consume-raw-dataBytes + +Discarding the HTTP Request Entity (Server) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sometimes, depending on some validation (e.g. checking if given user is allowed to perform uploads or not) +you may want to decide to discard the uploaded entity. + +Please note that discarding means that the entire upload will proceed, even though you are not interested in the data +being streamed to the server - this may be useful if you are simply not interested in the given entity, however +you don't want to abort the entire connection (which we'll demonstrate as well), since there may be more requests +pending on the same connection still. + +In order to discard the databytes explicitly you can invoke the ``discardEntityBytes`` bytes of the incoming ``HTTPRequest``: + +.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :include: discard-discardEntityBytes + +A related concept is *cancelling* the incoming ``entity.dataBytes`` stream, which results in Akka HTTP +*abruptly closing the connection from the Client*. This may be useful when you detect that the given user should not be allowed to make any +uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data). +This can be done by attaching the incoming ``entity.dataBytes`` to a ``Sink.cancelled`` which will cancel +the entity stream, which in turn will cause the underlying connection to be shut-down by the server – +effectively hard-aborting the incoming request: + +.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :include: discard-close-connections + +Closing connections is also explained in depth in the :ref:`http-closing-connection-low-level` section of the docs. + +Pending: Automatic discarding of not used entities +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Under certin conditions is is possible to detect an entity is very unlikely to be used by the user for a given request, +and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below +note and issues for further discussion and ideas. + +.. note:: + An advanced feature code named "auto draining" has been discussed and proposed for Akka HTTP, and we're hoping + to implement or help the community implement it. + + You can read more about it in `issue #18716 `_ + as well as `issue #18540 `_ ; as always, contributions are very welcome! + diff --git a/akka-docs/rst/scala/http/index.rst b/akka-docs/rst/scala/http/index.rst index 0c260ab4ea..570f59eb3e 100644 --- a/akka-docs/rst/scala/http/index.rst +++ b/akka-docs/rst/scala/http/index.rst @@ -9,6 +9,7 @@ Akka HTTP introduction configuration common/index + implications-of-streaming-http-entity low-level-server-side-api routing-dsl/index client-side/index diff --git a/akka-docs/rst/scala/http/low-level-server-side-api.rst b/akka-docs/rst/scala/http/low-level-server-side-api.rst index c3b6bf0aed..30bf2b1620 100644 --- a/akka-docs/rst/scala/http/low-level-server-side-api.rst +++ b/akka-docs/rst/scala/http/low-level-server-side-api.rst @@ -40,6 +40,10 @@ Depending on your needs you can either use the low-level API directly or rely on :ref:`Routing DSL ` which can make the definition of more complex service logic much easier. +.. note:: + It is recommended to read the :ref:`implications-of-streaming-http-entities` section, + as it explains the underlying full-stack streaming concepts, which may be unexpected when coming + from a background with non-"streaming first" HTTP Servers. Streams and HTTP ---------------- @@ -123,6 +127,7 @@ See :ref:`HttpEntity-scala` for a description of the alternatives. If you rely on the :ref:`http-marshalling-scala` and/or :ref:`http-unmarshalling-scala` facilities provided by Akka HTTP then the conversion of custom types to and from streamed entities can be quite convenient. +.. _http-closing-connection-low-level: Closing a connection ~~~~~~~~~~~~~~~~~~~~ diff --git a/akka-docs/rst/scala/http/routing-dsl/index.rst b/akka-docs/rst/scala/http/routing-dsl/index.rst index 10073cd27c..a4e1ee5121 100644 --- a/akka-docs/rst/scala/http/routing-dsl/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/index.rst @@ -8,6 +8,11 @@ defining RESTful web services. It picks up where the low-level API leaves off an functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or static content serving. +.. note:: + It is recommended to read the :ref:`implications-of-streaming-http-entities` section, + as it explains the underlying full-stack streaming concepts, which may be unexpected when coming + from a background with non-"streaming first" HTTP Servers. + .. toctree:: :maxdepth: 1 @@ -100,4 +105,4 @@ and split each line before we send it to an actor for further processing: Configuring Server-side HTTPS ----------------------------- -For detailed documentation about configuring and using HTTPS on the server-side refer to :ref:`serverSideHTTPS-scala`. \ No newline at end of file +For detailed documentation about configuring and using HTTPS on the server-side refer to :ref:`serverSideHTTPS-scala`. diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/headers/Connection.java b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/Connection.java new file mode 100644 index 0000000000..429c1856fc --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/Connection.java @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.http.javadsl.model.headers; + +/** + * Model for the `Connection` header. + * Specification: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.10 + */ +public abstract class Connection extends akka.http.scaladsl.model.HttpHeader { + public abstract Iterable getTokens(); + + public static Connection create(String... directives) { + return new akka.http.scaladsl.model.headers.Connection(akka.http.impl.util.Util.convertArray(directives)); + } +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index 15e3844a1e..bf13d2a98e 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -336,7 +336,8 @@ object Connection extends ModeledCompanion[Connection] { def apply(first: String, more: String*): Connection = apply(immutable.Seq(first +: more: _*)) implicit val tokensRenderer = Renderer.defaultSeqRenderer[String] // cache } -final case class Connection(tokens: immutable.Seq[String]) extends RequestResponseHeader { +final case class Connection(tokens: immutable.Seq[String]) extends jm.headers.Connection + with RequestResponseHeader { require(tokens.nonEmpty, "tokens must not be empty") import Connection.tokensRenderer def renderValue[R <: Rendering](r: R): r.type = r ~~ tokens diff --git a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala index 1312acde54..d1af34f168 100644 --- a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala @@ -150,6 +150,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte "be able to wrap HttpHeaders with custom typed headers" in { + // TODO potentially use the integration for Play / Lagom APIs? // This HTTP model is typed. It uses Akka HTTP types internally, but // no Akka HTTP types are visible to users. This typed model is a // model that Play Framework may eventually move to.