Merge pull request #17287 from spray/w/17129-provide-error

Improve error message if serverLayer is reused.
This commit is contained in:
Roland Kuhn 2015-04-24 11:44:11 +02:00
commit 995e659310
6 changed files with 36 additions and 29 deletions

View file

@ -120,12 +120,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Unit]
/**
* Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]].
* Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]]
* can only be materialized once.
*/
def serverLayer()(implicit mat: FlowMaterializer): ServerLayer = serverLayer(ServerSettings(system))
/**
* Constructs a [[ServerLayer]] stage using the given [[ServerSettings]].
* Constructs a [[ServerLayer]] stage using the given [[ServerSettings]]. The returned [[BidiFlow]] isn't reusable and
* can only be materialized once.
*/
def serverLayer(settings: ServerSettings,
log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): ServerLayer =

View file

@ -4,28 +4,30 @@
package akka.http.engine.server
import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic }
import akka.stream.scaladsl._
import akka.http.engine.ws._
import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic }
import org.reactivestreams.{ Subscriber, Publisher }
import scala.util.control.NonFatal
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.actor.{ ActorRef, Props }
import akka.stream.stage.PushPullStage
import akka.stream.OperationAttributes._
import akka.stream.scaladsl._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.PushPullStage
import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic }
import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic }
import akka.http.engine.parsing._
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.engine.TokenSourceActor
import akka.http.model._
import akka.http.util._
import ParserOutput._
import akka.http.engine.ws.Websocket.{ SwitchToWebsocketToken }
import akka.http.util._
import akka.http.engine.ws._
import Websocket.SwitchToWebsocketToken
/**
* INTERNAL API
@ -49,13 +51,13 @@ private[http] object HttpServerBluePrint {
val responseRendererFactory = new HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint, log, Some(ws))
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source.actorPublisher[OneHundredContinue.type] {
val oneHundredContinueSource = StreamUtils.oneTimeSource(Source.actorPublisher[OneHundredContinue.type] {
Props {
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)
actor
}
}
}, errorMsg = "Http.serverLayer is currently not reusable. You need to create a new instance for each materialization.")
val requestParsingFlow = Flow[ByteString].transform(()
// each connection uses a single (private) request parser instance for all its requests
@ -93,8 +95,8 @@ private[http] object HttpServerBluePrint {
.flatten(FlattenStrategy.concat)
.via(Flow[ByteString].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b
(requestParsing, renderer)
FlowGraph.partial(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ()) { implicit b
(requestParsing, renderer, oneHundreds)
import FlowGraph.Implicits._
val bypassFanout = b.add(Broadcast[RequestOutput](2).named("bypassFanout"))
@ -109,7 +111,7 @@ private[http] object HttpServerBluePrint {
val requestsIn = (bypassFanout.out(0) ~> requestPreparation).outlet
bypassFanout.out(1) ~> bypass ~> bypassInput
oneHundredContinueSource ~> bypassOneHundredContinueInput
oneHundreds ~> bypassOneHundredContinueInput
val http = FlowShape(requestParsing.inlet, renderer.outlet)
// Websocket pipeline

View file

@ -28,13 +28,18 @@ private[http] object Websocket {
closeTimeout: FiniteDuration = 3.seconds): Flow[FrameEvent, FrameEvent, Unit] = {
/** Completes this branch of the flow if no more messages are expected and converts close codes into errors */
class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] {
var inMessage = false
def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match {
case PeerClosed(code, reason)
if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Peer closed connection with code $code"))
else if (inMessage) ctx.fail(new ProtocolException(s"Truncated message, peer closed connection in the middle of message."))
else ctx.finish()
case ActivelyCloseWithCode(code, reason)
if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Closing connection with error code $code"))
else ctx.finish()
else ctx.fail(new IllegalStateException("Regular close from FrameHandler is unexpected"))
case x: MessageDataPart
inMessage = !x.last
ctx.push(x)
case x ctx.push(x)
}
}

View file

@ -186,11 +186,11 @@ private[http] object StreamUtils {
/**
* Returns a source that can only be used once for testing purposes.
*/
def oneTimeSource[T, Mat](other: Source[T, Mat]): Source[T, Mat] = {
def oneTimeSource[T, Mat](other: Source[T, Mat], errorMsg: String = "One time source can only be instantiated once"): Source[T, Mat] = {
val onlyOnceFlag = new AtomicBoolean(false)
other.map { elem
other.mapMaterialized { elem
if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true))
throw new IllegalStateException("One time source can only be instantiated once")
throw new IllegalStateException(errorMsg)
elem
}
}

View file

@ -30,11 +30,6 @@ object TestServer extends App {
case Some(upgrade) upgrade.handleMessages(echoWebsocketService) // needed for running the autobahn test suite
case None HttpResponse(400, entity = "Not a valid websocket request!")
}
case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _)
req.header[UpgradeToWebsocket] match {
case Some(upgrade) upgrade.handleMessages(greeterWebsocketService)
case None HttpResponse(400, entity = "Not a valid websocket request!")
}
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")

View file

@ -477,8 +477,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
netOut.expectComplete()
}
"after receiving regular close frame when fragmented message is still open" in pendingUntilFixed {
pending
"after receiving regular close frame when fragmented message is still open" in {
new ServerTestSetup {
netOutSub.request(10)
messageInSub.request(10)
@ -496,7 +495,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
inSubscriber.expectNext(outData)
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
messageIn.expectComplete()
// This is arguable: we could also just fail the subStream but complete the main message stream regularly.
// However, truncating an ongoing message by closing without sending a `Continuation(fin = true)` first
// could be seen as something being amiss.
messageIn.expectError()
inSubscriber.expectError()
// truncation of open message