Merge pull request #19495 from akka/wip-19427-UpgradeToWebsocket-Flow-RK

#19427 do not require Flow in UpgradeToWebsocket
This commit is contained in:
Konrad Malawski 2016-01-18 13:05:05 +01:00
commit 7dbd7dfc46
9 changed files with 45 additions and 42 deletions

View file

@ -6,7 +6,7 @@ package akka.http.impl.engine.rendering
import akka.http.impl.engine.ws.{ FrameEvent, UpgradeToWebsocketResponseHeader }
import akka.http.scaladsl.model.ws.Message
import akka.stream.{ Outlet, Inlet, Attributes, FlowShape }
import akka.stream.{ Outlet, Inlet, Attributes, FlowShape, Graph }
import scala.annotation.tailrec
import akka.event.LoggingAdapter
@ -253,7 +253,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
sealed trait CloseMode
case object DontClose extends CloseMode
case object CloseConnection extends CloseMode
case class SwitchToWebsocket(handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]) extends CloseMode
case class SwitchToWebsocket(handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends CloseMode
}
/**
@ -270,5 +270,5 @@ private[http] sealed trait ResponseRenderingOutput
/** INTERNAL API */
private[http] object ResponseRenderingOutput {
private[http] case class HttpData(bytes: ByteString) extends ResponseRenderingOutput
private[http] case class SwitchToWebsocket(httpResponseBytes: ByteString, handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]) extends ResponseRenderingOutput
private[http] case class SwitchToWebsocket(httpResponseBytes: ByteString, handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends ResponseRenderingOutput
}

View file

@ -421,7 +421,7 @@ private[http] object HttpServerBluePrint {
/*
* Websocket support
*/
def switchToWebsocket(handlerFlow: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]): Unit = {
def switchToWebsocket(handlerFlow: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]): Unit = {
val frameHandler = handlerFlow match {
case Left(frameHandler) frameHandler
case Right(messageHandler)

View file

@ -5,18 +5,14 @@
package akka.http.impl.engine.ws
import java.util.Random
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.reflect.ClassTag
import akka.stream.scaladsl.Flow
import akka.http.impl.util._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.ws.{ Message, UpgradeToWebsocket }
import akka.http.scaladsl.model._
import akka.stream.{ Graph, FlowShape }
/**
* Server-side implementation of the Websocket handshake
@ -95,16 +91,16 @@ private[http] object Handshake {
val header = new UpgradeToWebsocketLowLevel {
def requestedProtocols: Seq[String] = clientSupportedSubprotocols
def handle(handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]], subprotocol: Option[String]): HttpResponse = {
def handle(handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]], subprotocol: Option[String]): HttpResponse = {
require(subprotocol.forall(chosen clientSupportedSubprotocols.contains(chosen)),
s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]")
buildResponse(key.get, handler, subprotocol)
}
def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String]): HttpResponse =
def handleFrames(handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any], subprotocol: Option[String]): HttpResponse =
handle(Left(handlerFlow), subprotocol)
override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None): HttpResponse =
override def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String] = None): HttpResponse =
handle(Right(handlerFlow), subprotocol)
}
Some(header)
@ -130,7 +126,7 @@ private[http] object Handshake {
concatenated value to obtain a 20-byte value and base64-
encoding (see Section 4 of [RFC4648]) this 20-byte hash.
*/
def buildResponse(key: `Sec-WebSocket-Key`, handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]], subprotocol: Option[String]): HttpResponse =
def buildResponse(key: `Sec-WebSocket-Key`, handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]], subprotocol: Option[String]): HttpResponse =
HttpResponse(
StatusCodes.SwitchingProtocols,
subprotocol.map(p `Sec-WebSocket-Protocol`(Seq(p))).toList :::

View file

@ -6,7 +6,7 @@ package akka.http.impl.engine.ws
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.ws.UpgradeToWebsocket
import akka.stream.scaladsl.Flow
import akka.stream.{ Graph, FlowShape }
/**
* Currently internal API to handle FrameEvents directly.
@ -24,5 +24,5 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe
*
* INTERNAL API (for now)
*/
private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None): HttpResponse
private[http] def handleFrames(handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any], subprotocol: Option[String] = None): HttpResponse
}

View file

