Merge pull request #20089 from agolubev/19834-replace-PrepareForUserHandler-with-GraphStage-agolubev

19834 replace prepare for user handler with graph stage agolubev
This commit is contained in:
Patrik Nordwall 2016-03-31 11:30:41 +02:00
commit 23be2ee751

View file

@ -58,29 +58,38 @@ private[http] object WebSocket {
FrameOutHandler.create(serverSide, closeTimeout, log))
.named("ws-frame-handling")
/* Completes this branch of the flow if no more messages are expected and converts close codes into errors */
private object PrepareForUserHandler extends GraphStage[FlowShape[MessagePart, MessagePart]] {
val in = Inlet[MessagePart]("prepareForUserHandler.in")
val out = Outlet[MessagePart]("prepareForUserHandler.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = Attributes.name("PrepareForUserHandler")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
var inMessage = false
override def onPush():Unit = grab(in) match {
case PeerClosed(code, reason)
if (code.exists(Protocol.CloseCodes.isError)) failStage(new PeerClosedConnectionException(code.get, reason))
else if (inMessage) failStage(new ProtocolException(s"Truncated message, peer closed connection in the middle of message."))
else completeStage()
case ActivelyCloseWithCode(code, reason)
if (code.exists(Protocol.CloseCodes.isError)) failStage(new ProtocolException(s"Closing connection with error code $code"))
else failStage(new IllegalStateException("Regular close from FrameHandler is unexpected"))
case x: MessageDataPart
inMessage = !x.last
push(out,x)
case x push(out,x)
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**
* The layer that provides the high-level user facing API on top of frame handling.
*/
def messageAPI(serverSide: Boolean,
closeTimeout: FiniteDuration): BidiFlow[FrameHandler.Output, Message, Message, FrameOutHandler.Input, NotUsed] = {
/* Completes this branch of the flow if no more messages are expected and converts close codes into errors */
class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] {
var inMessage = false
def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match {
case PeerClosed(code, reason)
if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new PeerClosedConnectionException(code.get, reason))
else if (inMessage) ctx.fail(new ProtocolException(s"Truncated message, peer closed connection in the middle of message."))
else ctx.finish()
case ActivelyCloseWithCode(code, reason)
if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Closing connection with error code $code"))
else ctx.fail(new IllegalStateException("Regular close from FrameHandler is unexpected"))
case x: MessageDataPart
inMessage = !x.last
ctx.push(x)
case x ctx.push(x)
}
}
/* Collects user-level API messages from MessageDataParts */
val collectMessage: Flow[MessageDataPart, Message, NotUsed] =
Flow[MessageDataPart]
@ -113,7 +122,7 @@ private[http] object WebSocket {
def prepareMessages: Flow[MessagePart, Message, NotUsed] =
Flow[MessagePart]
.transform(() new PrepareForUserHandler)
.via(PrepareForUserHandler)
.splitWhen(_.isMessageEnd) // FIXME using splitAfter from #16885 would simplify protocol a lot
.collect {
case m: MessageDataPart m