From 93720874644d6ec002705690165ef38e536af674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 28 Jul 2016 17:48:06 +0200 Subject: [PATCH] =htc #21051 WebsocketResponse future now fails if the connection failed (#21054) * WebsocketResponse future now fails if the connection failed #21051 --- .../engine/ws/WebSocketClientBlueprint.scala | 5 +++++ .../engine/ws/WebSocketIntegrationSpec.scala | 18 ++++++++++++++++ .../test/scala/akka/stream/io/TcpSpec.scala | 21 ++++++++++++++++--- 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala index c95ef042eb..7cdfda7fc1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala @@ -111,6 +111,11 @@ object WebSocketClientBlueprint { override def onPull(): Unit = pull(in) setHandlers(in, out, this) + + override def onUpstreamFailure(ex: Throwable): Unit = { + result.tryFailure(new RuntimeException("Connection failed.", ex)) + super.onUpstreamFailure(ex) + } } override def toString = "UpgradeStage" diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala index 5fcecd8f2f..3c7c646b32 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala @@ -20,12 +20,15 @@ import org.scalatest.concurrent.Eventually import java.net.InetSocketAddress import akka.Done +import akka.http.scaladsl.settings.ClientConnectionSettings import akka.stream.impl.fusing.GraphStages import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } import akka.util.ByteString import akka.stream.testkit.scaladsl.TestSink import akka.testkit.{ AkkaSpec, EventFilter } +import scala.util.{ Failure, Success } + class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode=off") with Eventually { @@ -196,4 +199,19 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug. } + "A websocket client" should { + "fail the materialized future if the request fails" in { + val flow = Http().webSocketClientFlow( + WebSocketRequest("ws://127.0.0.1:65535/no/server/here"), + settings = ClientConnectionSettings(system).withConnectingTimeout(250.millis)) + + val future = Source.maybe[Message].viaMat(flow)(Keep.right).toMat(Sink.ignore)(Keep.left).run() + import system.dispatcher + whenReady(future.map(r ⇒ Success(r)).recover { case ex ⇒ Failure(ex) }) { resTry ⇒ + resTry.isFailure should ===(true) + resTry.failed.get.getMessage should ===("Connection failed.") + } + } + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 93b23d7ccd..1cd224c488 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -4,20 +4,23 @@ package akka.stream.io import akka.NotUsed -import akka.actor.{ ActorSystem, Kill } +import akka.actor.{ ActorSystem, Address, Kill } import akka.io.Tcp._ import akka.stream.scaladsl.Tcp.IncomingConnection import akka.stream.scaladsl.{ Flow, _ } import akka.stream.testkit.TestUtils.temporaryServerAddress + import scala.util.control.NonFatal import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException } import akka.util.{ ByteString, Helpers } + import scala.collection.immutable -import scala.concurrent.{ Promise, Await } +import scala.concurrent.{ Await, Promise } import scala.concurrent.duration._ -import java.net.BindException +import java.net.{ BindException, InetSocketAddress } + import akka.testkit.EventFilter class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.timeout = 2s") with TcpHelper { @@ -75,6 +78,18 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. } + "fail the materialized future when the connection fails" in assertAllStagesStopped { + val tcpWriteProbe = new TcpWriteProbe() + val future = Source.fromPublisher(tcpWriteProbe.publisherProbe) + .viaMat(Tcp().outgoingConnection(InetSocketAddress.createUnresolved("example.com", 666), connectTimeout = 1.second))(Keep.right) + .toMat(Sink.ignore)(Keep.left) + .run() + + whenReady(future.failed) { ex => + ex.getMessage should === ("Connection failed.") + } + } + "work when client closes write, then remote closes write" in assertAllStagesStopped { val testData = ByteString(1, 2, 3, 4, 5) val server = new Server()