* +doc #20192 explain need of draining entities in server/client HTTP * missing javadsl for Connection header * Update HttpClientExampleDocTest.java
This commit is contained in:
parent
9683e4bc58
commit
60fb163331
20 changed files with 766 additions and 16 deletions
|
|
@ -4,31 +4,137 @@
|
||||||
|
|
||||||
package docs.http.javadsl;
|
package docs.http.javadsl;
|
||||||
|
|
||||||
|
import akka.Done;
|
||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import akka.actor.ActorSystem;
|
|
||||||
import akka.http.javadsl.ConnectHttp;
|
import akka.http.javadsl.ConnectHttp;
|
||||||
import akka.http.javadsl.HostConnectionPool;
|
import akka.http.javadsl.HostConnectionPool;
|
||||||
import akka.japi.Pair;
|
import akka.japi.Pair;
|
||||||
|
|
||||||
import akka.japi.pf.ReceiveBuilder;
|
import akka.japi.pf.ReceiveBuilder;
|
||||||
import akka.stream.Materializer;
|
import akka.stream.Materializer;
|
||||||
|
import akka.util.ByteString;
|
||||||
|
import scala.compat.java8.FutureConverters;
|
||||||
import scala.concurrent.ExecutionContextExecutor;
|
import scala.concurrent.ExecutionContextExecutor;
|
||||||
import scala.concurrent.Future;
|
import scala.concurrent.Future;
|
||||||
import akka.stream.ActorMaterializer;
|
|
||||||
import akka.stream.javadsl.*;
|
import akka.stream.javadsl.*;
|
||||||
import akka.http.javadsl.OutgoingConnection;
|
import akka.http.javadsl.OutgoingConnection;
|
||||||
import akka.http.javadsl.model.*;
|
|
||||||
import akka.http.javadsl.Http;
|
import akka.http.javadsl.Http;
|
||||||
import scala.util.Try;
|
|
||||||
|
|
||||||
import static akka.http.javadsl.ConnectHttp.toHost;
|
import static akka.http.javadsl.ConnectHttp.toHost;
|
||||||
import static akka.pattern.PatternsCS.*;
|
import static akka.pattern.PatternsCS.*;
|
||||||
|
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
|
|
||||||
|
//#manual-entity-consume-example-1
|
||||||
|
import java.io.File;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import akka.stream.ActorMaterializer;
|
||||||
|
import akka.stream.javadsl.Framing;
|
||||||
|
import akka.http.javadsl.model.*;
|
||||||
|
import scala.concurrent.duration.FiniteDuration;
|
||||||
|
import scala.util.Try;
|
||||||
|
//#manual-entity-consume-example-1
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public class HttpClientExampleDocTest {
|
public class HttpClientExampleDocTest {
|
||||||
|
|
||||||
|
HttpResponse responseFromSomewhere() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void manualEntityComsumeExample() {
|
||||||
|
//#manual-entity-consume-example-1
|
||||||
|
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final HttpResponse response = responseFromSomewhere();
|
||||||
|
|
||||||
|
final Function<ByteString, ByteString> transformEachLine = line -> line /* some transformation here */;
|
||||||
|
|
||||||
|
final int maximumFrameLength = 256;
|
||||||
|
|
||||||
|
response.entity().getDataBytes()
|
||||||
|
.via(Framing.delimiter(ByteString.fromString("\n"), maximumFrameLength, FramingTruncation.ALLOW))
|
||||||
|
.map(transformEachLine::apply)
|
||||||
|
.runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer);
|
||||||
|
//#manual-entity-consume-example-1
|
||||||
|
}
|
||||||
|
|
||||||
|
private
|
||||||
|
//#manual-entity-consume-example-2
|
||||||
|
final class ExamplePerson {
|
||||||
|
final String name;
|
||||||
|
public ExamplePerson(String name) { this.name = name; }
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExamplePerson parse(ByteString line) {
|
||||||
|
return new ExamplePerson(line.utf8String());
|
||||||
|
}
|
||||||
|
//#manual-entity-consume-example-2
|
||||||
|
|
||||||
|
void manualEntityConsumeExample2() {
|
||||||
|
//#manual-entity-consume-example-2
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final HttpResponse response = responseFromSomewhere();
|
||||||
|
|
||||||
|
// toStrict to enforce all data be loaded into memory from the connection
|
||||||
|
final CompletionStage<HttpEntity.Strict> strictEntity = response.entity()
|
||||||
|
.toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), materializer);
|
||||||
|
|
||||||
|
// while API remains the same to consume dataBytes, now they're in memory already:
|
||||||
|
|
||||||
|
final CompletionStage<ExamplePerson> person =
|
||||||
|
strictEntity
|
||||||
|
.thenCompose(strict ->
|
||||||
|
strict.getDataBytes()
|
||||||
|
.runFold(ByteString.empty(), (acc, b) -> acc.concat(b), materializer)
|
||||||
|
.thenApply(this::parse)
|
||||||
|
);
|
||||||
|
|
||||||
|
//#manual-entity-consume-example-2
|
||||||
|
}
|
||||||
|
|
||||||
|
void manualEntityDiscardExample1() {
|
||||||
|
//#manual-entity-discard-example-1
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final HttpResponse response = responseFromSomewhere();
|
||||||
|
|
||||||
|
final HttpMessage.DiscardedEntity discarded = response.discardEntityBytes(materializer);
|
||||||
|
|
||||||
|
discarded.completionStage().whenComplete((done, ex) -> {
|
||||||
|
System.out.println("Entity discarded completely!");
|
||||||
|
});
|
||||||
|
//#manual-entity-discard-example-1
|
||||||
|
}
|
||||||
|
|
||||||
|
void manualEntityDiscardExample2() {
|
||||||
|
//#manual-entity-discard-example-2
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final HttpResponse response = responseFromSomewhere();
|
||||||
|
|
||||||
|
final CompletionStage<Done> discardingComplete = response.entity().getDataBytes().runWith(Sink.ignore(), materializer);
|
||||||
|
|
||||||
|
discardingComplete.whenComplete((done, ex) -> {
|
||||||
|
System.out.println("Entity discarded completely!");
|
||||||
|
});
|
||||||
|
//#manual-entity-discard-example-2
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// compile only test
|
// compile only test
|
||||||
public void testConstructRequest() {
|
public void testConstructRequest() {
|
||||||
//#outgoing-connection-example
|
//#outgoing-connection-example
|
||||||
|
|
|
||||||
|
|
@ -4,26 +4,37 @@
|
||||||
|
|
||||||
package docs.http.javadsl.server;
|
package docs.http.javadsl.server;
|
||||||
|
|
||||||
|
import akka.Done;
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import akka.http.javadsl.ConnectHttp;
|
import akka.http.javadsl.ConnectHttp;
|
||||||
import akka.http.javadsl.Http;
|
import akka.http.javadsl.Http;
|
||||||
import akka.http.javadsl.IncomingConnection;
|
import akka.http.javadsl.IncomingConnection;
|
||||||
import akka.http.javadsl.ServerBinding;
|
import akka.http.javadsl.ServerBinding;
|
||||||
|
import akka.http.javadsl.marshallers.jackson.Jackson;
|
||||||
import akka.http.javadsl.model.*;
|
import akka.http.javadsl.model.*;
|
||||||
|
import akka.http.javadsl.model.headers.Connection;
|
||||||
|
import akka.http.javadsl.server.Route;
|
||||||
|
import akka.http.javadsl.server.Unmarshaller;
|
||||||
import akka.japi.function.Function;
|
import akka.japi.function.Function;
|
||||||
import akka.stream.ActorMaterializer;
|
import akka.stream.ActorMaterializer;
|
||||||
|
import akka.stream.IOResult;
|
||||||
import akka.stream.Materializer;
|
import akka.stream.Materializer;
|
||||||
|
import akka.stream.javadsl.FileIO;
|
||||||
import akka.stream.javadsl.Flow;
|
import akka.stream.javadsl.Flow;
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import akka.util.ByteString;
|
import akka.util.ByteString;
|
||||||
|
import scala.concurrent.ExecutionContextExecutor;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static akka.http.javadsl.server.Directives.*;
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public class HttpServerExampleDocTest {
|
public class HttpServerExampleDocTest {
|
||||||
|
|
||||||
|
|
@ -205,4 +216,113 @@ public class HttpServerExampleDocTest {
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
fullServerExample();
|
fullServerExample();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//#consume-entity-directive
|
||||||
|
class Bid {
|
||||||
|
final String userId;
|
||||||
|
final int bid;
|
||||||
|
|
||||||
|
Bid(String userId, int bid) {
|
||||||
|
this.userId = userId;
|
||||||
|
this.bid = bid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#consume-entity-directive
|
||||||
|
|
||||||
|
void consumeEntityUsingEntityDirective() {
|
||||||
|
//#consume-entity-directive
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final Unmarshaller<HttpEntity, Bid> asBid = Jackson.unmarshaller(Bid.class);
|
||||||
|
|
||||||
|
final Route s = path("bid", () ->
|
||||||
|
put(() ->
|
||||||
|
entity(asBid, bid ->
|
||||||
|
// incoming entity is fully consumed and converted into a Bid
|
||||||
|
complete("The bid was: " + bid)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
//#consume-entity-directive
|
||||||
|
}
|
||||||
|
|
||||||
|
void consumeEntityUsingRawDataBytes() {
|
||||||
|
//#consume-raw-dataBytes
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final Route s =
|
||||||
|
put(() ->
|
||||||
|
path("lines", () ->
|
||||||
|
withoutSizeLimit(() ->
|
||||||
|
extractDataBytes(bytes -> {
|
||||||
|
final CompletionStage<IOResult> res = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer);
|
||||||
|
|
||||||
|
return onComplete(() -> res, ioResult ->
|
||||||
|
// we only want to respond once the incoming data has been handled:
|
||||||
|
complete("Finished writing data :" + ioResult));
|
||||||
|
})
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
//#consume-raw-dataBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
void discardEntityUsingRawBytes() {
|
||||||
|
//#discard-discardEntityBytes
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final Route s =
|
||||||
|
put(() ->
|
||||||
|
path("lines", () ->
|
||||||
|
withoutSizeLimit(() ->
|
||||||
|
extractRequest(r -> {
|
||||||
|
final CompletionStage<Done> res = r.discardEntityBytes(materializer).completionStage();
|
||||||
|
|
||||||
|
return onComplete(() -> res, done ->
|
||||||
|
// we only want to respond once the incoming data has been handled:
|
||||||
|
complete("Finished writing data :" + done));
|
||||||
|
})
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
//#discard-discardEntityBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
void discardEntityManuallyCloseConnections() {
|
||||||
|
//#discard-close-connections
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final ExecutionContextExecutor dispatcher = system.dispatcher();
|
||||||
|
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
final Route s =
|
||||||
|
put(() ->
|
||||||
|
path("lines", () ->
|
||||||
|
withoutSizeLimit(() ->
|
||||||
|
extractDataBytes(bytes -> {
|
||||||
|
// Closing connections, method 1 (eager):
|
||||||
|
// we deem this request as illegal, and close the connection right away:
|
||||||
|
bytes.runWith(Sink.cancelled(), materializer); // "brutally" closes the connection
|
||||||
|
|
||||||
|
// Closing connections, method 2 (graceful):
|
||||||
|
// consider draining connection and replying with `Connection: Close` header
|
||||||
|
// if you want the client to close after this request/reply cycle instead:
|
||||||
|
return respondWithHeader(Connection.create("close"), () ->
|
||||||
|
complete(StatusCodes.FORBIDDEN, "Not allowed!")
|
||||||
|
);
|
||||||
|
})
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
//#discard-close-connections
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,121 @@
|
||||||
|
.. _implications-of-streaming-http-entities-java:
|
||||||
|
|
||||||
|
Implications of the streaming nature of Request/Response Entities
|
||||||
|
-----------------------------------------------------------------
|
||||||
|
|
||||||
|
Akka HTTP is streaming *all the way through*, which means that the back-pressure mechanisms enabled by Akka Streams
|
||||||
|
are exposed through all layers–from the TCP layer, through the HTTP server, all the way up to the user-facing ``HttpRequest``
|
||||||
|
and ``HttpResponse`` and their ``HttpEntity`` APIs.
|
||||||
|
|
||||||
|
This has suprising implications if you are used to non-streaming / not-reactive HTTP clients.
|
||||||
|
Specifically it means that: "*lack of consumption of the HTTP Entity, is signaled as back-pressure to the other
|
||||||
|
side of the connection*". This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients
|
||||||
|
from overwhelming our application, possibly causing un-necessary buffering of the entity in memory.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
Consuming (or discarding) the Entity of a request is mandatory!
|
||||||
|
If *accidentally* left neither consumed or discarded Akka HTTP will
|
||||||
|
asume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms.
|
||||||
|
|
||||||
|
Client-Side handling of streaming HTTP Entities
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Consuming the HTTP Response Entity (Client)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
The most commong use-case of course is consuming the response entity, which can be done via
|
||||||
|
running the underlying ``dataBytes`` Source. This is as simple as running the dataBytes source,
|
||||||
|
(or on the server-side using directives such as
|
||||||
|
|
||||||
|
It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest,
|
||||||
|
for example by framing the incoming chunks, parsing them line-by-line and the connecting the flow into another
|
||||||
|
destination Sink, such as a File or other Akka Streams connector:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-consume-example-1
|
||||||
|
|
||||||
|
however sometimes the need may arise to consume the entire entity as ``Strict`` entity (which means that it is
|
||||||
|
completely loaded into memory). Akka HTTP provides a special ``toStrict(timeout, materializer)`` method which can be used to
|
||||||
|
eagerly consume the entity and make it available in memory:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-consume-example-2
|
||||||
|
|
||||||
|
|
||||||
|
Discarding the HTTP Response Entity (Client)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code),
|
||||||
|
yet as explained above entity still has to be consumed in some way, otherwise we'll be exherting back-pressure on the
|
||||||
|
underlying TCP connection.
|
||||||
|
|
||||||
|
The ``discardEntityBytes`` convenience method serves the purpose of easily discarding the entity if it has no purpose for us.
|
||||||
|
It does so by piping the incoming bytes directly into an ``Sink.ignore``.
|
||||||
|
|
||||||
|
The two snippets below are equivalent, and work the same way on the server-side for incoming HTTP Requests:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-discard-example-1
|
||||||
|
|
||||||
|
Or the equivalent low-level code achieving the same result:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-discard-example-2
|
||||||
|
|
||||||
|
Server-Side handling of streaming HTTP Entities
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Similarily as with the Client-side, HTTP Entities are directly linked to Streams which are fed by the underlying
|
||||||
|
TCP connection. Thus, if request entities remain not consumed, the server will back-pressure the connection, expecting
|
||||||
|
that the user-code will eventually decide what to do with the incoming data.
|
||||||
|
|
||||||
|
Note that some directives force an implicit ``toStrict`` operation, such as ``entity(exampleUnmarshaller, example -> {})`` and similar ones.
|
||||||
|
|
||||||
|
Consuming the HTTP Request Entity (Server)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
The simplest way of consuming the incoming request entity is to simply transform it into an actual domain object,
|
||||||
|
for example by using the :ref:`-entity-java-` directive:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#consume-entity-directive
|
||||||
|
|
||||||
|
Of course you can access the raw dataBytes as well and run the underlying stream, for example piping it into an
|
||||||
|
FileIO Sink, that signals completion via a ``CompletionStage<IoResult>`` once all the data has been written into the file:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#consume-raw-dataBytes
|
||||||
|
|
||||||
|
Discarding the HTTP Request Entity (Server)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
Sometimes, depending on some validation (e.g. checking if given user is allowed to perform uploads or not)
|
||||||
|
you may want to decide to discard the uploaded entity.
|
||||||
|
|
||||||
|
Please note that discarding means that the entire upload will proceed, even though you are not interested in the data
|
||||||
|
being streamed to the server - this may be useful if you are simply not interested in the given entity, however
|
||||||
|
you don't want to abort the entire connection (which we'll demonstrate as well), since there may be more requests
|
||||||
|
pending on the same connection still.
|
||||||
|
|
||||||
|
In order to discard the databytes explicitly you can invoke the ``discardEntityBytes`` bytes of the incoming ``HTTPRequest``:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#discard-discardEntityBytes
|
||||||
|
|
||||||
|
A related concept is *cancelling* the incoming ``entity.getDataBytes()`` stream, which results in Akka HTTP
|
||||||
|
*abruptly closing the connection from the Client*. This may be useful when you detect that the given user should not be allowed to make any
|
||||||
|
uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data).
|
||||||
|
This can be done by attaching the incoming ``entity.getDataBytes()`` to a ``Sink.cancelled`` which will cancel
|
||||||
|
the entity stream, which in turn will cause the underlying connection to be shut-down by the server –
|
||||||
|
effectively hard-aborting the incoming request:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#discard-close-connections
|
||||||
|
|
||||||
|
Closing connections is also explained in depth in the :ref:`http-closing-connection-low-level-java` section of the docs.
|
||||||
|
|
||||||
|
Pending: Automatic discarding of not used entities
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
Under certin conditions is is possible to detect an entity is very unlikely to be used by the user for a given request,
|
||||||
|
and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below
|
||||||
|
note and issues for further discussion and ideas.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
An advanced feature code named "auto draining" has been discussed and proposed for Akka HTTP, and we're hoping
|
||||||
|
to implement or help the community implement it.
|
||||||
|
|
||||||
|
You can read more about it in `issue #18716 <https://github.com/akka/akka/issues/18716>`_
|
||||||
|
as well as `issue #18540 <https://github.com/akka/akka/issues/18540>`_ ; as always, contributions are very welcome!
|
||||||
|
|
||||||
|
|
@ -37,6 +37,7 @@ akka-http-jackson
|
||||||
routing-dsl/index
|
routing-dsl/index
|
||||||
client-side/index
|
client-side/index
|
||||||
common/index
|
common/index
|
||||||
|
implications-of-streaming-http-entity
|
||||||
configuration
|
configuration
|
||||||
server-side-https-support
|
server-side-https-support
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -121,6 +121,7 @@ Streaming of HTTP message entities is supported through subclasses of ``HttpEnti
|
||||||
to deal with streamed entities when receiving a request as well as, in many cases, when constructing responses.
|
to deal with streamed entities when receiving a request as well as, in many cases, when constructing responses.
|
||||||
See :ref:`HttpEntity-java` for a description of the alternatives.
|
See :ref:`HttpEntity-java` for a description of the alternatives.
|
||||||
|
|
||||||
|
.. _http-closing-connection-low-level-java:
|
||||||
|
|
||||||
Closing a connection
|
Closing a connection
|
||||||
~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,110 @@
|
||||||
|
|
||||||
package docs.http.scaladsl
|
package docs.http.scaladsl
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
import akka.actor.{ ActorLogging, ActorSystem }
|
import akka.actor.{ ActorLogging, ActorSystem }
|
||||||
|
import akka.http.scaladsl.model.HttpEntity.Strict
|
||||||
|
import akka.http.scaladsl.model.HttpMessage.DiscardedEntity
|
||||||
|
import akka.stream.{ IOResult, Materializer }
|
||||||
|
import akka.stream.scaladsl.{ Framing, Sink }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import docs.CompileOnlySpec
|
import docs.CompileOnlySpec
|
||||||
import org.scalatest.{ Matchers, WordSpec }
|
import org.scalatest.{ Matchers, WordSpec }
|
||||||
|
|
||||||
|
import scala.concurrent.{ ExecutionContextExecutor, Future }
|
||||||
|
|
||||||
class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec {
|
class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec {
|
||||||
|
|
||||||
|
"manual-entity-consume-example-1" in compileOnlySpec {
|
||||||
|
//#manual-entity-consume-example-1
|
||||||
|
import java.io.File
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.scaladsl.Framing
|
||||||
|
import akka.stream.scaladsl.FileIO
|
||||||
|
import akka.http.scaladsl.model._
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val dispatcher = system.dispatcher
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
val response: HttpResponse = ???
|
||||||
|
|
||||||
|
response.entity.dataBytes
|
||||||
|
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256))
|
||||||
|
.map(transformEachLine)
|
||||||
|
.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))
|
||||||
|
|
||||||
|
def transformEachLine(line: ByteString): ByteString = ???
|
||||||
|
|
||||||
|
//#manual-entity-consume-example-1
|
||||||
|
}
|
||||||
|
|
||||||
|
"manual-entity-consume-example-2" in compileOnlySpec {
|
||||||
|
//#manual-entity-consume-example-2
|
||||||
|
import java.io.File
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.http.scaladsl.model._
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val dispatcher = system.dispatcher
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
case class ExamplePerson(name: String)
|
||||||
|
def parse(line: ByteString): ExamplePerson = ???
|
||||||
|
|
||||||
|
val response: HttpResponse = ???
|
||||||
|
|
||||||
|
// toStrict to enforce all data be loaded into memory from the connection
|
||||||
|
val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3.seconds)
|
||||||
|
|
||||||
|
// while API remains the same to consume dataBytes, now they're in memory already:
|
||||||
|
val transformedData: Future[ExamplePerson] =
|
||||||
|
strictEntity flatMap { e =>
|
||||||
|
e.dataBytes
|
||||||
|
.runFold(ByteString.empty) { case (acc, b) => acc ++ b }
|
||||||
|
.map(parse)
|
||||||
|
}
|
||||||
|
|
||||||
|
//#manual-entity-consume-example-2
|
||||||
|
}
|
||||||
|
|
||||||
|
"manual-entity-discard-example-1" in compileOnlySpec {
|
||||||
|
//#manual-entity-discard-example-1
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.http.scaladsl.model._
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val dispatcher = system.dispatcher
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below)
|
||||||
|
|
||||||
|
val discarded: DiscardedEntity = response1.discardEntityBytes()
|
||||||
|
discarded.future.onComplete { case done => println("Entity discarded completely!") }
|
||||||
|
|
||||||
|
//#manual-entity-discard-example-1
|
||||||
|
}
|
||||||
|
"manual-entity-discard-example-2" in compileOnlySpec {
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.http.scaladsl.model._
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val dispatcher = system.dispatcher
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
//#manual-entity-discard-example-2
|
||||||
|
val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below)
|
||||||
|
|
||||||
|
val discardingComplete: Future[Done] = response1.entity.dataBytes.runWith(Sink.ignore)
|
||||||
|
discardingComplete.onComplete { case done => println("Entity discarded completely!") }
|
||||||
|
//#manual-entity-discard-example-2
|
||||||
|
}
|
||||||
|
|
||||||
"outgoing-connection-example" in compileOnlySpec {
|
"outgoing-connection-example" in compileOnlySpec {
|
||||||
//#outgoing-connection-example
|
//#outgoing-connection-example
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
@ -74,6 +171,7 @@ class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
|
import scala.util.{ Failure, Success }
|
||||||
|
|
||||||
implicit val system = ActorSystem()
|
implicit val system = ActorSystem()
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,13 @@
|
||||||
package docs.http.scaladsl
|
package docs.http.scaladsl
|
||||||
|
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
|
import akka.http.scaladsl.model.{ RequestEntity, StatusCodes }
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
import akka.testkit.TestActors
|
import akka.testkit.TestActors
|
||||||
import docs.CompileOnlySpec
|
import docs.CompileOnlySpec
|
||||||
import org.scalatest.{ Matchers, WordSpec }
|
import org.scalatest.{ Matchers, WordSpec }
|
||||||
import scala.language.postfixOps
|
|
||||||
|
|
||||||
|
import scala.language.postfixOps
|
||||||
import scala.concurrent.{ ExecutionContext, Future }
|
import scala.concurrent.{ ExecutionContext, Future }
|
||||||
|
|
||||||
class HttpServerExampleSpec extends WordSpec with Matchers
|
class HttpServerExampleSpec extends WordSpec with Matchers
|
||||||
|
|
@ -159,7 +161,7 @@ class HttpServerExampleSpec extends WordSpec with Matchers
|
||||||
val httpEcho = Flow[HttpRequest]
|
val httpEcho = Flow[HttpRequest]
|
||||||
.via(reactToConnectionFailure)
|
.via(reactToConnectionFailure)
|
||||||
.map { request =>
|
.map { request =>
|
||||||
// simple text "echo" response:
|
// simple streaming (!) "echo" response:
|
||||||
HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, request.entity.dataBytes))
|
HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, request.entity.dataBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -195,7 +197,8 @@ class HttpServerExampleSpec extends WordSpec with Matchers
|
||||||
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
|
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
|
||||||
sys.error("BOOM!")
|
sys.error("BOOM!")
|
||||||
|
|
||||||
case _: HttpRequest =>
|
case r: HttpRequest =>
|
||||||
|
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
|
||||||
HttpResponse(404, entity = "Unknown resource!")
|
HttpResponse(404, entity = "Unknown resource!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -237,7 +240,8 @@ class HttpServerExampleSpec extends WordSpec with Matchers
|
||||||
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
|
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
|
||||||
sys.error("BOOM!")
|
sys.error("BOOM!")
|
||||||
|
|
||||||
case _: HttpRequest =>
|
case r: HttpRequest =>
|
||||||
|
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
|
||||||
HttpResponse(404, entity = "Unknown resource!")
|
HttpResponse(404, entity = "Unknown resource!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -554,5 +558,125 @@ class HttpServerExampleSpec extends WordSpec with Matchers
|
||||||
//#actor-interaction
|
//#actor-interaction
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"consume entity using entity directive" in compileOnlySpec {
|
||||||
|
//#consume-entity-directive
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.http.scaladsl.server.Directives._
|
||||||
|
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import spray.json.DefaultJsonProtocol._
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
// needed for the future flatMap/onComplete in the end
|
||||||
|
implicit val executionContext = system.dispatcher
|
||||||
|
|
||||||
|
final case class Bid(userId: String, bid: Int)
|
||||||
|
|
||||||
|
// these are from spray-json
|
||||||
|
implicit val bidFormat = jsonFormat2(Bid)
|
||||||
|
|
||||||
|
val route =
|
||||||
|
path("bid") {
|
||||||
|
put {
|
||||||
|
entity(as[Bid]) { bid =>
|
||||||
|
// incoming entity is fully consumed and converted into a Bid
|
||||||
|
complete("The bid was: " + bid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#consume-entity-directive
|
||||||
|
}
|
||||||
|
|
||||||
|
"consume entity using raw dataBytes to file" in compileOnlySpec {
|
||||||
|
//#consume-raw-dataBytes
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.scaladsl.FileIO
|
||||||
|
import akka.http.scaladsl.server.Directives._
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import java.io.File
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
// needed for the future flatMap/onComplete in the end
|
||||||
|
implicit val executionContext = system.dispatcher
|
||||||
|
|
||||||
|
val route =
|
||||||
|
(put & path("lines")) {
|
||||||
|
withoutSizeLimit {
|
||||||
|
extractDataBytes { bytes =>
|
||||||
|
val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))
|
||||||
|
|
||||||
|
// we only want to respond once the incoming data has been handled:
|
||||||
|
onComplete(finishedWriting) { ioResult =>
|
||||||
|
complete("Finished writing data: " + ioResult)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#consume-raw-dataBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
"drain entity using request#discardEntityBytes" in compileOnlySpec {
|
||||||
|
//#discard-discardEntityBytes
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.scaladsl.FileIO
|
||||||
|
import akka.http.scaladsl.server.Directives._
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.http.scaladsl.model.HttpRequest
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
// needed for the future flatMap/onComplete in the end
|
||||||
|
implicit val executionContext = system.dispatcher
|
||||||
|
|
||||||
|
val route =
|
||||||
|
(put & path("lines")) {
|
||||||
|
withoutSizeLimit {
|
||||||
|
extractRequest { r: HttpRequest =>
|
||||||
|
val finishedWriting = r.discardEntityBytes().future
|
||||||
|
|
||||||
|
// we only want to respond once the incoming data has been handled:
|
||||||
|
onComplete(finishedWriting) { done =>
|
||||||
|
complete("Drained all data from connection... (" + done + ")")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#discard-discardEntityBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
"discard entity manually" in compileOnlySpec {
|
||||||
|
//#discard-close-connections
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.http.scaladsl.server.Directives._
|
||||||
|
import akka.http.scaladsl.model.headers.Connection
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
|
||||||
|
implicit val system = ActorSystem()
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
// needed for the future flatMap/onComplete in the end
|
||||||
|
implicit val executionContext = system.dispatcher
|
||||||
|
|
||||||
|
val route =
|
||||||
|
(put & path("lines")) {
|
||||||
|
withoutSizeLimit {
|
||||||
|
extractDataBytes { data =>
|
||||||
|
// Closing connections, method 1 (eager):
|
||||||
|
// we deem this request as illegal, and close the connection right away:
|
||||||
|
data.runWith(Sink.cancelled) // "brutally" closes the connection
|
||||||
|
|
||||||
|
// Closing connections, method 2 (graceful):
|
||||||
|
// consider draining connection and replying with `Connection: Close` header
|
||||||
|
// if you want the client to close after this request/reply cycle instead:
|
||||||
|
respondWithHeader(Connection("close"))
|
||||||
|
complete(StatusCodes.Forbidden -> "Not allowed!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#discard-close-connections
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,9 @@ class WebSocketExampleSpec extends WordSpec with Matchers with CompileOnlySpec {
|
||||||
case Some(upgrade) => upgrade.handleMessages(greeterWebSocketService)
|
case Some(upgrade) => upgrade.handleMessages(greeterWebSocketService)
|
||||||
case None => HttpResponse(400, entity = "Not a valid websocket request!")
|
case None => HttpResponse(400, entity = "Not a valid websocket request!")
|
||||||
}
|
}
|
||||||
case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
|
case r: HttpRequest =>
|
||||||
|
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
|
||||||
|
HttpResponse(404, entity = "Unknown resource!")
|
||||||
}
|
}
|
||||||
//#websocket-request-handling
|
//#websocket-request-handling
|
||||||
|
|
||||||
|
|
@ -84,6 +86,7 @@ class WebSocketExampleSpec extends WordSpec with Matchers with CompileOnlySpec {
|
||||||
.collect {
|
.collect {
|
||||||
case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
|
case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
|
||||||
// ignore binary messages
|
// ignore binary messages
|
||||||
|
// TODO #20096 in case a Streamed message comes in, we should runWith(Sink.ignore) its data
|
||||||
}
|
}
|
||||||
|
|
||||||
//#websocket-routing
|
//#websocket-routing
|
||||||
|
|
|
||||||
|
|
@ -22,10 +22,10 @@ class RouteDirectivesExamplesSpec extends RoutingSpec {
|
||||||
complete(StatusCodes.OK)
|
complete(StatusCodes.OK)
|
||||||
} ~
|
} ~
|
||||||
path("c") {
|
path("c") {
|
||||||
complete(StatusCodes.Created, "bar")
|
complete(StatusCodes.Created -> "bar")
|
||||||
} ~
|
} ~
|
||||||
path("d") {
|
path("d") {
|
||||||
complete(201, "bar")
|
complete(201 -> "bar")
|
||||||
} ~
|
} ~
|
||||||
path("e") {
|
path("e") {
|
||||||
complete(StatusCodes.Created, List(`Content-Type`(`text/plain(UTF-8)`)), "bar")
|
complete(StatusCodes.Created, List(`Content-Type`(`text/plain(UTF-8)`)), "bar")
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,10 @@ The connection-level API is the lowest-level client-side API Akka HTTP provides.
|
||||||
HTTP connections are opened and closed and how requests are to be send across which connection. As such it offers the
|
HTTP connections are opened and closed and how requests are to be send across which connection. As such it offers the
|
||||||
highest flexibility at the cost of providing the least convenience.
|
highest flexibility at the cost of providing the least convenience.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
It is recommended to first read the :ref:`implications-of-streaming-http-entities` section,
|
||||||
|
as it explains the underlying full-stack streaming concepts, which may be unexpected when coming
|
||||||
|
from a background with non-"streaming first" HTTP Clients.
|
||||||
|
|
||||||
Opening HTTP Connections
|
Opening HTTP Connections
|
||||||
------------------------
|
------------------------
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,10 @@ As opposed to the :ref:`connection-level-api` the host-level API relieves you fr
|
||||||
connections. It autonomously manages a configurable pool of connections to *one particular target endpoint* (i.e.
|
connections. It autonomously manages a configurable pool of connections to *one particular target endpoint* (i.e.
|
||||||
host/port combination).
|
host/port combination).
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
It is recommended to first read the :ref:`implications-of-streaming-http-entities` section,
|
||||||
|
as it explains the underlying full-stack streaming concepts, which may be unexpected when coming
|
||||||
|
from a background with non-"streaming first" HTTP Clients.
|
||||||
|
|
||||||
Requesting a Host Connection Pool
|
Requesting a Host Connection Pool
|
||||||
---------------------------------
|
---------------------------------
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,10 @@ Consuming HTTP-based Services (Client-Side)
|
||||||
All client-side functionality of Akka HTTP, for consuming HTTP-based services offered by other endpoints, is currently
|
All client-side functionality of Akka HTTP, for consuming HTTP-based services offered by other endpoints, is currently
|
||||||
provided by the ``akka-http-core`` module.
|
provided by the ``akka-http-core`` module.
|
||||||
|
|
||||||
|
It is recommended to first read the :ref:`implications-of-streaming-http-entities` section,
|
||||||
|
as it explains the underlying full-stack streaming concepts, which may be unexpected when coming
|
||||||
|
from a background with non-"streaming first" HTTP Clients.
|
||||||
|
|
||||||
Depending on your application's specific needs you can choose from three different API levels:
|
Depending on your application's specific needs you can choose from three different API levels:
|
||||||
|
|
||||||
:ref:`connection-level-api`
|
:ref:`connection-level-api`
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,11 @@ The request-level API is the most convenient way of using Akka HTTP's client-sid
|
||||||
:ref:`host-level-api` to provide you with a simple and easy-to-use way of retrieving HTTP responses from remote servers.
|
:ref:`host-level-api` to provide you with a simple and easy-to-use way of retrieving HTTP responses from remote servers.
|
||||||
Depending on your preference you can pick the flow-based or the future-based variant.
|
Depending on your preference you can pick the flow-based or the future-based variant.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
It is recommended to first read the :ref:`implications-of-streaming-http-entities` section,
|
||||||
|
as it explains the underlying full-stack streaming concepts, which may be unexpected when coming
|
||||||
|
from a background with non-"streaming first" HTTP Clients.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
The request-level API is implemented on top of a connection pool that is shared inside the ActorSystem. A consequence of
|
The request-level API is implemented on top of a connection pool that is shared inside the ActorSystem. A consequence of
|
||||||
using a pool is that long-running requests block a connection while running and starve other requests. Make sure not to use
|
using a pool is that long-running requests block a connection while running and starve other requests. Make sure not to use
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,129 @@
|
||||||
|
.. _implications-of-streaming-http-entities:
|
||||||
|
|
||||||
|
Implications of the streaming nature of Request/Response Entities
|
||||||
|
-----------------------------------------------------------------
|
||||||
|
|
||||||
|
Akka HTTP is streaming *all the way through*, which means that the back-pressure mechanisms enabled by Akka Streams
|
||||||
|
are exposed through all layers–from the TCP layer, through the HTTP server, all the way up to the user-facing ``HttpRequest``
|
||||||
|
and ``HttpResponse`` and their ``HttpEntity`` APIs.
|
||||||
|
|
||||||
|
This has suprising implications if you are used to non-streaming / not-reactive HTTP clients.
|
||||||
|
Specifically it means that: "*lack of consumption of the HTTP Entity, is signaled as back-pressure to the other
|
||||||
|
side of the connection*". This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients
|
||||||
|
from overwhelming our application, possibly causing un-necessary buffering of the entity in memory.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
Consuming (or discarding) the Entity of a request is mandatory!
|
||||||
|
If *accidentally* left neither consumed or discarded Akka HTTP will
|
||||||
|
asume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms.
|
||||||
|
|
||||||
|
Client-Side handling of streaming HTTP Entities
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Consuming the HTTP Response Entity (Client)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
The most commong use-case of course is consuming the response entity, which can be done via
|
||||||
|
running the underlying ``dataBytes`` Source. This is as simple as running the dataBytes source,
|
||||||
|
(or on the server-side using directives such as
|
||||||
|
|
||||||
|
It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest,
|
||||||
|
for example by framing the incoming chunks, parsing them line-by-line and the connecting the flow into another
|
||||||
|
destination Sink, such as a File or other Akka Streams connector:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala
|
||||||
|
:include: manual-entity-consume-example-1
|
||||||
|
|
||||||
|
however sometimes the need may arise to consume the entire entity as ``Strict`` entity (which means that it is
|
||||||
|
completely loaded into memory). Akka HTTP provides a special ``toStrict(timeout)`` method which can be used to
|
||||||
|
eagerly consume the entity and make it available in memory:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala
|
||||||
|
:include: manual-entity-consume-example-2
|
||||||
|
|
||||||
|
|
||||||
|
Discarding the HTTP Response Entity (Client)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code),
|
||||||
|
yet as explained above entity still has to be consumed in some way, otherwise we'll be exherting back-pressure on the
|
||||||
|
underlying TCP connection.
|
||||||
|
|
||||||
|
The ``discardEntityBytes`` convenience method serves the purpose of easily discarding the entity if it has no purpose for us.
|
||||||
|
It does so by piping the incoming bytes directly into an ``Sink.ignore``.
|
||||||
|
|
||||||
|
The two snippets below are equivalent, and work the same way on the server-side for incoming HTTP Requests:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala
|
||||||
|
:include: manual-entity-discard-example-1
|
||||||
|
|
||||||
|
Or the equivalent low-level code achieving the same result:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpClientExampleSpec.scala
|
||||||
|
:include: manual-entity-discard-example-2
|
||||||
|
|
||||||
|
Server-Side handling of streaming HTTP Entities
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Similarily as with the Client-side, HTTP Entities are directly linked to Streams which are fed by the underlying
|
||||||
|
TCP connection. Thus, if request entities remain not consumed, the server will back-pressure the connection, expecting
|
||||||
|
that the user-code will eventually decide what to do with the incoming data.
|
||||||
|
|
||||||
|
Note that some directives force an implicit ``toStrict`` operation, such as ``entity(as[String])`` and similar ones.
|
||||||
|
|
||||||
|
Consuming the HTTP Request Entity (Server)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
The simplest way of consuming the incoming request entity is to simply transform it into an actual domain object,
|
||||||
|
for example by using the :ref:`-entity-` directive:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala
|
||||||
|
:include: consume-entity-directive
|
||||||
|
|
||||||
|
Of course you can access the raw dataBytes as well and run the underlying stream, for example piping it into an
|
||||||
|
FileIO Sink, that signals completion via a ``Future[IoResult]`` once all the data has been written into the file:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala
|
||||||
|
:include: consume-raw-dataBytes
|
||||||
|
|
||||||
|
Discarding the HTTP Request Entity (Server)
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
Sometimes, depending on some validation (e.g. checking if given user is allowed to perform uploads or not)
|
||||||
|
you may want to decide to discard the uploaded entity.
|
||||||
|
|
||||||
|
Please note that discarding means that the entire upload will proceed, even though you are not interested in the data
|
||||||
|
being streamed to the server - this may be useful if you are simply not interested in the given entity, however
|
||||||
|
you don't want to abort the entire connection (which we'll demonstrate as well), since there may be more requests
|
||||||
|
pending on the same connection still.
|
||||||
|
|
||||||
|
In order to discard the databytes explicitly you can invoke the ``discardEntityBytes`` bytes of the incoming ``HTTPRequest``:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala
|
||||||
|
:include: discard-discardEntityBytes
|
||||||
|
|
||||||
|
A related concept is *cancelling* the incoming ``entity.dataBytes`` stream, which results in Akka HTTP
|
||||||
|
*abruptly closing the connection from the Client*. This may be useful when you detect that the given user should not be allowed to make any
|
||||||
|
uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data).
|
||||||
|
This can be done by attaching the incoming ``entity.dataBytes`` to a ``Sink.cancelled`` which will cancel
|
||||||
|
the entity stream, which in turn will cause the underlying connection to be shut-down by the server –
|
||||||
|
effectively hard-aborting the incoming request:
|
||||||
|
|
||||||
|
.. includecode:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala
|
||||||
|
:include: discard-close-connections
|
||||||
|
|
||||||
|
Closing connections is also explained in depth in the :ref:`http-closing-connection-low-level` section of the docs.
|
||||||
|
|
||||||
|
Pending: Automatic discarding of not used entities
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
Under certin conditions is is possible to detect an entity is very unlikely to be used by the user for a given request,
|
||||||
|
and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below
|
||||||
|
note and issues for further discussion and ideas.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
An advanced feature code named "auto draining" has been discussed and proposed for Akka HTTP, and we're hoping
|
||||||
|
to implement or help the community implement it.
|
||||||
|
|
||||||
|
You can read more about it in `issue #18716 <https://github.com/akka/akka/issues/18716>`_
|
||||||
|
as well as `issue #18540 <https://github.com/akka/akka/issues/18540>`_ ; as always, contributions are very welcome!
|
||||||
|
|
||||||
|
|
@ -9,6 +9,7 @@ Akka HTTP
|
||||||
introduction
|
introduction
|
||||||
configuration
|
configuration
|
||||||
common/index
|
common/index
|
||||||
|
implications-of-streaming-http-entity
|
||||||
low-level-server-side-api
|
low-level-server-side-api
|
||||||
routing-dsl/index
|
routing-dsl/index
|
||||||
client-side/index
|
client-side/index
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,10 @@ Depending on your needs you can either use the low-level API directly or rely on
|
||||||
:ref:`Routing DSL <http-high-level-server-side-api>` which can make the definition of more complex service logic much
|
:ref:`Routing DSL <http-high-level-server-side-api>` which can make the definition of more complex service logic much
|
||||||
easier.
|
easier.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
It is recommended to read the :ref:`implications-of-streaming-http-entities` section,
|
||||||
|
as it explains the underlying full-stack streaming concepts, which may be unexpected when coming
|
||||||
|
from a background with non-"streaming first" HTTP Servers.
|
||||||
|
|
||||||
Streams and HTTP
|
Streams and HTTP
|
||||||
----------------
|
----------------
|
||||||
|
|
@ -123,6 +127,7 @@ See :ref:`HttpEntity-scala` for a description of the alternatives.
|
||||||
If you rely on the :ref:`http-marshalling-scala` and/or :ref:`http-unmarshalling-scala` facilities provided by
|
If you rely on the :ref:`http-marshalling-scala` and/or :ref:`http-unmarshalling-scala` facilities provided by
|
||||||
Akka HTTP then the conversion of custom types to and from streamed entities can be quite convenient.
|
Akka HTTP then the conversion of custom types to and from streamed entities can be quite convenient.
|
||||||
|
|
||||||
|
.. _http-closing-connection-low-level:
|
||||||
|
|
||||||
Closing a connection
|
Closing a connection
|
||||||
~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,11 @@ defining RESTful web services. It picks up where the low-level API leaves off an
|
||||||
functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or
|
functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or
|
||||||
static content serving.
|
static content serving.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
It is recommended to read the :ref:`implications-of-streaming-http-entities` section,
|
||||||
|
as it explains the underlying full-stack streaming concepts, which may be unexpected when coming
|
||||||
|
from a background with non-"streaming first" HTTP Servers.
|
||||||
|
|
||||||
.. toctree::
|
.. toctree::
|
||||||
:maxdepth: 1
|
:maxdepth: 1
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http.javadsl.model.headers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Model for the `Connection` header.
|
||||||
|
* Specification: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.10
|
||||||
|
*/
|
||||||
|
public abstract class Connection extends akka.http.scaladsl.model.HttpHeader {
|
||||||
|
public abstract Iterable<String> getTokens();
|
||||||
|
|
||||||
|
public static Connection create(String... directives) {
|
||||||
|
return new akka.http.scaladsl.model.headers.Connection(akka.http.impl.util.Util.convertArray(directives));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -336,7 +336,8 @@ object Connection extends ModeledCompanion[Connection] {
|
||||||
def apply(first: String, more: String*): Connection = apply(immutable.Seq(first +: more: _*))
|
def apply(first: String, more: String*): Connection = apply(immutable.Seq(first +: more: _*))
|
||||||
implicit val tokensRenderer = Renderer.defaultSeqRenderer[String] // cache
|
implicit val tokensRenderer = Renderer.defaultSeqRenderer[String] // cache
|
||||||
}
|
}
|
||||||
final case class Connection(tokens: immutable.Seq[String]) extends RequestResponseHeader {
|
final case class Connection(tokens: immutable.Seq[String]) extends jm.headers.Connection
|
||||||
|
with RequestResponseHeader {
|
||||||
require(tokens.nonEmpty, "tokens must not be empty")
|
require(tokens.nonEmpty, "tokens must not be empty")
|
||||||
import Connection.tokensRenderer
|
import Connection.tokensRenderer
|
||||||
def renderValue[R <: Rendering](r: R): r.type = r ~~ tokens
|
def renderValue[R <: Rendering](r: R): r.type = r ~~ tokens
|
||||||
|
|
|
||||||
|
|
@ -150,6 +150,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte
|
||||||
|
|
||||||
"be able to wrap HttpHeaders with custom typed headers" in {
|
"be able to wrap HttpHeaders with custom typed headers" in {
|
||||||
|
|
||||||
|
// TODO potentially use the integration for Play / Lagom APIs?
|
||||||
// This HTTP model is typed. It uses Akka HTTP types internally, but
|
// This HTTP model is typed. It uses Akka HTTP types internally, but
|
||||||
// no Akka HTTP types are visible to users. This typed model is a
|
// no Akka HTTP types are visible to users. This typed model is a
|
||||||
// model that Play Framework may eventually move to.
|
// model that Play Framework may eventually move to.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue