diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index a79095daa3..8292b861e7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -98,9 +98,7 @@ class TcpSpec extends StreamSpec(""" .toMat(Sink.ignore)(Keep.left) .run() - whenReady(future.failed) { ex ⇒ - ex.getMessage should ===("Connection failed.") - } + future.failed.futureValue shouldBe a[StreamTcpException] } "work when client closes write, then remote closes write" in assertAllStagesStopped { @@ -456,6 +454,19 @@ class TcpSpec extends StreamSpec(""" } } + "provide full exceptions when connection attempt fails because name cannot be resolved" in { + val unknownHostName = "abcdefghijklmnopkuh" + + val test = + Source.maybe + .viaMat(Tcp().outgoingConnection(unknownHostName, 12345))(Keep.right) + .to(Sink.ignore) + .run() + .failed + .futureValue + + test.getCause shouldBe a[UnknownHostException] + } } "TCP listen stream" must { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index a746337f06..b921f276cd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -235,8 +235,8 @@ private[stream] object ConnectionSourceStage { val sender = evt._1 val msg = evt._2 msg match { - case Terminated(_) ⇒ failStage(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now.")) - case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) + case Terminated(_) ⇒ fail(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now.")) + case f @ CommandFailed(cmd) ⇒ fail(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}").initCause(f.cause.orNull)) case c: Connected ⇒ role.asInstanceOf[Outbound].localAddressPromise.success(c.localAddress) connection = sender @@ -275,10 +275,10 @@ private[stream] object ConnectionSourceStage { if (!isClosed(bytesIn) && !hasBeenPulled(bytesIn)) pull(bytesIn) - case Terminated(_) ⇒ failStage(new StreamTcpException("The connection actor has terminated. Stopping now.")) - case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) - case ErrorClosed(cause) ⇒ failStage(new StreamTcpException(s"The connection closed with error: $cause")) - case Aborted ⇒ failStage(new StreamTcpException("The connection has been aborted")) + case Terminated(_) ⇒ fail(new StreamTcpException("The connection actor has terminated. Stopping now.")) + case f @ CommandFailed(cmd) ⇒ fail(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}").initCause(f.cause.orNull)) + case ErrorClosed(cause) ⇒ fail(new StreamTcpException(s"The connection closed with error: $cause")) + case Aborted ⇒ fail(new StreamTcpException("The connection has been aborted")) case Closed ⇒ completeStage() case ConfirmedClosed ⇒ completeStage() case PeerClosed ⇒ complete(bytesOut) @@ -346,16 +346,25 @@ private[stream] object ConnectionSourceStage { else interpreter.log.debug(msg + "\n{}", remoteAddress, ex, ex.getStackTrace.mkString("\n")) } connection ! Abort - } else failStage(ex) + } else fail(ex) } }) - override def postStop(): Unit = role match { - case Outbound(_, _, localAddressPromise, _, _) ⇒ - // Fail if has not been completed with an address earlier - localAddressPromise.tryFailure(new StreamTcpException("Connection failed.")) - case _ ⇒ // do nothing... + /** Fail stage and report to localAddressPromise if still possible */ + private def fail(ex: Throwable): Unit = { + reportExceptionToPromise(ex) + failStage(ex) } + private def reportExceptionToPromise(ex: Throwable): Unit = + role match { + case Outbound(_, _, localAddressPromise, _, _) ⇒ + // Fail if has not been completed with an address earlier + localAddressPromise.tryFailure(ex) + case _ ⇒ // do nothing... + } + + override def postStop(): Unit = reportExceptionToPromise(new StreamTcpException("Connection failed.")) + writeBuffer = ByteString.empty } }