diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala index 448f0944b2..7863046e8b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala @@ -6,7 +6,7 @@ package akka.http.impl.engine.rendering import akka.http.impl.engine.ws.{ FrameEvent, UpgradeToWebsocketResponseHeader } import akka.http.scaladsl.model.ws.Message -import akka.stream.{ Outlet, Inlet, Attributes, FlowShape } +import akka.stream.{ Outlet, Inlet, Attributes, FlowShape, Graph } import scala.annotation.tailrec import akka.event.LoggingAdapter @@ -253,7 +253,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser sealed trait CloseMode case object DontClose extends CloseMode case object CloseConnection extends CloseMode - case class SwitchToWebsocket(handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]) extends CloseMode + case class SwitchToWebsocket(handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends CloseMode } /** @@ -270,5 +270,5 @@ private[http] sealed trait ResponseRenderingOutput /** INTERNAL API */ private[http] object ResponseRenderingOutput { private[http] case class HttpData(bytes: ByteString) extends ResponseRenderingOutput - private[http] case class SwitchToWebsocket(httpResponseBytes: ByteString, handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]) extends ResponseRenderingOutput + private[http] case class SwitchToWebsocket(httpResponseBytes: ByteString, handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends ResponseRenderingOutput } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index fa3d8ec042..2216cb5b43 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -421,7 +421,7 @@ private[http] object HttpServerBluePrint { /* * Websocket support */ - def switchToWebsocket(handlerFlow: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]): Unit = { + def switchToWebsocket(handlerFlow: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]): Unit = { val frameHandler = handlerFlow match { case Left(frameHandler) ⇒ frameHandler case Right(messageHandler) ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala index e0909b08f8..56b63e798c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala @@ -5,18 +5,14 @@ package akka.http.impl.engine.ws import java.util.Random - import scala.collection.immutable import scala.collection.immutable.Seq import scala.reflect.ClassTag - -import akka.stream.scaladsl.Flow - import akka.http.impl.util._ - import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.ws.{ Message, UpgradeToWebsocket } import akka.http.scaladsl.model._ +import akka.stream.{ Graph, FlowShape } /** * Server-side implementation of the Websocket handshake @@ -95,16 +91,16 @@ private[http] object Handshake { val header = new UpgradeToWebsocketLowLevel { def requestedProtocols: Seq[String] = clientSupportedSubprotocols - def handle(handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]], subprotocol: Option[String]): HttpResponse = { + def handle(handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]], subprotocol: Option[String]): HttpResponse = { require(subprotocol.forall(chosen ⇒ clientSupportedSubprotocols.contains(chosen)), s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]") buildResponse(key.get, handler, subprotocol) } - def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String]): HttpResponse = + def handleFrames(handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any], subprotocol: Option[String]): HttpResponse = handle(Left(handlerFlow), subprotocol) - override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None): HttpResponse = + override def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String] = None): HttpResponse = handle(Right(handlerFlow), subprotocol) } Some(header) @@ -130,7 +126,7 @@ private[http] object Handshake { concatenated value to obtain a 20-byte value and base64- encoding (see Section 4 of [RFC4648]) this 20-byte hash. */ - def buildResponse(key: `Sec-WebSocket-Key`, handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]], subprotocol: Option[String]): HttpResponse = + def buildResponse(key: `Sec-WebSocket-Key`, handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]], subprotocol: Option[String]): HttpResponse = HttpResponse( StatusCodes.SwitchingProtocols, subprotocol.map(p ⇒ `Sec-WebSocket-Protocol`(Seq(p))).toList ::: diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala index 82716d58a0..830e42ef4e 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala @@ -6,7 +6,7 @@ package akka.http.impl.engine.ws import akka.http.scaladsl.model.HttpResponse import akka.http.scaladsl.model.ws.UpgradeToWebsocket -import akka.stream.scaladsl.Flow +import akka.stream.{ Graph, FlowShape } /** * Currently internal API to handle FrameEvents directly. @@ -24,5 +24,5 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe * * INTERNAL API (for now) */ - private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None): HttpResponse + private[http] def handleFrames(handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any], subprotocol: Option[String] = None): HttpResponse } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala index 37970943b9..f1db3dcada 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala @@ -6,13 +6,13 @@ package akka.http.impl.engine.ws import akka.http.scaladsl.model.headers.CustomHeader import akka.http.scaladsl.model.ws.Message -import akka.stream.scaladsl.Flow +import akka.stream.{ Graph, FlowShape } -private[http] final case class UpgradeToWebsocketResponseHeader(handler: Either[Flow[FrameEvent, FrameEvent, Any], Flow[Message, Message, Any]]) +private[http] final case class UpgradeToWebsocketResponseHeader(handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends InternalCustomHeader("UpgradeToWebsocketResponseHeader") private[http] abstract class InternalCustomHeader(val name: String) extends CustomHeader { override def suppressRendering: Boolean = true def value(): String = "" -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala b/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala index 26efac5a19..cf31d1482a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala @@ -7,8 +7,7 @@ package akka.http.impl.util import java.net.InetAddress import java.{ util ⇒ ju, lang ⇒ jl } import akka.japi.Pair -import akka.stream.javadsl -import akka.stream.scaladsl +import akka.stream.{ Graph, FlowShape, javadsl, scaladsl } import scala.collection.immutable import scala.reflect.ClassTag @@ -117,6 +116,16 @@ private[http] object JavaMapping { } } + implicit def graphFlowMapping[JIn, SIn, JOut, SOut, M](implicit inMapping: JavaMapping[JIn, SIn], outMapping: JavaMapping[JOut, SOut]): JavaMapping[Graph[FlowShape[JIn, JOut], M], Graph[FlowShape[SIn, SOut], M]] = + new JavaMapping[Graph[FlowShape[JIn, JOut], M], Graph[FlowShape[SIn, SOut], M]] { + def toScala(javaObject: Graph[FlowShape[JIn, JOut], M]): S = + scaladsl.Flow[SIn].map(inMapping.toJava).viaMat(javaObject)(scaladsl.Keep.right).map(outMapping.toScala) + def toJava(scalaObject: Graph[FlowShape[SIn, SOut], M]): J = + javadsl.Flow.fromGraph { + scaladsl.Flow[JIn].map(inMapping.toScala).viaMat(scalaObject)(scaladsl.Keep.right).map(outMapping.toJava) + } + } + def scalaToJavaAdapterFlow[J, S](implicit mapping: JavaMapping[J, S]): scaladsl.Flow[S, J, Unit] = scaladsl.Flow[S].map(mapping.toJava) def javaToScalaAdapterFlow[J, S](implicit mapping: JavaMapping[J, S]): scaladsl.Flow[J, S, Unit] = diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala index d265eba875..b820573e38 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala @@ -8,7 +8,7 @@ import java.lang.{ Iterable ⇒ JIterable } import akka.http.scaladsl.{ model ⇒ sm } import akka.http.javadsl.model._ import akka.stream.Materializer -import akka.stream.javadsl.{ Sink, Source, Flow } +import akka.stream._ /** * A virtual header that Websocket requests will contain. Use [[UpgradeToWebsocket.handleMessagesWith]] to @@ -26,20 +26,20 @@ trait UpgradeToWebsocket extends sm.HttpHeader { * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards * use the given handlerFlow to handle Websocket messages from the client. */ - def handleMessagesWith(handlerFlow: Flow[Message, Message, _]): HttpResponse + def handleMessagesWith(handlerFlow: Graph[FlowShape[Message, Message], _ <: Any]): HttpResponse /** * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards * use the given handlerFlow to handle Websocket messages from the client. The given subprotocol must be one * of the ones offered by the client. */ - def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String): HttpResponse + def handleMessagesWith(handlerFlow: Graph[FlowShape[Message, Message], _ <: Any], subprotocol: String): HttpResponse /** * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards * use the given inSink to handle Websocket messages from the client and the given outSource to send messages to the client. */ - def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _]): HttpResponse + def handleMessagesWith(inSink: Graph[SinkShape[Message], _ <: Any], outSource: Graph[SourceShape[Message], _ <: Any]): HttpResponse /** * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards @@ -47,5 +47,5 @@ trait UpgradeToWebsocket extends sm.HttpHeader { * * The given subprotocol must be one of the ones offered by the client. */ - def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _], subprotocol: String): HttpResponse + def handleMessagesWith(inSink: Graph[SinkShape[Message], _ <: Any], outSource: Graph[SourceShape[Message], _ <: Any], subprotocol: String): HttpResponse } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala index 5508161834..6456247986 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala @@ -8,9 +8,7 @@ import java.lang.Iterable import akka.http.impl.util.JavaMapping import scala.collection.immutable -import akka.stream -import akka.stream.javadsl -import akka.stream.scaladsl.{ Sink, Source, Flow } +import akka.stream._ import akka.http.javadsl.{ model ⇒ jm } import akka.http.scaladsl.model.HttpResponse @@ -36,7 +34,7 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket { * * Optionally, a subprotocol out of the ones requested by the client can be chosen. */ - def handleMessages(handlerFlow: Flow[Message, Message, Any], + def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String] = None): HttpResponse /** @@ -49,10 +47,10 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket { * * Optionally, a subprotocol out of the ones requested by the client can be chosen. */ - def handleMessagesWithSinkSource(inSink: Sink[Message, Any], - outSource: Source[Message, Any], + def handleMessagesWithSinkSource(inSink: Graph[SinkShape[Message], Any], + outSource: Graph[SourceShape[Message], Any], subprotocol: Option[String] = None): HttpResponse = - handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol) + handleMessages(scaladsl.Flow.fromSinkAndSource(inSink, outSource), subprotocol) import scala.collection.JavaConverters._ @@ -64,29 +62,29 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket { /** * Java API */ - def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _]): HttpResponse = + def handleMessagesWith(handlerFlow: Graph[FlowShape[jm.ws.Message, jm.ws.Message], _ <: Any]): HttpResponse = handleMessages(JavaMapping.toScala(handlerFlow)) /** * Java API */ - def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], subprotocol: String): HttpResponse = + def handleMessagesWith(handlerFlow: Graph[FlowShape[jm.ws.Message, jm.ws.Message], _ <: Any], subprotocol: String): HttpResponse = handleMessages(JavaMapping.toScala(handlerFlow), subprotocol = Some(subprotocol)) /** * Java API */ - def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: javadsl.Source[jm.ws.Message, _]): HttpResponse = + def handleMessagesWith(inSink: Graph[SinkShape[jm.ws.Message], _ <: Any], outSource: Graph[SourceShape[jm.ws.Message], _ <: Any]): HttpResponse = handleMessages(createScalaFlow(inSink, outSource)) /** * Java API */ - def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _], - outSource: javadsl.Source[jm.ws.Message, _], + def handleMessagesWith(inSink: Graph[SinkShape[jm.ws.Message], _ <: Any], + outSource: Graph[SourceShape[jm.ws.Message], _ <: Any], subprotocol: String): HttpResponse = handleMessages(createScalaFlow(inSink, outSource), subprotocol = Some(subprotocol)) - private[this] def createScalaFlow(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: stream.javadsl.Source[jm.ws.Message, _]): Flow[Message, Message, Any] = - JavaMapping.toScala(Flow.fromSinkAndSourceMat(inSink.asScala, outSource.asScala)((_, _) ⇒ ()).asJava) + private[this] def createScalaFlow(inSink: Graph[SinkShape[jm.ws.Message], _ <: Any], outSource: Graph[SourceShape[jm.ws.Message], _ <: Any]): Graph[FlowShape[Message, Message], Unit] = + JavaMapping.toScala(scaladsl.Flow.fromSinkAndSourceMat(inSink, outSource)(scaladsl.Keep.none): Graph[FlowShape[jm.ws.Message, jm.ws.Message], Unit]) } diff --git a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala index d210435771..e050e22eb9 100644 --- a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala +++ b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala @@ -8,9 +8,9 @@ import akka.http.impl.engine.ws.InternalCustomHeader import akka.http.scaladsl.model.headers.{ UpgradeProtocol, Upgrade, `Sec-WebSocket-Protocol` } import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest, Uri } import akka.http.scaladsl.model.ws.{ UpgradeToWebsocket, Message } -import akka.stream.scaladsl.Flow - import scala.collection.immutable +import akka.stream.{ Graph, FlowShape } +import akka.stream.scaladsl.Flow trait WSTestRequestBuilding { self: RouteTest ⇒ def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(): HttpRequest = @@ -18,7 +18,7 @@ trait WSTestRequestBuilding { self: RouteTest ⇒ .addHeader(new InternalCustomHeader("UpgradeToWebsocketTestHeader") with UpgradeToWebsocket { def requestedProtocols: immutable.Seq[String] = subprotocols.toList - def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String]): HttpResponse = { + def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String]): HttpResponse = { clientSideHandler.join(handlerFlow).run() HttpResponse(StatusCodes.SwitchingProtocols, headers =