!htc #17275 encapsulate Websocket request arguments in new WebsocketRequest class

This commit is contained in:
Johannes Rudolph 2015-10-12 11:34:24 +02:00
parent 08aa903408
commit 00b4eefab5
4 changed files with 47 additions and 27 deletions

View file

@ -4,6 +4,8 @@
package akka.http.impl.engine.ws
import akka.http.scaladsl.model.ws.WebsocketRequest
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
@ -32,12 +34,10 @@ object WebsocketClientBlueprint {
/**
* Returns a WebsocketClientLayer that can be materialized once.
*/
def apply(uri: Uri,
extraHeaders: immutable.Seq[HttpHeader],
subProtocol: Option[String],
def apply(request: WebsocketRequest,
settings: ClientConnectionSettings,
log: LoggingAdapter): Http.WebsocketClientLayer =
(simpleTls.atopMat(handshake(uri, extraHeaders, subProtocol, settings, log))(Keep.right) atop
(simpleTls.atopMat(handshake(request, settings, log))(Keep.right) atop
Websocket.framing atop
Websocket.stack(serverSide = false, maskingRandomFactory = settings.websocketRandomFactory)).reversed
@ -45,16 +45,15 @@ object WebsocketClientBlueprint {
* A bidi flow that injects and inspects the WS handshake and then goes out of the way. This BidiFlow
* can only be materialized once.
*/
def handshake(uri: Uri,
extraHeaders: immutable.Seq[HttpHeader],
subProtocol: Option[String],
def handshake(request: WebsocketRequest,
settings: ClientConnectionSettings,
log: LoggingAdapter): BidiFlow[ByteString, ByteString, ByteString, ByteString, Future[WebsocketUpgradeResponse]] = {
import request._
val result = Promise[WebsocketUpgradeResponse]()
val valve = StreamUtils.OneTimeValve()
val (initialRequest, key) = Handshake.Client.buildRequest(uri, extraHeaders, subProtocol.toList, settings.websocketRandomFactory())
val (initialRequest, key) = Handshake.Client.buildRequest(uri, extraHeaders, subprotocol.toList, settings.websocketRandomFactory())
val hostHeader = Host(uri.authority)
val renderedInitialRequest =
HttpRequestRendererFactory.renderStrict(RequestRenderingContext(initialRequest, hostHeader), settings, log)
@ -86,7 +85,7 @@ object WebsocketClientBlueprint {
case NeedMoreData ctx.pull()
case ResponseStart(status, protocol, headers, entity, close)
val response = HttpResponse(status, headers, protocol = protocol)
Handshake.Client.validateResponse(response, subProtocol.toList, key) match {
Handshake.Client.validateResponse(response, subprotocol.toList, key) match {
case Right(NegotiatedWebsocketSettings(protocol))
result.success(ValidUpgrade(response, protocol))

View file

@ -19,7 +19,7 @@ import akka.http.impl.util.{ ReadTheDocumentationException, Java6Compat, StreamU
import akka.http.impl.engine.ws.WebsocketClientBlueprint
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.ws.{ WebsocketRequest, Message }
import akka.http.scaladsl.util.FastFuture
import akka.japi
import akka.stream.Materializer
@ -422,25 +422,22 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientLayer(uri: Uri,
extraHeaders: immutable.Seq[HttpHeader] = Nil,
subprotocol: Option[String] = None,
def websocketClientLayer(request: WebsocketRequest,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log): Http.WebsocketClientLayer =
WebsocketClientBlueprint(uri, extraHeaders, subprotocol, settings, log)
WebsocketClientBlueprint(request, settings, log)
/**
* Constructs a flow that once materialized establishes a Websocket connection to the given Uri.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientFlow(uri: Uri,
extraHeaders: immutable.Seq[HttpHeader] = Nil,
subprotocol: Option[String] = None,
def websocketClientFlow(request: WebsocketRequest,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = {
import request.uri
require(uri.isAbsolute, s"Websocket request URI must be absolute but was '$uri'")
val ctx = uri.scheme match {
@ -453,7 +450,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
val host = uri.authority.host.address
val port = uri.effectivePort
websocketClientLayer(uri, extraHeaders, subprotocol, settings, log)
websocketClientLayer(request, settings, log)
.joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, ctx, log))(Keep.left)
}
@ -461,15 +458,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* Runs a single Websocket conversation given a Uri and a flow that represents the client side of the
* Websocket conversation.
*/
def singleWebsocketRequest[T](uri: Uri,
def singleWebsocketRequest[T](request: WebsocketRequest,
clientFlow: Flow[Message, Message, T],
extraHeaders: immutable.Seq[HttpHeader] = Nil,
subprotocol: Option[String] = None,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebsocketUpgradeResponse], T) =
websocketClientFlow(uri, extraHeaders, subprotocol, localAddress, settings, httpsContext, log)
websocketClientFlow(request, localAddress, settings, httpsContext, log)
.joinMat(clientFlow)(Keep.both).run()
/**
@ -695,15 +690,13 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
final case class OutgoingConnection(localAddress: InetSocketAddress, remoteAddress: InetSocketAddress)
/**
* Represents the response to a websocket upgrade request.
* Represents the response to a websocket upgrade request. Can either be [[ValidUpgrade]] or [[InvalidUpgradeResponse]].
*/
sealed trait WebsocketUpgradeResponse {
def response: HttpResponse
}
final case class ValidUpgrade(response: HttpResponse, chosenSubprotocol: Option[String]) extends WebsocketUpgradeResponse
final case class InvalidUpgradeResponse(response: HttpResponse, cause: String) extends WebsocketUpgradeResponse
final case class ValidUpgrade(
response: HttpResponse,
chosenSubprotocol: Option[String]) extends WebsocketUpgradeResponse
/**
* Represents a connection pool to a specific target host and pool configuration.

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl.model.ws
import scala.language.implicitConversions
import scala.collection.immutable
import akka.http.scaladsl.model.{ HttpHeader, Uri }
/**
* Represents a Websocket request.
* @param uri The target URI to connect to.
* @param extraHeaders Extra headers to add to the Websocket request.
* @param subprotocol A Websocket subprotocol if required.
*/
final case class WebsocketRequest(
uri: Uri,
extraHeaders: immutable.Seq[HttpHeader] = Nil,
subprotocol: Option[String] = None)
object WebsocketRequest {
implicit def fromTargetUri(uri: Uri): WebsocketRequest = WebsocketRequest(uri)
implicit def fromTargetUriString(uriString: String): WebsocketRequest = WebsocketRequest(uriString)
}

View file

@ -302,7 +302,9 @@ class WebsocketClientSpec extends FreeSpec with Matchers with WithMaterializerSp
def targetUri: Uri = "ws://example.org/ws"
def clientLayer: Http.WebsocketClientLayer =
Http(system).websocketClientLayer(targetUri, subprotocol = requestedSubProtocol, settings = settings)
Http(system).websocketClientLayer(
WebsocketRequest(targetUri, subprotocol = requestedSubProtocol),
settings = settings)
val (netOut, netIn, response) = {
val netOut = ByteStringSinkProbe()