From 2c462387af5fdbd34447ed0ed1650322831005b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Mon, 18 Jan 2016 16:58:09 +0100 Subject: [PATCH] =doc #19315 docs for websocket client --- .../javadsl/WebSocketClientExampleTest.java | 158 ++++++++++++++++++ .../http/client-side/websocket-support.rst | 82 ++++++++- .../stream/migration-guide-2.0-2.4-java.rst | 4 +- .../scaladsl/WebSocketClientExampleSpec.scala | 150 +++++++++++++++++ .../http/client-side/connection-level.rst | 2 + .../http/client-side/websocket-support.rst | 79 ++++++++- .../akka/http/scaladsl/model/ws/Message.scala | 3 - 7 files changed, 469 insertions(+), 9 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java create mode 100644 akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala diff --git a/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java b/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java new file mode 100644 index 0000000000..9271183850 --- /dev/null +++ b/akka-docs/rst/java/code/docs/http/javadsl/WebSocketClientExampleTest.java @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2016 Typesafe Inc. + */ +package docs.http.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.http.javadsl.Http; +import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.model.headers.Authorization; +import akka.http.javadsl.model.ws.Message; +import akka.http.javadsl.model.ws.TextMessage; +import akka.http.javadsl.model.ws.WebSocketRequest; +import akka.http.javadsl.model.ws.WebSocketUpgradeResponse; +import akka.japi.Pair; +import akka.japi.function.Procedure; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.util.concurrent.CompletionStage; + +@SuppressWarnings("unused") +public class WebSocketClientExampleTest { + + // compile only test + public void testSingleWebSocketRequest() { + //#single-WebSocket-request + ActorSystem system = ActorSystem.create(); + Materializer materializer = ActorMaterializer.create(system); + Http http = Http.get(system); + + // print each incoming text message + // would throw exception on non strict or binary message + Sink> printSink = + Sink.foreach((message) -> + System.out.println("Got message: " + message.asTextMessage().getStrictText()) + ); + + // send this as a message over the WebSocket + Source helloSource = + Source.single(TextMessage.create("hello world")); + + // the CompletionStage is the materialized value of Sink.foreach + // and it is completed when the stream completes + Flow> flow = + Flow.fromSinkAndSourceMat(printSink, helloSource, Keep.left()); + + Pair, CompletionStage> pair = + http.singleWebSocketRequest( + WebSocketRequest.create("ws://echo.websocket.org"), + flow, + materializer + ); + + // The first value in the pair is a CompletionStage that + // completes when the WebSocket request has connected successfully (or failed) + CompletionStage connected = pair.first().thenApply(upgrade -> { + // just like a regular http request we can get 404 NotFound, + // with a response body, that will be available from upgrade.response + if (upgrade.response().status().equals(StatusCodes.OK)) { + return Done.getInstance(); + } else { + throw new RuntimeException("Connection failed: " + upgrade.response().status()); + } + }); + + // the second value is the completion of the sink from above + // in other words, it completes when the WebSocket disconnects + CompletionStage closed = pair.second(); + + // in a real application you would not side effect here + // and handle errors more carefully + connected.thenAccept(done -> System.out.println("Connected")); + closed.thenAccept(done -> System.out.println("Connection closed")); + + //#single-WebSocket-request + } + + // compile time only test + public void testAuthorizedSingleWebSocketRequest() { + Materializer materializer = null; + Http http = null; + + Flow flow = null; + + //#authorized-single-WebSocket-request + http.singleWebSocketRequest( + WebSocketRequest.create("ws://example.com:8080/some/path") + .addHeader(Authorization.basic("johan", "correcthorsebatterystaple")), + flow, + materializer); + //#authorized-single-WebSocket-request + } + + // compile time only test + public void testWebSocketClientFlow() { + //#WebSocket-client-flow + ActorSystem system = ActorSystem.create(); + Materializer materializer = ActorMaterializer.create(system); + Http http = Http.get(system); + + // print each incoming text message + // would throw exception on non strict or binary message + Sink> printSink = + Sink.foreach((message) -> + System.out.println("Got message: " + message.asTextMessage().getStrictText()) + ); + + // send this as a message over the WebSocket + Source helloSource = + Source.single(TextMessage.create("hello world")); + + + Flow> webSocketFlow = + http.webSocketClientFlow(WebSocketRequest.create("ws://echo.websocket.org")); + + + Pair, CompletionStage> pair = + helloSource.viaMat(webSocketFlow, Keep.right()) + .toMat(printSink, Keep.both()) + .run(materializer); + + + // The first value in the pair is a CompletionStage that + // completes when the WebSocket request has connected successfully (or failed) + CompletionStage upgradeCompletion = pair.first(); + + // the second value is the completion of the sink from above + // in other words, it completes when the WebSocket disconnects + CompletionStage closed = pair.second(); + + CompletionStage connected = upgradeCompletion.thenApply(upgrade-> + { + // just like a regular http request we can get 404 NotFound, + // with a response body, that will be available from upgrade.response + if (upgrade.response().status().equals(StatusCodes.OK)) { + return Done.getInstance(); + } else { + throw new RuntimeException(("Connection failed: " + upgrade.response().status())); + } + }); + + // in a real application you would not side effect here + // and handle errors more carefully + connected.thenAccept(done -> System.out.println("Connected")); + closed.thenAccept(done -> System.out.println("Connection closed")); + + //#WebSocket-client-flow + } + + + + } diff --git a/akka-docs/rst/java/http/client-side/websocket-support.rst b/akka-docs/rst/java/http/client-side/websocket-support.rst index 91d0d0aa1c..f8883ad0fa 100644 --- a/akka-docs/rst/java/http/client-side/websocket-support.rst +++ b/akka-docs/rst/java/http/client-side/websocket-support.rst @@ -3,6 +3,84 @@ Client-Side WebSocket Support ============================= -Not yet implemented see 17275_. +Client side WebSocket support is available through ``Http.singleWebSocketRequest`` , +``Http.webSocketClientFlow`` and ``Http.webSocketClientLayer``. + +A WebSocket consists of two streams of messages, incoming messages (a :class:`Sink`) and outgoing messages +(a :class:`Source`) where either may be signalled first; or even be the only direction in which messages flow +during the lifetime of the connection. Therefore a WebSocket connection is modelled as either something you connect a +``Flow`` to or a ``Flow`` that you connect a ``Source`` +and a ``Sink`` to. + +A WebSocket request starts with a regular HTTP request which contains an ``Upgrade`` header (and possibly +other regular HTTP request properties), so in addition to the flow of messages there also is an initial response +from the server, this is modelled with :class:`WebSocketUpgradeResponse`. + +The methods of the WebSocket client API handle the upgrade to WebSocket on connection success and materializes +the connected WebSocket stream. If the connection fails, for example with a ``404 NotFound`` error, this regular +HTTP result can be found in ``WebSocketUpgradeResponse.response`` + + +Message +------- +Messages sent and received over a WebSocket can be either :class:`TextMessage` s or :class:`BinaryMessage` s and each +of those can be either strict (all data in one chunk) or streaming. In typical applications messages will be strict as +WebSockets are usually deployed to communicate using small messages not stream data, the protocol does however +allow this (by not marking the first fragment as final, as described in `rfc 6455 section 5.2`__). + +__ https://tools.ietf.org/html/rfc6455#section-5.2 + +The strict text is available from ``TextMessage.getStrictText`` and strict binary data from +``BinaryMessage.getStrictData``. + +For streaming messages ``BinaryMessage.getStreamedData`` and ``TextMessage.getStreamedText`` is used to access the data. +In these cases the data is provided as a ``Source`` for binary and ``Source`` +for text messages. + + +singleWebSocketRequest +---------------------- +``singleWebSocketRequest`` takes a :class:`WebSocketRequest` and a flow it will connect to the source and +sink of the WebSocket connection. It will trigger the request right away and returns a tuple containing a +``CompletionStage`` and the materialized value from the flow passed to the method. + +The future will succeed when the WebSocket connection has been established or the server returned a regular +HTTP response, or fail if the connection fails with an exception. + +Simple example sending a message and printing any incoming message: + +.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java + :include: single-WebSocket-request + +The websocket request may also include additional headers, like in this example, HTTP Basic Auth: + +.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java + :include: authorized-single-WebSocket-request + + +webSocketClientFlow +------------------- +``webSocketClientFlow`` takes a request, and returns a ``Flow>``. + +The future that is materialized from the flow will succeed when the WebSocket connection has been established or +the server returned a regular HTTP response, or fail if the connection fails with an exception. + +.. note:: + The :class:`Flow` that is returned by this method can only be materialized once. For each request a new + flow must be acquired by calling the method again. + +Simple example sending a message and printing any incoming message: + + +.. includecode:: ../../code/docs/http/javadsl/WebSocketClientExampleTest.java + :include: WebSocket-client-flow + + +webSocketClientLayer +-------------------- +Just like the :ref:`http-client-layer-java` for regular HTTP requests, the WebSocket layer can be used fully detached from the +underlying TCP interface. The same scenarios as described for regular HTTP requests apply here. + +The returned layer forms a ``BidiFlow>``. + -.. _17275: https://github.com/akka/akka/issues/17275 \ No newline at end of file diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index 8867effbc2..c98f82da67 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -22,9 +22,9 @@ the ``PatternCS`` class that provide the ability to interact between Actors and Should you have the need to use Scala Futures with these new Java APIs please use the ``scala-java8-compat`` library that comes as a dependency of Akka. For more -information see `the documentation``_. +information see `the documentation`__. -.. _`the documentation`:: https://github.com/scala/scala-java8-compat +__ https://github.com/scala/scala-java8-compat akka.Done and akka.NotUsed replacing Unit and BoxedUnit ------------------------------------------------------- diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala new file mode 100644 index 0000000000..9ee157c43d --- /dev/null +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/WebSocketClientExampleSpec.scala @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2009-2016 Typesafe Inc. + */ +package docs.http.scaladsl + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.headers.{ BasicHttpCredentials, Authorization } +import org.scalatest.{ Matchers, WordSpec } + +class WebSocketClientExampleSpec extends WordSpec with Matchers { + + "singleWebSocket-request-example" in { + pending // compile-time only test + //#single-WebSocket-request + import akka.{ Done, NotUsed } + import akka.http.scaladsl.Http + import akka.stream.ActorMaterializer + import akka.stream.scaladsl._ + import akka.http.scaladsl.model._ + import akka.http.scaladsl.model.ws._ + + import scala.concurrent.Future + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + import system.dispatcher + + // print each incoming strict text message + val printSink: Sink[Message, Future[Done]] = + Sink.foreach { + case message: TextMessage.Strict => + println(message.text) + } + + val helloSource: Source[Message, NotUsed] = + Source.single(TextMessage("hello world!")) + + // the Future[Done] is the materialized value of Sink.foreach + // and it is completed when the stream completes + val flow: Flow[Message, Message, Future[Done]] = + Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left) + + // upgradeResponse is a Future[WebSocketUpgradeResponse] that + // completes or fails when the connection succeeds or fails + // and closed is a Future[Done] representing the stream completion from above + val (upgradeResponse, closed) = + Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow) + + val connected = upgradeResponse.map { upgrade => + // just like a regular http request we can get 404 NotFound, + // with a response body, that will be available from upgrade.response + if (upgrade.response.status == StatusCodes.OK) { + Done + } else { + throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") + } + } + + // in a real application you would not side effect here + // and handle errors more carefully + connected.onComplete(println) + closed.foreach(_ => println("closed")) + + //#single-WebSocket-request + } + + "authorized-singleWebSocket-request-example" in { + pending // compile-time only test + import akka.NotUsed + import akka.http.scaladsl.Http + import akka.stream.ActorMaterializer + import akka.stream.scaladsl._ + import akka.http.scaladsl.model._ + import akka.http.scaladsl.model.ws._ + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + import collection.immutable.Seq + + val flow: Flow[Message, Message, NotUsed] = ??? + + //#authorized-single-WebSocket-request + val (upgradeResponse, _) = + Http().singleWebSocketRequest( + WebSocketRequest( + "ws://example.com:8080/some/path", + extraHeaders = Seq(Authorization( + BasicHttpCredentials("johan", "correcthorsebatterystaple")))), + flow) + //#authorized-single-WebSocket-request + } + + "WebSocketClient-flow-example" in { + pending // compile-time only test + + //#WebSocket-client-flow + import akka.Done + import akka.http.scaladsl.Http + import akka.stream.ActorMaterializer + import akka.stream.scaladsl._ + import akka.http.scaladsl.model._ + import akka.http.scaladsl.model.ws._ + + import scala.concurrent.Future + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + import system.dispatcher + + // Future[Done] is the materialized value of Sink.foreach, + // emitted when the stream completes + val incoming: Sink[Message, Future[Done]] = + Sink.foreach[Message] { + case message: TextMessage.Strict => + println(message.text) + } + + // send this as a message over the WebSocket + val outgoing = Source.single(TextMessage("hello world!")) + + // flow to use (note: not re-usable!) + val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) + + // the materialized value is a tuple with + // upgradeResponse is a Future[WebSocketUpgradeResponse] that + // completes or fails when the connection succeeds or fails + // and closed is a Future[Done] with the stream completion from the incoming sink + val (upgradeResponse, closed) = + outgoing + .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] + .toMat(incoming)(Keep.both) // also keep the Future[Done] + .run() + + // just like a regular http request we can get 404 NotFound etc. + // that will be available from upgrade.response + val connected = upgradeResponse.flatMap { upgrade => + if (upgrade.response.status == StatusCodes.OK) { + Future.successful(Done) + } else { + throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") + } + } + + // in a real application you would not side effect here + connected.onComplete(println) + closed.foreach(_ => println("closed")) + + //#WebSocket-client-flow + } + +} diff --git a/akka-docs/rst/scala/http/client-side/connection-level.rst b/akka-docs/rst/scala/http/client-side/connection-level.rst index 1f403e88b6..452a633d20 100644 --- a/akka-docs/rst/scala/http/client-side/connection-level.rst +++ b/akka-docs/rst/scala/http/client-side/connection-level.rst @@ -64,6 +64,8 @@ as a more general purpose streaming infrastructure feature. However, akka-stream should soon provide such a feature. +.. _http-client-layer: + Stand-Alone HTTP Layer Usage ---------------------------- diff --git a/akka-docs/rst/scala/http/client-side/websocket-support.rst b/akka-docs/rst/scala/http/client-side/websocket-support.rst index 522a8123c7..a3e54c43d2 100644 --- a/akka-docs/rst/scala/http/client-side/websocket-support.rst +++ b/akka-docs/rst/scala/http/client-side/websocket-support.rst @@ -3,6 +3,81 @@ Client-Side WebSocket Support ============================= -Not yet implemented see 17275_. +Client side WebSocket support is available through ``Http.singleWebSocketRequest`` , +``Http.webSocketClientFlow`` and ``Http.webSocketClientLayer``. + +A WebSocket consists of two streams of messages, incoming messages (a :class:`Sink`) and outgoing messages +(a :class:`Source`) where either may be signalled first; or even be the only direction in which messages flow during +the lifetime of the connection. Therefore a WebSocket connection is modelled as either something you connect a +``Flow[Message, Message, Mat]`` to or a ``Flow[Message, Message, Mat]`` that you connect a ``Source[Message, Mat]`` and +a ``Sink[Message, Mat]`` to. + +A WebSocket request starts with a regular HTTP request which contains an ``Upgrade`` header (and possibly +other regular HTTP request properties), so in addition to the flow of messages there also is an initial response +from the server, this is modelled with :class:`WebSocketUpgradeResponse`. + +The methods of the WebSocket client API handle the upgrade to WebSocket on connection success and materializes +the connected WebSocket stream. If the connection fails, for example with a ``404 NotFound`` error, this regular +HTTP result can be found in ``WebSocketUpgradeResponse.response`` + + +Message +------- +Messages sent and received over a WebSocket can be either :class:`TextMessage` s or :class:`BinaryMessage` s and each +of those has two subtypes :class:`Strict` or :class:`Streaming`. In typical applications messages will be ``Strict`` as +WebSockets are usually deployed to communicate using small messages not stream data, the protocol does however +allow this (by not marking the first fragment as final, as described in `rfc 6455 section 5.2`__). + +__ https://tools.ietf.org/html/rfc6455#section-5.2 + +For such streaming messages :class:`BinaryMessage.Streaming` and :class:`TextMessage.Streaming` will be used. In these cases +the data is provided as a ``Source[ByteString, NotUsed]`` for binary and ``Source[String, NotUsed]`` for text messages. + + +singleWebSocketRequest +---------------------- +``singleWebSocketRequest`` takes a :class:`WebSocketRequest` and a flow it will connect to the source and +sink of the WebSocket connection. It will trigger the request right away and returns a tuple containing the +``Future[WebSocketUpgradeResponse]`` and the materialized value from the flow passed to the method. + +The future will succeed when the WebSocket connection has been established or the server returned a regular +HTTP response, or fail if the connection fails with an exception. + +Simple example sending a message and printing any incoming message: + +.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala + :include: single-WebSocket-request + + +The websocket request may also include additional headers, like in this example, HTTP Basic Auth: + +.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala + :include: authorized-single-WebSocket-request + + +webSocketClientFlow +------------------- +``webSocketClientFlow`` takes a request, and returns a ``Flow[Message, Message, Future[WebSocketUpgradeResponse]]``. + +The future that is materialized from the flow will succeed when the WebSocket connection has been established or +the server returned a regular HTTP response, or fail if the connection fails with an exception. + +.. note:: + The :class:`Flow` that is returned by this method can only be materialized once. For each request a new + flow must be acquired by calling the method again. + +Simple example sending a message and printing any incoming message: + + +.. includecode:: ../../code/docs/http/scaladsl/WebSocketClientExampleSpec.scala + :include: WebSocket-client-flow + + +webSocketClientLayer +-------------------- +Just like the :ref:`http-client-layer` for regular HTTP requests, the WebSocket layer can be used fully detached from the +underlying TCP interface. The same scenarios as described for regular HTTP requests apply here. + +The returned layer forms a ``BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebSocketUpgradeResponse]]``. + -.. _17275: https://github.com/akka/akka/issues/17275 \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala index a8b5622316..82834fe098 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala @@ -13,9 +13,6 @@ import akka.util.ByteString */ sealed trait Message -/** - * A binary - */ sealed trait TextMessage extends Message { /** * The contents of this message as a stream.