diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 221a1d69c4..40e468915d 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -6,16 +6,20 @@ package akka.io import java.net.InetSocketAddress import java.net.Socket + import akka.io.Inet._ import com.typesafe.config.Config + import scala.concurrent.duration._ import scala.collection.immutable import scala.collection.JavaConverters._ -import akka.util.{ Helpers, ByteString } +import akka.util.{ ByteString, Helpers } import akka.util.Helpers.Requiring import akka.actor._ import java.lang.{ Iterable ⇒ JIterable } +import akka.annotation.InternalApi + /** * TCP Extension for Akka’s IO layer. * @@ -431,7 +435,25 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * Whenever a command cannot be completed, the queried actor will reply with * this message, wrapping the original command which failed. */ - final case class CommandFailed(cmd: Command) extends Event + final case class CommandFailed(cmd: Command) extends Event { + @transient private var _cause: Option[Throwable] = None + + /** Optionally contains the cause why the command failed. */ + def cause: Option[Throwable] = _cause + + // Needs to be added with a mutable var for compatibility reasons. + // The cause will be lost in the unlikely case that someone uses `copy` on an instance. + @InternalApi /** Creates a copy of this object with a new cause set. */ + private[akka] def withCause(cause: Throwable): CommandFailed = { + val newInstance = copy() + newInstance._cause = Some(cause) + newInstance + } + @InternalApi + private[akka] def causedByString = _cause.map(c ⇒ s" because of ${c.getMessage}").getOrElse("") + + override def toString: String = s"CommandFailed($cmd)$causedByString" + } /** * When `useResumeWriting` is in effect as indicated in the [[Register]] message, diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 258f97983a..2401fb6fa5 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -4,21 +4,22 @@ package akka.io -import java.net.{ SocketException, InetSocketAddress } +import java.net.{ InetSocketAddress, SocketException } import java.nio.channels.SelectionKey._ import java.io.{ FileInputStream, IOException } import java.nio.channels.{ FileChannel, SocketChannel } import java.nio.ByteBuffer + import scala.annotation.tailrec import scala.collection.immutable -import scala.util.control.NonFatal +import scala.util.control.{ NoStackTrace, NonFatal } import scala.concurrent.duration._ import akka.actor._ import akka.util.ByteString import akka.io.Inet.SocketOption import akka.io.Tcp._ import akka.io.SelectionHandler._ -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import java.nio.file.Paths /** @@ -150,11 +151,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha case write: WriteCommand ⇒ if (writingSuspended) { if (TraceLogging) log.debug("Dropping write because writing is suspended") - sender() ! write.failureMessage + sender() ! write.failureMessage.withCause(DroppingWriteBecauseWritingIsSuspendedException) } else if (writePending) { if (TraceLogging) log.debug("Dropping write because queue is full") - sender() ! write.failureMessage + sender() ! write.failureMessage.withCause(DroppingWriteBecauseQueueIsFullException) if (info.useResumeWriting) writingSuspended = true } else { @@ -505,4 +506,10 @@ private[io] object TcpConnection { } val doNothing: () ⇒ Unit = () ⇒ () + + val DroppingWriteBecauseWritingIsSuspendedException = + new IOException("Dropping write because writing is suspended") with NoStackTrace + + val DroppingWriteBecauseQueueIsFullException = + new IOException("Dropping write because queue is full") with NoStackTrace } diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 3d60f1bd7f..cdac03bdea 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -67,7 +67,7 @@ private[io] class TcpListener( ret } catch { case NonFatal(e) ⇒ - bindCommander ! bind.failureMessage + bindCommander ! bind.failureMessage.withCause(e) log.error(e, "Bind failed for TCP channel on endpoint [{}]", bind.localAddress) context.stop(self) } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 33b9ecd078..cd94a8d966 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -4,11 +4,13 @@ package akka.io -import java.net.InetSocketAddress +import java.net.{ ConnectException, InetSocketAddress } import java.nio.channels.{ SelectionKey, SocketChannel } -import scala.util.control.NonFatal + +import scala.util.control.{ NoStackTrace, NonFatal } import scala.concurrent.duration._ -import akka.actor.{ ReceiveTimeout, ActorRef } +import akka.actor.{ ActorRef, ReceiveTimeout } +import akka.annotation.InternalApi import akka.io.TcpConnection.CloseInformation import akka.io.SelectionHandler._ import akka.io.Tcp._ @@ -26,6 +28,7 @@ private[io] class TcpOutgoingConnection( connect: Connect) extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel], connect.pullMode) { + import TcpOutgoingConnection._ import context._ import connect._ @@ -36,7 +39,7 @@ private[io] class TcpOutgoingConnection( channelRegistry.register(channel, 0) timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied - private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage)) + private def stop(cause: Throwable): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage.withCause(cause))) private def reportConnectFailure(thunk: ⇒ Unit): Unit = { try { @@ -44,7 +47,7 @@ private[io] class TcpOutgoingConnection( } catch { case NonFatal(e) ⇒ log.debug("Could not establish connection to [{}] due to {}", remoteAddress, e) - stop() + stop(e) } } @@ -101,7 +104,7 @@ private[io] class TcpOutgoingConnection( } else { log.debug("Could not establish connection because finishConnect " + "never returned true (consider increasing akka.io.tcp.finish-connect-retries)") - stop() + stop(FinishConnectNeverReturnedTrueException) } } } @@ -109,7 +112,16 @@ private[io] class TcpOutgoingConnection( case ReceiveTimeout ⇒ if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout log.debug("Connect timeout expired, could not establish connection to [{}]", remoteAddress) - stop() + stop(connectTimeoutExpired(timeout)) } } } + +@InternalApi +private[io] object TcpOutgoingConnection { + val FinishConnectNeverReturnedTrueException = + new ConnectException("Could not establish connection because finishConnect never returned true") with NoStackTrace + + def connectTimeoutExpired(timeout: Option[FiniteDuration]) = + new ConnectException(s"Connect timeout of $timeout expired") with NoStackTrace +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 36f054acac..b7e314e05c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -496,7 +496,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. val probe2 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val binding2F = bind.to(Sink.fromSubscriber(probe2)).run() - probe2.expectSubscriptionAndError(BindFailedException) + probe2.expectSubscriptionAndError(signalDemand = true) shouldBe a[BindFailedException] val probe3 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val binding3F = bind.to(Sink.fromSubscriber(probe3)).run() diff --git a/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala b/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala index 880c3e06de..916bca3daf 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala @@ -7,8 +7,9 @@ import scala.util.control.NoStackTrace class StreamTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace -abstract class BindFailedException extends StreamTcpException("bind failed") +class BindFailedException extends StreamTcpException("bind failed") +@deprecated("BindFailedException object will never be thrown. Match on the class instead.") case object BindFailedException extends BindFailedException class ConnectionException(msg: String) extends StreamTcpException(msg) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index a202c77587..b58d3429de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -25,7 +25,6 @@ import akka.util.ByteString import scala.collection.immutable import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.{ Future, Promise } -import scala.util.Try /** * INTERNAL API @@ -78,7 +77,11 @@ import scala.util.Try unbindPromise.future })) case f: CommandFailed ⇒ - val ex = BindFailedException + val ex = new BindFailedException { + // cannot modify the actual exception class for compatibility reasons + override def getMessage: String = s"Bind failed${f.causedByString}" + } + f.cause.foreach(ex.initCause) bindingPromise.failure(ex) unbindPromise.success(() ⇒ Future.successful(())) failStage(ex) @@ -219,8 +222,8 @@ private[stream] object ConnectionSourceStage { val sender = evt._1 val msg = evt._2 msg match { - case Terminated(_) ⇒ failStage(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now.")) - case CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed")) + case Terminated(_) ⇒ failStage(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now.")) + case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) case c: Connected ⇒ role.asInstanceOf[Outbound].localAddressPromise.success(c.localAddress) connection = sender @@ -238,13 +241,13 @@ private[stream] object ConnectionSourceStage { val sender = evt._1 val msg = evt._2 msg match { - case Terminated(_) ⇒ failStage(new StreamTcpException("The connection actor has terminated. Stopping now.")) - case CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed")) - case ErrorClosed(cause) ⇒ failStage(new StreamTcpException(s"The connection closed with error: $cause")) - case Aborted ⇒ failStage(new StreamTcpException("The connection has been aborted")) - case Closed ⇒ completeStage() - case ConfirmedClosed ⇒ completeStage() - case PeerClosed ⇒ complete(bytesOut) + case Terminated(_) ⇒ failStage(new StreamTcpException("The connection actor has terminated. Stopping now.")) + case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) + case ErrorClosed(cause) ⇒ failStage(new StreamTcpException(s"The connection closed with error: $cause")) + case Aborted ⇒ failStage(new StreamTcpException("The connection has been aborted")) + case Closed ⇒ completeStage() + case ConfirmedClosed ⇒ completeStage() + case PeerClosed ⇒ complete(bytesOut) case Received(data) ⇒ // Keep on reading even when closed. There is no "close-read-side" in TCP