From 00b4eefab5f09a24de386c344537d60674c3e8dc Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 12 Oct 2015 11:34:24 +0200 Subject: [PATCH] !htc #17275 encapsulate Websocket request arguments in new `WebsocketRequest` class --- .../engine/ws/WebsocketClientBlueprint.scala | 17 ++++++------ .../main/scala/akka/http/scaladsl/Http.scala | 27 +++++++------------ .../scaladsl/model/ws/WebsocketRequest.scala | 26 ++++++++++++++++++ .../impl/engine/ws/WebsocketClientSpec.scala | 4 ++- 4 files changed, 47 insertions(+), 27 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/WebsocketRequest.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala index b7c2bf36ed..79e2dd026a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.ws +import akka.http.scaladsl.model.ws.WebsocketRequest + import scala.collection.immutable import scala.concurrent.{ Future, Promise } @@ -32,12 +34,10 @@ object WebsocketClientBlueprint { /** * Returns a WebsocketClientLayer that can be materialized once. */ - def apply(uri: Uri, - extraHeaders: immutable.Seq[HttpHeader], - subProtocol: Option[String], + def apply(request: WebsocketRequest, settings: ClientConnectionSettings, log: LoggingAdapter): Http.WebsocketClientLayer = - (simpleTls.atopMat(handshake(uri, extraHeaders, subProtocol, settings, log))(Keep.right) atop + (simpleTls.atopMat(handshake(request, settings, log))(Keep.right) atop Websocket.framing atop Websocket.stack(serverSide = false, maskingRandomFactory = settings.websocketRandomFactory)).reversed @@ -45,16 +45,15 @@ object WebsocketClientBlueprint { * A bidi flow that injects and inspects the WS handshake and then goes out of the way. This BidiFlow * can only be materialized once. */ - def handshake(uri: Uri, - extraHeaders: immutable.Seq[HttpHeader], - subProtocol: Option[String], + def handshake(request: WebsocketRequest, settings: ClientConnectionSettings, log: LoggingAdapter): BidiFlow[ByteString, ByteString, ByteString, ByteString, Future[WebsocketUpgradeResponse]] = { + import request._ val result = Promise[WebsocketUpgradeResponse]() val valve = StreamUtils.OneTimeValve() - val (initialRequest, key) = Handshake.Client.buildRequest(uri, extraHeaders, subProtocol.toList, settings.websocketRandomFactory()) + val (initialRequest, key) = Handshake.Client.buildRequest(uri, extraHeaders, subprotocol.toList, settings.websocketRandomFactory()) val hostHeader = Host(uri.authority) val renderedInitialRequest = HttpRequestRendererFactory.renderStrict(RequestRenderingContext(initialRequest, hostHeader), settings, log) @@ -86,7 +85,7 @@ object WebsocketClientBlueprint { case NeedMoreData ⇒ ctx.pull() case ResponseStart(status, protocol, headers, entity, close) ⇒ val response = HttpResponse(status, headers, protocol = protocol) - Handshake.Client.validateResponse(response, subProtocol.toList, key) match { + Handshake.Client.validateResponse(response, subprotocol.toList, key) match { case Right(NegotiatedWebsocketSettings(protocol)) ⇒ result.success(ValidUpgrade(response, protocol)) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 13430b812d..70117668bb 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -19,7 +19,7 @@ import akka.http.impl.util.{ ReadTheDocumentationException, Java6Compat, StreamU import akka.http.impl.engine.ws.WebsocketClientBlueprint import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Host -import akka.http.scaladsl.model.ws.Message +import akka.http.scaladsl.model.ws.{ WebsocketRequest, Message } import akka.http.scaladsl.util.FastFuture import akka.japi import akka.stream.Materializer @@ -422,25 +422,22 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * * The layer is not reusable and must only be materialized once. */ - def websocketClientLayer(uri: Uri, - extraHeaders: immutable.Seq[HttpHeader] = Nil, - subprotocol: Option[String] = None, + def websocketClientLayer(request: WebsocketRequest, settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log): Http.WebsocketClientLayer = - WebsocketClientBlueprint(uri, extraHeaders, subprotocol, settings, log) + WebsocketClientBlueprint(request, settings, log) /** * Constructs a flow that once materialized establishes a Websocket connection to the given Uri. * * The layer is not reusable and must only be materialized once. */ - def websocketClientFlow(uri: Uri, - extraHeaders: immutable.Seq[HttpHeader] = Nil, - subprotocol: Option[String] = None, + def websocketClientFlow(request: WebsocketRequest, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = { + import request.uri require(uri.isAbsolute, s"Websocket request URI must be absolute but was '$uri'") val ctx = uri.scheme match { @@ -453,7 +450,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E val host = uri.authority.host.address val port = uri.effectivePort - websocketClientLayer(uri, extraHeaders, subprotocol, settings, log) + websocketClientLayer(request, settings, log) .joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, ctx, log))(Keep.left) } @@ -461,15 +458,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * Runs a single Websocket conversation given a Uri and a flow that represents the client side of the * Websocket conversation. */ - def singleWebsocketRequest[T](uri: Uri, + def singleWebsocketRequest[T](request: WebsocketRequest, clientFlow: Flow[Message, Message, T], - extraHeaders: immutable.Seq[HttpHeader] = Nil, - subprotocol: Option[String] = None, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebsocketUpgradeResponse], T) = - websocketClientFlow(uri, extraHeaders, subprotocol, localAddress, settings, httpsContext, log) + websocketClientFlow(request, localAddress, settings, httpsContext, log) .joinMat(clientFlow)(Keep.both).run() /** @@ -695,15 +690,13 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { final case class OutgoingConnection(localAddress: InetSocketAddress, remoteAddress: InetSocketAddress) /** - * Represents the response to a websocket upgrade request. + * Represents the response to a websocket upgrade request. Can either be [[ValidUpgrade]] or [[InvalidUpgradeResponse]]. */ sealed trait WebsocketUpgradeResponse { def response: HttpResponse } + final case class ValidUpgrade(response: HttpResponse, chosenSubprotocol: Option[String]) extends WebsocketUpgradeResponse final case class InvalidUpgradeResponse(response: HttpResponse, cause: String) extends WebsocketUpgradeResponse - final case class ValidUpgrade( - response: HttpResponse, - chosenSubprotocol: Option[String]) extends WebsocketUpgradeResponse /** * Represents a connection pool to a specific target host and pool configuration. diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/WebsocketRequest.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/WebsocketRequest.scala new file mode 100644 index 0000000000..e4dd04a02b --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/WebsocketRequest.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.scaladsl.model.ws + +import scala.language.implicitConversions + +import scala.collection.immutable + +import akka.http.scaladsl.model.{ HttpHeader, Uri } + +/** + * Represents a Websocket request. + * @param uri The target URI to connect to. + * @param extraHeaders Extra headers to add to the Websocket request. + * @param subprotocol A Websocket subprotocol if required. + */ +final case class WebsocketRequest( + uri: Uri, + extraHeaders: immutable.Seq[HttpHeader] = Nil, + subprotocol: Option[String] = None) +object WebsocketRequest { + implicit def fromTargetUri(uri: Uri): WebsocketRequest = WebsocketRequest(uri) + implicit def fromTargetUriString(uriString: String): WebsocketRequest = WebsocketRequest(uriString) +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala index c7a8629e43..b4937c9c60 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala @@ -302,7 +302,9 @@ class WebsocketClientSpec extends FreeSpec with Matchers with WithMaterializerSp def targetUri: Uri = "ws://example.org/ws" def clientLayer: Http.WebsocketClientLayer = - Http(system).websocketClientLayer(targetUri, subprotocol = requestedSubProtocol, settings = settings) + Http(system).websocketClientLayer( + WebsocketRequest(targetUri, subprotocol = requestedSubProtocol), + settings = settings) val (netOut, netIn, response) = { val netOut = ByteStringSinkProbe()