Merge pull request #17300 from spray/w/add-websocket-java-api

+htc #16887 the Java side of the Websocket server-side API
This commit is contained in:
drewhk 2015-04-24 15:31:39 +02:00
commit a892ce1f0f
5 changed files with 264 additions and 3 deletions

View file

@ -0,0 +1,145 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.model.ws
import akka.http.model.ws
import akka.stream.javadsl.Source
import akka.util.ByteString
/**
* Represents a Websocket message. A message can either be a binary message or a text message.
*/
sealed abstract class Message {
/**
* Is this message a text message? If true, [[asTextMessage]] will return this
* text message, if false, [[asBinaryMessage]] will return this binary message.
*/
def isText: Boolean
/**
* Returns this TextMessage if it is a text message, throws otherwise.
*/
def asTextMessage: TextMessage
/**
* Returns this BinaryMessage if it is a binary message, throws otherwise.
*/
def asBinaryMessage: BinaryMessage
def asScala: ws.Message
}
object Message {
def adapt(msg: ws.Message): Message = msg match {
case t: ws.TextMessage TextMessage.adapt(t)
case b: ws.BinaryMessage BinaryMessage.adapt(b)
}
}
/**
* Represents a Websocket text message. A text message can either be strict in which case
* the complete data is already available or it can be streamed in which case [[getStreamedText]]
* will return a Source streaming the data as it comes in.
*/
abstract class TextMessage extends Message {
/** Is this message a strict one? */
def isStrict: Boolean
/**
* Returns the strict message text if this message is strict, throws otherwise.
*/
def getStrictText: String
/**
* Returns a source of the text message data.
*/
def getStreamedText: Source[String, _]
def isText: Boolean = true
def asTextMessage: TextMessage = this
def asBinaryMessage: BinaryMessage = throw new ClassCastException("This message is not a binary message.")
def asScala: ws.TextMessage
}
object TextMessage {
/**
* Creates a strict text message.
*/
def create(text: String): TextMessage =
new TextMessage {
def isStrict: Boolean = true
def getStreamedText: Source[String, _] = Source.single(text)
def getStrictText: String = text
def asScala: ws.TextMessage = ws.TextMessage.Strict(text)
}
/**
* Creates a streamed text message.
*/
def create(textStream: Source[String, _]): TextMessage =
new TextMessage {
def isStrict: Boolean = false
def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.")
def getStreamedText: Source[String, _] = textStream
def asScala: ws.TextMessage = ws.TextMessage.Streamed(textStream.asScala)
}
def adapt(msg: ws.TextMessage): TextMessage = msg match {
case ws.TextMessage.Strict(text) create(text)
case ws.TextMessage.Streamed(stream) create(stream.asJava)
}
}
abstract class BinaryMessage extends Message {
/** Is this message a strict one? */
def isStrict: Boolean
/**
* Returns the strict message data if this message is strict, throws otherwise.
*/
def getStrictData: ByteString
/**
* Returns a source of the binary message data.
*/
def getStreamedData: Source[ByteString, _]
def isText: Boolean = false
def asTextMessage: TextMessage = throw new ClassCastException("This message is not a text message.")
def asBinaryMessage: BinaryMessage = this
def asScala: ws.BinaryMessage
}
object BinaryMessage {
/**
* Creates a strict binary message.
*/
def create(data: ByteString): BinaryMessage =
new BinaryMessage {
def isStrict: Boolean = true
def getStreamedData: Source[ByteString, _] = Source.single(data)
def getStrictData: ByteString = data
def asScala: ws.BinaryMessage = ws.BinaryMessage.Strict(data)
}
/**
* Creates a streamed binary message.
*/
def create(dataStream: Source[ByteString, _]): BinaryMessage =
new BinaryMessage {
def isStrict: Boolean = false
def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.")
def getStreamedData: Source[ByteString, _] = dataStream
def asScala: ws.BinaryMessage = ws.BinaryMessage.Streamed(dataStream.asScala)
}
def adapt(msg: ws.BinaryMessage): BinaryMessage = msg match {
case ws.BinaryMessage.Strict(data) create(data)
case ws.BinaryMessage.Streamed(stream) create(stream.asJava)
}
}

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.model.ws
import java.lang.{ Iterable JIterable }
import akka.http.model
import akka.http.model.japi.HttpResponse
import akka.stream.FlowMaterializer
import akka.stream.javadsl.Flow
/**
* A virtual header that Websocket requests will contain. Use [[UpgradeToWebsocket.handleMessagesWith]] to
* create a Websocket handshake response and handle the Websocket message stream with the given handler.
*/
trait UpgradeToWebsocket extends model.HttpHeader {
/**
* Returns the sequence of protocols the client accepts.
*
* See http://tools.ietf.org/html/rfc6455#section-1.9
*/
def getRequestedProtocols(): JIterable[String]
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given handlerFlow to handle Websocket messages from the client.
*/
def handleMessagesWith(handlerFlow: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse
/**
* Returns a response that can be used to answer a Websocket handshake request. The connection will afterwards
* use the given handlerFlow to handle Websocket messages from the client. The given subprotocol must be one
* of the ones offered by the client.
*/
def handleMessagesWith(handlerFlow: Flow[Message, Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.model.ws
import akka.stream.FlowMaterializer
import akka.stream.javadsl.Flow
import akka.http.model.japi.JavaMapping.Implicits._
import akka.http.model.japi.{ StatusCodes, HttpResponse, HttpRequest }
object Websocket {
/**
* If a given request is a Websocket request a response accepting the request is returned using the given handler to
* handle the Websocket message stream. If the request wasn't a Websocket request a response with status code 400 is
* returned.
*/
def handleWebsocketRequestWith(request: HttpRequest, handler: Flow[Message, Message, _], materializer: FlowMaterializer): HttpResponse =
request.asScala.header[UpgradeToWebsocket] match {
case Some(header) header.handleMessagesWith(handler, materializer)
case None HttpResponse.create().withStatus(StatusCodes.BAD_REQUEST).withEntity("Expected websocket request")
}
}

View file

@ -4,18 +4,23 @@
package akka.http.model.ws
import java.lang.Iterable
import akka.http.javadsl
import akka.stream
import scala.collection.immutable
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.http.model.{ HttpHeader, HttpResponse }
import akka.http.model.HttpResponse
/**
* A custom header that will be added to an Websocket upgrade HttpRequest that
* enables a request handler to upgrade this connection to a Websocket connection and
* registers a Websocket handler.
*/
trait UpgradeToWebsocket extends HttpHeader {
trait UpgradeToWebsocket extends javadsl.model.ws.UpgradeToWebsocket {
/**
* A sequence of protocols the client accepts.
*
@ -32,4 +37,15 @@ trait UpgradeToWebsocket extends HttpHeader {
* a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse
import scala.collection.JavaConverters._
def getRequestedProtocols(): Iterable[String] = requestedProtocols.asJava
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[javadsl.model.ws.Message, javadsl.model.ws.Message, _], materializer: FlowMaterializer): HttpResponse =
handleMessages(adaptJavaFlow(handlerFlow))(materializer)
def handleMessagesWith(handlerFlow: stream.javadsl.Flow[javadsl.model.ws.Message, javadsl.model.ws.Message, _], subprotocol: String, materializer: FlowMaterializer): HttpResponse =
handleMessages(adaptJavaFlow(handlerFlow), subprotocol = Some(subprotocol))(materializer)
private[this] def adaptJavaFlow(handlerFlow: stream.javadsl.Flow[javadsl.model.ws.Message, javadsl.model.ws.Message, _]): Flow[Message, Message, Any] =
Flow[Message].map(javadsl.model.ws.Message.adapt).via(handlerFlow.asScala).map(_.asScala)
}

View file

@ -8,13 +8,19 @@ import akka.actor.ActorSystem;
import akka.japi.function.Function;
import akka.japi.function.Procedure;
import akka.http.engine.server.ServerSettings;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.Websocket;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorFlowMaterializer;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import java.io.BufferedReader;
import java.io.InputStreamReader;
@ -32,7 +38,13 @@ public class JavaTestServer {
new Function<HttpRequest, HttpResponse>() {
public HttpResponse apply(HttpRequest request) throws Exception {
System.out.println("Handling request to " + request.getUri());
return JavaApiTestCases.handleRequest(request);
if (request.getUri().path().equals("/"))
return Websocket.handleWebsocketRequestWith(request, echoMessages(), materializer);
else if (request.getUri().path().equals("/greeter"))
return Websocket.handleWebsocketRequestWith(request, greeter(), materializer);
else
return JavaApiTestCases.handleRequest(request);
}
}, "localhost", 8080, materializer);
@ -43,4 +55,29 @@ public class JavaTestServer {
system.shutdown();
}
}
public static Flow<Message, Message, BoxedUnit> echoMessages() {
return Flow.create(); // the identity operation
}
public static Flow<Message, Message, BoxedUnit> greeter() {
return
Flow.<Message>create()
.collect(new JavaPartialFunction<Message, Message>() {
@Override
public Message apply(Message msg, boolean isCheck) throws Exception {
if (isCheck)
if (msg.isText()) return null;
else throw noMatch();
else
return handleTextMessage(msg.asTextMessage());
}
});
}
public static TextMessage handleTextMessage(TextMessage msg) {
if (msg.isStrict())
return TextMessage.create("Hello "+msg.getStrictText());
else
return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText()));
}
}