diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 5c158e5ef7..76382318af 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -120,12 +120,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Unit] /** - * Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. + * Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]] + * can only be materialized once. */ def serverLayer()(implicit mat: FlowMaterializer): ServerLayer = serverLayer(ServerSettings(system)) /** - * Constructs a [[ServerLayer]] stage using the given [[ServerSettings]]. + * Constructs a [[ServerLayer]] stage using the given [[ServerSettings]]. The returned [[BidiFlow]] isn't reusable and + * can only be materialized once. */ def serverLayer(settings: ServerSettings, log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): ServerLayer = diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala index dad93cde5e..ce2d8239cf 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala @@ -4,28 +4,30 @@ package akka.http.engine.server -import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic } -import akka.stream.scaladsl._ - -import akka.http.engine.ws._ -import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic } import org.reactivestreams.{ Subscriber, Publisher } import scala.util.control.NonFatal import akka.util.ByteString import akka.event.LoggingAdapter import akka.actor.{ ActorRef, Props } -import akka.stream.stage.PushPullStage -import akka.stream.OperationAttributes._ -import akka.stream.scaladsl._ + import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.stage.PushPullStage + +import akka.stream.scaladsl.FlexiMerge.{ ReadAny, MergeLogic } +import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic } + import akka.http.engine.parsing._ import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.TokenSourceActor import akka.http.model._ -import akka.http.util._ import ParserOutput._ -import akka.http.engine.ws.Websocket.{ SwitchToWebsocketToken } + +import akka.http.util._ + +import akka.http.engine.ws._ +import Websocket.SwitchToWebsocketToken /** * INTERNAL API @@ -49,13 +51,13 @@ private[http] object HttpServerBluePrint { val responseRendererFactory = new HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint, log, Some(ws)) @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 - val oneHundredContinueSource = Source.actorPublisher[OneHundredContinue.type] { + val oneHundredContinueSource = StreamUtils.oneTimeSource(Source.actorPublisher[OneHundredContinue.type] { Props { val actor = new TokenSourceActor(OneHundredContinue) oneHundredContinueRef = Some(actor.context.self) actor } - } + }, errorMsg = "Http.serverLayer is currently not reusable. You need to create a new instance for each materialization.") val requestParsingFlow = Flow[ByteString].transform(() ⇒ // each connection uses a single (private) request parser instance for all its requests @@ -93,8 +95,8 @@ private[http] object HttpServerBluePrint { .flatten(FlattenStrategy.concat) .via(Flow[ByteString].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger")) - FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b ⇒ - (requestParsing, renderer) ⇒ + FlowGraph.partial(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒ + (requestParsing, renderer, oneHundreds) ⇒ import FlowGraph.Implicits._ val bypassFanout = b.add(Broadcast[RequestOutput](2).named("bypassFanout")) @@ -109,7 +111,7 @@ private[http] object HttpServerBluePrint { val requestsIn = (bypassFanout.out(0) ~> requestPreparation).outlet bypassFanout.out(1) ~> bypass ~> bypassInput - oneHundredContinueSource ~> bypassOneHundredContinueInput + oneHundreds ~> bypassOneHundredContinueInput val http = FlowShape(requestParsing.inlet, renderer.outlet) // Websocket pipeline diff --git a/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala index e19827d0d1..fdf758a27b 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/ws/Websocket.scala @@ -28,13 +28,18 @@ private[http] object Websocket { closeTimeout: FiniteDuration = 3.seconds): Flow[FrameEvent, FrameEvent, Unit] = { /** Completes this branch of the flow if no more messages are expected and converts close codes into errors */ class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] { + var inMessage = false def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match { case PeerClosed(code, reason) ⇒ if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Peer closed connection with code $code")) + else if (inMessage) ctx.fail(new ProtocolException(s"Truncated message, peer closed connection in the middle of message.")) else ctx.finish() case ActivelyCloseWithCode(code, reason) ⇒ if (code.exists(Protocol.CloseCodes.isError)) ctx.fail(new ProtocolException(s"Closing connection with error code $code")) - else ctx.finish() + else ctx.fail(new IllegalStateException("Regular close from FrameHandler is unexpected")) + case x: MessageDataPart ⇒ + inMessage = !x.last + ctx.push(x) case x ⇒ ctx.push(x) } } diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index a8d642fe3b..1288680dca 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -186,11 +186,11 @@ private[http] object StreamUtils { /** * Returns a source that can only be used once for testing purposes. */ - def oneTimeSource[T, Mat](other: Source[T, Mat]): Source[T, Mat] = { + def oneTimeSource[T, Mat](other: Source[T, Mat], errorMsg: String = "One time source can only be instantiated once"): Source[T, Mat] = { val onlyOnceFlag = new AtomicBoolean(false) - other.map { elem ⇒ + other.mapMaterialized { elem ⇒ if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true)) - throw new IllegalStateException("One time source can only be instantiated once") + throw new IllegalStateException(errorMsg) elem } } diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index 07d29179b0..73aa641eb1 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -30,11 +30,6 @@ object TestServer extends App { case Some(upgrade) ⇒ upgrade.handleMessages(echoWebsocketService) // needed for running the autobahn test suite case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") } - case req @ HttpRequest(GET, Uri.Path("/ws-greeter"), _, _, _) ⇒ - req.header[UpgradeToWebsocket] match { - case Some(upgrade) ⇒ upgrade.handleMessages(greeterWebsocketService) - case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") - } case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ index case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!") case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!") diff --git a/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala index 0928ca0eda..0a97eb662c 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala @@ -477,8 +477,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) netOut.expectComplete() } - "after receiving regular close frame when fragmented message is still open" in pendingUntilFixed { - pending + "after receiving regular close frame when fragmented message is still open" in { new ServerTestSetup { netOutSub.request(10) messageInSub.request(10) @@ -496,7 +495,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { inSubscriber.expectNext(outData) pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) - messageIn.expectComplete() + + // This is arguable: we could also just fail the subStream but complete the main message stream regularly. + // However, truncating an ongoing message by closing without sending a `Continuation(fin = true)` first + // could be seen as something being amiss. + messageIn.expectError() inSubscriber.expectError() // truncation of open message