+htc #17609 overloads to handle websocket requests with sink and source, removed materializer param

This commit is contained in:
Johannes Rudolph 2015-06-05 17:47:38 +02:00
parent 0018bc2cda
commit 9b0aa86f15
9 changed files with 84 additions and 28 deletions

View file

@ -84,7 +84,7 @@ private[http] object Handshake {
val header = new UpgradeToWebsocketLowLevel {
def requestedProtocols: Seq[String] = supportedProtocols
def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse = {
def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String]): HttpResponse = {
require(subprotocol.forall(chosen supportedProtocols.contains(chosen)),
s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]")
buildResponse(key.get, handlerFlow, subprotocol)
@ -113,7 +113,7 @@ private[http] object Handshake {
concatenated value to obtain a 20-byte value and base64-
encoding (see Section 4 of [RFC4648]) this 20-byte hash.
*/
def buildResponse(key: `Sec-WebSocket-Key`, handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse =
def buildResponse(key: `Sec-WebSocket-Key`, handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String]): HttpResponse =
HttpResponse(
StatusCodes.SwitchingProtocols,
subprotocol.map(p `Sec-WebSocket-Protocol`(Seq(p))).toList :::

View file

@ -25,8 +25,8 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe
*
* INTERNAL API (for now)
*/
private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse
private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None): HttpResponse
override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse =
override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None): HttpResponse =
handleFrames(Websocket.handleMessages(handlerFlow), subprotocol)
}

View file

@ -8,7 +8,7 @@ import akka.http.scaladsl.model.headers.CustomHeader
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
private[http] case class UpgradeToWebsocketResponseHeader(handlerFlow: Flow[FrameEvent, FrameEvent, Any])(implicit val mat: FlowMaterializer)
private[http] case class UpgradeToWebsocketResponseHeader(handlerFlow: Flow[FrameEvent, FrameEvent, Any])
extends InternalCustomHeader("UpgradeToWebsocketResponseHeader") {
}

View file

@ -8,7 +8,7 @@ import java.lang.{ Iterable ⇒ JIterable }
import akka.http.scaladsl.{ model sm }
import akka.http.javadsl.model._
import akka.stream.FlowMaterializer
import akka.stream.javadsl.Flow
import akka.stream.javadsl.{ Sink, Source, Flow }
/**
* A virtual header that Websocket requests will contain. Use [[UpgradeToWebsocket.handleMessagesWith]] to
@ -26,12 +26,26 @@ trait UpgradeToWebsocket extends sm.HttpHeader {
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given handlerFlow to handle Websocket messages from the client.
*/
def handleMessagesWith(handlerFlow: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse
def handleMessagesWith(handlerFlow: Flow[Message, Message, _]): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given handlerFlow to handle Websocket messages from the client. The given subprotocol must be one
* of the ones offered by the client.
*/
def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse
def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given inSink to handle Websocket messages from the client and the given outSource to send messages to the client.
*/
def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _]): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given inSink to handle Websocket messages from the client and the given outSource to send messages to the client.
*
* The given subprotocol must be one of the ones offered by the client.
*/
def handleMessagesWith(inSink: Sink[Message, _], outSource: Source[Message, _], subprotocol: String): HttpResponse
}

View file

