diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala index 2565331e17..59b1e89b58 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -28,10 +28,10 @@ private[http] object MessageToFrameRenderer { Flow[Message] .map { - case BinaryMessage.Strict(data) ⇒ strictFrames(Opcode.Binary, data) - case BinaryMessage.Streamed(data) ⇒ streamedFrames(Opcode.Binary, data) - case TextMessage.Strict(text) ⇒ strictFrames(Opcode.Text, ByteString(text, "UTF-8")) - case TextMessage.Streamed(text) ⇒ streamedFrames(Opcode.Text, text.transform(() ⇒ new Utf8Encoder)) + case BinaryMessage.Strict(data) ⇒ strictFrames(Opcode.Binary, data) + case bm: BinaryMessage ⇒ streamedFrames(Opcode.Binary, bm.dataStream) + case TextMessage.Strict(text) ⇒ strictFrames(Opcode.Text, ByteString(text, "UTF-8")) + case tm: TextMessage ⇒ streamedFrames(Opcode.Text, tm.textStream.transform(() ⇒ new Utf8Encoder)) }.flatten(FlattenStrategy.Concat()) } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index bcc0448a77..20f1933cbd 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -52,7 +52,7 @@ private[http] object Websocket { case (TextMessagePart(text, true), remaining) ⇒ TextMessage.Strict(text) case (first @ TextMessagePart(text, false), remaining) ⇒ - TextMessage.Streamed( + TextMessage( (Source.single(first) ++ remaining) .collect { case t: TextMessagePart if t.data.nonEmpty ⇒ t.data @@ -60,7 +60,7 @@ private[http] object Websocket { case (BinaryMessagePart(data, true), remaining) ⇒ BinaryMessage.Strict(data) case (first @ BinaryMessagePart(data, false), remaining) ⇒ - BinaryMessage.Streamed( + BinaryMessage( (Source.single(first) ++ remaining) .collect { case t: BinaryMessagePart if t.data.nonEmpty ⇒ t.data diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala index 12e854ca03..4aaf4b6160 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala @@ -84,12 +84,12 @@ object TextMessage { def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.") def getStreamedText: Source[String, _] = textStream - def asScala: sm.ws.TextMessage = sm.ws.TextMessage.Streamed(textStream.asScala) + def asScala: sm.ws.TextMessage = sm.ws.TextMessage(textStream.asScala) } def adapt(msg: sm.ws.TextMessage): TextMessage = msg match { - case sm.ws.TextMessage.Strict(text) ⇒ create(text) - case sm.ws.TextMessage.Streamed(stream) ⇒ create(stream.asJava) + case sm.ws.TextMessage.Strict(text) ⇒ create(text) + case tm: sm.ws.TextMessage ⇒ create(tm.textStream.asJava) } } @@ -135,11 +135,11 @@ object BinaryMessage { def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.") def getStreamedData: Source[ByteString, _] = dataStream - def asScala: sm.ws.BinaryMessage = sm.ws.BinaryMessage.Streamed(dataStream.asScala) + def asScala: sm.ws.BinaryMessage = sm.ws.BinaryMessage(dataStream.asScala) } def adapt(msg: sm.ws.BinaryMessage): BinaryMessage = msg match { - case sm.ws.BinaryMessage.Strict(data) ⇒ create(data) - case sm.ws.BinaryMessage.Streamed(stream) ⇒ create(stream.asJava) + case sm.ws.BinaryMessage.Strict(data) ⇒ create(data) + case bm: sm.ws.BinaryMessage ⇒ create(bm.dataStream.asJava) } } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 08851017e0..33fc60fb45 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -575,7 +575,7 @@ case class HttpsContext(sslContext: SSLContext, enabledProtocols: Option[immutable.Seq[String]] = None, clientAuth: Option[ClientAuth] = None, sslParameters: Option[SSLParameters] = None) -//# + //# extends akka.http.javadsl.HttpsContext { def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala index 36808e4daf..d29c859a79 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala @@ -8,22 +8,50 @@ import akka.stream.scaladsl.Source import akka.util.ByteString /** - * The ADT for Websocket messages. A message can either be binary or a text message. Each of - * those can either be strict or streamed. + * The ADT for Websocket messages. A message can either be a binary or a text message. */ sealed trait Message -sealed trait TextMessage extends Message + +/** + * A binary + */ +trait TextMessage extends Message { + /** + * The contents of this message as a stream. + */ + def textStream: Source[String, _] +} object TextMessage { + def apply(text: String): Strict = Strict(text) + def apply(textStream: Source[String, Any]): TextMessage = + Streamed(textStream) + + /** + * A strict [[TextMessage]] that contains the complete data as a [[String]]. + */ final case class Strict(text: String) extends TextMessage { + def textStream: Source[String, _] = Source.single(text) override def toString: String = s"TextMessage.Strict($text)" } - final case class Streamed(textStream: Source[String, _]) extends TextMessage + final private case class Streamed(textStream: Source[String, _]) extends TextMessage +} +trait BinaryMessage extends Message { + /** + * The contents of this message as a stream. + */ + def dataStream: Source[ByteString, _] } - -sealed trait BinaryMessage extends Message object BinaryMessage { + def apply(data: ByteString): Strict = Strict(data) + def apply(dataStream: Source[ByteString, Any]): BinaryMessage = + Streamed(dataStream) + + /** + * A strict [[BinaryMessage]] that contains the complete data as a [[ByteString]]. + */ final case class Strict(data: ByteString) extends BinaryMessage { + def dataStream: Source[ByteString, _] = Source.single(data) override def toString: String = s"BinaryMessage.Strict($data)" } - final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage + final private case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage } \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 67804272ce..c23069af72 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -39,8 +39,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header = frameHeader(Opcode.Binary, 6, fin = true) pushInput(header ++ data1) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) @@ -51,8 +51,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header = frameHeader(Opcode.Binary, 6, fin = true) pushInput(header) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) @@ -70,8 +70,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header1 = frameHeader(Opcode.Binary, 3, fin = false) pushInput(header1 ++ data1) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) @@ -88,8 +88,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header1 = frameHeader(Opcode.Binary, 3, fin = false) pushInput(header1 ++ data1) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) @@ -103,8 +103,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { sub.expectNext(data2) sub.expectComplete() - val BinaryMessage.Streamed(dataSource2) = expectMessage() - val sub2 = TestSubscriber.manualProbe[ByteString] + val dataSource2 = expectBinaryMessage().dataStream + val sub2 = TestSubscriber.manualProbe[ByteString]() dataSource2.runWith(Sink(sub2)) val s2 = sub2.expectSubscription() s2.request(2) @@ -123,8 +123,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header = frameHeader(Opcode.Binary, 6, fin = true, mask = Some(mask)) pushInput(header ++ data1) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) @@ -159,8 +159,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val input = frameHeader(Opcode.Text, data.size, fin = true) ++ data0 pushInput(input) - val TextMessage.Streamed(parts) = expectMessage() - val sub = TestSubscriber.manualProbe[String] + val parts = expectTextMessage().textStream + val sub = TestSubscriber.manualProbe[String]() parts.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(4) @@ -177,8 +177,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header0 = frameHeader(Opcode.Text, data0.size, fin = false) pushInput(header0 ++ data0) - val TextMessage.Streamed(parts) = expectMessage() - val sub = TestSubscriber.manualProbe[String] + val parts = expectTextMessage().textStream + val sub = TestSubscriber.manualProbe[String]() parts.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(4) @@ -196,8 +196,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val header = frameHeader(Opcode.Binary, data.size, fin = true, mask = Some(mask)) pushInput(header ++ data1) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) @@ -223,7 +223,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "for a streamed message" in new ServerTestSetup { val data = ByteString("abcdefg", "ASCII") val pub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage.Streamed(Source(pub)) + val msg = BinaryMessage(Source(pub)) netOutSub.request(6) pushMessage(msg) val sub = pub.expectSubscription() @@ -246,7 +246,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "and mask input on the client side" in new ClientTestSetup { val data = ByteString("abcdefg", "ASCII") val pub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage.Streamed(Source(pub)) + val msg = BinaryMessage(Source(pub)) netOutSub.request(7) pushMessage(msg) val sub = pub.expectSubscription() @@ -279,7 +279,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "for a streamed message" in new ServerTestSetup { val text = "äbcd€fg" val pub = TestPublisher.manualProbe[String]() - val msg = TextMessage.Streamed(Source(pub)) + val msg = TextMessage(Source(pub)) netOutSub.request(6) pushMessage(msg) val sub = pub.expectSubscription() @@ -311,7 +311,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { println(half2(0).toInt.toHexString) val pub = TestPublisher.manualProbe[String]() - val msg = TextMessage.Streamed(Source(pub)) + val msg = TextMessage(Source(pub)) netOutSub.request(6) pushMessage(msg) @@ -328,7 +328,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "and mask input on the client side" in new ClientTestSetup { val text = "abcdefg" val pub = TestPublisher.manualProbe[String]() - val msg = TextMessage.Streamed(Source(pub)) + val msg = TextMessage(Source(pub)) netOutSub.request(5) pushMessage(msg) val sub = pub.expectSubscription() @@ -375,15 +375,15 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val input1 = frameHeader(Opcode.Binary, 3, fin = false, mask = Some(mask1)) ++ maskedASCII("123", mask1)._1 pushInput(input1) - val BinaryMessage.Streamed(dataSource) = expectMessage() - val sub = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val sub = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(sub)) val s = sub.expectSubscription() s.request(2) sub.expectNext(ByteString("123", "ASCII")) val outPub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage.Streamed(Source(outPub)) + val msg = BinaryMessage(Source(outPub)) netOutSub.request(10) pushMessage(msg) @@ -460,7 +460,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { // sending another message is allowed before closing (inherently racy) val pub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage.Streamed(Source(pub)) + val msg = BinaryMessage(Source(pub)) pushMessage(msg) expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) @@ -482,8 +482,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { messageInSub.request(10) pushInput(frameHeader(Protocol.Opcode.Binary, 0, fin = false)) - val BinaryMessage.Streamed(dataSource) = messageIn.expectNext() - val inSubscriber = TestSubscriber.manualProbe[ByteString] + val dataSource = expectBinaryMessage().dataStream + val inSubscriber = TestSubscriber.manualProbe[ByteString]() dataSource.runWith(Sink(inSubscriber)) val inSub = inSubscriber.expectSubscription() @@ -505,7 +505,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { // sending another message is allowed before closing (inherently racy) val pub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage.Streamed(Source(pub)) + val msg = BinaryMessage(Source(pub)) pushMessage(msg) expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) @@ -550,7 +550,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { // send half a message val pub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage.Streamed(Source(pub)) + val msg = BinaryMessage(Source(pub)) pushMessage(msg) expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) @@ -766,7 +766,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { protected def closeTimeout: FiniteDuration = 1.second val netIn = TestPublisher.manualProbe[ByteString]() - val netOut = TestSubscriber.manualProbe[ByteString] + val netOut = TestSubscriber.manualProbe[ByteString]() val messageIn = TestSubscriber.manualProbe[Message] val messageOut = TestPublisher.manualProbe[Message]() @@ -812,6 +812,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { messageInSub.request(1) messageIn.expectNext() } + def expectBinaryMessage(): BinaryMessage = + expectMessage().asInstanceOf[BinaryMessage] + + def expectTextMessage(): TextMessage = + expectMessage().asInstanceOf[TextMessage] var inBuffer = ByteString.empty @tailrec final def expectNetworkData(bytes: Int): ByteString = diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala index f9a4f3a996..4623b27c28 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala @@ -69,8 +69,8 @@ object TestServer extends App { def greeterWebsocketService: Flow[Message, Message, Unit] = Flow[Message] .collect { - case TextMessage.Strict(name) ⇒ TextMessage.Strict(s"Hello '$name'") - case TextMessage.Streamed(nameStream) ⇒ TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterializedValue (_ ⇒ ())) + case TextMessage.Strict(name) ⇒ TextMessage(s"Hello '$name'") + case tm: TextMessage ⇒ TextMessage(Source.single("Hello ") ++ tm.textStream) // ignore binary messages } }