@ -6,13 +6,13 @@ package akka.http.impl.engine.ws
import akka.http.scaladsl.model.headers.CustomHeader
import akka.http.scaladsl.model.ws.Message
import akka.stream.scaladsl.Flow
import akka.stream.{ Graph, FlowShape }
private[http] final case class UpgradeToWebsocketResponseHeader(handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]])
private[http] final case class UpgradeToWebsocketResponseHeader(handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]])
extends InternalCustomHeader("UpgradeToWebsocketResponseHeader")
private[http] abstract class InternalCustomHeader(val name: String) extends CustomHeader {
override def suppressRendering: Boolean = true
def value(): String = ""
}
}

View file

@ -7,8 +7,7 @@ package akka.http.impl.util
import java.net.InetAddress
import java.{ util ju, lang jl }
import akka.japi.Pair
import akka.stream.javadsl
import akka.stream.scaladsl
import akka.stream.{ Graph, FlowShape, javadsl, scaladsl }
import scala.collection.immutable
import scala.reflect.ClassTag
@ -117,6 +116,16 @@ private[http] object JavaMapping {
}
}
implicit def graphFlowMapping[JIn, SIn, JOut, SOut, M](implicit inMapping: JavaMapping[JIn, SIn], outMapping: JavaMapping[JOut, SOut]): JavaMapping[Graph[FlowShape[JIn, JOut], M], Graph[FlowShape[SIn, SOut], M]] =
new JavaMapping[Graph[FlowShape[JIn, JOut], M], Graph[FlowShape[SIn, SOut], M]] {
def toScala(javaObject: Graph[FlowShape[JIn, JOut], M]): S =
scaladsl.Flow[SIn].map(inMapping.toJava).viaMat(javaObject)(scaladsl.Keep.right).map(outMapping.toScala)
def toJava(scalaObject: Graph[FlowShape[SIn, SOut], M]): J =
javadsl.Flow.fromGraph {
scaladsl.Flow[JIn].map(inMapping.toScala).viaMat(scalaObject)(scaladsl.Keep.right).map(outMapping.toJava)
}
}
def scalaToJavaAdapterFlow[J, S](implicit mapping: JavaMapping[J, S]): scaladsl.Flow[S, J, Unit] =
scaladsl.Flow[S].map(mapping.toJava)
def javaToScalaAdapterFlow[J, S](implicit mapping: JavaMapping[J, S]): scaladsl.Flow[J, S, Unit] =

View file

@ -8,7 +8,7 @@ import java.lang.{ Iterable ⇒ JIterable }
import akka.http.scaladsl.{ model sm }
import akka.http.javadsl.model._
import akka.stream.Materializer
import akka.stream.javadsl.{ Sink, Source, Flow }
import akka.stream._
/**
* A virtual header that Websocket requests will contain. Use [[UpgradeToWebsocket.handleMessagesWith]] to
@ -26,20 +26,20 @@ trait UpgradeToWebsocket extends sm.HttpHeader {
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given handlerFlow to handle Websocket messages from the client.
*/
def handleMessagesWith(handlerFlow: Flow[Message, Message, _]): HttpResponse
def handleMessagesWith(handlerFlow: Graph[FlowShape[Message, Message], _ <: Any]): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given handlerFlow to handle Websocket messages from the client. The given subprotocol must be one
* of the ones offered by the client.
*/
def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String): HttpResponse
def handleMessagesWith(handlerFlow: Graph[FlowShape[Message, Message], _ <: Any], subprotocol: String): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given inSink to handle Websocket messages from the client and the given outSource to send messages to the client.
*/
def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _]): HttpResponse
def handleMessagesWith(inSink: Graph[SinkShape[Message], _ <: Any], outSource: Graph[SourceShape[Message], _ <: Any]): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
@ -47,5 +47,5 @@ trait UpgradeToWebsocket extends sm.HttpHeader {
*
* The given subprotocol must be one of the ones offered by the client.
*/
def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _], subprotocol: String): HttpResponse
def handleMessagesWith(inSink: Graph[SinkShape[Message], _ <: Any], outSource: Graph[SourceShape[Message], _ <: Any], subprotocol: String): HttpResponse
}

View file