@ -15,9 +15,9 @@ object Websocket {
* handle the Websocket message stream. If the request wasn't a Websocket request a response with status code 400 is
* returned.
*/
def handleWebsocketRequestWith(request: HttpRequest, handler: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse =
def handleWebsocketRequestWith(request: HttpRequest, handler: Flow[Message, Message, _]): HttpResponse =
request.asScala.header[UpgradeToWebsocket] match {
case Some(header) header.handleMessagesWith(handler, materializer)
case Some(header) header.handleMessagesWith(handler)
case None HttpResponse.create().withStatus(StatusCodes.BAD_REQUEST).withEntity("Expected websocket request")
}
}

View file

@ -7,8 +7,8 @@ package akka.http.scaladsl.model.ws
import java.lang.Iterable
import scala.collection.immutable
import akka.stream
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.javadsl
import akka.stream.scaladsl.{ Sink, Source, Flow }
import akka.http.javadsl.{ model jm }
import akka.http.scaladsl.model.HttpResponse
@ -30,19 +30,63 @@ trait UpgradeToWebsocket extends jm.ws.UpgradeToWebsocket {
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to Websocket and
* use the supplied handler to handle incoming Websocket messages. Optionally,
* a subprotocol out of the ones requested by the client can be chosen.
* use the supplied handler to handle incoming Websocket messages.
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse
def handleMessages(handlerFlow: Flow[Message, Message, Any],
subprotocol: Option[String] = None): HttpResponse
/**
* The high-level interface to create a Websocket server based on "messages".
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to Websocket and
* use the supplied inSink to consume messages received from the client and
* the supplied outSource to produce message to sent to the client.
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessagesWithSinkSource(inSink: Sink[Message, Any],
outSource: Source[Message, Any],
subprotocol: Option[String] = None): HttpResponse =
handleMessages(Flow.wrap(inSink, outSource)((_, _) ()), subprotocol)
import scala.collection.JavaConverters._
def getRequestedProtocols(): Iterable[String] = requestedProtocols.asJava
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], materializer: FlowMaterializer): HttpResponse =
handleMessages(adaptJavaFlow(handlerFlow))(materializer)
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse =
handleMessages(adaptJavaFlow(handlerFlow), subprotocol = Some(subprotocol))(materializer)
/**
* Java API
*/
def getRequestedProtocols(): Iterable[String] = requestedProtocols.asJava
/**
* Java API
*/
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _]): HttpResponse =
handleMessages(adaptJavaFlow(handlerFlow))
/**
* Java API
*/
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _], subprotocol: String): HttpResponse =
handleMessages(adaptJavaFlow(handlerFlow), subprotocol = Some(subprotocol))
/**
* Java API
*/
def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: javadsl.Source[jm.ws.Message, _]): HttpResponse =
handleMessages(createScalaFlow(inSink, outSource))
/**
* Java API
*/
def handleMessagesWith(inSink: stream.javadsl.Sink[jm.ws.Message, _],
outSource: javadsl.Source[jm.ws.Message, _],
subprotocol: String): HttpResponse =
handleMessages(createScalaFlow(inSink, outSource), subprotocol = Some(subprotocol))
private[this] def adaptJavaFlow(handlerFlow: stream.javadsl.Flow[jm.ws.Message, jm.ws.Message, _]): Flow[Message, Message, Any] =
Flow[Message].map(jm.ws.Message.adapt).via(handlerFlow.asScala).map(_.asScala)
private[this] def createScalaFlow(inSink: stream.javadsl.Sink[jm.ws.Message, _], outSource: stream.javadsl.Source[jm.ws.Message, _]): Flow[Message, Message, Any] =
adaptJavaFlow(Flow.wrap(inSink.asScala, outSource.asScala)((_, _) ()).asJava)
}

View file

@ -39,9 +39,9 @@ public class JavaTestServer {
System.out.println("Handling request to " + request.getUri());
if (request.getUri().path().equals("/"))
return Websocket.handleWebsocketRequestWith(request, echoMessages(), materializer);
return Websocket.handleWebsocketRequestWith(request, echoMessages());
else if (request.getUri().path().equals("/greeter"))
return Websocket.handleWebsocketRequestWith(request, greeter(), materializer);
return Websocket.handleWebsocketRequestWith(request, greeter());
else
return JavaApiTestCases.handleRequest(request);
}

View file

@ -40,7 +40,7 @@ class WebsocketDirectivesSpec extends RoutingSpec {
new InternalCustomHeader("UpgradeToWebsocketMock") with UpgradeToWebsocket {
def requestedProtocols: Seq[String] = Nil
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String])(implicit mat: FlowMaterializer): HttpResponse =
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String]): HttpResponse =
HttpResponse(StatusCodes.SwitchingProtocols)
}
}

View file

@ -18,10 +18,8 @@ trait WebsocketDirectives {
* [[ExpectedWebsocketRequestRejection]].
*/
def handleWebsocketMessages(handler: Flow[Message, Message, Any]): Route =
extractFlowMaterializer { implicit mat
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) complete(upgrade.handleMessages(handler))
case None reject(ExpectedWebsocketRequestRejection)
}
}
}