+htc #16887 add support for WS application-level subprotocol negotiation

This commit is contained in:
Johannes Rudolph 2015-04-22 10:40:07 +02:00
parent cd87dadf54
commit 6fef5d534c
4 changed files with 35 additions and 16 deletions

View file

@ -11,6 +11,7 @@ import akka.parboiled2.util.Base64
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import scala.collection.immutable.Seq
import scala.reflect.ClassTag
/**
@ -70,6 +71,7 @@ private[http] object Handshake {
val version = find[`Sec-WebSocket-Version`]
val origin = find[Origin]
val protocol = find[`Sec-WebSocket-Protocol`]
val supportedProtocols = protocol.toList.flatMap(_.protocols)
val extensions = find[`Sec-WebSocket-Extensions`]
def isValidKey(key: String): Boolean = Base64.rfc2045().decode(key).length == 16
@ -80,8 +82,13 @@ private[http] object Handshake {
key.exists(k isValidKey(k.key))) {
val header = new UpgradeToWebsocketLowLevel {
def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any])(implicit mat: FlowMaterializer): HttpResponse =
buildResponse(key.get, handlerFlow)
def requestedProtocols: Seq[String] = supportedProtocols
def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse = {
require(subprotocol.forall(chosen supportedProtocols.contains(chosen)),
s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]")
buildResponse(key.get, handlerFlow, subprotocol)
}
}
Some(header)
} else None
@ -106,12 +113,13 @@ private[http] object Handshake {
concatenated value to obtain a 20-byte value and base64-
encoding (see Section 4 of [RFC4648]) this 20-byte hash.
*/
def buildResponse(key: `Sec-WebSocket-Key`, handlerFlow: Flow[FrameEvent, FrameEvent, Any])(implicit mat: FlowMaterializer): HttpResponse =
def buildResponse(key: `Sec-WebSocket-Key`, handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse =
HttpResponse(
StatusCodes.SwitchingProtocols,
List(
Upgrade(List(UpgradeProtocol("websocket"))),
Connection(List("upgrade")),
`Sec-WebSocket-Accept`.forKey(key),
UpgradeToWebsocketResponseHeader(handlerFlow)))
subprotocol.map(p `Sec-WebSocket-Protocol`(Seq(p))).toList :::
List(
Upgrade(List(UpgradeProtocol("websocket"))),
Connection(List("upgrade")),
`Sec-WebSocket-Accept`.forKey(key),
UpgradeToWebsocketResponseHeader(handlerFlow)))
}

View file

@ -25,8 +25,8 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe
*
* INTERNAL API (for now)
*/
private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any])(implicit mat: FlowMaterializer): HttpResponse
private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse
override def handleMessages(handlerFlow: Flow[Message, Message, Any])(implicit mat: FlowMaterializer): HttpResponse =
handleFrames(Websocket.handleMessages(handlerFlow))
override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse =
handleFrames(Websocket.handleMessages(handlerFlow), subprotocol)
}

View file

@ -4,6 +4,7 @@
package akka.http.model.ws
import scala.collection.immutable
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
@ -13,16 +14,22 @@ import akka.http.model.{ HttpHeader, HttpResponse }
* A custom header that will be added to an Websocket upgrade HttpRequest that
* enables a request handler to upgrade this connection to a Websocket connection and
* registers a Websocket handler.
*
* FIXME: needs to be able to choose subprotocols as possibly agreed on in the websocket handshake
*/
trait UpgradeToWebsocket extends HttpHeader {
/**
* A sequence of protocols the client accepts.
*
* See http://tools.ietf.org/html/rfc6455#section-1.9
*/
def requestedProtocols: immutable.Seq[String]
/**
* The high-level interface to create a Websocket server based on "messages".
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to Websocket and
* use the supplied handler to handle incoming Websocket messages.
* use the supplied handler to handle incoming Websocket messages. Optionally,
* a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessages(handlerFlow: Flow[Message, Message, Any])(implicit mat: FlowMaterializer): HttpResponse
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse
}

View file

@ -14,6 +14,8 @@ import akka.http.util.Rendering
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import scala.collection.immutable.Seq
class WebsocketDirectivesSpec extends RoutingSpec {
"the handleWebsocketMessages directive" should {
"handle websocket requests" in {
@ -39,7 +41,9 @@ class WebsocketDirectivesSpec extends RoutingSpec {
}
def upgradeToWebsocketHeaderMock: UpgradeToWebsocket =
new InternalCustomHeader("UpgradeToWebsocketMock") with UpgradeToWebsocket {
def handleMessages(handlerFlow: Flow[Message, Message, Any])(implicit mat: FlowMaterializer): HttpResponse =
def requestedProtocols: Seq[String] = Nil
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse =
HttpResponse(StatusCodes.SwitchingProtocols)
}
}