From b9a05aff9695278eac84f41dc2e40940a79acae3 Mon Sep 17 00:00:00 2001 From: Hawstein Date: Fri, 2 Sep 2016 19:28:49 +0800 Subject: [PATCH] add java api for websocket testkit #21184 And additionally adds unit test for WebSocketDirectives #20466 --- .../CircuitBreakerDocSpec.scala | 4 +- .../WebSocketDirectivesExamplesTest.java | 121 ++++++++++++++++++ .../handleWebSocketMessages.rst | 3 +- .../handleWebSocketMessagesForProtocol.rst | 3 +- .../model/headers/SecWebSocketProtocol.java | 20 +++ .../akka/http/javadsl/model/ws/Message.scala | 4 +- .../http/scaladsl/model/headers/headers.scala | 5 +- .../akka/http/scaladsl/model/ws/Message.scala | 32 ++++- .../akka/http/javadsl/testkit/RouteTest.scala | 2 +- .../akka/http/javadsl/testkit/WSProbe.scala | 108 ++++++++++++++++ .../testkit/WSTestRequestBuilding.scala | 34 +++++ .../testkit/WSTestRequestBuilding.scala | 8 +- .../directives/WebSocketDirectives.scala | 11 +- project/MiMa.scala | 10 +- 14 files changed, 342 insertions(+), 23 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java create mode 100644 akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java create mode 100644 akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala create mode 100644 akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala index 5f6c8c6d16..324ce19e04 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala +++ b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -8,7 +8,7 @@ package docs.circuitbreaker import scala.concurrent.duration._ import akka.pattern.CircuitBreaker import akka.pattern.pipe -import akka.actor.{Actor, ActorLogging, ActorRef} +import akka.actor.{ Actor, ActorLogging, ActorRef } import scala.concurrent.Future @@ -44,7 +44,7 @@ class DangerousActor extends Actor with ActorLogging { } -class TellPatternActor(recipient : ActorRef) extends Actor with ActorLogging { +class TellPatternActor(recipient: ActorRef) extends Actor with ActorLogging { import context.dispatcher val breaker = diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java new file mode 100644 index 0000000000..14f15fa1ae --- /dev/null +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ +package docs.http.javadsl.server.directives; + +import akka.NotUsed; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.model.Uri; +import akka.http.javadsl.model.headers.SecWebSocketProtocol; +import akka.http.javadsl.model.ws.BinaryMessage; +import akka.http.javadsl.model.ws.Message; +import akka.http.javadsl.model.ws.TextMessage; +import akka.http.javadsl.server.Route; +import akka.http.javadsl.testkit.JUnitRouteTest; +import akka.http.javadsl.testkit.WSProbe; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +public class WebSocketDirectivesExamplesTest extends JUnitRouteTest { + + @Test + public void testHandleWebSocketMessages() { + //#handleWebSocketMessages + final Flow greeter = Flow.of(Message.class).mapConcat(msg -> { + if (msg instanceof TextMessage) { + final TextMessage tm = (TextMessage) msg; + final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!"))); + return Collections.singletonList(ret); + } else if (msg instanceof BinaryMessage) { + final BinaryMessage bm = (BinaryMessage) msg; + bm.getStreamedData().runWith(Sink.ignore(), materializer()); + return Collections.emptyList(); + } else { + throw new IllegalArgumentException("Unsupported message type!"); + } + }); + + final Route websocketRoute = path("greeter", () -> + handleWebSocketMessages(greeter) + ); + + // create a testing probe representing the client-side + final WSProbe wsClient = WSProbe.create(system(), materializer()); + + // WS creates a WebSocket request for testing + testRoute(websocketRoute).run(WS(Uri.create("/greeter"), wsClient.flow(), materializer())) + .assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS); + + // manually run a WS conversation + wsClient.sendMessage("Peter"); + wsClient.expectMessage("Hello Peter!"); + + wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); + wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + wsClient.sendMessage("John"); + wsClient.expectMessage("Hello John!"); + + wsClient.sendCompletion(); + wsClient.expectCompletion(); + //#handleWebSocketMessages + } + + @Test + public void testHandleWebSocketMessagesForProtocol() { + //#handleWebSocketMessagesForProtocol + final Flow greeterService = Flow.of(Message.class).mapConcat(msg -> { + if (msg instanceof TextMessage) { + final TextMessage tm = (TextMessage) msg; + final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!"))); + return Collections.singletonList(ret); + } else if (msg instanceof BinaryMessage) { + final BinaryMessage bm = (BinaryMessage) msg; + bm.getStreamedData().runWith(Sink.ignore(), materializer()); + return Collections.emptyList(); + } else { + throw new IllegalArgumentException("Unsupported message type!"); + } + }); + + final Flow echoService = Flow.of(Message.class).buffer(1, OverflowStrategy.backpressure()); + + final Route websocketMultipleProtocolRoute = path("services", () -> + route( + handleWebSocketMessagesForProtocol(greeterService, "greeter"), + handleWebSocketMessagesForProtocol(echoService, "echo") + ) + ); + + // create a testing probe representing the client-side + final WSProbe wsClient = WSProbe.create(system(), materializer()); + + // WS creates a WebSocket request for testing + testRoute(websocketMultipleProtocolRoute) + .run(WS(Uri.create("/services"), wsClient.flow(), materializer(), Arrays.asList("other", "echo"))) + .assertHeaderExists(SecWebSocketProtocol.create("echo")); + + wsClient.sendMessage("Peter"); + wsClient.expectMessage("Peter"); + + wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); + wsClient.expectMessage(ByteString.fromString("abcdef")); + + wsClient.sendMessage("John"); + wsClient.expectMessage("John"); + + wsClient.sendCompletion(); + wsClient.expectCompletion(); + //#handleWebSocketMessagesForProtocol + } +} diff --git a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst index f63b3bb4c8..33d844bd52 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst @@ -16,4 +16,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. + +.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessages diff --git a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst index c4f981d96b..5f88c2c8d3 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst @@ -20,4 +20,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. + +.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessagesForProtocol diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java new file mode 100644 index 0000000000..2683e5a45b --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package akka.http.javadsl.model.headers; + +import akka.http.impl.util.Util; + +/** + * Model for the `Sec-WebSocket-Protocol` header. + */ +public abstract class SecWebSocketProtocol extends akka.http.scaladsl.model.HttpHeader { + public abstract Iterable getProtocols(); + + public static SecWebSocketProtocol create(String... protocols) { + return new akka.http.scaladsl.model.headers.Sec$minusWebSocket$minusProtocol(Util.convertArray(protocols)); + } + +} + diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala index 6ec86a849b..0524f02464 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala @@ -11,7 +11,7 @@ import akka.util.ByteString /** * Represents a WebSocket message. A message can either be a binary message or a text message. */ -sealed abstract class Message { +abstract class Message { /** * Is this message a text message? If true, [[asTextMessage]] will return this * text message, if false, [[asBinaryMessage]] will return this binary message. @@ -150,4 +150,4 @@ object BinaryMessage { case sm.ws.BinaryMessage.Strict(data) ⇒ create(data) case bm: sm.ws.BinaryMessage ⇒ create(bm.dataStream.asJava) } -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index 917243c58c..e70c530f00 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -766,11 +766,14 @@ private[http] object `Sec-WebSocket-Protocol` extends ModeledCompanion[`Sec-WebS * INTERNAL API */ private[http] final case class `Sec-WebSocket-Protocol`(protocols: immutable.Seq[String]) - extends RequestResponseHeader { + extends jm.headers.SecWebSocketProtocol with RequestResponseHeader { require(protocols.nonEmpty, "Sec-WebSocket-Protocol.protocols must not be empty") import `Sec-WebSocket-Protocol`.protocolsRenderer protected[http] def renderValue[R <: Rendering](r: R): r.type = r ~~ protocols protected def companion = `Sec-WebSocket-Protocol` + + /** Java API */ + override def getProtocols: Iterable[String] = protocols.asJava } // http://tools.ietf.org/html/rfc6455#section-4.3 diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala index 355e45a161..c19fbc8adc 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala @@ -4,6 +4,7 @@ package akka.http.scaladsl.model.ws +import akka.stream.javadsl import akka.stream.scaladsl.Source import akka.util.ByteString @@ -11,18 +12,22 @@ import akka.util.ByteString /** * The ADT for WebSocket messages. A message can either be a binary or a text message. */ -sealed trait Message // FIXME: Why don't we extend akka.http.javadsl.model.ws.Message here? +sealed trait Message extends akka.http.javadsl.model.ws.Message /** * Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case * the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream` * will return a Source streaming the data as it comes in. */ -sealed trait TextMessage extends Message { +sealed trait TextMessage extends akka.http.javadsl.model.ws.TextMessage with Message { /** * The contents of this message as a stream. */ def textStream: Source[String, _] + + /** Java API */ + override def getStreamedText: javadsl.Source[String, _] = textStream.asJava + override def asScala: TextMessage = this } //#message-model object TextMessage { @@ -36,9 +41,18 @@ object TextMessage { final case class Strict(text: String) extends TextMessage { def textStream: Source[String, _] = Source.single(text) override def toString: String = s"TextMessage.Strict($text)" + + /** Java API */ + override def getStrictText: String = text + override def isStrict: Boolean = true } + final case class Streamed(textStream: Source[String, _]) extends TextMessage { override def toString: String = s"TextMessage.Streamed($textStream)" + + /** Java API */ + override def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.") + override def isStrict: Boolean = false } } @@ -48,11 +62,15 @@ object TextMessage { * will return a Source streaming the data as it comes in. */ //#message-model -sealed trait BinaryMessage extends Message { +sealed trait BinaryMessage extends akka.http.javadsl.model.ws.BinaryMessage with Message { /** * The contents of this message as a stream. */ def dataStream: Source[ByteString, _] + + /** Java API */ + override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava + override def asScala: BinaryMessage = this } //#message-model object BinaryMessage { @@ -66,8 +84,16 @@ object BinaryMessage { final case class Strict(data: ByteString) extends BinaryMessage { def dataStream: Source[ByteString, _] = Source.single(data) override def toString: String = s"BinaryMessage.Strict($data)" + + /** Java API */ + override def getStrictData: ByteString = data + override def isStrict: Boolean = true } final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage { override def toString: String = s"BinaryMessage.Streamed($dataStream)" + + /** Java API */ + override def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.") + override def isStrict: Boolean = false } } diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala index e41ebed1b4..bb41a2ad8f 100644 --- a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala @@ -29,7 +29,7 @@ import akka.stream.Materializer * * See `JUnitRouteTest` for an example of a concrete implementation. */ -abstract class RouteTest extends AllDirectives { +abstract class RouteTest extends AllDirectives with WSTestRequestBuilding { implicit def system: ActorSystem implicit def materializer: Materializer implicit def executionContext: ExecutionContextExecutor = system.dispatcher diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala new file mode 100644 index 0000000000..d00e03c8d2 --- /dev/null +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package akka.http.javadsl.testkit + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.javadsl.model.ws.Message +import akka.stream.Materializer +import akka.stream.javadsl.Flow +import akka.stream.scaladsl +import akka.util.ByteString + +import akka.http.scaladsl.{ testkit ⇒ st } + +import akka.http.impl.util.JavaMapping.Implicits._ + +import scala.concurrent.duration._ + +/** + * A WSProbe is a probe that implements a `Flow[Message, Message, Unit]` for testing + * websocket code. + * + * Requesting elements is handled automatically. + */ +class WSProbe(delegate: st.WSProbe) { + + def flow: Flow[Message, Message, Any] = { + val underlying = scaladsl.Flow[Message].map(_.asScala).via(delegate.flow).map(_.asJava) + new Flow[Message, Message, NotUsed](underlying) + } + + /** + * Send the given messages out of the flow. + */ + def sendMessage(message: Message): Unit = delegate.sendMessage(message.asScala) + + /** + * Send a text message containing the given string out of the flow. + */ + def sendMessage(text: String): Unit = delegate.sendMessage(text) + + /** + * Send a binary message containing the given bytes out of the flow. + */ + def sendMessage(bytes: ByteString): Unit = delegate.sendMessage(bytes) + + /** + * Complete the output side of the flow. + */ + def sendCompletion(): Unit = delegate.sendCompletion() + + /** + * Expect a message on the input side of the flow. + */ + def expectMessage(): Message = delegate.expectMessage() + + /** + * Expect a text message on the input side of the flow and compares its payload with the given one. + * If the received message is streamed its contents are collected and then asserted against the given + * String. + */ + def expectMessage(text: String): Unit = delegate.expectMessage(text) + + /** + * Expect a binary message on the input side of the flow and compares its payload with the given one. + * If the received message is streamed its contents are collected and then asserted against the given + * ByteString. + */ + def expectMessage(bytes: ByteString): Unit = delegate.expectMessage(bytes) + + /** + * Expect no message on the input side of the flow. + */ + def expectNoMessage(): Unit = delegate.expectNoMessage() + + /** + * Expect no message on the input side of the flow for the given maximum duration. + */ + def expectNoMessage(max: FiniteDuration): Unit = delegate.expectNoMessage(max) + + /** + * Expect completion on the input side of the flow. + */ + def expectCompletion(): Unit = delegate.expectCompletion() + +} + +object WSProbe { + + // A convenient method to create WSProbe with default maxChunks and maxChunkCollectionMills + def create(system: ActorSystem, materializer: Materializer): WSProbe = { + create(system, materializer, 1000, 5000) + } + + /** + * Creates a WSProbe to use in tests against websocket handlers. + * + * @param maxChunks The maximum number of chunks to collect for streamed messages. + * @param maxChunkCollectionMills The maximum time in milliseconds to collect chunks for streamed messages. + */ + def create(system: ActorSystem, materializer: Materializer, maxChunks: Int, maxChunkCollectionMills: Long): WSProbe = { + val delegate = st.WSProbe(maxChunks, maxChunkCollectionMills)(system, materializer) + new WSProbe(delegate) + } + +} diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala new file mode 100644 index 0000000000..abd4340b19 --- /dev/null +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package akka.http.javadsl.testkit + +import akka.http.javadsl.model.ws.Message +import akka.http.javadsl.model.{ HttpRequest, Uri } +import akka.http.scaladsl.{ model ⇒ sm } +import akka.stream.javadsl.Flow + +import akka.http.scaladsl.{ testkit ⇒ st } + +import akka.http.impl.util.JavaMapping.Implicits._ +import scala.collection.JavaConverters._ +import akka.stream.{ Materializer, scaladsl } + +trait WSTestRequestBuilding { + + def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], materializer: Materializer): HttpRequest = { + WS(uri, clientSideHandler, materializer, java.util.Collections.emptyList()) + } + + def WS( + uri: Uri, + clientSideHandler: Flow[Message, Message, Any], + materializer: Materializer, + subprotocols: java.util.List[String]): HttpRequest = { + + val handler = scaladsl.Flow[sm.ws.Message].map(_.asJava).via(clientSideHandler).map(_.asScala) + st.WSTestRequestBuilding.WS(uri.asScala, handler, subprotocols.asScala)(materializer) + } + +} diff --git a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala index ea9e0a92ba..edf3ad041b 100644 --- a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala +++ b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala @@ -9,11 +9,11 @@ import akka.http.scaladsl.model.headers.{ UpgradeProtocol, Upgrade, `Sec-WebSock import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest, Uri } import akka.http.scaladsl.model.ws.{ UpgradeToWebSocket, Message } import scala.collection.immutable -import akka.stream.{ Graph, FlowShape } +import akka.stream.{ Materializer, Graph, FlowShape } import akka.stream.scaladsl.Flow -trait WSTestRequestBuilding { self: RouteTest ⇒ - def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(): HttpRequest = +trait WSTestRequestBuilding { + def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(implicit materializer: Materializer): HttpRequest = HttpRequest(uri = uri) .addHeader(new InternalCustomHeader("UpgradeToWebSocketTestHeader") with UpgradeToWebSocket { def requestedProtocols: immutable.Seq[String] = subprotocols.toList @@ -28,3 +28,5 @@ trait WSTestRequestBuilding { self: RouteTest ⇒ } }) } + +object WSTestRequestBuilding extends WSTestRequestBuilding diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala index 79b78f9071..c807744f0f 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala @@ -52,9 +52,8 @@ abstract class WebSocketDirectives extends SecurityDirectives { * Handles WebSocket requests with the given handler if the given subprotocol is offered in the request and * rejects other requests with an [[ExpectedWebSocketRequestRejection]] or an [[UnsupportedWebSocketSubprotocolRejection]]. */ - def handleWebSocketMessagesForProtocol(handler: Flow[Message, Message, Any], subprotocol: String): Route = RouteAdapter { - val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala) - D.handleWebSocketMessagesForProtocol(adapted, subprotocol) + def handleWebSocketMessagesForProtocol[T](handler: Flow[Message, Message, T], subprotocol: String): Route = RouteAdapter { + D.handleWebSocketMessagesForProtocol(adapt(handler), subprotocol) } /** @@ -68,12 +67,10 @@ abstract class WebSocketDirectives extends SecurityDirectives { * * To support several subprotocols you may chain several `handleWebSocketMessage` Routes. */ - def handleWebSocketMessagesForOptionalProtocol(handler: Flow[Message, Message, Any], subprotocol: Optional[String]): Route = RouteAdapter { - val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala) - D.handleWebSocketMessagesForOptionalProtocol(adapted, subprotocol.asScala) + def handleWebSocketMessagesForOptionalProtocol[T](handler: Flow[Message, Message, T], subprotocol: Optional[String]): Route = RouteAdapter { + D.handleWebSocketMessagesForOptionalProtocol(adapt(handler), subprotocol.asScala) } - // TODO this is because scala Message does not extend java Message - we could fix that, but http-core is stable private def adapt[T](handler: Flow[Message, Message, T]): scaladsl.Flow[s.Message, s.Message, NotUsed] = { scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala) } diff --git a/project/MiMa.scala b/project/MiMa.scala index 2744b0c362..db82d79c97 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -952,11 +952,17 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat"), - + // #21201 adding childActorOf to TestActor / TestKit / TestProbe ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$3"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$2"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf"), + + // #21184 add java api for ws testkit + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.asScala"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.getStreamedText"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.asScala"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.getStreamedData") ) ) }