diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index c3c4bce2fa..ee64d95c57 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -84,8 +84,11 @@ private[http] object Websocket { val collectMessage: Flow[MessageDataPart, Message, Unit] = Flow[MessageDataPart] .prefixAndTail(1) - .map { - case (seq, remaining) ⇒ seq.head match { + .mapConcat { + // happens if we get a MessageEnd first which creates a new substream but which is then + // filtered out by collect in `prepareMessages` below + case (Nil, _) ⇒ Nil + case (first +: Nil, remaining) ⇒ (first match { case TextMessagePart(text, true) ⇒ SubSource.kill(remaining) TextMessage.Strict(text) @@ -104,7 +107,7 @@ private[http] object Websocket { .collect { case t: BinaryMessagePart if t.data.nonEmpty ⇒ t.data }) - } + }) :: Nil } def prepareMessages: Flow[MessagePart, Message, Unit] = diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 0dd471c12e..9e5a88d596 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -463,7 +463,26 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { expectComplete(messageIn) messageOut.sendComplete() - // especially mustn't be Procotol.CloseCodes.NoCodePresent + // especially mustn't be Protocol.CloseCodes.NoCodePresent + expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) + netOut.expectComplete() + netIn.expectCancellation() + } + "after receiving regular close frame when idle (but some data was exchanged before)" in new ServerTestSetup { + val msg = "äbcdef€\uffff" + val input = frame(Opcode.Text, ByteString(msg, "UTF-8"), fin = true, mask = true) + + // send at least one regular frame to trigger #19340 afterwards + pushInput(input) + expectMessage(TextMessage.Strict(msg)) + + pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) + expectComplete(messageIn) + + netIn.expectNoMsg(100.millis) // especially the cancellation not yet + expectNoNetworkData() + messageOut.sendComplete() + expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() netIn.expectCancellation()