#19417 fix empty.head error in WebSocket termination

also fixes related issues in HTTP Server and OutgoingConnection and
addresses one flaw in Websocket.BypassRouter
This commit is contained in:
Roland Kuhn 2016-01-12 20:28:48 +01:00
parent ba282d2e3b
commit e1922dad45
3 changed files with 54 additions and 4 deletions

View file

@ -75,8 +75,8 @@ private[http] object OutgoingConnectionBlueprint {
case (Seq(MessageEnd), remaining)
SubSource.kill(remaining)
false
case _
true
case (seq, _)
seq.nonEmpty
}
.map {
case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, _)), entityParts)

View file

@ -62,7 +62,7 @@ private[http] object Websocket {
*/
def messageAPI(serverSide: Boolean,
closeTimeout: FiniteDuration): BidiFlow[FrameHandler.Output, Message, Message, FrameOutHandler.Input, Unit] = {
/** Completes this branch of the flow if no more messages are expected and converts close codes into errors */
/* Completes this branch of the flow if no more messages are expected and converts close codes into errors */
class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] {
var inMessage = false
def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match {
@ -172,7 +172,7 @@ private[http] object Websocket {
}
}
})
val pullIn = () pull(in)
val pullIn = () tryPull(in)
setHandler(bypass, eagerTerminateOutput)
setHandler(user, ignoreTerminateOutput)

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.impl.engine.ws
import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import com.typesafe.config.ConfigFactory
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl._
import akka.stream.scaladsl._
import akka.stream._
import scala.concurrent.duration._
import org.scalatest.concurrent.ScalaFutures
import org.scalactic.ConversionCheckedTripleEquals
import akka.stream.testkit.Utils
class BypassRouterSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode = off") with ScalaFutures with ConversionCheckedTripleEquals {
implicit val patience = PatienceConfig(3.seconds)
import system.dispatcher
implicit val materializer = ActorMaterializer()
"BypassRouter" must {
"work without double pull-ing some ports" in Utils.assertAllStagesStopped {
val bindingFuture = Http().bindAndHandleSync({
case HttpRequest(_, _, headers, _, _)
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket u }.get
upgrade.handleMessages(Flow.apply, None)
}, interface = "localhost", port = 8080)
val binding = Await.result(bindingFuture, 3.seconds)
val N = 100
val (response, count) = Http().singleWebsocketRequest(
WebsocketRequest("ws://127.0.0.1:8080"),
Flow.fromSinkAndSourceMat(
Sink.fold(0)((n, _: Message) n + 1),
Source.repeat(TextMessage("hello")).take(N))(Keep.left))
count.futureValue should ===(N)
binding.unbind()
}
}
}