diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala index d4db4e37ec..d738cfcdf6 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala @@ -84,7 +84,7 @@ private[http] object Handshake { val header = new UpgradeToWebsocketLowLevel { def requestedProtocols: Seq[String] = supportedProtocols - def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse = { + def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String]): HttpResponse = { require(subprotocol.forall(chosen ⇒ supportedProtocols.contains(chosen)), s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]") buildResponse(key.get, handlerFlow, subprotocol) @@ -113,7 +113,7 @@ private[http] object Handshake { concatenated value to obtain a 20-byte value and base64- encoding (see Section 4 of [RFC4648]) this 20-byte hash. */ - def buildResponse(key: `Sec-WebSocket-Key`, handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse = + def buildResponse(key: `Sec-WebSocket-Key`, handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String]): HttpResponse = HttpResponse( StatusCodes.SwitchingProtocols, subprotocol.map(p ⇒ `Sec-WebSocket-Protocol`(Seq(p))).toList ::: diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala index 8c192276c6..c42ef2de0c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala @@ -25,8 +25,8 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe * * INTERNAL API (for now) */ - private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse + private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None): HttpResponse - override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse = + override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None): HttpResponse = handleFrames(Websocket.handleMessages(handlerFlow), subprotocol) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala index f75fdd7a44..f74ea88c7d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketsResponseHeader.scala @@ -8,7 +8,7 @@ import akka.http.scaladsl.model.headers.CustomHeader import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow -private[http] case class UpgradeToWebsocketResponseHeader(handlerFlow: Flow[FrameEvent, FrameEvent, Any])(implicit val mat: FlowMaterializer) +private[http] case class UpgradeToWebsocketResponseHeader(handlerFlow: Flow[FrameEvent, FrameEvent, Any]) extends InternalCustomHeader("UpgradeToWebsocketResponseHeader") { } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala index ff973ab5c8..8c76744ef6 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala @@ -8,7 +8,7 @@ import java.lang.{ Iterable ⇒ JIterable } import akka.http.scaladsl.{ model ⇒ sm } import akka.http.javadsl.model._ import akka.stream.FlowMaterializer -import akka.stream.javadsl.Flow +import akka.stream.javadsl.{ Sink, Source, Flow } /** * A virtual header that Websocket requests will contain. Use [[UpgradeToWebsocket.handleMessagesWith]] to @@ -26,12 +26,26 @@ trait UpgradeToWebsocket extends sm.HttpHeader { * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards * use the given handlerFlow to handle Websocket messages from the client. */ - def handleMessagesWith(handlerFlow: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse + def handleMessagesWith(handlerFlow: Flow[Message, Message, _]): HttpResponse /** * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards * use the given handlerFlow to handle Websocket messages from the client. The given subprotocol must be one * of the ones offered by the client. */ - def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse + def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String): HttpResponse + + /** + * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards + * use the given inSink to handle Websocket messages from the client and the given outSource to send messages to the client. + */ + def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _]): HttpResponse + + /** + * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards + * use the given inSink to handle Websocket messages from the client and the given outSource to send messages to the client. + * + * The given subprotocol must be one of the ones offered by the client. + */ + def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _], subprotocol: String): HttpResponse } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala index fd519a54f9..689eaa7eec 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala @@ -15,9 +15,9 @@ object Websocket { * handle the Websocket message stream. If the request wasn't a Websocket request a response with status code 400 is * returned. */ - def handleWebsocketRequestWith(request: HttpRequest, handler: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse = + def handleWebsocketRequestWith(request: HttpRequest, handler: Flow[Message, Message, _]): HttpResponse = request.asScala.header[UpgradeToWebsocket] match { - case Some(header) ⇒ header.handleMessagesWith(handler, materializer) + case Some(header) ⇒ header.handleMessagesWith(handler) case None ⇒ HttpResponse.create().withStatus(StatusCodes.BAD_REQUEST).withEntity("Expected websocket request") } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala index 8b30c0e423..7b2cbfe5ad 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/UpgradeToWebsocket.scala @@ -7,8 +7,8 @@ package akka.http.scaladsl.model.ws import java.lang.Iterable import scala.collection.immutable import akka.stream -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.javadsl +import akka.stream.scaladsl.{ Sink, Source, Flow } import akka.http.javadsl.{ model ⇒ jm } import akka.http.scaladsl.model.HttpResponse @@ -30,19 +30,63 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket { * * Returns a response to return in a request handler that will signal the * low-level HTTP implementation to upgrade the connection to Websocket and - * use the supplied handler to handle incoming Websocket messages. Optionally, - * a subprotocol out of the ones requested by the client can be chosen. + * use the supplied handler to handle incoming Websocket messages. + * + * Optionally, a subprotocol out of the ones requested by the client can be chosen. */ - def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse + def handleMessages(handlerFlow: Flow[Message, Message, Any], + subprotocol: Option[String] = None): HttpResponse + + /** + * The high-level interface to create a Websocket server based on "messages". + * + * Returns a response to return in a request handler that will signal the + * low-level HTTP implementation to upgrade the connection to Websocket and + * use the supplied inSink to consume messages received from the client and + * the supplied outSource to produce message to sent to the client. + * + * Optionally, a subprotocol out of the ones requested by the client can be chosen. + */ + def handleMessagesWithSinkSource(inSink: Sink[Message, Any], + outSource: Source[Message, Any], + subprotocol: Option[String] = None): HttpResponse = + handleMessages(Flow.wrap(inSink, outSource)((_, _) ⇒ ()), subprotocol) import scala.collection.JavaConverters._ - def getRequestedProtocols(): Iterable[String] = requestedProtocols.asJava - def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], materializer: FlowMaterializer): HttpResponse = - handleMessages(adaptJavaFlow(handlerFlow))(materializer) - def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse = - handleMessages(adaptJavaFlow(handlerFlow), subprotocol = Some(subprotocol))(materializer) + /** + * Java API + */ + def getRequestedProtocols(): Iterable[String] = requestedProtocols.asJava + + /** + * Java API + */ + def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _]): HttpResponse = + handleMessages(adaptJavaFlow(handlerFlow)) + + /** + * Java API + */ + def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], subprotocol: String): HttpResponse = + handleMessages(adaptJavaFlow(handlerFlow), subprotocol = Some(subprotocol)) + + /** + * Java API + */ + def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: javadsl.Source[jm.ws.Message, _]): HttpResponse = + handleMessages(createScalaFlow(inSink, outSource)) + + /** + * Java API + */ + def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _], + outSource: javadsl.Source[jm.ws.Message, _], + subprotocol: String): HttpResponse = + handleMessages(createScalaFlow(inSink, outSource), subprotocol = Some(subprotocol)) private[this] def adaptJavaFlow(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _]): Flow[Message, Message, Any] = Flow[Message].map(jm.ws.Message.adapt).via(handlerFlow.asScala).map(_.asScala) + private[this] def createScalaFlow(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: stream.javadsl.Source[jm.ws.Message, _]): Flow[Message, Message, Any] = + adaptJavaFlow(Flow.wrap(inSink.asScala, outSource.asScala)((_, _) ⇒ ()).asJava) } diff --git a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java index dbd7a52a81..3635a7bf6b 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java @@ -39,9 +39,9 @@ public class JavaTestServer { System.out.println("Handling request to " + request.getUri()); if (request.getUri().path().equals("/")) - return Websocket.handleWebsocketRequestWith(request, echoMessages(), materializer); + return Websocket.handleWebsocketRequestWith(request, echoMessages()); else if (request.getUri().path().equals("/greeter")) - return Websocket.handleWebsocketRequestWith(request, greeter(), materializer); + return Websocket.handleWebsocketRequestWith(request, greeter()); else return JavaApiTestCases.handleRequest(request); } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/WebsocketDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/WebsocketDirectivesSpec.scala index 5331fd0620..5de239676f 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/WebsocketDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/WebsocketDirectivesSpec.scala @@ -40,7 +40,7 @@ class WebsocketDirectivesSpec extends RoutingSpec { new InternalCustomHeader("UpgradeToWebsocketMock") with UpgradeToWebsocket { def requestedProtocols: Seq[String] = Nil - def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse = + def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String]): HttpResponse = HttpResponse(StatusCodes.SwitchingProtocols) } } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/WebsocketDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/WebsocketDirectives.scala index ab02d28611..7ea1bd6ab6 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/WebsocketDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/WebsocketDirectives.scala @@ -18,10 +18,8 @@ trait WebsocketDirectives { * [[ExpectedWebsocketRequestRejection]]. */ def handleWebsocketMessages(handler: Flow[Message, Message, Any]): Route = - extractFlowMaterializer { implicit mat ⇒ - optionalHeaderValueByType[UpgradeToWebsocket]() { - case Some(upgrade) ⇒ complete(upgrade.handleMessages(handler)) - case None ⇒ reject(ExpectedWebsocketRequestRejection) - } + optionalHeaderValueByType[UpgradeToWebsocket]() { + case Some(upgrade) ⇒ complete(upgrade.handleMessages(handler)) + case None ⇒ reject(ExpectedWebsocketRequestRejection) } }