=htc log error if user handler fails

This commit is contained in:
Johannes Rudolph 2015-10-22 11:15:43 +02:00
parent abe0123a92
commit a283f43d1b
5 changed files with 23 additions and 15 deletions

View file

@ -150,7 +150,7 @@ private[http] object HttpServerBluePrint {
// protocol routing
val protocolRouter = b.add(WebsocketSwitchRouter)
val protocolMerge = b.add(new WebsocketMerge(ws.installHandler, settings.websocketRandomFactory))
val protocolMerge = b.add(new WebsocketMerge(ws.installHandler, settings.websocketRandomFactory, log))
protocolRouter.out0 ~> http ~> protocolMerge.in0
protocolRouter.out1 ~> websocket ~> protocolMerge.in1
@ -360,7 +360,7 @@ private[http] object HttpServerBluePrint {
}
}
private class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] Unit, websocketRandomFactory: () Random) extends GraphStage[FanInShape2[ResponseRenderingOutput, ByteString, ByteString]] {
private class WebsocketMerge(installHandler: Flow[FrameEvent, FrameEvent, Any] Unit, websocketRandomFactory: () Random, log: LoggingAdapter) extends GraphStage[FanInShape2[ResponseRenderingOutput, ByteString, ByteString]] {
private val httpIn = Inlet[ResponseRenderingOutput]("httpIn")
private val wsIn = Inlet[ByteString]("wsIn")
private val out = Outlet[ByteString]("out")
@ -389,7 +389,7 @@ private[http] object HttpServerBluePrint {
val frameHandler = handlerFlow match {
case Left(frameHandler) frameHandler
case Right(messageHandler)
Websocket.stack(serverSide = true, maskingRandomFactory = websocketRandomFactory).join(messageHandler)
Websocket.stack(serverSide = true, maskingRandomFactory = websocketRandomFactory, log = log).join(messageHandler)
}
installHandler(frameHandler)
websocketHandlerWasInstalled = true

View file

@ -4,6 +4,7 @@
package akka.http.impl.engine.ws
import akka.event.LoggingAdapter
import akka.stream.scaladsl.Flow
import scala.concurrent.duration.FiniteDuration
import akka.stream.stage._
@ -17,7 +18,7 @@ import akka.http.impl.engine.ws.FrameHandler.UserHandlerErredOut
*
* INTERNAL API
*/
private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration) extends StatefulStage[FrameOutHandler.Input, FrameStart] {
private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration, log: LoggingAdapter) extends StatefulStage[FrameOutHandler.Input, FrameStart] {
def initial: StageState[AnyRef, FrameStart] = Idle
def closeTimeout: Timestamp = Timestamp.now + _closeTimeout
@ -43,7 +44,8 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu
case UserHandlerCompleted
become(new WaitingForPeerCloseFrame())
ctx.push(FrameEvent.closeFrame(Protocol.CloseCodes.Regular))
case UserHandlerErredOut(_)
case UserHandlerErredOut(e)
log.error(e, s"Websocket handler failed with ${e.getMessage}")
become(new WaitingForPeerCloseFrame())
ctx.push(FrameEvent.closeFrame(Protocol.CloseCodes.UnexpectedCondition, "internal error"))
case Tick ctx.pull() // ignore
@ -60,9 +62,12 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu
*/
private class WaitingForUserHandlerClosed(closeFrame: FrameStart) extends CompletionHandlingState {
def onPush(elem: AnyRef, ctx: Context[FrameStart]): SyncDirective = elem match {
case UserHandlerCompleted | UserHandlerErredOut(_) sendOutLastFrame(ctx)
case UserHandlerCompleted sendOutLastFrame(ctx)
case UserHandlerErredOut(e)
log.error(e, s"Websocket handler failed while waiting for handler completion with ${e.getMessage}")
sendOutLastFrame(ctx)
case start: FrameStart ctx.push(start)
case _ ctx.pull() // ignore
case _ ctx.pull() // ignore
}
def sendOutLastFrame(ctx: Context[FrameStart]): SyncDirective =
@ -140,6 +145,6 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu
private[http] object FrameOutHandler {
type Input = AnyRef
def create(serverSide: Boolean, closeTimeout: FiniteDuration): Flow[Input, FrameStart, Unit] =
Flow[Input].transform(() new FrameOutHandler(serverSide, closeTimeout))
def create(serverSide: Boolean, closeTimeout: FiniteDuration, log: LoggingAdapter): Flow[Input, FrameStart, Unit] =
Flow[Input].transform(() new FrameOutHandler(serverSide, closeTimeout, log))
}

View file

@ -6,6 +6,7 @@ package akka.http.impl.engine.ws
import java.util.Random
import akka.event.LoggingAdapter
import akka.util.ByteString
import scala.concurrent.duration._
@ -30,9 +31,10 @@ private[http] object Websocket {
*/
def stack(serverSide: Boolean,
maskingRandomFactory: () Random,
closeTimeout: FiniteDuration = 3.seconds): BidiFlow[FrameEvent, Message, Message, FrameEvent, Unit] =
closeTimeout: FiniteDuration = 3.seconds,
log: LoggingAdapter): BidiFlow[FrameEvent, Message, Message, FrameEvent, Unit] =
masking(serverSide, maskingRandomFactory) atop
frameHandling(serverSide, closeTimeout) atop
frameHandling(serverSide, closeTimeout, log) atop
messageAPI(serverSide, closeTimeout)
/** The lowest layer that implements the binary protocol */
@ -52,10 +54,11 @@ private[http] object Websocket {
* from frames, decoding text messages, close handling, etc.
*/
def frameHandling(serverSide: Boolean = true,
closeTimeout: FiniteDuration): BidiFlow[FrameEvent, FrameHandler.Output, FrameOutHandler.Input, FrameStart, Unit] =
closeTimeout: FiniteDuration,
log: LoggingAdapter): BidiFlow[FrameEvent, FrameHandler.Output, FrameOutHandler.Input, FrameStart, Unit] =
BidiFlow.wrap(
FrameHandler.create(server = serverSide),
FrameOutHandler.create(serverSide, closeTimeout))(Keep.none)
FrameOutHandler.create(serverSide, closeTimeout, log))(Keep.none)
.named("ws-frame-handling")
/**

View file

@ -37,7 +37,7 @@ object WebsocketClientBlueprint {
log: LoggingAdapter): Http.WebsocketClientLayer =
(simpleTls.atopMat(handshake(request, settings, log))(Keep.right) atop
Websocket.framing atop
Websocket.stack(serverSide = false, maskingRandomFactory = settings.websocketRandomFactory)).reversed
Websocket.stack(serverSide = false, maskingRandomFactory = settings.websocketRandomFactory, log = log)).reversed
/**
* A bidi flow that injects and inspects the WS handshake and then goes out of the way. This BidiFlow

View file

@ -829,7 +829,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
Source(netIn)
.via(printEvent("netIn"))
.transform(() new FrameEventParser)
.via(Websocket.stack(serverSide, maskingRandomFactory = Randoms.SecureRandomInstances, closeTimeout = closeTimeout).join(messageHandler))
.via(Websocket.stack(serverSide, maskingRandomFactory = Randoms.SecureRandomInstances, closeTimeout = closeTimeout, log = system.log).join(messageHandler))
.via(printEvent("frameRendererIn"))
.transform(() new FrameEventRenderer)
.via(printEvent("frameRendererOut"))