Merge pull request #17728 from spray/w/17551-fix-new-ActorInterpreter-leak
=htc #17551 fix leak because of never subscribed Websocket handler if connection is closed by peer
This commit is contained in:
commit
52f08f035a
1 changed files with 7 additions and 9 deletions
|
|
@ -345,6 +345,7 @@ private[http] object HttpServerBluePrint {
|
||||||
class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] ⇒ Unit) extends FlexiMerge[ByteString, FanInShape2[ResponseRenderingOutput, ByteString, ByteString]](new FanInShape2("websocketMerge"), OperationAttributes.name("websocketMerge")) {
|
class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] ⇒ Unit) extends FlexiMerge[ByteString, FanInShape2[ResponseRenderingOutput, ByteString, ByteString]](new FanInShape2("websocketMerge"), OperationAttributes.name("websocketMerge")) {
|
||||||
def createMergeLogic(s: FanInShape2[ResponseRenderingOutput, ByteString, ByteString]): MergeLogic[ByteString] =
|
def createMergeLogic(s: FanInShape2[ResponseRenderingOutput, ByteString, ByteString]): MergeLogic[ByteString] =
|
||||||
new MergeLogic[ByteString] {
|
new MergeLogic[ByteString] {
|
||||||
|
var websocketHandlerWasInstalled: Boolean = false
|
||||||
def httpIn = s.in0
|
def httpIn = s.in0
|
||||||
def wsIn = s.in1
|
def wsIn = s.in1
|
||||||
|
|
||||||
|
|
@ -358,23 +359,20 @@ private[http] object HttpServerBluePrint {
|
||||||
ctx.emit(responseBytes)
|
ctx.emit(responseBytes)
|
||||||
installHandler(handlerFlow)
|
installHandler(handlerFlow)
|
||||||
ctx.changeCompletionHandling(defaultCompletionHandling)
|
ctx.changeCompletionHandling(defaultCompletionHandling)
|
||||||
|
websocketHandlerWasInstalled = true
|
||||||
websocket
|
websocket
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Completion handling that installs a dummy handler, to make sure no processors leak because they have
|
|
||||||
// never been subscribed to, see #17494.
|
|
||||||
override def initialCompletionHandling: CompletionHandling =
|
|
||||||
CompletionHandling(
|
|
||||||
onUpstreamFinish = { (ctx, in) ⇒ installDummyHandler(); defaultCompletionHandling.onUpstreamFinish(ctx, in) },
|
|
||||||
onUpstreamFailure = { (ctx, in, t) ⇒ installDummyHandler(); defaultCompletionHandling.onUpstreamFailure(ctx, in, t) })
|
|
||||||
|
|
||||||
def installDummyHandler(): Unit = installHandler(Flow[FrameEvent])
|
|
||||||
|
|
||||||
def websocket: State[_] = State[ByteString](Read(wsIn)) { (ctx, in, bytes) ⇒
|
def websocket: State[_] = State[ByteString](Read(wsIn)) { (ctx, in, bytes) ⇒
|
||||||
ctx.emit(bytes)
|
ctx.emit(bytes)
|
||||||
SameState
|
SameState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def postStop(): Unit = if (!websocketHandlerWasInstalled) installDummyHandler()
|
||||||
|
// Install a dummy handler to make sure no processors leak because they have
|
||||||
|
// never been subscribed to, see #17494 and #17551.
|
||||||
|
def installDummyHandler(): Unit = installHandler(Flow[FrameEvent])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/** A merge for two streams that just forwards all elements and closes the connection when the first input closes. */
|
/** A merge for two streams that just forwards all elements and closes the connection when the first input closes. */
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue