From e1922dad45690d43913e2c61eeeca316d1d98bdf Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 12 Jan 2016 20:28:48 +0100 Subject: [PATCH] #19417 fix empty.head error in WebSocket termination also fixes related issues in HTTP Server and OutgoingConnection and addresses one flaw in Websocket.BypassRouter --- .../client/OutgoingConnectionBlueprint.scala | 4 +- .../akka/http/impl/engine/ws/Websocket.scala | 4 +- .../impl/engine/ws/BypassRouterSpec.scala | 50 +++++++++++++++++++ 3 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 akka-http-core/src/test/scala/akka/http/impl/engine/ws/BypassRouterSpec.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 2bbd8df266..a597aec773 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -75,8 +75,8 @@ private[http] object OutgoingConnectionBlueprint { case (Seq(MessageEnd), remaining) ⇒ SubSource.kill(remaining) false - case _ ⇒ - true + case (seq, _) ⇒ + seq.nonEmpty } .map { case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, _)), entityParts) ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index 0c5874a292..a2d734471a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -62,7 +62,7 @@ private[http] object Websocket { */ def messageAPI(serverSide: Boolean, closeTimeout: FiniteDuration): BidiFlow[FrameHandler.Output, Message, Message, FrameOutHandler.Input, Unit] = { - /** Completes this branch of the flow if no more messages are expected and converts close codes into errors */ + /* Completes this branch of the flow if no more messages are expected and converts close codes into errors */ class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] { var inMessage = false def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match { @@ -172,7 +172,7 @@ private[http] object Websocket { } } }) - val pullIn = () ⇒ pull(in) + val pullIn = () ⇒ tryPull(in) setHandler(bypass, eagerTerminateOutput) setHandler(user, ignoreTerminateOutput) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/BypassRouterSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/BypassRouterSpec.scala new file mode 100644 index 0000000000..89be18a5da --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/BypassRouterSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.http.impl.engine.ws + +import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import com.typesafe.config.ConfigFactory +import com.typesafe.config.Config +import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.ws._ +import akka.http.scaladsl._ +import akka.stream.scaladsl._ +import akka.stream._ +import scala.concurrent.duration._ +import org.scalatest.concurrent.ScalaFutures +import org.scalactic.ConversionCheckedTripleEquals +import akka.stream.testkit.Utils + +class BypassRouterSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode = off") with ScalaFutures with ConversionCheckedTripleEquals { + + implicit val patience = PatienceConfig(3.seconds) + import system.dispatcher + implicit val materializer = ActorMaterializer() + + "BypassRouter" must { + + "work without double pull-ing some ports" in Utils.assertAllStagesStopped { + val bindingFuture = Http().bindAndHandleSync({ + case HttpRequest(_, _, headers, _, _) ⇒ + val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get + upgrade.handleMessages(Flow.apply, None) + }, interface = "localhost", port = 8080) + val binding = Await.result(bindingFuture, 3.seconds) + + val N = 100 + val (response, count) = Http().singleWebsocketRequest( + WebsocketRequest("ws://127.0.0.1:8080"), + Flow.fromSinkAndSourceMat( + Sink.fold(0)((n, _: Message) ⇒ n + 1), + Source.repeat(TextMessage("hello")).take(N))(Keep.left)) + + count.futureValue should ===(N) + binding.unbind() + } + + } + +}