=htc #17129 improve error message that serverLayer cannot be reused

This commit is contained in:
Johannes Rudolph 2015-04-24 08:26:22 +02:00
parent 48b50b6a84
commit 5e459688d5
3 changed files with 24 additions and 20 deletions

View file

@ -120,12 +120,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Unit]
/**
* Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]].
* Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]]
* can only be materialized once.
*/
def serverLayer()(implicit mat: FlowMaterializer): ServerLayer = serverLayer(ServerSettings(system))
/**
* Constructs a [[ServerLayer]] stage using the given [[ServerSettings]].
* Constructs a [[ServerLayer]] stage using the given [[ServerSettings]]. The returned [[BidiFlow]] isn't reusable and
* can only be materialized once.
*/
def serverLayer(settings: ServerSettings,
log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): ServerLayer =

View file

@ -4,28 +4,30 @@
package akka.http.engine.server
import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic }
import akka.stream.scaladsl._
import akka.http.engine.ws._
import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic }
import org.reactivestreams.{ Subscriber, Publisher }
import scala.util.control.NonFatal
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.actor.{ ActorRef, Props }
import akka.stream.stage.PushPullStage
import akka.stream.OperationAttributes._
import akka.stream.scaladsl._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.PushPullStage
import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic }
import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic }
import akka.http.engine.parsing._
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.engine.TokenSourceActor
import akka.http.model._
import akka.http.util._
import ParserOutput._
import akka.http.engine.ws.Websocket.{ SwitchToWebsocketToken }
import akka.http.util._
import akka.http.engine.ws._
import Websocket.SwitchToWebsocketToken
/**
* INTERNAL API
@ -49,13 +51,13 @@ private[http] object HttpServerBluePrint {
val responseRendererFactory = new HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint, log, Some(ws))
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source.actorPublisher[OneHundredContinue.type] {
val oneHundredContinueSource = StreamUtils.oneTimeSource(Source.actorPublisher[OneHundredContinue.type] {
Props {
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)
actor
}
}
}, errorMsg = "Http.serverLayer is currently not reusable. You need to create a new instance for each materialization.")
val requestParsingFlow = Flow[ByteString].transform(()
// each connection uses a single (private) request parser instance for all its requests
@ -93,8 +95,8 @@ private[http] object HttpServerBluePrint {
.flatten(FlattenStrategy.concat)
.via(Flow[ByteString].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b
(requestParsing, renderer)
FlowGraph.partial(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ()) { implicit b
(requestParsing, renderer, oneHundreds)
import FlowGraph.Implicits._
val bypassFanout = b.add(Broadcast[RequestOutput](2).named("bypassFanout"))
@ -109,7 +111,7 @@ private[http] object HttpServerBluePrint {
val requestsIn = (bypassFanout.out(0) ~> requestPreparation).outlet
bypassFanout.out(1) ~> bypass ~> bypassInput
oneHundredContinueSource ~> bypassOneHundredContinueInput
oneHundreds ~> bypassOneHundredContinueInput
val http = FlowShape(requestParsing.inlet, renderer.outlet)
// Websocket pipeline

View file

@ -186,11 +186,11 @@ private[http] object StreamUtils {
/**
* Returns a source that can only be used once for testing purposes.
*/
def oneTimeSource[T, Mat](other: Source[T, Mat]): Source[T, Mat] = {
def oneTimeSource[T, Mat](other: Source[T, Mat], errorMsg: String = "One time source can only be instantiated once"): Source[T, Mat] = {
val onlyOnceFlag = new AtomicBoolean(false)
other.map { elem
other.mapMaterialized { elem
if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true))
throw new IllegalStateException("One time source can only be instantiated once")
throw new IllegalStateException(errorMsg)
elem
}
}