Don't throw away error causes in TcpStreamLogic (#25617)

* =str ensure TcpStreamLogic sets cause of StreamTcpException

* =str make sure TcpStreamLogic will report connection errors to localAddressPromise if possible
This commit is contained in:
Johannes Rudolph 2018-12-04 16:27:15 +01:00 committed by Christopher Batey
parent 9e739ea2f1
commit b649b4e2d0
2 changed files with 35 additions and 15 deletions

View file

@ -98,9 +98,7 @@ class TcpSpec extends StreamSpec("""
.toMat(Sink.ignore)(Keep.left) .toMat(Sink.ignore)(Keep.left)
.run() .run()
whenReady(future.failed) { ex future.failed.futureValue shouldBe a[StreamTcpException]
ex.getMessage should ===("Connection failed.")
}
} }
"work when client closes write, then remote closes write" in assertAllStagesStopped { "work when client closes write, then remote closes write" in assertAllStagesStopped {
@ -456,6 +454,19 @@ class TcpSpec extends StreamSpec("""
} }
} }
"provide full exceptions when connection attempt fails because name cannot be resolved" in {
val unknownHostName = "abcdefghijklmnopkuh"
val test =
Source.maybe
.viaMat(Tcp().outgoingConnection(unknownHostName, 12345))(Keep.right)
.to(Sink.ignore)
.run()
.failed
.futureValue
test.getCause shouldBe a[UnknownHostException]
}
} }
"TCP listen stream" must { "TCP listen stream" must {

View file

@ -235,8 +235,8 @@ private[stream] object ConnectionSourceStage {
val sender = evt._1 val sender = evt._1
val msg = evt._2 val msg = evt._2
msg match { msg match {
case Terminated(_) failStage(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now.")) case Terminated(_) fail(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now."))
case f @ CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) case f @ CommandFailed(cmd) fail(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}").initCause(f.cause.orNull))
case c: Connected case c: Connected
role.asInstanceOf[Outbound].localAddressPromise.success(c.localAddress) role.asInstanceOf[Outbound].localAddressPromise.success(c.localAddress)
connection = sender connection = sender
@ -275,10 +275,10 @@ private[stream] object ConnectionSourceStage {
if (!isClosed(bytesIn) && !hasBeenPulled(bytesIn)) if (!isClosed(bytesIn) && !hasBeenPulled(bytesIn))
pull(bytesIn) pull(bytesIn)
case Terminated(_) failStage(new StreamTcpException("The connection actor has terminated. Stopping now.")) case Terminated(_) fail(new StreamTcpException("The connection actor has terminated. Stopping now."))
case f @ CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) case f @ CommandFailed(cmd) fail(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}").initCause(f.cause.orNull))
case ErrorClosed(cause) failStage(new StreamTcpException(s"The connection closed with error: $cause")) case ErrorClosed(cause) fail(new StreamTcpException(s"The connection closed with error: $cause"))
case Aborted failStage(new StreamTcpException("The connection has been aborted")) case Aborted fail(new StreamTcpException("The connection has been aborted"))
case Closed completeStage() case Closed completeStage()
case ConfirmedClosed completeStage() case ConfirmedClosed completeStage()
case PeerClosed complete(bytesOut) case PeerClosed complete(bytesOut)
@ -346,16 +346,25 @@ private[stream] object ConnectionSourceStage {
else interpreter.log.debug(msg + "\n{}", remoteAddress, ex, ex.getStackTrace.mkString("\n")) else interpreter.log.debug(msg + "\n{}", remoteAddress, ex, ex.getStackTrace.mkString("\n"))
} }
connection ! Abort connection ! Abort
} else failStage(ex) } else fail(ex)
} }
}) })
override def postStop(): Unit = role match { /** Fail stage and report to localAddressPromise if still possible */
private def fail(ex: Throwable): Unit = {
reportExceptionToPromise(ex)
failStage(ex)
}
private def reportExceptionToPromise(ex: Throwable): Unit =
role match {
case Outbound(_, _, localAddressPromise, _, _) case Outbound(_, _, localAddressPromise, _, _)
// Fail if has not been completed with an address earlier // Fail if has not been completed with an address earlier
localAddressPromise.tryFailure(new StreamTcpException("Connection failed.")) localAddressPromise.tryFailure(ex)
case _ // do nothing... case _ // do nothing...
} }
override def postStop(): Unit = reportExceptionToPromise(new StreamTcpException("Connection failed."))
writeBuffer = ByteString.empty writeBuffer = ByteString.empty
} }
} }