add java api for websocket testkit #21184

And additionally adds unit test for WebSocketDirectives #20466
This commit is contained in:
Hawstein 2016-09-02 19:28:49 +08:00 committed by Johan Andrén
parent df4a6270e6
commit b9a05aff96
14 changed files with 342 additions and 23 deletions

View file

@ -8,7 +8,7 @@ package docs.circuitbreaker
import scala.concurrent.duration._
import akka.pattern.CircuitBreaker
import akka.pattern.pipe
import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.actor.{ Actor, ActorLogging, ActorRef }
import scala.concurrent.Future
@ -44,7 +44,7 @@ class DangerousActor extends Actor with ActorLogging {
}
class TellPatternActor(recipient : ActorRef) extends Actor with ActorLogging {
class TellPatternActor(recipient: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
val breaker =

View file

@ -0,0 +1,121 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.http.javadsl.server.directives;
import akka.NotUsed;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.Uri;
import akka.http.javadsl.model.headers.SecWebSocketProtocol;
import akka.http.javadsl.model.ws.BinaryMessage;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.javadsl.testkit.WSProbe;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class WebSocketDirectivesExamplesTest extends JUnitRouteTest {
@Test
public void testHandleWebSocketMessages() {
//#handleWebSocketMessages
final Flow<Message, Message, NotUsed> greeter = Flow.of(Message.class).mapConcat(msg -> {
if (msg instanceof TextMessage) {
final TextMessage tm = (TextMessage) msg;
final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!")));
return Collections.singletonList(ret);
} else if (msg instanceof BinaryMessage) {
final BinaryMessage bm = (BinaryMessage) msg;
bm.getStreamedData().runWith(Sink.ignore(), materializer());
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Unsupported message type!");
}
});
final Route websocketRoute = path("greeter", () ->
handleWebSocketMessages(greeter)
);
// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe.create(system(), materializer());
// WS creates a WebSocket request for testing
testRoute(websocketRoute).run(WS(Uri.create("/greeter"), wsClient.flow(), materializer()))
.assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
// manually run a WS conversation
wsClient.sendMessage("Peter");
wsClient.expectMessage("Hello Peter!");
wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
wsClient.sendMessage("John");
wsClient.expectMessage("Hello John!");
wsClient.sendCompletion();
wsClient.expectCompletion();
//#handleWebSocketMessages
}
@Test
public void testHandleWebSocketMessagesForProtocol() {
//#handleWebSocketMessagesForProtocol
final Flow<Message, Message, NotUsed> greeterService = Flow.of(Message.class).mapConcat(msg -> {
if (msg instanceof TextMessage) {
final TextMessage tm = (TextMessage) msg;
final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!")));
return Collections.singletonList(ret);
} else if (msg instanceof BinaryMessage) {
final BinaryMessage bm = (BinaryMessage) msg;
bm.getStreamedData().runWith(Sink.ignore(), materializer());
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Unsupported message type!");
}
});
final Flow<Message, Message, NotUsed> echoService = Flow.of(Message.class).buffer(1, OverflowStrategy.backpressure());
final Route websocketMultipleProtocolRoute = path("services", () ->
route(
handleWebSocketMessagesForProtocol(greeterService, "greeter"),
handleWebSocketMessagesForProtocol(echoService, "echo")
)
);
// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe.create(system(), materializer());
// WS creates a WebSocket request for testing
testRoute(websocketMultipleProtocolRoute)
.run(WS(Uri.create("/services"), wsClient.flow(), materializer(), Arrays.asList("other", "echo")))
.assertHeaderExists(SecWebSocketProtocol.create("echo"));
wsClient.sendMessage("Peter");
wsClient.expectMessage("Peter");
wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectMessage(ByteString.fromString("abcdef"));
wsClient.sendMessage("John");
wsClient.expectMessage("John");
wsClient.sendCompletion();
wsClient.expectCompletion();
//#handleWebSocketMessagesForProtocol
}
}

View file

@ -16,4 +16,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke
Example
-------
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.
.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessages

View file

@ -20,4 +20,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke
Example
-------
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.
.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessagesForProtocol

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.model.headers;
import akka.http.impl.util.Util;
/**
* Model for the `Sec-WebSocket-Protocol` header.
*/
public abstract class SecWebSocketProtocol extends akka.http.scaladsl.model.HttpHeader {
public abstract Iterable<String> getProtocols();
public static SecWebSocketProtocol create(String... protocols) {
return new akka.http.scaladsl.model.headers.Sec$minusWebSocket$minusProtocol(Util.convertArray(protocols));
}
}

View file

@ -11,7 +11,7 @@ import akka.util.ByteString
/**
* Represents a WebSocket message. A message can either be a binary message or a text message.
*/
sealed abstract class Message {
abstract class Message {
/**
* Is this message a text message? If true, [[asTextMessage]] will return this
* text message, if false, [[asBinaryMessage]] will return this binary message.
@ -150,4 +150,4 @@ object BinaryMessage {
case sm.ws.BinaryMessage.Strict(data) create(data)
case bm: sm.ws.BinaryMessage create(bm.dataStream.asJava)
}
}
}

View file

@ -766,11 +766,14 @@ private[http] object `Sec-WebSocket-Protocol` extends ModeledCompanion[`Sec-WebS
* INTERNAL API
*/
private[http] final case class `Sec-WebSocket-Protocol`(protocols: immutable.Seq[String])
extends RequestResponseHeader {
extends jm.headers.SecWebSocketProtocol with RequestResponseHeader {
require(protocols.nonEmpty, "Sec-WebSocket-Protocol.protocols must not be empty")
import `Sec-WebSocket-Protocol`.protocolsRenderer
protected[http] def renderValue[R <: Rendering](r: R): r.type = r ~~ protocols
protected def companion = `Sec-WebSocket-Protocol`
/** Java API */
override def getProtocols: Iterable[String] = protocols.asJava
}
// http://tools.ietf.org/html/rfc6455#section-4.3

View file

@ -4,6 +4,7 @@
package akka.http.scaladsl.model.ws
import akka.stream.javadsl
import akka.stream.scaladsl.Source
import akka.util.ByteString
@ -11,18 +12,22 @@ import akka.util.ByteString
/**
* The ADT for WebSocket messages. A message can either be a binary or a text message.
*/
sealed trait Message // FIXME: Why don't we extend akka.http.javadsl.model.ws.Message here?
sealed trait Message extends akka.http.javadsl.model.ws.Message
/**
* Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case
* the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream`
* will return a Source streaming the data as it comes in.
*/
sealed trait TextMessage extends Message {
sealed trait TextMessage extends akka.http.javadsl.model.ws.TextMessage with Message {
/**
* The contents of this message as a stream.
*/
def textStream: Source[String, _]
/** Java API */
override def getStreamedText: javadsl.Source[String, _] = textStream.asJava
override def asScala: TextMessage = this
}
//#message-model
object TextMessage {
@ -36,9 +41,18 @@ object TextMessage {
final case class Strict(text: String) extends TextMessage {
def textStream: Source[String, _] = Source.single(text)
override def toString: String = s"TextMessage.Strict($text)"
/** Java API */
override def getStrictText: String = text
override def isStrict: Boolean = true
}
final case class Streamed(textStream: Source[String, _]) extends TextMessage {
override def toString: String = s"TextMessage.Streamed($textStream)"
/** Java API */
override def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.")
override def isStrict: Boolean = false
}
}
@ -48,11 +62,15 @@ object TextMessage {
* will return a Source streaming the data as it comes in.
*/
//#message-model
sealed trait BinaryMessage extends Message {
sealed trait BinaryMessage extends akka.http.javadsl.model.ws.BinaryMessage with Message {
/**
* The contents of this message as a stream.
*/
def dataStream: Source[ByteString, _]
/** Java API */
override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava
override def asScala: BinaryMessage = this
}
//#message-model
object BinaryMessage {
@ -66,8 +84,16 @@ object BinaryMessage {
final case class Strict(data: ByteString) extends BinaryMessage {
def dataStream: Source[ByteString, _] = Source.single(data)
override def toString: String = s"BinaryMessage.Strict($data)"
/** Java API */
override def getStrictData: ByteString = data
override def isStrict: Boolean = true
}
final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage {
override def toString: String = s"BinaryMessage.Streamed($dataStream)"
/** Java API */
override def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.")
override def isStrict: Boolean = false
}
}

View file

@ -29,7 +29,7 @@ import akka.stream.Materializer
*
* See `JUnitRouteTest` for an example of a concrete implementation.
*/
abstract class RouteTest extends AllDirectives {
abstract class RouteTest extends AllDirectives with WSTestRequestBuilding {
implicit def system: ActorSystem
implicit def materializer: Materializer
implicit def executionContext: ExecutionContextExecutor = system.dispatcher

View file

@ -0,0 +1,108 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.testkit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.javadsl.model.ws.Message
import akka.stream.Materializer
import akka.stream.javadsl.Flow
import akka.stream.scaladsl
import akka.util.ByteString
import akka.http.scaladsl.{ testkit st }
import akka.http.impl.util.JavaMapping.Implicits._
import scala.concurrent.duration._
/**
* A WSProbe is a probe that implements a `Flow[Message, Message, Unit]` for testing
* websocket code.
*
* Requesting elements is handled automatically.
*/
class WSProbe(delegate: st.WSProbe) {
def flow: Flow[Message, Message, Any] = {
val underlying = scaladsl.Flow[Message].map(_.asScala).via(delegate.flow).map(_.asJava)
new Flow[Message, Message, NotUsed](underlying)
}
/**
* Send the given messages out of the flow.
*/
def sendMessage(message: Message): Unit = delegate.sendMessage(message.asScala)
/**
* Send a text message containing the given string out of the flow.
*/
def sendMessage(text: String): Unit = delegate.sendMessage(text)
/**
* Send a binary message containing the given bytes out of the flow.
*/
def sendMessage(bytes: ByteString): Unit = delegate.sendMessage(bytes)
/**
* Complete the output side of the flow.
*/
def sendCompletion(): Unit = delegate.sendCompletion()
/**
* Expect a message on the input side of the flow.
*/
def expectMessage(): Message = delegate.expectMessage()
/**
* Expect a text message on the input side of the flow and compares its payload with the given one.
* If the received message is streamed its contents are collected and then asserted against the given
* String.
*/
def expectMessage(text: String): Unit = delegate.expectMessage(text)
/**
* Expect a binary message on the input side of the flow and compares its payload with the given one.
* If the received message is streamed its contents are collected and then asserted against the given
* ByteString.
*/
def expectMessage(bytes: ByteString): Unit = delegate.expectMessage(bytes)
/**
* Expect no message on the input side of the flow.
*/
def expectNoMessage(): Unit = delegate.expectNoMessage()
/**
* Expect no message on the input side of the flow for the given maximum duration.
*/
def expectNoMessage(max: FiniteDuration): Unit = delegate.expectNoMessage(max)
/**
* Expect completion on the input side of the flow.
*/
def expectCompletion(): Unit = delegate.expectCompletion()
}
object WSProbe {
// A convenient method to create WSProbe with default maxChunks and maxChunkCollectionMills
def create(system: ActorSystem, materializer: Materializer): WSProbe = {
create(system, materializer, 1000, 5000)
}
/**
* Creates a WSProbe to use in tests against websocket handlers.
*
* @param maxChunks The maximum number of chunks to collect for streamed messages.
* @param maxChunkCollectionMills The maximum time in milliseconds to collect chunks for streamed messages.
*/
def create(system: ActorSystem, materializer: Materializer, maxChunks: Int, maxChunkCollectionMills: Long): WSProbe = {
val delegate = st.WSProbe(maxChunks, maxChunkCollectionMills)(system, materializer)
new WSProbe(delegate)
}
}

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.testkit
import akka.http.javadsl.model.ws.Message
import akka.http.javadsl.model.{ HttpRequest, Uri }
import akka.http.scaladsl.{ model sm }
import akka.stream.javadsl.Flow
import akka.http.scaladsl.{ testkit st }
import akka.http.impl.util.JavaMapping.Implicits._
import scala.collection.JavaConverters._
import akka.stream.{ Materializer, scaladsl }
trait WSTestRequestBuilding {
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], materializer: Materializer): HttpRequest = {
WS(uri, clientSideHandler, materializer, java.util.Collections.emptyList())
}
def WS(
uri: Uri,
clientSideHandler: Flow[Message, Message, Any],
materializer: Materializer,
subprotocols: java.util.List[String]): HttpRequest = {
val handler = scaladsl.Flow[sm.ws.Message].map(_.asJava).via(clientSideHandler).map(_.asScala)
st.WSTestRequestBuilding.WS(uri.asScala, handler, subprotocols.asScala)(materializer)
}
}

View file

@ -9,11 +9,11 @@ import akka.http.scaladsl.model.headers.{ UpgradeProtocol, Upgrade, `Sec-WebSock
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest, Uri }
import akka.http.scaladsl.model.ws.{ UpgradeToWebSocket, Message }
import scala.collection.immutable
import akka.stream.{ Graph, FlowShape }
import akka.stream.{ Materializer, Graph, FlowShape }
import akka.stream.scaladsl.Flow
trait WSTestRequestBuilding { self: RouteTest
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(): HttpRequest =
trait WSTestRequestBuilding {
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(implicit materializer: Materializer): HttpRequest =
HttpRequest(uri = uri)
.addHeader(new InternalCustomHeader("UpgradeToWebSocketTestHeader") with UpgradeToWebSocket {
def requestedProtocols: immutable.Seq[String] = subprotocols.toList
@ -28,3 +28,5 @@ trait WSTestRequestBuilding { self: RouteTest ⇒
}
})
}
object WSTestRequestBuilding extends WSTestRequestBuilding

View file

@ -52,9 +52,8 @@ abstract class WebSocketDirectives extends SecurityDirectives {
* Handles WebSocket requests with the given handler if the given subprotocol is offered in the request and
* rejects other requests with an [[ExpectedWebSocketRequestRejection]] or an [[UnsupportedWebSocketSubprotocolRejection]].
*/
def handleWebSocketMessagesForProtocol(handler: Flow[Message, Message, Any], subprotocol: String): Route = RouteAdapter {
val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala)
D.handleWebSocketMessagesForProtocol(adapted, subprotocol)
def handleWebSocketMessagesForProtocol[T](handler: Flow[Message, Message, T], subprotocol: String): Route = RouteAdapter {
D.handleWebSocketMessagesForProtocol(adapt(handler), subprotocol)
}
/**
@ -68,12 +67,10 @@ abstract class WebSocketDirectives extends SecurityDirectives {
*
* To support several subprotocols you may chain several `handleWebSocketMessage` Routes.
*/
def handleWebSocketMessagesForOptionalProtocol(handler: Flow[Message, Message, Any], subprotocol: Optional[String]): Route = RouteAdapter {
val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala)
D.handleWebSocketMessagesForOptionalProtocol(adapted, subprotocol.asScala)
def handleWebSocketMessagesForOptionalProtocol[T](handler: Flow[Message, Message, T], subprotocol: Optional[String]): Route = RouteAdapter {
D.handleWebSocketMessagesForOptionalProtocol(adapt(handler), subprotocol.asScala)
}
// TODO this is because scala Message does not extend java Message - we could fix that, but http-core is stable
private def adapt[T](handler: Flow[Message, Message, T]): scaladsl.Flow[s.Message, s.Message, NotUsed] = {
scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala)
}

View file

@ -952,11 +952,17 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat"),
// #21201 adding childActorOf to TestActor / TestKit / TestProbe
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$3"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$2"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf"),
// #21184 add java api for ws testkit
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.asScala"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.getStreamedText"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.asScala"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.getStreamedData")
)
)
}