!htc remove Streamed subclasses to signify that streamed is the default and add convenience Message constructors

This commit is contained in:
Johannes Rudolph 2015-06-22 08:58:45 +02:00 committed by Mathias
parent e7027b3974
commit da5ff4c560
7 changed files with 87 additions and 54 deletions

View file

@ -28,10 +28,10 @@ private[http] object MessageToFrameRenderer {
Flow[Message]
.map {
case BinaryMessage.Strict(data) strictFrames(Opcode.Binary, data)
case BinaryMessage.Streamed(data) streamedFrames(Opcode.Binary, data)
case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8"))
case TextMessage.Streamed(text) streamedFrames(Opcode.Text, text.transform(() new Utf8Encoder))
case BinaryMessage.Strict(data) strictFrames(Opcode.Binary, data)
case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream)
case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8"))
case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder))
}.flatten(FlattenStrategy.Concat())
}
}

View file

@ -52,7 +52,7 @@ private[http] object Websocket {
case (TextMessagePart(text, true), remaining)
TextMessage.Strict(text)
case (first @ TextMessagePart(text, false), remaining)
TextMessage.Streamed(
TextMessage(
(Source.single(first) ++ remaining)
.collect {
case t: TextMessagePart if t.data.nonEmpty t.data
@ -60,7 +60,7 @@ private[http] object Websocket {
case (BinaryMessagePart(data, true), remaining)
BinaryMessage.Strict(data)
case (first @ BinaryMessagePart(data, false), remaining)
BinaryMessage.Streamed(
BinaryMessage(
(Source.single(first) ++ remaining)
.collect {
case t: BinaryMessagePart if t.data.nonEmpty t.data

View file

@ -84,12 +84,12 @@ object TextMessage {
def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.")
def getStreamedText: Source[String, _] = textStream
def asScala: sm.ws.TextMessage = sm.ws.TextMessage.Streamed(textStream.asScala)
def asScala: sm.ws.TextMessage = sm.ws.TextMessage(textStream.asScala)
}
def adapt(msg: sm.ws.TextMessage): TextMessage = msg match {
case sm.ws.TextMessage.Strict(text) create(text)
case sm.ws.TextMessage.Streamed(stream) create(stream.asJava)
case sm.ws.TextMessage.Strict(text) create(text)
case tm: sm.ws.TextMessage create(tm.textStream.asJava)
}
}
@ -135,11 +135,11 @@ object BinaryMessage {
def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.")
def getStreamedData: Source[ByteString, _] = dataStream
def asScala: sm.ws.BinaryMessage = sm.ws.BinaryMessage.Streamed(dataStream.asScala)
def asScala: sm.ws.BinaryMessage = sm.ws.BinaryMessage(dataStream.asScala)
}
def adapt(msg: sm.ws.BinaryMessage): BinaryMessage = msg match {
case sm.ws.BinaryMessage.Strict(data) create(data)
case sm.ws.BinaryMessage.Streamed(stream) create(stream.asJava)
case sm.ws.BinaryMessage.Strict(data) create(data)
case bm: sm.ws.BinaryMessage create(bm.dataStream.asJava)
}
}

View file

@ -575,7 +575,7 @@ case class HttpsContext(sslContext: SSLContext,
enabledProtocols: Option[immutable.Seq[String]] = None,
clientAuth: Option[ClientAuth] = None,
sslParameters: Option[SSLParameters] = None)
//#
//#
extends akka.http.javadsl.HttpsContext {
def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters)

View file

@ -8,22 +8,50 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString
/**
* The ADT for Websocket messages. A message can either be binary or a text message. Each of
* those can either be strict or streamed.
* The ADT for Websocket messages. A message can either be a binary or a text message.
*/
sealed trait Message
sealed trait TextMessage extends Message
/**
* A binary
*/
trait TextMessage extends Message {
/**
* The contents of this message as a stream.
*/
def textStream: Source[String, _]
}
object TextMessage {
def apply(text: String): Strict = Strict(text)
def apply(textStream: Source[String, Any]): TextMessage =
Streamed(textStream)
/**
* A strict [[TextMessage]] that contains the complete data as a [[String]].
*/
final case class Strict(text: String) extends TextMessage {
def textStream: Source[String, _] = Source.single(text)
override def toString: String = s"TextMessage.Strict($text)"
}
final case class Streamed(textStream: Source[String, _]) extends TextMessage
final private case class Streamed(textStream: Source[String, _]) extends TextMessage
}
trait BinaryMessage extends Message {
/**
* The contents of this message as a stream.
*/
def dataStream: Source[ByteString, _]
}
sealed trait BinaryMessage extends Message
object BinaryMessage {
def apply(data: ByteString): Strict = Strict(data)
def apply(dataStream: Source[ByteString, Any]): BinaryMessage =
Streamed(dataStream)
/**
* A strict [[BinaryMessage]] that contains the complete data as a [[ByteString]].
*/
final case class Strict(data: ByteString) extends BinaryMessage {
def dataStream: Source[ByteString, _] = Source.single(data)
override def toString: String = s"BinaryMessage.Strict($data)"
}
final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage
final private case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage
}

View file

@ -39,8 +39,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header = frameHeader(Opcode.Binary, 6, fin = true)
pushInput(header ++ data1)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
@ -51,8 +51,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header = frameHeader(Opcode.Binary, 6, fin = true)
pushInput(header)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
@ -70,8 +70,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header1 = frameHeader(Opcode.Binary, 3, fin = false)
pushInput(header1 ++ data1)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
@ -88,8 +88,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header1 = frameHeader(Opcode.Binary, 3, fin = false)
pushInput(header1 ++ data1)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
@ -103,8 +103,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
sub.expectNext(data2)
sub.expectComplete()
val BinaryMessage.Streamed(dataSource2) = expectMessage()
val sub2 = TestSubscriber.manualProbe[ByteString]
val dataSource2 = expectBinaryMessage().dataStream
val sub2 = TestSubscriber.manualProbe[ByteString]()
dataSource2.runWith(Sink(sub2))
val s2 = sub2.expectSubscription()
s2.request(2)
@ -123,8 +123,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header = frameHeader(Opcode.Binary, 6, fin = true, mask = Some(mask))
pushInput(header ++ data1)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
@ -159,8 +159,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val input = frameHeader(Opcode.Text, data.size, fin = true) ++ data0
pushInput(input)
val TextMessage.Streamed(parts) = expectMessage()
val sub = TestSubscriber.manualProbe[String]
val parts = expectTextMessage().textStream
val sub = TestSubscriber.manualProbe[String]()
parts.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(4)
@ -177,8 +177,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header0 = frameHeader(Opcode.Text, data0.size, fin = false)
pushInput(header0 ++ data0)
val TextMessage.Streamed(parts) = expectMessage()
val sub = TestSubscriber.manualProbe[String]
val parts = expectTextMessage().textStream
val sub = TestSubscriber.manualProbe[String]()
parts.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(4)
@ -196,8 +196,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val header = frameHeader(Opcode.Binary, data.size, fin = true, mask = Some(mask))
pushInput(header ++ data1)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
@ -223,7 +223,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
"for a streamed message" in new ServerTestSetup {
val data = ByteString("abcdefg", "ASCII")
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage.Streamed(Source(pub))
val msg = BinaryMessage(Source(pub))
netOutSub.request(6)
pushMessage(msg)
val sub = pub.expectSubscription()
@ -246,7 +246,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
"and mask input on the client side" in new ClientTestSetup {
val data = ByteString("abcdefg", "ASCII")
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage.Streamed(Source(pub))
val msg = BinaryMessage(Source(pub))
netOutSub.request(7)
pushMessage(msg)
val sub = pub.expectSubscription()
@ -279,7 +279,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
"for a streamed message" in new ServerTestSetup {
val text = "äbcd€fg"
val pub = TestPublisher.manualProbe[String]()
val msg = TextMessage.Streamed(Source(pub))
val msg = TextMessage(Source(pub))
netOutSub.request(6)
pushMessage(msg)
val sub = pub.expectSubscription()
@ -311,7 +311,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
println(half2(0).toInt.toHexString)
val pub = TestPublisher.manualProbe[String]()
val msg = TextMessage.Streamed(Source(pub))
val msg = TextMessage(Source(pub))
netOutSub.request(6)
pushMessage(msg)
@ -328,7 +328,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
"and mask input on the client side" in new ClientTestSetup {
val text = "abcdefg"
val pub = TestPublisher.manualProbe[String]()
val msg = TextMessage.Streamed(Source(pub))
val msg = TextMessage(Source(pub))
netOutSub.request(5)
pushMessage(msg)
val sub = pub.expectSubscription()
@ -375,15 +375,15 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
val input1 = frameHeader(Opcode.Binary, 3, fin = false, mask = Some(mask1)) ++ maskedASCII("123", mask1)._1
pushInput(input1)
val BinaryMessage.Streamed(dataSource) = expectMessage()
val sub = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val sub = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(sub))
val s = sub.expectSubscription()
s.request(2)
sub.expectNext(ByteString("123", "ASCII"))
val outPub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage.Streamed(Source(outPub))
val msg = BinaryMessage(Source(outPub))
netOutSub.request(10)
pushMessage(msg)
@ -460,7 +460,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
// sending another message is allowed before closing (inherently racy)
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage.Streamed(Source(pub))
val msg = BinaryMessage(Source(pub))
pushMessage(msg)
expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false)
@ -482,8 +482,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
messageInSub.request(10)
pushInput(frameHeader(Protocol.Opcode.Binary, 0, fin = false))
val BinaryMessage.Streamed(dataSource) = messageIn.expectNext()
val inSubscriber = TestSubscriber.manualProbe[ByteString]
val dataSource = expectBinaryMessage().dataStream
val inSubscriber = TestSubscriber.manualProbe[ByteString]()
dataSource.runWith(Sink(inSubscriber))
val inSub = inSubscriber.expectSubscription()
@ -505,7 +505,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
// sending another message is allowed before closing (inherently racy)
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage.Streamed(Source(pub))
val msg = BinaryMessage(Source(pub))
pushMessage(msg)
expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false)
@ -550,7 +550,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
// send half a message
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage.Streamed(Source(pub))
val msg = BinaryMessage(Source(pub))
pushMessage(msg)
expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false)
@ -766,7 +766,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
protected def closeTimeout: FiniteDuration = 1.second
val netIn = TestPublisher.manualProbe[ByteString]()
val netOut = TestSubscriber.manualProbe[ByteString]
val netOut = TestSubscriber.manualProbe[ByteString]()
val messageIn = TestSubscriber.manualProbe[Message]
val messageOut = TestPublisher.manualProbe[Message]()
@ -812,6 +812,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
messageInSub.request(1)
messageIn.expectNext()
}
def expectBinaryMessage(): BinaryMessage =
expectMessage().asInstanceOf[BinaryMessage]
def expectTextMessage(): TextMessage =
expectMessage().asInstanceOf[TextMessage]
var inBuffer = ByteString.empty
@tailrec final def expectNetworkData(bytes: Int): ByteString =

View file

@ -69,8 +69,8 @@ object TestServer extends App {
def greeterWebsocketService: Flow[Message, Message, Unit] =
Flow[Message]
.collect {
case TextMessage.Strict(name) TextMessage.Strict(s"Hello '$name'")
case TextMessage.Streamed(nameStream) TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterializedValue (_ ()))
case TextMessage.Strict(name) TextMessage(s"Hello '$name'")
case tm: TextMessage TextMessage(Source.single("Hello ") ++ tm.textStream)
// ignore binary messages
}
}