diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/WebsocketExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/WebsocketExampleSpec.scala new file mode 100644 index 0000000000..b25ff5ef3b --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/WebsocketExampleSpec.scala @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.http.scaladsl.server + +import org.scalatest.{ Matchers, WordSpec } + +class WebsocketExampleSpec extends WordSpec with Matchers { + "core-example" in { + pending // compile-time only test + //#websocket-example-using-core + import akka.actor.ActorSystem + import akka.stream.ActorFlowMaterializer + import akka.stream.scaladsl.{ Source, Flow } + import akka.http.scaladsl.Http + import akka.http.scaladsl.model.ws.UpgradeToWebsocket + import akka.http.scaladsl.model.ws.{ TextMessage, Message } + import akka.http.scaladsl.model.{ HttpResponse, Uri, HttpRequest } + import akka.http.scaladsl.model.HttpMethods._ + + implicit val system = ActorSystem() + implicit val materializer = ActorFlowMaterializer() + + // The Greeter WebSocket Service expects a "name" per message and + // returns a greeting message for that name + val greeterWebsocketService = + Flow[Message] + .collect { + // we match but don't actually consume the text message here, + // rather we simply stream it back as the tail of the response + // this means we might start sending the response even before the + // end of the incoming message has been received + case tm: TextMessage ⇒ TextMessage(Source.single("Hello ") ++ tm.textStream) + // ignore binary messages + } + + val bindingFuture = Http().bindAndHandleSync({ + case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _) ⇒ + req.header[UpgradeToWebsocket] match { + case Some(upgrade) ⇒ upgrade.handleMessages(greeterWebsocketService) + case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") + } + case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") + }, interface = "localhost", port = 8080) + //#websocket-example-using-core + + println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") + Console.readLine() + + import system.dispatcher // for the future transformations + bindingFuture + .flatMap(_.unbind()) // trigger unbinding from the port + .onComplete(_ ⇒ system.shutdown()) // and shutdown when done + } + "routing-example" in { + pending // compile-time only test + //#websocket-example-using-routing + import akka.actor.ActorSystem + import akka.stream.ActorFlowMaterializer + import akka.stream.scaladsl.{ Source, Flow } + import akka.http.scaladsl.Http + import akka.http.scaladsl.model.ws.{ TextMessage, Message } + import akka.http.scaladsl.server.Directives + + implicit val system = ActorSystem() + implicit val materializer = ActorFlowMaterializer() + + import Directives._ + + // The Greeter WebSocket Service expects a "name" per message and + // returns a greeting message for that name + val greeterWebsocketService = + Flow[Message] + .collect { + case tm: TextMessage ⇒ TextMessage(Source.single("Hello ") ++ tm.textStream) + // ignore binary messages + } + + val route = + path("ws-greeter") { + get { + handleWebsocketMessages(greeterWebsocketService) + } + } + + val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) + + println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") + Console.readLine() + + import system.dispatcher // for the future transformations + bindingFuture + .flatMap(_.unbind()) // trigger unbinding from the port + .onComplete(_ ⇒ system.shutdown()) // and shutdown when done + //#websocket-example-using-routing + } +} diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/websocket-support.rst b/akka-docs-dev/rst/scala/http/routing-dsl/websocket-support.rst index 123201efd8..3dced95dde 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/websocket-support.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/websocket-support.rst @@ -1,4 +1,89 @@ Server-Side WebSocket Support ============================= -TODO \ No newline at end of file +WebSocket is a protocol that provides a bi-directional channel between browser and webserver usually run over an +upgraded HTTP(S) connection. Data is exchanged in messages whereby a message can either be binary data or unicode text. + +Akka HTTP provides a stream-based implementation of the WebSocket protocol that hides the low-level details of the +underlying binary framing wire-protocol and provides a simple API to implement services using WebSocket. + + +Model +----- + +The basic unit of data exchange in the WebSocket protocol is a message. A message can either be binary message, +i.e. a sequence of octets or a text message, i.e. a sequence of unicode code points. + +Akka HTTP provides a straight-forward model for this abstraction: + +.. includecode:: /../../akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala + :include: message-model + +The data of a message is provided as a stream because WebSocket messages do not have a predefined size and could +(in theory) be infinitely long. However, only one message can be open per direction of the WebSocket connection, +so that many application level protocols will want to make use of the delineation into (small) messages to transport +single application-level data units like "one event" or "one chat message". + +Many messages are small enough to be sent or received in one go. As an opportunity for optimization, the model provides +a ``Strict`` subclass for each kind of message which contains data as a strict, i.e. non-streamed, ``ByteString`` or +``String``. + +For sending data, the ``Strict`` variant is often the natural choice when the complete message has already been +assembled. While receiving data from the network connection the WebSocket implementation tries to create a ``Strict`` +message whenever possible, i.e. when the complete data was received in one chunk. However, the actual chunking +of messages over a network connection and through the various streaming abstraction layers is not deterministic from +the perspective of the application. Therefore application code must be able to handle both streaming and strict messages +and not expect certain messages to be strict. (Particularly, note that tests against ``localhost`` will behave +differently from when data is received over a physical network connection.) + + +Core-level support +------------------ + +On the server-side a request to upgrade the connection to WebSocket is provided through a special header that is added +to a request by the implementation. Whenever a request contains the synthetic +``akka.http.scaldsl.model.ws.UpgradeToWebsocket``-header an HTTP request was a valid WebSocket upgrade request. +Methods on this header can be used to create a response that will upgrade the connection to a WebSocket connection and +install a ``Flow`` to handle WebSocket traffic on this connection. + +The following example shows how to handle a WebSocket request using just the low-level http-core API: + +.. includecode2:: ../../code/docs/http/scaladsl/server/WebsocketExampleSpec.scala + :snippet: websocket-example-using-core + +Handshake ++++++++++ + +HTTP-level details of the WebSocket handshake are hidden from the application. The ``UpgradeToWebsocket`` represents a +valid handshake request. The WebSocket protocol defines a facility to negotiate an application-level sub-protocol for +the WebSocket connection. Use ``UpgradeToWebsocket.requestedProtocols`` to retrieve the protocols suggested by the +client and pass one of the values to ``UpgradeToWebsocket.handleMessages`` or one of the other handling methods to +select a sub-protocol. + +Handling Messages ++++++++++++++++++ + +A message handler is expected to be implemented as a ``Flow[Message, Message, Any]``. For typical request-response +scenarios this fits very well and such a ``Flow`` can be constructed from a simple function by using +``Flow[Message].map`` or ``Flow[Message].mapAsync``. + +There are other typical use-cases, however, like a server-push model where a server message is sent spontaneously, or +true bi-directional use-cases where input and output aren't logically connected. Providing the handler as a ``Flow`` in +these cases seems awkward. A variant of ``UpgradeToWebsocket.handleMessages``, +``UpgradeToWebsocket.handleMessageWithSinkSource`` is provided instead, which allows for supplying a ``Sink[Message]`` +and a ``Source[Message]`` for input and output independently. + +Note that a handler is required to consume the data stream of each message to make place for new messages. Otherwise, +subsequent messages may be stuck and message traffic in this direction will stall. + +Routing support +--------------- + +The routing DSL provides the :ref:`-handleWebsocketMessages-` directive to install a WebSocket handler if the request +was a WebSocket request. Otherwise, the directive rejects the request. + +Complete example +---------------- + +.. includecode2:: ../../code/docs/http/scaladsl/server/WebsocketExampleSpec.scala + :snippet: websocket-example-using-routing \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala index d29c859a79..f44b294b30 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala @@ -7,6 +7,7 @@ package akka.http.scaladsl.model.ws import akka.stream.scaladsl.Source import akka.util.ByteString +//#message-model /** * The ADT for Websocket messages. A message can either be a binary or a text message. */ @@ -21,6 +22,7 @@ trait TextMessage extends Message { */ def textStream: Source[String, _] } +//#message-model object TextMessage { def apply(text: String): Strict = Strict(text) def apply(textStream: Source[String, Any]): TextMessage = @@ -35,12 +37,14 @@ object TextMessage { } final private case class Streamed(textStream: Source[String, _]) extends TextMessage } +//#message-model trait BinaryMessage extends Message { /** * The contents of this message as a stream. */ def dataStream: Source[ByteString, _] } +//#message-model object BinaryMessage { def apply(data: ByteString): Strict = Strict(data) def apply(dataStream: Source[ByteString, Any]): BinaryMessage = diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index 42ff66c297..f2ff09347b 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -42,7 +42,7 @@ object TestServer extends App { path("crash") { complete(sys.error("BOOM!")) } - } + } ~ pathPrefix("inner")(getFromResourceDirectory("someDir")) }, interface = "localhost", port = 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")