diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 9f2e56d5cc..25b29accc8 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -345,6 +345,7 @@ private[http] object HttpServerBluePrint { class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] ⇒ Unit) extends FlexiMerge[ByteString, FanInShape2[ResponseRenderingOutput, ByteString, ByteString]](new FanInShape2("websocketMerge"), OperationAttributes.name("websocketMerge")) { def createMergeLogic(s: FanInShape2[ResponseRenderingOutput, ByteString, ByteString]): MergeLogic[ByteString] = new MergeLogic[ByteString] { + var websocketHandlerWasInstalled: Boolean = false def httpIn = s.in0 def wsIn = s.in1 @@ -358,23 +359,20 @@ private[http] object HttpServerBluePrint { ctx.emit(responseBytes) installHandler(handlerFlow) ctx.changeCompletionHandling(defaultCompletionHandling) + websocketHandlerWasInstalled = true websocket } } - // Completion handling that installs a dummy handler, to make sure no processors leak because they have - // never been subscribed to, see #17494. - override def initialCompletionHandling: CompletionHandling = - CompletionHandling( - onUpstreamFinish = { (ctx, in) ⇒ installDummyHandler(); defaultCompletionHandling.onUpstreamFinish(ctx, in) }, - onUpstreamFailure = { (ctx, in, t) ⇒ installDummyHandler(); defaultCompletionHandling.onUpstreamFailure(ctx, in, t) }) - - def installDummyHandler(): Unit = installHandler(Flow[FrameEvent]) - def websocket: State[_] = State[ByteString](Read(wsIn)) { (ctx, in, bytes) ⇒ ctx.emit(bytes) SameState } + + override def postStop(): Unit = if (!websocketHandlerWasInstalled) installDummyHandler() + // Install a dummy handler to make sure no processors leak because they have + // never been subscribed to, see #17494 and #17551. + def installDummyHandler(): Unit = installHandler(Flow[FrameEvent]) } } /** A merge for two streams that just forwards all elements and closes the connection when the first input closes. */