diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala index 77906bafc9..9d793c28a9 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala @@ -26,6 +26,8 @@ trait ByteStringSinkProbe { def expectComplete(): Unit def expectError(): Throwable def expectError(cause: Throwable): Unit + + def request(n: Long): Unit } object ByteStringSinkProbe { @@ -63,5 +65,7 @@ object ByteStringSinkProbe { def expectComplete(): Unit = probe.expectComplete() def expectError(): Throwable = probe.expectError() def expectError(cause: Throwable): Unit = probe.expectError(cause) + + def request(n: Long): Unit = probe.request(n) } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 3914fdf6fb..3a969d0b0f 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -4,7 +4,6 @@ package akka.http.impl.engine.ws -import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.Random import org.scalatest.{ Matchers, FreeSpec } @@ -228,7 +227,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "for a short strict message" in new ServerTestSetup { val data = ByteString("abcdef", "ASCII") val msg = BinaryMessage.Strict(data) - netOutSub.request(5) pushMessage(msg) expectFrameOnNetwork(Opcode.Binary, data, fin = true) @@ -238,7 +236,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val data = ByteString("abcdefg", "ASCII") val pub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source(pub)) - netOutSub.request(6) pushMessage(msg) val sub = pub.expectSubscription() @@ -261,7 +258,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val data = ByteString("abcdefg", "ASCII") val pub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source(pub)) - netOutSub.request(7) pushMessage(msg) val sub = pub.expectSubscription() @@ -288,7 +284,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "for a short strict message" in new ServerTestSetup { val text = "äbcdef" val msg = TextMessage.Strict(text) - netOutSub.request(5) pushMessage(msg) expectFrameOnNetwork(Opcode.Text, ByteString(text, "UTF-8"), fin = true) @@ -298,7 +293,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val text = "äbcd€fg" val pub = TestPublisher.manualProbe[String]() val msg = TextMessage(Source(pub)) - netOutSub.request(6) pushMessage(msg) val sub = pub.expectSubscription() @@ -328,7 +322,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val pub = TestPublisher.manualProbe[String]() val msg = TextMessage(Source(pub)) - netOutSub.request(6) pushMessage(msg) val sub = pub.expectSubscription() @@ -345,7 +338,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val text = "abcdefg" val pub = TestPublisher.manualProbe[String]() val msg = TextMessage(Source(pub)) - netOutSub.request(5) pushMessage(msg) val sub = pub.expectSubscription() @@ -377,14 +369,12 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val input = frameHeader(Opcode.Ping, 6, fin = true, mask = Some(mask)) ++ maskedASCII("abcdef", mask)._1 pushInput(input) - netOutSub.request(5) expectFrameOnNetwork(Opcode.Pong, ByteString("abcdef"), fin = true) } "respond to ping frames masking them on the client side" in new ClientTestSetup { val input = frameHeader(Opcode.Ping, 6, fin = true) ++ ByteString("abcdef") pushInput(input) - netOutSub.request(5) expectMaskedFrameOnNetwork(Opcode.Pong, ByteString("abcdef"), fin = true) } "respond to ping frames interleaved with data frames (without mixing frame data)" in new ServerTestSetup { @@ -404,7 +394,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val outPub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source(outPub)) - netOutSub.request(10) pushMessage(msg) expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false) @@ -439,44 +428,35 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "don't respond to unsolicited pong frames" in new ClientTestSetup { val data = frameHeader(Opcode.Pong, 6, fin = true) ++ ByteString("abcdef") pushInput(data) - netOutSub.request(5) expectNoNetworkData() } } "provide close behavior" - { "after receiving regular close frame when idle (user closes immediately)" in new ServerTestSetup { - netInSub.expectRequest() - netOutSub.request(20) - messageOutSub.request(20) - pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + expectComplete(messageIn) netIn.expectNoMsg(100.millis) // especially the cancellation not yet expectNoNetworkData() - messageOutSub.sendComplete() + messageOut.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "after receiving close frame without close code" in new ServerTestSetup { - netInSub.expectRequest() pushInput(frameHeader(Opcode.Close, 0, fin = true, mask = Some(Random.nextInt()))) - messageIn.expectComplete() + expectComplete(messageIn) - messageOutSub.sendComplete() + messageOut.sendComplete() // especially mustn't be Procotol.CloseCodes.NoCodePresent expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "after receiving regular close frame when idle (user still sends some data)" in new ServerTestSetup { - netOutSub.request(20) - messageOutSub.request(20) - pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + expectComplete(messageIn) // sending another message is allowed before closing (inherently racy) val pub = TestPublisher.manualProbe[ByteString]() @@ -492,82 +472,75 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { dataSub.sendComplete() expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) - messageOutSub.sendComplete() + messageOut.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() } - "after receiving regular close frame when fragmented message is still open" in { - new ServerTestSetup { - netOutSub.request(10) - messageInSub.request(10) + "after receiving regular close frame when fragmented message is still open" in new ServerTestSetup { + pushInput(frameHeader(Protocol.Opcode.Binary, 0, fin = false, mask = Some(Random.nextInt()))) + val dataSource = expectBinaryMessage().dataStream + val inSubscriber = TestSubscriber.manualProbe[ByteString]() + dataSource.runWith(Sink(inSubscriber)) + val inSub = inSubscriber.expectSubscription() - pushInput(frameHeader(Protocol.Opcode.Binary, 0, fin = false, mask = Some(Random.nextInt()))) - val dataSource = expectBinaryMessage().dataStream - val inSubscriber = TestSubscriber.manualProbe[ByteString]() - dataSource.runWith(Sink(inSubscriber)) - val inSub = inSubscriber.expectSubscription() + val outData = ByteString("def", "ASCII") + val mask = Random.nextInt() + pushInput(frameHeader(Protocol.Opcode.Continuation, 3, fin = false, mask = Some(mask)) ++ maskedBytes(outData, mask)._1) + inSub.request(5) + inSubscriber.expectNext(outData) - val outData = ByteString("def", "ASCII") - val mask = Random.nextInt() - pushInput(frameHeader(Protocol.Opcode.Continuation, 3, fin = false, mask = Some(mask)) ++ maskedBytes(outData, mask)._1) - inSub.request(5) - inSubscriber.expectNext(outData) + pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) + // This is arguable: we could also just fail the subStream but complete the main message stream regularly. + // However, truncating an ongoing message by closing without sending a `Continuation(fin = true)` first + // could be seen as something being amiss. + expectError(messageIn) + inSubscriber.expectError() + // truncation of open message - // This is arguable: we could also just fail the subStream but complete the main message stream regularly. - // However, truncating an ongoing message by closing without sending a `Continuation(fin = true)` first - // could be seen as something being amiss. - messageIn.expectError() - inSubscriber.expectError() - // truncation of open message + // sending another message is allowed before closing (inherently racy) - // sending another message is allowed before closing (inherently racy) + val pub = TestPublisher.manualProbe[ByteString]() + val msg = BinaryMessage(Source(pub)) + pushMessage(msg) + expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) - val pub = TestPublisher.manualProbe[ByteString]() - val msg = BinaryMessage(Source(pub)) - pushMessage(msg) - expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) + val data = ByteString("abc", "ASCII") + val dataSub = pub.expectSubscription() + dataSub.sendNext(data) + expectFrameOnNetwork(Opcode.Continuation, data, fin = false) - val data = ByteString("abc", "ASCII") - val dataSub = pub.expectSubscription() - dataSub.sendNext(data) - expectFrameOnNetwork(Opcode.Continuation, data, fin = false) + dataSub.sendComplete() + expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) - dataSub.sendComplete() - expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) + messageOut.sendComplete() + expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) + netOut.expectComplete() - messageOutSub.sendComplete() - expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) - netOut.expectComplete() - } } "after receiving error close frame" in pending "after peer closes connection without sending a close frame" in new ServerTestSetup { - netInSub.expectRequest() - netInSub.sendComplete() + netIn.expectRequest() + netIn.sendComplete() - messageIn.expectComplete() - messageOutSub.sendComplete() + expectComplete(messageIn) + messageOut.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() } "when user handler closes (simple)" in new ServerTestSetup { - messageOutSub.sendComplete() + messageOut.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) - netOut.expectNoMsg(100.millis) // wait for peer to close regularly + expectNoNetworkData() // wait for peer to close regularly pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + expectComplete(messageIn) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "when user handler closes main stream and substream only afterwards" in new ServerTestSetup { - netOutSub.request(10) - messageInSub.request(10) - // send half a message val pub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source(pub)) @@ -579,79 +552,73 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { dataSub.sendNext(data) expectFrameOnNetwork(Opcode.Continuation, data, fin = false) - messageOutSub.sendComplete() + messageOut.sendComplete() expectNoNetworkData() // need to wait for substream to close dataSub.sendComplete() expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) - netOut.expectNoMsg(100.millis) // wait for peer to close regularly + expectNoNetworkData() // wait for peer to close regularly val mask = Random.nextInt() pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + expectComplete(messageIn) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "if user handler fails" in pending "if peer closes with invalid close frame" - { "close code outside of the valid range" in new ServerTestSetup { - netInSub.expectRequest() pushInput(frameHeader(Opcode.Close, 1, mask = Some(Random.nextInt()), fin = true) ++ ByteString("x")) - val error = messageIn.expectError() + val error = expectError(messageIn) expectCloseCodeOnNetwork(Protocol.CloseCodes.ProtocolError) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "close data of size 1" in new ServerTestSetup { - netInSub.expectRequest() pushInput(frameHeader(Opcode.Close, 1, mask = Some(Random.nextInt()), fin = true) ++ ByteString("x")) - val error = messageIn.expectError() + val error = expectError(messageIn) expectCloseCodeOnNetwork(Protocol.CloseCodes.ProtocolError) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "reason is no valid utf8 data" in pending } "timeout if user handler closes and peer doesn't send a close frame" in new ServerTestSetup { override protected def closeTimeout: FiniteDuration = 100.millis - netInSub.expectRequest() - messageOutSub.sendComplete() + messageOut.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "timeout after we close after error and peer doesn't send a close frame" in new ServerTestSetup { override protected def closeTimeout: FiniteDuration = 100.millis - netInSub.expectRequest() - pushInput(frameHeader(Opcode.Binary, 0, fin = true, rsv1 = true)) expectProtocolErrorOnNetwork() - messageOutSub.sendComplete() + messageOut.sendComplete() netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } "ignore frames peer sends after close frame" in new ServerTestSetup { - netInSub.expectRequest() pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + expectComplete(messageIn) pushInput(frameHeader(Opcode.Binary, 0, fin = true)) - messageOutSub.sendComplete() + messageOut.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() - netInSub.expectCancellation() + netIn.expectCancellation() } } "reject unexpected frames" - { @@ -753,7 +720,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { pushInput(frameHeader(Opcode.Text, 0, fin = false)) pushInput(frameHeader(Opcode.Continuation, 3, fin = true) ++ data) - messageIn.expectError() + expectError(messageIn) expectCloseCodeOnNetwork(Protocol.CloseCodes.InconsistentData) } @@ -798,11 +765,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { protected def serverSide: Boolean protected def closeTimeout: FiniteDuration = 1.second - val netIn = TestPublisher.manualProbe[ByteString]() - val netOut = TestSubscriber.manualProbe[ByteString]() + val netIn = TestPublisher.probe[ByteString]() + val netOut = ByteStringSinkProbe() - val messageIn = TestSubscriber.manualProbe[Message] - val messageOut = TestPublisher.manualProbe[Message]() + val messageIn = TestSubscriber.probe[Message] + val messageOut = TestPublisher.probe[Message]() val messageHandler: Flow[Message, Message, Unit] = Flow.wrap { @@ -821,56 +788,20 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { .via(printEvent("frameRendererIn")) .transform(() ⇒ new FrameEventRenderer) .via(printEvent("frameRendererOut")) - .to(Sink(netOut)) + .to(netOut.sink) .run() - val netInSub = netIn.expectSubscription() - val netOutSub = netOut.expectSubscription() - val messageOutSub = messageOut.expectSubscription() - val messageInSub = messageIn.expectSubscription() + def pushInput(data: ByteString): Unit = netIn.sendNext(data) + def pushMessage(msg: Message): Unit = messageOut.sendNext(msg) + def expectMessage(message: Message): Unit = messageIn.requestNext(message) + def expectMessage(): Message = messageIn.requestNext() + def expectBinaryMessage(): BinaryMessage = expectMessage().asInstanceOf[BinaryMessage] + def expectBinaryMessage(message: BinaryMessage): Unit = expectBinaryMessage() shouldEqual message + def expectTextMessage(): TextMessage = expectMessage().asInstanceOf[TextMessage] + def expectTextMessage(message: TextMessage): Unit = expectTextMessage() shouldEqual message + final def expectNetworkData(bytes: Int): ByteString = netOut.expectBytes(bytes) - def pushInput(data: ByteString): Unit = { - // TODO: expect/handle request? - netInSub.sendNext(data) - } - def pushMessage(msg: Message): Unit = { - messageOutSub.sendNext(msg) - } - - def expectMessage(message: Message): Unit = { - messageInSub.request(1) - messageIn.expectNext(message) - } - def expectMessage(): Message = { - messageInSub.request(1) - messageIn.expectNext() - } - def expectBinaryMessage(): BinaryMessage = - expectMessage().asInstanceOf[BinaryMessage] - - def expectBinaryMessage(message: BinaryMessage): Unit = - expectBinaryMessage() shouldEqual message - - def expectTextMessage(): TextMessage = - expectMessage().asInstanceOf[TextMessage] - - def expectTextMessage(message: TextMessage): Unit = - expectTextMessage() shouldEqual message - - var inBuffer = ByteString.empty - @tailrec final def expectNetworkData(bytes: Int): ByteString = - if (inBuffer.size >= bytes) { - val res = inBuffer.take(bytes) - inBuffer = inBuffer.drop(bytes) - res - } else { - netOutSub.request(1) - inBuffer ++= netOut.expectNext() - expectNetworkData(bytes) - } - - def expectNetworkData(data: ByteString): Unit = - expectNetworkData(data.size) shouldEqual data + def expectNetworkData(data: ByteString): Unit = expectNetworkData(data.size) shouldEqual data def expectFrameOnNetwork(opcode: Opcode, data: ByteString, fin: Boolean): Unit = { expectFrameHeaderOnNetwork(opcode, data.size, fin) @@ -944,8 +875,16 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { code shouldEqual expectedCode } - def expectNoNetworkData(): Unit = - netOut.expectNoMsg(100.millis) + def expectNoNetworkData(): Unit = netOut.expectNoBytes(100.millis) + + def expectComplete[T](probe: TestSubscriber.Probe[T]): Unit = { + probe.ensureSubscription() + probe.expectComplete() + } + def expectError[T](probe: TestSubscriber.Probe[T]): Throwable = { + probe.ensureSubscription() + probe.expectError() + } } val trace = false // set to `true` for debugging purposes