diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 873d59bf83..c35e60317d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -150,7 +150,7 @@ private[http] object HttpServerBluePrint { // protocol routing val protocolRouter = b.add(WebsocketSwitchRouter) - val protocolMerge = b.add(new WebsocketMerge(ws.installHandler, settings.websocketRandomFactory)) + val protocolMerge = b.add(new WebsocketMerge(ws.installHandler, settings.websocketRandomFactory, log)) protocolRouter.out0 ~> http ~> protocolMerge.in0 protocolRouter.out1 ~> websocket ~> protocolMerge.in1 @@ -360,7 +360,7 @@ private[http] object HttpServerBluePrint { } } - private class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] ⇒ Unit, websocketRandomFactory: () ⇒ Random) extends GraphStage[FanInShape2[ResponseRenderingOutput, ByteString, ByteString]] { + private class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] ⇒ Unit, websocketRandomFactory: () ⇒ Random, log: LoggingAdapter) extends GraphStage[FanInShape2[ResponseRenderingOutput, ByteString, ByteString]] { private val httpIn = Inlet[ResponseRenderingOutput]("httpIn") private val wsIn = Inlet[ByteString]("wsIn") private val out = Outlet[ByteString]("out") @@ -389,7 +389,7 @@ private[http] object HttpServerBluePrint { val frameHandler = handlerFlow match { case Left(frameHandler) ⇒ frameHandler case Right(messageHandler) ⇒ - Websocket.stack(serverSide = true, maskingRandomFactory = websocketRandomFactory).join(messageHandler) + Websocket.stack(serverSide = true, maskingRandomFactory = websocketRandomFactory, log = log).join(messageHandler) } installHandler(frameHandler) websocketHandlerWasInstalled = true diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala index 841d1fae21..d9d69474d1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala @@ -4,6 +4,7 @@ package akka.http.impl.engine.ws +import akka.event.LoggingAdapter import akka.stream.scaladsl.Flow import scala.concurrent.duration.FiniteDuration import akka.stream.stage._ @@ -17,7 +18,7 @@ import akka.http.impl.engine.ws.FrameHandler.UserHandlerErredOut * * INTERNAL API */ -private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration) extends StatefulStage[FrameOutHandler.Input, FrameStart] { +private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration, log: LoggingAdapter) extends StatefulStage[FrameOutHandler.Input, FrameStart] { def initial: StageState[AnyRef, FrameStart] = Idle def closeTimeout: Timestamp = Timestamp.now + _closeTimeout @@ -43,7 +44,8 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu case UserHandlerCompleted ⇒ become(new WaitingForPeerCloseFrame()) ctx.push(FrameEvent.closeFrame(Protocol.CloseCodes.Regular)) - case UserHandlerErredOut(_) ⇒ + case UserHandlerErredOut(e) ⇒ + log.error(e, s"Websocket handler failed with ${e.getMessage}") become(new WaitingForPeerCloseFrame()) ctx.push(FrameEvent.closeFrame(Protocol.CloseCodes.UnexpectedCondition, "internal error")) case Tick ⇒ ctx.pull() // ignore @@ -60,9 +62,12 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu */ private class WaitingForUserHandlerClosed(closeFrame: FrameStart) extends CompletionHandlingState { def onPush(elem: AnyRef, ctx: Context[FrameStart]): SyncDirective = elem match { - case UserHandlerCompleted | UserHandlerErredOut(_) ⇒ sendOutLastFrame(ctx) + case UserHandlerCompleted ⇒ sendOutLastFrame(ctx) + case UserHandlerErredOut(e) ⇒ + log.error(e, s"Websocket handler failed while waiting for handler completion with ${e.getMessage}") + sendOutLastFrame(ctx) case start: FrameStart ⇒ ctx.push(start) - case _ ⇒ ctx.pull() // ignore + case _ ⇒ ctx.pull() // ignore } def sendOutLastFrame(ctx: Context[FrameStart]): SyncDirective = @@ -140,6 +145,6 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu private[http] object FrameOutHandler { type Input = AnyRef - def create(serverSide: Boolean, closeTimeout: FiniteDuration): Flow[Input, FrameStart, Unit] = - Flow[Input].transform(() ⇒ new FrameOutHandler(serverSide, closeTimeout)) + def create(serverSide: Boolean, closeTimeout: FiniteDuration, log: LoggingAdapter): Flow[Input, FrameStart, Unit] = + Flow[Input].transform(() ⇒ new FrameOutHandler(serverSide, closeTimeout, log)) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index 25de84b934..3f74bf029f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -6,6 +6,7 @@ package akka.http.impl.engine.ws import java.util.Random +import akka.event.LoggingAdapter import akka.util.ByteString import scala.concurrent.duration._ @@ -30,9 +31,10 @@ private[http] object Websocket { */ def stack(serverSide: Boolean, maskingRandomFactory: () ⇒ Random, - closeTimeout: FiniteDuration = 3.seconds): BidiFlow[FrameEvent, Message, Message, FrameEvent, Unit] = + closeTimeout: FiniteDuration = 3.seconds, + log: LoggingAdapter): BidiFlow[FrameEvent, Message, Message, FrameEvent, Unit] = masking(serverSide, maskingRandomFactory) atop - frameHandling(serverSide, closeTimeout) atop + frameHandling(serverSide, closeTimeout, log) atop messageAPI(serverSide, closeTimeout) /** The lowest layer that implements the binary protocol */ @@ -52,10 +54,11 @@ private[http] object Websocket { * from frames, decoding text messages, close handling, etc. */ def frameHandling(serverSide: Boolean = true, - closeTimeout: FiniteDuration): BidiFlow[FrameEvent, FrameHandler.Output, FrameOutHandler.Input, FrameStart, Unit] = + closeTimeout: FiniteDuration, + log: LoggingAdapter): BidiFlow[FrameEvent, FrameHandler.Output, FrameOutHandler.Input, FrameStart, Unit] = BidiFlow.wrap( FrameHandler.create(server = serverSide), - FrameOutHandler.create(serverSide, closeTimeout))(Keep.none) + FrameOutHandler.create(serverSide, closeTimeout, log))(Keep.none) .named("ws-frame-handling") /** diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala index 4fa8836efe..b694829f15 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala @@ -37,7 +37,7 @@ object WebsocketClientBlueprint { log: LoggingAdapter): Http.WebsocketClientLayer = (simpleTls.atopMat(handshake(request, settings, log))(Keep.right) atop Websocket.framing atop - Websocket.stack(serverSide = false, maskingRandomFactory = settings.websocketRandomFactory)).reversed + Websocket.stack(serverSide = false, maskingRandomFactory = settings.websocketRandomFactory, log = log)).reversed /** * A bidi flow that injects and inspects the WS handshake and then goes out of the way. This BidiFlow diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 4004456ac1..7e8b4b3ff9 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -829,7 +829,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { Source(netIn) .via(printEvent("netIn")) .transform(() ⇒ new FrameEventParser) - .via(Websocket.stack(serverSide, maskingRandomFactory = Randoms.SecureRandomInstances, closeTimeout = closeTimeout).join(messageHandler)) + .via(Websocket.stack(serverSide, maskingRandomFactory = Randoms.SecureRandomInstances, closeTimeout = closeTimeout, log = system.log).join(messageHandler)) .via(printEvent("frameRendererIn")) .transform(() ⇒ new FrameEventRenderer) .via(printEvent("frameRendererOut"))