+htc #16887 add server-side websocket API and add example to TestServer

This commit is contained in:
Johannes Rudolph 2015-04-21 14:54:00 +02:00
parent a58859c77c
commit 2402bedc6a
3 changed files with 102 additions and 11 deletions

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.model.ws
import akka.stream.scaladsl.Source
import akka.util.ByteString
/**
* The ADT for Websocket messages. A message can either be binary or a text message. Each of
* those can either be strict or streamed.
*/
sealed trait Message
sealed trait TextMessage extends Message
object TextMessage {
final case class Strict(text: String) extends TextMessage {
override def toString: String = s"TextMessage.Strict($text)"
}
final case class Streamed(textStream: Source[String, _]) extends TextMessage
}
sealed trait BinaryMessage extends Message
object BinaryMessage {
final case class Strict(data: ByteString) extends BinaryMessage {
override def toString: String = s"BinaryMessage.Strict($data)"
}
final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage
}

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.model.ws
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.http.model.{ HttpHeader, HttpResponse }
/**
* A custom header that will be added to an Websocket upgrade HttpRequest that
* enables a request handler to upgrade this connection to a Websocket connection and
* registers a Websocket handler.
*
* FIXME: needs to be able to choose subprotocols as possibly agreed on in the websocket handshake
*/
trait UpgradeToWebsocket extends HttpHeader {
/**
* The high-level interface to create a Websocket server based on "messages".
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to Websocket and
* use the supplied handler to handle incoming Websocket messages.
*/
def handleMessages(handlerFlow: Flow[Message, Message, Any])(implicit mat: FlowMaterializer): HttpResponse
}

View file

@ -4,12 +4,17 @@
package akka.http
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.model._
import akka.http.model.ws._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ Source, Flow }
import com.typesafe.config.{ ConfigFactory, Config }
import HttpMethods._
import scala.concurrent.Await
object TestServer extends App {
val testConf: Config = ConfigFactory.parseString("""
akka.loglevel = INFO
@ -18,18 +23,36 @@ object TestServer extends App {
implicit val system = ActorSystem("ServerTest", testConf)
implicit val fm = ActorFlowMaterializer()
val binding = Http().bindAndHandleSync({
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}, interface = "localhost", port = 8080)
try {
val binding = Http().bindAndHandleSync({
case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) if req.header[UpgradeToWebsocket].isDefined
req.header[UpgradeToWebsocket] match {
case Some(upgrade) upgrade.handleMessages(echoWebsocketService) // needed for running the autobahn test suite
case None HttpResponse(400, entity = "Not a valid websocket request!")
}
case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _)
req.header[UpgradeToWebsocket] match {
case Some(upgrade) upgrade.handleMessages(greeterWebsocketService)
case None HttpResponse(400, entity = "Not a valid websocket request!")
}
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")
case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _)
req.header[UpgradeToWebsocket] match {
case Some(upgrade) upgrade.handleMessages(greeterWebsocketService)
case None HttpResponse(400, entity = "Not a valid websocket request!")
}
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}, interface = "localhost", port = 9001)
println(s"Server online at http://localhost:8080")
println("Press RETURN to stop...")
Console.readLine()
system.shutdown()
Await.result(binding, 1.second) // throws if binding fails
println("Server online at http://localhost:9001")
println("Press RETURN to stop...")
Console.readLine()
} finally {
system.shutdown()
}
////////////// helpers //////////////
@ -45,4 +68,15 @@ object TestServer extends App {
| </ul>
| </body>
|</html>""".stripMargin))
def echoWebsocketService: Flow[Message, Message, Unit] =
Flow[Message] // just let message flow directly to the output
def greeterWebsocketService: Flow[Message, Message, Unit] =
Flow[Message]
.collect {
case TextMessage.Strict(name) TextMessage.Strict(s"Hello '$name'")
case TextMessage.Streamed(nameStream) TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterialized (_ ()))
// ignore binary messages
}
}