=htc #21051 WebsocketResponse future now fails if the connection failed (#21054)

* WebsocketResponse future now fails if the connection failed #21051
This commit is contained in:
Johan Andrén 2016-07-28 17:48:06 +02:00 committed by Konrad Malawski
parent d0bf45fe68
commit 9372087464
3 changed files with 41 additions and 3 deletions

View file

@ -111,6 +111,11 @@ object WebSocketClientBlueprint {
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
override def onUpstreamFailure(ex: Throwable): Unit = {
result.tryFailure(new RuntimeException("Connection failed.", ex))
super.onUpstreamFailure(ex)
}
}
override def toString = "UpgradeStage"

View file

@ -20,12 +20,15 @@ import org.scalatest.concurrent.Eventually
import java.net.InetSocketAddress
import akka.Done
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.impl.fusing.GraphStages
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
import akka.util.ByteString
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.{ AkkaSpec, EventFilter }
import scala.util.{ Failure, Success }
class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode=off")
with Eventually {
@ -196,4 +199,19 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
}
"A websocket client" should {
"fail the materialized future if the request fails" in {
val flow = Http().webSocketClientFlow(
WebSocketRequest("ws://127.0.0.1:65535/no/server/here"),
settings = ClientConnectionSettings(system).withConnectingTimeout(250.millis))
val future = Source.maybe[Message].viaMat(flow)(Keep.right).toMat(Sink.ignore)(Keep.left).run()
import system.dispatcher
whenReady(future.map(r Success(r)).recover { case ex Failure(ex) }) { resTry
resTry.isFailure should ===(true)
resTry.failed.get.getMessage should ===("Connection failed.")
}
}
}
}

View file

@ -4,20 +4,23 @@
package akka.stream.io
import akka.NotUsed
import akka.actor.{ ActorSystem, Kill }
import akka.actor.{ ActorSystem, Address, Kill }
import akka.io.Tcp._
import akka.stream.scaladsl.Tcp.IncomingConnection
import akka.stream.scaladsl.{ Flow, _ }
import akka.stream.testkit.TestUtils.temporaryServerAddress
import scala.util.control.NonFatal
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException }
import akka.util.{ ByteString, Helpers }
import scala.collection.immutable
import scala.concurrent.{ Promise, Await }
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import java.net.BindException
import java.net.{ BindException, InetSocketAddress }
import akka.testkit.EventFilter
class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.timeout = 2s") with TcpHelper {
@ -75,6 +78,18 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
}
"fail the materialized future when the connection fails" in assertAllStagesStopped {
val tcpWriteProbe = new TcpWriteProbe()
val future = Source.fromPublisher(tcpWriteProbe.publisherProbe)
.viaMat(Tcp().outgoingConnection(InetSocketAddress.createUnresolved("example.com", 666), connectTimeout = 1.second))(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run()
whenReady(future.failed) { ex =>
ex.getMessage should === ("Connection failed.")
}
}
"work when client closes write, then remote closes write" in assertAllStagesStopped {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()