Docs for half closed client WebSockets #19957

This commit is contained in:
Johan Andrén 2016-05-16 11:10:30 +02:00
parent 192fa56975
commit ab526356dd
4 changed files with 276 additions and 15 deletions

View file

@ -22,6 +22,9 @@ import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@SuppressWarnings("unused")
@ -36,21 +39,21 @@ public class WebSocketClientExampleTest {
// print each incoming text message
// would throw exception on non strict or binary message
Sink<Message, CompletionStage<Done>> printSink =
final Sink<Message, CompletionStage<Done>> printSink =
Sink.foreach((message) ->
System.out.println("Got message: " + message.asTextMessage().getStrictText())
);
// send this as a message over the WebSocket
Source<Message, NotUsed> helloSource =
final Source<Message, NotUsed> helloSource =
Source.single(TextMessage.create("hello world"));
// the CompletionStage<Done> is the materialized value of Sink.foreach
// and it is completed when the stream completes
Flow<Message, Message, CompletionStage<Done>> flow =
final Flow<Message, Message, CompletionStage<Done>> flow =
Flow.fromSinkAndSourceMat(printSink, helloSource, Keep.left());
Pair<CompletionStage<WebSocketUpgradeResponse>, CompletionStage<Done>> pair =
final Pair<CompletionStage<WebSocketUpgradeResponse>, CompletionStage<Done>> pair =
http.singleWebSocketRequest(
WebSocketRequest.create("ws://echo.websocket.org"),
flow,
@ -59,7 +62,7 @@ public class WebSocketClientExampleTest {
// The first value in the pair is a CompletionStage<WebSocketUpgradeResponse> that
// completes when the WebSocket request has connected successfully (or failed)
CompletionStage<Done> connected = pair.first().thenApply(upgrade -> {
final CompletionStage<Done> connected = pair.first().thenApply(upgrade -> {
// just like a regular http request we can get 404 NotFound,
// with a response body, that will be available from upgrade.response
if (upgrade.response().status().equals(StatusCodes.OK)) {
@ -71,7 +74,7 @@ public class WebSocketClientExampleTest {
// the second value is the completion of the sink from above
// in other words, it completes when the WebSocket disconnects
CompletionStage<Done> closed = pair.second();
final CompletionStage<Done> closed = pair.second();
// in a real application you would not side effect here
// and handle errors more carefully
@ -81,6 +84,87 @@ public class WebSocketClientExampleTest {
//#single-WebSocket-request
}
// compile only test
public void halfClosedWebSocketClosingExample() {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Http http = Http.get(system);
//#half-closed-WebSocket-closing
// we may expect to be able to to just tail
// the server websocket output like this
final Flow<Message, Message, NotUsed> flow =
Flow.fromSinkAndSource(
Sink.foreach(System.out::println),
Source.empty());
http.singleWebSocketRequest(
WebSocketRequest.create("ws://example.com:8080/some/path"),
flow,
materializer);
//#half-closed-WebSocket-closing
}
public void halfClosedWebSocketWorkingExample() {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Http http = Http.get(system);
//#half-closed-WebSocket-working
// using Source.maybe materializes into a completable future
// which will allow us to complete the source later
final Flow<Message, Message, CompletableFuture<Optional<Message>>> flow =
Flow.fromSinkAndSourceMat(
Sink.foreach(System.out::println),
Source.maybe(),
Keep.right());
final Pair<CompletionStage<WebSocketUpgradeResponse>, CompletableFuture<Optional<Message>>> pair =
http.singleWebSocketRequest(
WebSocketRequest.create("ws://example.com:8080/some/path"),
flow,
materializer);
// at some later time we want to disconnect
pair.second().complete(Optional.empty());
//#half-closed-WebSocket-working
}
public void halfClosedWebSocketFiniteWorkingExample() {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Http http = Http.get(system);
//#half-closed-WebSocket-finite
// emit "one" and then "two" and then keep the source from completing
final Source<Message, CompletableFuture<Optional<Message>>> source =
Source.from(Arrays.<Message>asList(TextMessage.create("one"), TextMessage.create("two")))
.concatMat(Source.maybe(), Keep.right());
final Flow<Message, Message, CompletableFuture<Optional<Message>>> flow =
Flow.fromSinkAndSourceMat(
Sink.foreach(System.out::println),
source,
Keep.right());
final Pair<CompletionStage<WebSocketUpgradeResponse>, CompletableFuture<Optional<Message>>> pair =
http.singleWebSocketRequest(
WebSocketRequest.create("ws://example.com:8080/some/path"),
flow,
materializer);
// at some later time we want to disconnect
pair.second().complete(Optional.empty());
//#half-closed-WebSocket-finite
}
// compile time only test
public void testAuthorizedSingleWebSocketRequest() {
Materializer materializer = null;

View file

@ -21,6 +21,10 @@ the connected WebSocket stream. If the connection fails, for example with a ``40
HTTP result can be found in ``WebSocketUpgradeResponse.response``
.. note::
Make sure to read and understand the section about :ref:`half-closed-client-websockets-java` as the behavior
when using WebSockets for one-way communication may not be what you would expect.
Message
-------
Messages sent and received over a WebSocket can be either :class:`TextMessage` s or :class:`BinaryMessage` s and each
@ -83,4 +87,42 @@ underlying TCP interface. The same scenarios as described for regular HTTP reque
The returned layer forms a ``BidiFlow<Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage<WebSocketUpgradeResponse>>``.
.. _half-closed-client-websockets-java:
Half-Closed WebSockets
----------------------
The Akka HTTP WebSocket API does not support half-closed connections which means that if the either stream completes the
entire connection is closed (after a "Closing Handshake" has been exchanged or a timeout of 3 seconds has passed).
This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server,
like this:
.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java
:include: half-closed-WebSocket-closing
This will in fact quickly close the connection because of the ``Source.empty`` being completed immediately when the
stream is materialized. To solve this you can make sure to not complete the outgoing source by using for example
``Source.maybe`` like this:
.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java
:include: half-closed-WebSocket-working
This will keep the outgoing source from completing, but without emitting any elements until the ``CompletableFuture`` is manually
completed which makes the ``Source`` complete and the connection to close.
The same problem holds true if emitting a finite number of elements, as soon as the last element is reached the ``Source``
will close and cause the connection to close. To avoid that you can concatenate ``Source.maybe`` to the finite stream:
.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java
:include: half-closed-WebSocket-finite
Scenarios that exist with the two streams in a WebSocket and possible ways to deal with it:
=========================================== ================================================================================
Scenario Possible solution
=========================================== ================================================================================
Two-way communication ``Flow.fromSinkAndSource``, or ``Flow.map`` for a request-response protocol
Infinite incoming stream, no outgoing ``Flow.fromSinkAndSource(someSink, Source.maybe())``
Infinite outgoing stream, no incoming ``Flow.fromSinkAndSource(Sink.ignore(), yourSource)``
=========================================== ================================================================================

View file

@ -4,13 +4,15 @@
package docs.http.scaladsl
import akka.actor.ActorSystem
import akka.http.scaladsl.model.headers.{ BasicHttpCredentials, Authorization }
import akka.http.scaladsl.model.headers.{ Authorization, BasicHttpCredentials }
import docs.CompileOnlySpec
import org.scalatest.{ Matchers, WordSpec }
class WebSocketClientExampleSpec extends WordSpec with Matchers {
import scala.concurrent.Promise
"singleWebSocket-request-example" in {
pending // compile-time only test
class WebSocketClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec {
"singleWebSocket-request-example" in compileOnlySpec {
//#single-WebSocket-request
import akka.{ Done, NotUsed }
import akka.http.scaladsl.Http
@ -64,8 +66,103 @@ class WebSocketClientExampleSpec extends WordSpec with Matchers {
//#single-WebSocket-request
}
"authorized-singleWebSocket-request-example" in {
pending // compile-time only test
"half-closed-WebSocket-closing-example" in compileOnlySpec {
import akka.{ Done, NotUsed }
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
//#half-closed-WebSocket-closing-example
// we may expect to be able to to just tail
// the server websocket output like this
val flow: Flow[Message, Message, NotUsed] =
Flow.fromSinkAndSource(
Sink.foreach(println),
Source.empty)
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
//#half-closed-WebSocket-closing-example
}
"half-closed-WebSocket-working-example" in compileOnlySpec {
import akka.{ Done, NotUsed }
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
//#half-closed-WebSocket-working-example
// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](println),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
// at some later time we want to disconnect
promise.success(None)
//#half-closed-WebSocket-working-example
}
"half-closed-WebSocket-finite-working-example" in compileOnlySpec {
import akka.{ Done, NotUsed }
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
//#half-closed-WebSocket-finite-working-example
// using emit "one" and "two" and then keep the connection open
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](println),
Source(List(TextMessage("one"), TextMessage("two")))
.concatMat(Source.maybe[Message])(Keep.right))(Keep.right)
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
// at some later time we want to disconnect
promise.success(None)
//#half-closed-WebSocket-finite-working-example
}
"authorized-singleWebSocket-request-example" in compileOnlySpec {
import akka.NotUsed
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
@ -88,9 +185,7 @@ class WebSocketClientExampleSpec extends WordSpec with Matchers {
//#authorized-single-WebSocket-request
}
"WebSocketClient-flow-example" in {
pending // compile-time only test
"WebSocketClient-flow-example" in compileOnlySpec {
//#WebSocket-client-flow
import akka.Done
import akka.http.scaladsl.Http

View file

@ -20,6 +20,9 @@ The methods of the WebSocket client API handle the upgrade to WebSocket on conne
the connected WebSocket stream. If the connection fails, for example with a ``404 NotFound`` error, this regular
HTTP result can be found in ``WebSocketUpgradeResponse.response``
.. note::
Make sure to read and understand the section about :ref:`half-closed-client-websockets` as the behavior
when using WebSockets for one-way communication may not be what you would expect.
Message
-------
@ -81,3 +84,40 @@ underlying TCP interface. The same scenarios as described for regular HTTP reque
The returned layer forms a ``BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebSocketUpgradeResponse]]``.
.. _half-closed-client-websockets:
Half-Closed WebSockets
----------------------
The Akka HTTP WebSocket API does not support half-closed connections which means that if the either stream completes the
entire connection is closed (after a "Closing Handshake" has been exchanged or a timeout of 3 seconds has passed).
This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server,
like this:
.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala
:include: half-closed-WebSocket-closing-example
This will in fact quickly close the connection because of the ``Source.empty`` being completed immediately when the
stream is materialized. To solve this you can make sure to not complete the outgoing source by using for example
``Source.maybe`` like this:
.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala
:include: half-closed-WebSocket-working-example
This will keep the outgoing source from completing, but without emitting any elements until the ``Promise`` is manually
completed which makes the ``Source`` complete and the connection to close.
The same problem holds true if emitting a finite number of elements, as soon as the last element is reached the ``Source``
will close and cause the connection to close. To avoid that you can concatenate ``Source.maybe`` to the finite stream:
.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala
:include: half-closed-WebSocket-finite-working-example
Scenarios that exist with the two streams in a WebSocket and possible ways to deal with it:
=========================================== ================================================================================
Scenario Possible solution
=========================================== ================================================================================
Two-way communication ``Flow.fromSinkAndSource``, or ``Flow.map`` for a request-response protocol
Infinite incoming stream, no outgoing ``Flow.fromSinkAndSource(someSink, Source.maybe)``
Infinite outgoing stream, no incoming ``Flow.fromSinkAndSource(Sink.ignore, yourSource)``
=========================================== ================================================================================