From 3588a98a4ba563729f315af05c661be4a1531a77 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 24 Apr 2015 14:40:27 +0200 Subject: [PATCH] +htc #16887 the Java side of the Websocket server-side API --- .../akka/http/javadsl/model/ws/Message.scala | 145 ++++++++++++++++++ .../javadsl/model/ws/UpgradeToWebsocket.scala | 38 +++++ .../http/javadsl/model/ws/Websocket.scala | 25 +++ .../http/model/ws/UpgradeToWebsocket.scala | 20 ++- .../akka/http/model/japi/JavaTestServer.java | 39 ++++- 5 files changed, 264 insertions(+), 3 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala create mode 100644 akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala create mode 100644 akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala new file mode 100644 index 0000000000..aada9e0f7b --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala @@ -0,0 +1,145 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl.model.ws + +import akka.http.model.ws +import akka.stream.javadsl.Source +import akka.util.ByteString + +/** + * Represents a Websocket message. A message can either be a binary message or a text message. + */ +sealed abstract class Message { + /** + * Is this message a text message? If true, [[asTextMessage]] will return this + * text message, if false, [[asBinaryMessage]] will return this binary message. + */ + def isText: Boolean + + /** + * Returns this TextMessage if it is a text message, throws otherwise. + */ + def asTextMessage: TextMessage + + /** + * Returns this BinaryMessage if it is a binary message, throws otherwise. + */ + def asBinaryMessage: BinaryMessage + + def asScala: ws.Message +} + +object Message { + def adapt(msg: ws.Message): Message = msg match { + case t: ws.TextMessage ⇒ TextMessage.adapt(t) + case b: ws.BinaryMessage ⇒ BinaryMessage.adapt(b) + } +} + +/** + * Represents a Websocket text message. A text message can either be strict in which case + * the complete data is already available or it can be streamed in which case [[getStreamedText]] + * will return a Source streaming the data as it comes in. + */ +abstract class TextMessage extends Message { + /** Is this message a strict one? */ + def isStrict: Boolean + + /** + * Returns the strict message text if this message is strict, throws otherwise. + */ + def getStrictText: String + + /** + * Returns a source of the text message data. + */ + def getStreamedText: Source[String, _] + + def isText: Boolean = true + def asTextMessage: TextMessage = this + def asBinaryMessage: BinaryMessage = throw new ClassCastException("This message is not a binary message.") + def asScala: ws.TextMessage +} + +object TextMessage { + /** + * Creates a strict text message. + */ + def create(text: String): TextMessage = + new TextMessage { + def isStrict: Boolean = true + def getStreamedText: Source[String, _] = Source.single(text) + def getStrictText: String = text + + def asScala: ws.TextMessage = ws.TextMessage.Strict(text) + } + /** + * Creates a streamed text message. + */ + def create(textStream: Source[String, _]): TextMessage = + new TextMessage { + def isStrict: Boolean = false + def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.") + def getStreamedText: Source[String, _] = textStream + + def asScala: ws.TextMessage = ws.TextMessage.Streamed(textStream.asScala) + } + + def adapt(msg: ws.TextMessage): TextMessage = msg match { + case ws.TextMessage.Strict(text) ⇒ create(text) + case ws.TextMessage.Streamed(stream) ⇒ create(stream.asJava) + } +} + +abstract class BinaryMessage extends Message { + /** Is this message a strict one? */ + def isStrict: Boolean + + /** + * Returns the strict message data if this message is strict, throws otherwise. + */ + def getStrictData: ByteString + + /** + * Returns a source of the binary message data. + */ + def getStreamedData: Source[ByteString, _] + + def isText: Boolean = false + def asTextMessage: TextMessage = throw new ClassCastException("This message is not a text message.") + def asBinaryMessage: BinaryMessage = this + def asScala: ws.BinaryMessage +} + +object BinaryMessage { + /** + * Creates a strict binary message. + */ + def create(data: ByteString): BinaryMessage = + new BinaryMessage { + def isStrict: Boolean = true + def getStreamedData: Source[ByteString, _] = Source.single(data) + def getStrictData: ByteString = data + + def asScala: ws.BinaryMessage = ws.BinaryMessage.Strict(data) + } + + /** + * Creates a streamed binary message. + */ + def create(dataStream: Source[ByteString, _]): BinaryMessage = + new BinaryMessage { + def isStrict: Boolean = false + def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.") + def getStreamedData: Source[ByteString, _] = dataStream + + def asScala: ws.BinaryMessage = ws.BinaryMessage.Streamed(dataStream.asScala) + } + + def adapt(msg: ws.BinaryMessage): BinaryMessage = msg match { + case ws.BinaryMessage.Strict(data) ⇒ create(data) + case ws.BinaryMessage.Streamed(stream) ⇒ create(stream.asJava) + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala new file mode 100644 index 0000000000..b865bdd724 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/UpgradeToWebsocket.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl.model.ws + +import java.lang.{ Iterable ⇒ JIterable } + +import akka.http.model +import akka.http.model.japi.HttpResponse +import akka.stream.FlowMaterializer +import akka.stream.javadsl.Flow + +/** + * A virtual header that Websocket requests will contain. Use [[UpgradeToWebsocket.handleMessagesWith]] to + * create a Websocket handshake response and handle the Websocket message stream with the given handler. + */ +trait UpgradeToWebsocket extends model.HttpHeader { + /** + * Returns the sequence of protocols the client accepts. + * + * See http://tools.ietf.org/html/rfc6455#section-1.9 + */ + def getRequestedProtocols(): JIterable[String] + + /** + * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards + * use the given handlerFlow to handle Websocket messages from the client. + */ + def handleMessagesWith(handlerFlow: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse + + /** + * Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards + * use the given handlerFlow to handle Websocket messages from the client. The given subprotocol must be one + * of the ones offered by the client. + */ + def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse +} diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala new file mode 100644 index 0000000000..b8a30dfa4a --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Websocket.scala @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl.model.ws + +import akka.stream.FlowMaterializer +import akka.stream.javadsl.Flow + +import akka.http.model.japi.JavaMapping.Implicits._ + +import akka.http.model.japi.{ StatusCodes, HttpResponse, HttpRequest } + +object Websocket { + /** + * If a given request is a Websocket request a response accepting the request is returned using the given handler to + * handle the Websocket message stream. If the request wasn't a Websocket request a response with status code 400 is + * returned. + */ + def handleWebsocketRequestWith(request: HttpRequest, handler: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse = + request.asScala.header[UpgradeToWebsocket] match { + case Some(header) ⇒ header.handleMessagesWith(handler, materializer) + case None ⇒ HttpResponse.create().withStatus(StatusCodes.BAD_REQUEST).withEntity("Expected websocket request") + } +} diff --git a/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala b/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala index 7325d46835..9dd4288f3d 100644 --- a/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala +++ b/akka-http-core/src/main/scala/akka/http/model/ws/UpgradeToWebsocket.scala @@ -4,18 +4,23 @@ package akka.http.model.ws +import java.lang.Iterable + +import akka.http.javadsl +import akka.stream + import scala.collection.immutable import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow -import akka.http.model.{ HttpHeader, HttpResponse } +import akka.http.model.HttpResponse /** * A custom header that will be added to an Websocket upgrade HttpRequest that * enables a request handler to upgrade this connection to a Websocket connection and * registers a Websocket handler. */ -trait UpgradeToWebsocket extends HttpHeader { +trait UpgradeToWebsocket extends javadsl.model.ws.UpgradeToWebsocket { /** * A sequence of protocols the client accepts. * @@ -32,4 +37,15 @@ trait UpgradeToWebsocket extends HttpHeader { * a subprotocol out of the ones requested by the client can be chosen. */ def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse + + import scala.collection.JavaConverters._ + def getRequestedProtocols(): Iterable[String] = requestedProtocols.asJava + def handleMessagesWith(handlerFlow: stream.javadsl.Flow[javadsl.model.ws.Message, javadsl.model.ws.Message, _], materializer: FlowMaterializer): HttpResponse = + handleMessages(adaptJavaFlow(handlerFlow))(materializer) + + def handleMessagesWith(handlerFlow: stream.javadsl.Flow[javadsl.model.ws.Message, javadsl.model.ws.Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse = + handleMessages(adaptJavaFlow(handlerFlow), subprotocol = Some(subprotocol))(materializer) + + private[this] def adaptJavaFlow(handlerFlow: stream.javadsl.Flow[javadsl.model.ws.Message, javadsl.model.ws.Message, _]): Flow[Message, Message, Any] = + Flow[Message].map(javadsl.model.ws.Message.adapt).via(handlerFlow.asScala).map(_.asScala) } diff --git a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java index 0e24caeb93..accdf03913 100644 --- a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java @@ -6,8 +6,13 @@ package akka.http.model.japi; import akka.actor.ActorSystem; import akka.http.engine.server.ServerSettings; +import akka.http.javadsl.model.ws.Message; +import akka.http.javadsl.model.ws.TextMessage; +import akka.http.javadsl.model.ws.Websocket; +import akka.japi.JavaPartialFunction; import akka.stream.ActorFlowMaterializer; import akka.stream.FlowMaterializer; +import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.javadsl.japi.Function; @@ -15,6 +20,7 @@ import akka.stream.javadsl.japi.Procedure; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; import java.io.BufferedReader; import java.io.InputStreamReader; @@ -32,7 +38,13 @@ public class JavaTestServer { new Function() { public HttpResponse apply(HttpRequest request) throws Exception { System.out.println("Handling request to " + request.getUri()); - return JavaApiTestCases.handleRequest(request); + + if (request.getUri().path().equals("/")) + return Websocket.handleWebsocketRequestWith(request, echoMessages(), materializer); + else if (request.getUri().path().equals("/greeter")) + return Websocket.handleWebsocketRequestWith(request, greeter(), materializer); + else + return JavaApiTestCases.handleRequest(request); } }, "localhost", 8080, materializer); @@ -43,4 +55,29 @@ public class JavaTestServer { system.shutdown(); } } + + public static Flow echoMessages() { + return Flow.create(); // the identity operation + } + + public static Flow greeter() { + return + Flow.create() + .collect(new JavaPartialFunction() { + @Override + public Message apply(Message msg, boolean isCheck) throws Exception { + if (isCheck) + if (msg.isText()) return null; + else throw noMatch(); + else + return handleTextMessage(msg.asTextMessage()); + } + }); + } + public static TextMessage handleTextMessage(TextMessage msg) { + if (msg.isStrict()) + return TextMessage.create("Hello "+msg.getStrictText()); + else + return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText())); + } }