From ab526356dd64e7d7d4356aea22c0862faa0ed858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 16 May 2016 11:10:30 +0200 Subject: [PATCH] Docs for half closed client WebSockets #19957 --- .../javadsl/WebSocketClientExampleTest.java | 96 ++++++++++++++- .../http/client-side/websocket-support.rst | 42 +++++++ .../scaladsl/WebSocketClientExampleSpec.scala | 113 ++++++++++++++++-- .../http/client-side/websocket-support.rst | 40 +++++++ 4 files changed, 276 insertions(+), 15 deletions(-) diff --git a/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java b/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java index cb4efb766b..1348c65636 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java @@ -22,6 +22,9 @@ import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @SuppressWarnings("unused") @@ -36,21 +39,21 @@ public class WebSocketClientExampleTest { // print each incoming text message // would throw exception on non strict or binary message - Sink> printSink = + final Sink> printSink = Sink.foreach((message) -> System.out.println("Got message: " + message.asTextMessage().getStrictText()) ); // send this as a message over the WebSocket - Source helloSource = + final Source helloSource = Source.single(TextMessage.create("hello world")); // the CompletionStage is the materialized value of Sink.foreach // and it is completed when the stream completes - Flow> flow = + final Flow> flow = Flow.fromSinkAndSourceMat(printSink, helloSource, Keep.left()); - Pair, CompletionStage> pair = + final Pair, CompletionStage> pair = http.singleWebSocketRequest( WebSocketRequest.create("ws://echo.websocket.org"), flow, @@ -59,7 +62,7 @@ public class WebSocketClientExampleTest { // The first value in the pair is a CompletionStage that // completes when the WebSocket request has connected successfully (or failed) - CompletionStage connected = pair.first().thenApply(upgrade -> { + final CompletionStage connected = pair.first().thenApply(upgrade -> { // just like a regular http request we can get 404 NotFound, // with a response body, that will be available from upgrade.response if (upgrade.response().status().equals(StatusCodes.OK)) { @@ -71,7 +74,7 @@ public class WebSocketClientExampleTest { // the second value is the completion of the sink from above // in other words, it completes when the WebSocket disconnects - CompletionStage closed = pair.second(); + final CompletionStage closed = pair.second(); // in a real application you would not side effect here // and handle errors more carefully @@ -81,6 +84,87 @@ public class WebSocketClientExampleTest { //#single-WebSocket-request } + // compile only test + public void halfClosedWebSocketClosingExample() { + + final ActorSystem system = ActorSystem.create(); + final Materializer materializer = ActorMaterializer.create(system); + final Http http = Http.get(system); + + //#half-closed-WebSocket-closing + + // we may expect to be able to to just tail + // the server websocket output like this + final Flow flow = + Flow.fromSinkAndSource( + Sink.foreach(System.out::println), + Source.empty()); + + http.singleWebSocketRequest( + WebSocketRequest.create("ws://example.com:8080/some/path"), + flow, + materializer); + + //#half-closed-WebSocket-closing + } + + public void halfClosedWebSocketWorkingExample() { + final ActorSystem system = ActorSystem.create(); + final Materializer materializer = ActorMaterializer.create(system); + final Http http = Http.get(system); + + //#half-closed-WebSocket-working + + // using Source.maybe materializes into a completable future + // which will allow us to complete the source later + final Flow>> flow = + Flow.fromSinkAndSourceMat( + Sink.foreach(System.out::println), + Source.maybe(), + Keep.right()); + + final Pair, CompletableFuture>> pair = + http.singleWebSocketRequest( + WebSocketRequest.create("ws://example.com:8080/some/path"), + flow, + materializer); + + // at some later time we want to disconnect + pair.second().complete(Optional.empty()); + //#half-closed-WebSocket-working + } + + public void halfClosedWebSocketFiniteWorkingExample() { + final ActorSystem system = ActorSystem.create(); + final Materializer materializer = ActorMaterializer.create(system); + final Http http = Http.get(system); + + //#half-closed-WebSocket-finite + + // emit "one" and then "two" and then keep the source from completing + final Source>> source = + Source.from(Arrays.asList(TextMessage.create("one"), TextMessage.create("two"))) + .concatMat(Source.maybe(), Keep.right()); + + final Flow>> flow = + Flow.fromSinkAndSourceMat( + Sink.foreach(System.out::println), + source, + Keep.right()); + + final Pair, CompletableFuture>> pair = + http.singleWebSocketRequest( + WebSocketRequest.create("ws://example.com:8080/some/path"), + flow, + materializer); + + // at some later time we want to disconnect + pair.second().complete(Optional.empty()); + //#half-closed-WebSocket-finite + } + + + // compile time only test public void testAuthorizedSingleWebSocketRequest() { Materializer materializer = null; diff --git a/akka-docs/rst/java/http/client-side/websocket-support.rst b/akka-docs/rst/java/http/client-side/websocket-support.rst index c636c80c0c..5331ff754c 100644 --- a/akka-docs/rst/java/http/client-side/websocket-support.rst +++ b/akka-docs/rst/java/http/client-side/websocket-support.rst @@ -21,6 +21,10 @@ the connected WebSocket stream. If the connection fails, for example with a ``40 HTTP result can be found in ``WebSocketUpgradeResponse.response`` +.. note:: + Make sure to read and understand the section about :ref:`half-closed-client-websockets-java` as the behavior + when using WebSockets for one-way communication may not be what you would expect. + Message ------- Messages sent and received over a WebSocket can be either :class:`TextMessage` s or :class:`BinaryMessage` s and each @@ -83,4 +87,42 @@ underlying TCP interface. The same scenarios as described for regular HTTP reque The returned layer forms a ``BidiFlow>``. +.. _half-closed-client-websockets-java: + + +Half-Closed WebSockets +---------------------- +The Akka HTTP WebSocket API does not support half-closed connections which means that if the either stream completes the +entire connection is closed (after a "Closing Handshake" has been exchanged or a timeout of 3 seconds has passed). +This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server, +like this: + +.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java + :include: half-closed-WebSocket-closing + +This will in fact quickly close the connection because of the ``Source.empty`` being completed immediately when the +stream is materialized. To solve this you can make sure to not complete the outgoing source by using for example +``Source.maybe`` like this: + +.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java + :include: half-closed-WebSocket-working + +This will keep the outgoing source from completing, but without emitting any elements until the ``CompletableFuture`` is manually +completed which makes the ``Source`` complete and the connection to close. + +The same problem holds true if emitting a finite number of elements, as soon as the last element is reached the ``Source`` +will close and cause the connection to close. To avoid that you can concatenate ``Source.maybe`` to the finite stream: + +.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java + :include: half-closed-WebSocket-finite + +Scenarios that exist with the two streams in a WebSocket and possible ways to deal with it: + +=========================================== ================================================================================ +Scenario Possible solution +=========================================== ================================================================================ +Two-way communication ``Flow.fromSinkAndSource``, or ``Flow.map`` for a request-response protocol +Infinite incoming stream, no outgoing ``Flow.fromSinkAndSource(someSink, Source.maybe())`` +Infinite outgoing stream, no incoming ``Flow.fromSinkAndSource(Sink.ignore(), yourSource)`` +=========================================== ================================================================================ diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala index 18731f5d85..67baf6628a 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala @@ -4,13 +4,15 @@ package docs.http.scaladsl import akka.actor.ActorSystem -import akka.http.scaladsl.model.headers.{ BasicHttpCredentials, Authorization } +import akka.http.scaladsl.model.headers.{ Authorization, BasicHttpCredentials } +import docs.CompileOnlySpec import org.scalatest.{ Matchers, WordSpec } -class WebSocketClientExampleSpec extends WordSpec with Matchers { +import scala.concurrent.Promise - "singleWebSocket-request-example" in { - pending // compile-time only test +class WebSocketClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec { + + "singleWebSocket-request-example" in compileOnlySpec { //#single-WebSocket-request import akka.{ Done, NotUsed } import akka.http.scaladsl.Http @@ -64,8 +66,103 @@ class WebSocketClientExampleSpec extends WordSpec with Matchers { //#single-WebSocket-request } - "authorized-singleWebSocket-request-example" in { - pending // compile-time only test + "half-closed-WebSocket-closing-example" in compileOnlySpec { + import akka.{ Done, NotUsed } + import akka.http.scaladsl.Http + import akka.stream.ActorMaterializer + import akka.stream.scaladsl._ + import akka.http.scaladsl.model._ + import akka.http.scaladsl.model.ws._ + + import scala.concurrent.Future + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + import system.dispatcher + + //#half-closed-WebSocket-closing-example + + // we may expect to be able to to just tail + // the server websocket output like this + val flow: Flow[Message, Message, NotUsed] = + Flow.fromSinkAndSource( + Sink.foreach(println), + Source.empty) + + Http().singleWebSocketRequest( + WebSocketRequest("ws://example.com:8080/some/path"), + flow) + + //#half-closed-WebSocket-closing-example + } + + "half-closed-WebSocket-working-example" in compileOnlySpec { + import akka.{ Done, NotUsed } + import akka.http.scaladsl.Http + import akka.stream.ActorMaterializer + import akka.stream.scaladsl._ + import akka.http.scaladsl.model._ + import akka.http.scaladsl.model.ws._ + + import scala.concurrent.Future + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + import system.dispatcher + + //#half-closed-WebSocket-working-example + + // using Source.maybe materializes into a promise + // which will allow us to complete the source later + val flow: Flow[Message, Message, Promise[Option[Message]]] = + Flow.fromSinkAndSourceMat( + Sink.foreach[Message](println), + Source.maybe[Message])(Keep.right) + + val (upgradeResponse, promise) = + Http().singleWebSocketRequest( + WebSocketRequest("ws://example.com:8080/some/path"), + flow) + + // at some later time we want to disconnect + promise.success(None) + //#half-closed-WebSocket-working-example + } + + "half-closed-WebSocket-finite-working-example" in compileOnlySpec { + import akka.{ Done, NotUsed } + import akka.http.scaladsl.Http + import akka.stream.ActorMaterializer + import akka.stream.scaladsl._ + import akka.http.scaladsl.model._ + import akka.http.scaladsl.model.ws._ + + import scala.concurrent.Future + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + import system.dispatcher + + //#half-closed-WebSocket-finite-working-example + + // using emit "one" and "two" and then keep the connection open + val flow: Flow[Message, Message, Promise[Option[Message]]] = + Flow.fromSinkAndSourceMat( + Sink.foreach[Message](println), + Source(List(TextMessage("one"), TextMessage("two"))) + .concatMat(Source.maybe[Message])(Keep.right))(Keep.right) + + val (upgradeResponse, promise) = + Http().singleWebSocketRequest( + WebSocketRequest("ws://example.com:8080/some/path"), + flow) + + // at some later time we want to disconnect + promise.success(None) + //#half-closed-WebSocket-finite-working-example + } + + "authorized-singleWebSocket-request-example" in compileOnlySpec { import akka.NotUsed import akka.http.scaladsl.Http import akka.stream.ActorMaterializer @@ -88,9 +185,7 @@ class WebSocketClientExampleSpec extends WordSpec with Matchers { //#authorized-single-WebSocket-request } - "WebSocketClient-flow-example" in { - pending // compile-time only test - + "WebSocketClient-flow-example" in compileOnlySpec { //#WebSocket-client-flow import akka.Done import akka.http.scaladsl.Http diff --git a/akka-docs/rst/scala/http/client-side/websocket-support.rst b/akka-docs/rst/scala/http/client-side/websocket-support.rst index a3e54c43d2..79f88e4002 100644 --- a/akka-docs/rst/scala/http/client-side/websocket-support.rst +++ b/akka-docs/rst/scala/http/client-side/websocket-support.rst @@ -20,6 +20,9 @@ The methods of the WebSocket client API handle the upgrade to WebSocket on conne the connected WebSocket stream. If the connection fails, for example with a ``404 NotFound`` error, this regular HTTP result can be found in ``WebSocketUpgradeResponse.response`` +.. note:: + Make sure to read and understand the section about :ref:`half-closed-client-websockets` as the behavior + when using WebSockets for one-way communication may not be what you would expect. Message ------- @@ -81,3 +84,40 @@ underlying TCP interface. The same scenarios as described for regular HTTP reque The returned layer forms a ``BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebSocketUpgradeResponse]]``. +.. _half-closed-client-websockets: + +Half-Closed WebSockets +---------------------- +The Akka HTTP WebSocket API does not support half-closed connections which means that if the either stream completes the +entire connection is closed (after a "Closing Handshake" has been exchanged or a timeout of 3 seconds has passed). +This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server, +like this: + +.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala + :include: half-closed-WebSocket-closing-example + +This will in fact quickly close the connection because of the ``Source.empty`` being completed immediately when the +stream is materialized. To solve this you can make sure to not complete the outgoing source by using for example +``Source.maybe`` like this: + +.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala + :include: half-closed-WebSocket-working-example + +This will keep the outgoing source from completing, but without emitting any elements until the ``Promise`` is manually +completed which makes the ``Source`` complete and the connection to close. + +The same problem holds true if emitting a finite number of elements, as soon as the last element is reached the ``Source`` +will close and cause the connection to close. To avoid that you can concatenate ``Source.maybe`` to the finite stream: + +.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala + :include: half-closed-WebSocket-finite-working-example + +Scenarios that exist with the two streams in a WebSocket and possible ways to deal with it: + +=========================================== ================================================================================ +Scenario Possible solution +=========================================== ================================================================================ +Two-way communication ``Flow.fromSinkAndSource``, or ``Flow.map`` for a request-response protocol +Infinite incoming stream, no outgoing ``Flow.fromSinkAndSource(someSink, Source.maybe)`` +Infinite outgoing stream, no incoming ``Flow.fromSinkAndSource(Sink.ignore, yourSource)`` +=========================================== ================================================================================ \ No newline at end of file