=htc #16887 properly report truncated WS messages to user handler
This commit is contained in:
parent
5e459688d5
commit
70a147a581
2 changed files with 12 additions and 4 deletions
|
|
@ -28,13 +28,18 @@ private[http] object Websocket {
|
|||
closeTimeout: FiniteDuration = 3.seconds): Flow[FrameEvent, FrameEvent, Unit] = {
|
||||
/** Completes this branch of the flow if no more messages are expected and converts close codes into errors */
|
||||
class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] {
|
||||
var inMessage = false
|
||||
def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match {
|
||||
case PeerClosed(code, reason) ⇒
|
||||
if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Peer closed connection with code $code"))
|
||||
else if (inMessage) ctx.fail(new ProtocolException(s"Truncated message, peer closed connection in the middle of message."))
|
||||
else ctx.finish()
|
||||
case ActivelyCloseWithCode(code, reason) ⇒
|
||||
if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Closing connection with error code $code"))
|
||||
else ctx.finish()
|
||||
else ctx.fail(new IllegalStateException("Regular close from FrameHandler is unexpected"))
|
||||
case x: MessageDataPart ⇒
|
||||
inMessage = !x.last
|
||||
ctx.push(x)
|
||||
case x ⇒ ctx.push(x)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -477,8 +477,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
|||
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
||||
netOut.expectComplete()
|
||||
}
|
||||
"after receiving regular close frame when fragmented message is still open" in pendingUntilFixed {
|
||||
pending
|
||||
"after receiving regular close frame when fragmented message is still open" in {
|
||||
new ServerTestSetup {
|
||||
netOutSub.request(10)
|
||||
messageInSub.request(10)
|
||||
|
|
@ -496,7 +495,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
|||
inSubscriber.expectNext(outData)
|
||||
|
||||
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
||||
messageIn.expectComplete()
|
||||
|
||||
// This is arguable: we could also just fail the subStream but complete the main message stream regularly.
|
||||
// However, truncating an ongoing message by closing without sending a `Continuation(fin = true)` first
|
||||
// could be seen as something being amiss.
|
||||
messageIn.expectError()
|
||||
inSubscriber.expectError()
|
||||
// truncation of open message
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue