From 2402bedc6a7e8bf058b4e49d36e7cba39f39e8ef Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 21 Apr 2015 14:54:00 +0200 Subject: [PATCH] +htc #16887 add server-side websocket API and add example to TestServer --- .../scala/akka/http/model/ws/Message.scala | 29 ++++++++++ .../http/model/ws/UpgradeToWebsocket.scala | 28 ++++++++++ .../src/test/scala/akka/http/TestServer.scala | 56 +++++++++++++++---- 3 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/model/ws/Message.scala create mode 100644 akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala diff --git a/akka-http-core/src/main/scala/akka/http/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/model/ws/Message.scala new file mode 100644 index 0000000000..a8b73116f7 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/model/ws/Message.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.model.ws + +import akka.stream.scaladsl.Source +import akka.util.ByteString + +/** + * The ADT for Websocket messages. A message can either be binary or a text message. Each of + * those can either be strict or streamed. + */ +sealed trait Message +sealed trait TextMessage extends Message +object TextMessage { + final case class Strict(text: String) extends TextMessage { + override def toString: String = s"TextMessage.Strict($text)" + } + final case class Streamed(textStream: Source[String, _]) extends TextMessage +} + +sealed trait BinaryMessage extends Message +object BinaryMessage { + final case class Strict(data: ByteString) extends BinaryMessage { + override def toString: String = s"BinaryMessage.Strict($data)" + } + final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala new file mode 100644 index 0000000000..c63b4ea2a4 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.model.ws + +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Flow + +import akka.http.model.{ HttpHeader, HttpResponse } + +/** + * A custom header that will be added to an Websocket upgrade HttpRequest that + * enables a request handler to upgrade this connection to a Websocket connection and + * registers a Websocket handler. + * + * FIXME: needs to be able to choose subprotocols as possibly agreed on in the websocket handshake + */ +trait UpgradeToWebsocket extends HttpHeader { + /** + * The high-level interface to create a Websocket server based on "messages". + * + * Returns a response to return in a request handler that will signal the + * low-level HTTP implementation to upgrade the connection to Websocket and + * use the supplied handler to handle incoming Websocket messages. + */ + def handleMessages(handlerFlow: Flow[Message, Message, Any])(implicit mat: FlowMaterializer): HttpResponse +} diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index c22be6dee9..07d29179b0 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -4,12 +4,17 @@ package akka.http +import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.http.model._ +import akka.http.model.ws._ import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl.{ Source, Flow } import com.typesafe.config.{ ConfigFactory, Config } import HttpMethods._ +import scala.concurrent.Await + object TestServer extends App { val testConf: Config = ConfigFactory.parseString(""" akka.loglevel = INFO @@ -18,18 +23,36 @@ object TestServer extends App { implicit val system = ActorSystem("ServerTest", testConf) implicit val fm = ActorFlowMaterializer() - val binding = Http().bindAndHandleSync({ - case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ index - case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!") - case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!") - case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") - }, interface = "localhost", port = 8080) + try { + val binding = Http().bindAndHandleSync({ + case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) if req.header[UpgradeToWebsocket].isDefined ⇒ + req.header[UpgradeToWebsocket] match { + case Some(upgrade) ⇒ upgrade.handleMessages(echoWebsocketService) // needed for running the autobahn test suite + case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") + } + case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _) ⇒ + req.header[UpgradeToWebsocket] match { + case Some(upgrade) ⇒ upgrade.handleMessages(greeterWebsocketService) + case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") + } + case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ index + case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!") + case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!") + case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _) ⇒ + req.header[UpgradeToWebsocket] match { + case Some(upgrade) ⇒ upgrade.handleMessages(greeterWebsocketService) + case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") + } + case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") + }, interface = "localhost", port = 9001) - println(s"Server online at http://localhost:8080") - println("Press RETURN to stop...") - Console.readLine() - - system.shutdown() + Await.result(binding, 1.second) // throws if binding fails + println("Server online at http://localhost:9001") + println("Press RETURN to stop...") + Console.readLine() + } finally { + system.shutdown() + } ////////////// helpers ////////////// @@ -45,4 +68,15 @@ object TestServer extends App { | | |""".stripMargin)) + + def echoWebsocketService: Flow[Message, Message, Unit] = + Flow[Message] // just let message flow directly to the output + + def greeterWebsocketService: Flow[Message, Message, Unit] = + Flow[Message] + .collect { + case TextMessage.Strict(name) ⇒ TextMessage.Strict(s"Hello '$name'") + case TextMessage.Streamed(nameStream) ⇒ TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterialized (_ ⇒ ())) + // ignore binary messages + } }