diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 5c158e5ef7..76382318af 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -120,12 +120,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Unit] /** - * Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. + * Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]] + * can only be materialized once. */ def serverLayer()(implicit mat: FlowMaterializer): ServerLayer = serverLayer(ServerSettings(system)) /** - * Constructs a [[ServerLayer]] stage using the given [[ServerSettings]]. + * Constructs a [[ServerLayer]] stage using the given [[ServerSettings]]. The returned [[BidiFlow]] isn't reusable and + * can only be materialized once. */ def serverLayer(settings: ServerSettings, log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): ServerLayer = diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala index dad93cde5e..ce2d8239cf 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala @@ -4,28 +4,30 @@ package akka.http.engine.server -import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic } -import akka.stream.scaladsl._ - -import akka.http.engine.ws._ -import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic } import org.reactivestreams.{ Subscriber, Publisher } import scala.util.control.NonFatal import akka.util.ByteString import akka.event.LoggingAdapter import akka.actor.{ ActorRef, Props } -import akka.stream.stage.PushPullStage -import akka.stream.OperationAttributes._ -import akka.stream.scaladsl._ + import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.stage.PushPullStage + +import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic } +import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic } + import akka.http.engine.parsing._ import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.TokenSourceActor import akka.http.model._ -import akka.http.util._ import ParserOutput._ -import akka.http.engine.ws.Websocket.{ SwitchToWebsocketToken } + +import akka.http.util._ + +import akka.http.engine.ws._ +import Websocket.SwitchToWebsocketToken /** * INTERNAL API @@ -49,13 +51,13 @@ private[http] object HttpServerBluePrint { val responseRendererFactory = new HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint, log, Some(ws)) @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 - val oneHundredContinueSource = Source.actorPublisher[OneHundredContinue.type] { + val oneHundredContinueSource = StreamUtils.oneTimeSource(Source.actorPublisher[OneHundredContinue.type] { Props { val actor = new TokenSourceActor(OneHundredContinue) oneHundredContinueRef = Some(actor.context.self) actor } - } + }, errorMsg = "Http.serverLayer is currently not reusable. You need to create a new instance for each materialization.") val requestParsingFlow = Flow[ByteString].transform(() ⇒ // each connection uses a single (private) request parser instance for all its requests @@ -93,8 +95,8 @@ private[http] object HttpServerBluePrint { .flatten(FlattenStrategy.concat) .via(Flow[ByteString].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger")) - FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b ⇒ - (requestParsing, renderer) ⇒ + FlowGraph.partial(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒ + (requestParsing, renderer, oneHundreds) ⇒ import FlowGraph.Implicits._ val bypassFanout = b.add(Broadcast[RequestOutput](2).named("bypassFanout")) @@ -109,7 +111,7 @@ private[http] object HttpServerBluePrint { val requestsIn = (bypassFanout.out(0) ~> requestPreparation).outlet bypassFanout.out(1) ~> bypass ~> bypassInput - oneHundredContinueSource ~> bypassOneHundredContinueInput + oneHundreds ~> bypassOneHundredContinueInput val http = FlowShape(requestParsing.inlet, renderer.outlet) // Websocket pipeline diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index a8d642fe3b..1288680dca 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -186,11 +186,11 @@ private[http] object StreamUtils { /** * Returns a source that can only be used once for testing purposes. */ - def oneTimeSource[T, Mat](other: Source[T, Mat]): Source[T, Mat] = { + def oneTimeSource[T, Mat](other: Source[T, Mat], errorMsg: String = "One time source can only be instantiated once"): Source[T, Mat] = { val onlyOnceFlag = new AtomicBoolean(false) - other.map { elem ⇒ + other.mapMaterialized { elem ⇒ if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true)) - throw new IllegalStateException("One time source can only be instantiated once") + throw new IllegalStateException(errorMsg) elem } }