diff --git a/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala index e19827d0d1..fdf758a27b 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala @@ -28,13 +28,18 @@ private[http] object Websocket { closeTimeout: FiniteDuration = 3.seconds): Flow[FrameEvent, FrameEvent, Unit] = { /** Completes this branch of the flow if no more messages are expected and converts close codes into errors */ class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] { + var inMessage = false def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match { case PeerClosed(code, reason) ⇒ if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Peer closed connection with code $code")) + else if (inMessage) ctx.fail(new ProtocolException(s"Truncated message, peer closed connection in the middle of message.")) else ctx.finish() case ActivelyCloseWithCode(code, reason) ⇒ if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Closing connection with error code $code")) - else ctx.finish() + else ctx.fail(new IllegalStateException("Regular close from FrameHandler is unexpected")) + case x: MessageDataPart ⇒ + inMessage = !x.last + ctx.push(x) case x ⇒ ctx.push(x) } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala index c47ec4d5d9..bfd054f73a 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala @@ -477,8 +477,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() } - "after receiving regular close frame when fragmented message is still open" in pendingUntilFixed { - pending + "after receiving regular close frame when fragmented message is still open" in { new ServerTestSetup { netOutSub.request(10) messageInSub.request(10) @@ -496,7 +495,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { inSubscriber.expectNext(outData) pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + + // This is arguable: we could also just fail the subStream but complete the main message stream regularly. + // However, truncating an ongoing message by closing without sending a `Continuation(fin = true)` first + // could be seen as something being amiss. + messageIn.expectError() inSubscriber.expectError() // truncation of open message