@ -8,9 +8,7 @@ import java.lang.Iterable
import akka.http.impl.util.JavaMapping
import scala.collection.immutable
import akka.stream
import akka.stream.javadsl
import akka.stream.scaladsl.{ Sink, Source, Flow }
import akka.stream._
import akka.http.javadsl.{ model jm }
import akka.http.scaladsl.model.HttpResponse
@ -36,7 +34,7 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket {
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessages(handlerFlow: Flow[Message, Message, Any],
def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any],
subprotocol: Option[String] = None): HttpResponse
/**
@ -49,10 +47,10 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket {
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessagesWithSinkSource(inSink: Sink[Message, Any],
outSource: Source[Message, Any],
def handleMessagesWithSinkSource(inSink: Graph[SinkShape[Message], Any],
outSource: Graph[SourceShape[Message], Any],
subprotocol: Option[String] = None): HttpResponse =
handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)
handleMessages(scaladsl.Flow.fromSinkAndSource(inSink, outSource), subprotocol)
import scala.collection.JavaConverters._
@ -64,29 +62,29 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket {
/**
* Java API
*/
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _]): HttpResponse =
def handleMessagesWith(handlerFlow: Graph[FlowShape[jm.ws.Message, jm.ws.Message], _ <: Any]): HttpResponse =
handleMessages(JavaMapping.toScala(handlerFlow))
/**
* Java API
*/
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], subprotocol: String): HttpResponse =
def handleMessagesWith(handlerFlow: Graph[FlowShape[jm.ws.Message, jm.ws.Message], _ <: Any], subprotocol: String): HttpResponse =
handleMessages(JavaMapping.toScala(handlerFlow), subprotocol = Some(subprotocol))
/**
* Java API
*/
def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: javadsl.Source[jm.ws.Message, _]): HttpResponse =
def handleMessagesWith(inSink: Graph[SinkShape[jm.ws.Message], _ <: Any], outSource: Graph[SourceShape[jm.ws.Message], _ <: Any]): HttpResponse =
handleMessages(createScalaFlow(inSink, outSource))
/**
* Java API
*/
def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _],
outSource: javadsl.Source[jm.ws.Message, _],
def handleMessagesWith(inSink: Graph[SinkShape[jm.ws.Message], _ <: Any],
outSource: Graph[SourceShape[jm.ws.Message], _ <: Any],
subprotocol: String): HttpResponse =
handleMessages(createScalaFlow(inSink, outSource), subprotocol = Some(subprotocol))
private[this] def createScalaFlow(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: stream.javadsl.Source[jm.ws.Message, _]): Flow[Message, Message, Any] =
JavaMapping.toScala(Flow.fromSinkAndSourceMat(inSink.asScala, outSource.asScala)((_, _) ()).asJava)
private[this] def createScalaFlow(inSink: Graph[SinkShape[jm.ws.Message], _ <: Any], outSource: Graph[SourceShape[jm.ws.Message], _ <: Any]): Graph[FlowShape[Message, Message], Unit] =
JavaMapping.toScala(scaladsl.Flow.fromSinkAndSourceMat(inSink, outSource)(scaladsl.Keep.none): Graph[FlowShape[jm.ws.Message, jm.ws.Message], Unit])
}

View file

@ -8,9 +8,9 @@ import akka.http.impl.engine.ws.InternalCustomHeader
import akka.http.scaladsl.model.headers.{ UpgradeProtocol, Upgrade, `Sec-WebSocket-Protocol` }
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest, Uri }
import akka.http.scaladsl.model.ws.{ UpgradeToWebsocket, Message }
import akka.stream.scaladsl.Flow
import scala.collection.immutable
import akka.stream.{ Graph, FlowShape }
import akka.stream.scaladsl.Flow
trait WSTestRequestBuilding { self: RouteTest
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(): HttpRequest =
@ -18,7 +18,7 @@ trait WSTestRequestBuilding { self: RouteTest ⇒
.addHeader(new InternalCustomHeader("UpgradeToWebsocketTestHeader") with UpgradeToWebsocket {
def requestedProtocols: immutable.Seq[String] = subprotocols.toList
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String]): HttpResponse = {
def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String]): HttpResponse = {
clientSideHandler.join(handlerFlow).run()
HttpResponse(StatusCodes.SwitchingProtocols,
headers =