=act #13861 report cause for Akka IO TCP CommandFailed events

To stay compatible this needs to be hacked into CommandFailed using a
mutable var.
This commit is contained in:
Johannes Rudolph 2017-05-16 15:22:11 +02:00
parent 2858118946
commit 38c9fff219
7 changed files with 73 additions and 28 deletions

View file

@ -6,16 +6,20 @@ package akka.io
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.Socket import java.net.Socket
import akka.io.Inet._ import akka.io.Inet._
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.util.{ Helpers, ByteString } import akka.util.{ ByteString, Helpers }
import akka.util.Helpers.Requiring import akka.util.Helpers.Requiring
import akka.actor._ import akka.actor._
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import akka.annotation.InternalApi
/** /**
* TCP Extension for Akkas IO layer. * TCP Extension for Akkas IO layer.
* *
@ -431,7 +435,25 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* Whenever a command cannot be completed, the queried actor will reply with * Whenever a command cannot be completed, the queried actor will reply with
* this message, wrapping the original command which failed. * this message, wrapping the original command which failed.
*/ */
final case class CommandFailed(cmd: Command) extends Event final case class CommandFailed(cmd: Command) extends Event {
@transient private var _cause: Option[Throwable] = None
/** Optionally contains the cause why the command failed. */
def cause: Option[Throwable] = _cause
// Needs to be added with a mutable var for compatibility reasons.
// The cause will be lost in the unlikely case that someone uses `copy` on an instance.
@InternalApi /** Creates a copy of this object with a new cause set. */
private[akka] def withCause(cause: Throwable): CommandFailed = {
val newInstance = copy()
newInstance._cause = Some(cause)
newInstance
}
@InternalApi
private[akka] def causedByString = _cause.map(c s" because of ${c.getMessage}").getOrElse("")
override def toString: String = s"CommandFailed($cmd)$causedByString"
}
/** /**
* When `useResumeWriting` is in effect as indicated in the [[Register]] message, * When `useResumeWriting` is in effect as indicated in the [[Register]] message,

View file

@ -4,21 +4,22 @@
package akka.io package akka.io
import java.net.{ SocketException, InetSocketAddress } import java.net.{ InetSocketAddress, SocketException }
import java.nio.channels.SelectionKey._ import java.nio.channels.SelectionKey._
import java.io.{ FileInputStream, IOException } import java.io.{ FileInputStream, IOException }
import java.nio.channels.{ FileChannel, SocketChannel } import java.nio.channels.{ FileChannel, SocketChannel }
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.util.control.NonFatal import scala.util.control.{ NoStackTrace, NonFatal }
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.util.ByteString import akka.util.ByteString
import akka.io.Inet.SocketOption import akka.io.Inet.SocketOption
import akka.io.Tcp._ import akka.io.Tcp._
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import java.nio.file.Paths import java.nio.file.Paths
/** /**
@ -150,11 +151,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
case write: WriteCommand case write: WriteCommand
if (writingSuspended) { if (writingSuspended) {
if (TraceLogging) log.debug("Dropping write because writing is suspended") if (TraceLogging) log.debug("Dropping write because writing is suspended")
sender() ! write.failureMessage sender() ! write.failureMessage.withCause(DroppingWriteBecauseWritingIsSuspendedException)
} else if (writePending) { } else if (writePending) {
if (TraceLogging) log.debug("Dropping write because queue is full") if (TraceLogging) log.debug("Dropping write because queue is full")
sender() ! write.failureMessage sender() ! write.failureMessage.withCause(DroppingWriteBecauseQueueIsFullException)
if (info.useResumeWriting) writingSuspended = true if (info.useResumeWriting) writingSuspended = true
} else { } else {
@ -505,4 +506,10 @@ private[io] object TcpConnection {
} }
val doNothing: () Unit = () () val doNothing: () Unit = () ()
val DroppingWriteBecauseWritingIsSuspendedException =
new IOException("Dropping write because writing is suspended") with NoStackTrace
val DroppingWriteBecauseQueueIsFullException =
new IOException("Dropping write because queue is full") with NoStackTrace
} }

View file

@ -67,7 +67,7 @@ private[io] class TcpListener(
ret ret
} catch { } catch {
case NonFatal(e) case NonFatal(e)
bindCommander ! bind.failureMessage bindCommander ! bind.failureMessage.withCause(e)
log.error(e, "Bind failed for TCP channel on endpoint [{}]", bind.localAddress) log.error(e, "Bind failed for TCP channel on endpoint [{}]", bind.localAddress)
context.stop(self) context.stop(self)
} }

View file

@ -4,11 +4,13 @@
package akka.io package akka.io
import java.net.InetSocketAddress import java.net.{ ConnectException, InetSocketAddress }
import java.nio.channels.{ SelectionKey, SocketChannel } import java.nio.channels.{ SelectionKey, SocketChannel }
import scala.util.control.NonFatal
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.{ ReceiveTimeout, ActorRef } import akka.actor.{ ActorRef, ReceiveTimeout }
import akka.annotation.InternalApi
import akka.io.TcpConnection.CloseInformation import akka.io.TcpConnection.CloseInformation
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.io.Tcp._ import akka.io.Tcp._
@ -26,6 +28,7 @@ private[io] class TcpOutgoingConnection(
connect: Connect) connect: Connect)
extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel], connect.pullMode) { extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel], connect.pullMode) {
import TcpOutgoingConnection._
import context._ import context._
import connect._ import connect._
@ -36,7 +39,7 @@ private[io] class TcpOutgoingConnection(
channelRegistry.register(channel, 0) channelRegistry.register(channel, 0)
timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied
private def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage)) private def stop(cause: Throwable): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage.withCause(cause)))
private def reportConnectFailure(thunk: Unit): Unit = { private def reportConnectFailure(thunk: Unit): Unit = {
try { try {
@ -44,7 +47,7 @@ private[io] class TcpOutgoingConnection(
} catch { } catch {
case NonFatal(e) case NonFatal(e)
log.debug("Could not establish connection to [{}] due to {}", remoteAddress, e) log.debug("Could not establish connection to [{}] due to {}", remoteAddress, e)
stop() stop(e)
} }
} }
@ -101,7 +104,7 @@ private[io] class TcpOutgoingConnection(
} else { } else {
log.debug("Could not establish connection because finishConnect " + log.debug("Could not establish connection because finishConnect " +
"never returned true (consider increasing akka.io.tcp.finish-connect-retries)") "never returned true (consider increasing akka.io.tcp.finish-connect-retries)")
stop() stop(FinishConnectNeverReturnedTrueException)
} }
} }
} }
@ -109,7 +112,16 @@ private[io] class TcpOutgoingConnection(
case ReceiveTimeout case ReceiveTimeout
if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout
log.debug("Connect timeout expired, could not establish connection to [{}]", remoteAddress) log.debug("Connect timeout expired, could not establish connection to [{}]", remoteAddress)
stop() stop(connectTimeoutExpired(timeout))
} }
} }
} }
@InternalApi
private[io] object TcpOutgoingConnection {
val FinishConnectNeverReturnedTrueException =
new ConnectException("Could not establish connection because finishConnect never returned true") with NoStackTrace
def connectTimeoutExpired(timeout: Option[FiniteDuration]) =
new ConnectException(s"Connect timeout of $timeout expired") with NoStackTrace
}

View file

@ -496,7 +496,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
val probe2 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val probe2 = TestSubscriber.manualProbe[Tcp.IncomingConnection]()
val binding2F = bind.to(Sink.fromSubscriber(probe2)).run() val binding2F = bind.to(Sink.fromSubscriber(probe2)).run()
probe2.expectSubscriptionAndError(BindFailedException) probe2.expectSubscriptionAndError(signalDemand = true) shouldBe a[BindFailedException]
val probe3 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val probe3 = TestSubscriber.manualProbe[Tcp.IncomingConnection]()
val binding3F = bind.to(Sink.fromSubscriber(probe3)).run() val binding3F = bind.to(Sink.fromSubscriber(probe3)).run()

View file

@ -7,8 +7,9 @@ import scala.util.control.NoStackTrace
class StreamTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace class StreamTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace
abstract class BindFailedException extends StreamTcpException("bind failed") class BindFailedException extends StreamTcpException("bind failed")
@deprecated("BindFailedException object will never be thrown. Match on the class instead.")
case object BindFailedException extends BindFailedException case object BindFailedException extends BindFailedException
class ConnectionException(msg: String) extends StreamTcpException(msg) class ConnectionException(msg: String) extends StreamTcpException(msg)

View file

@ -25,7 +25,6 @@ import akka.util.ByteString
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.util.Try
/** /**
* INTERNAL API * INTERNAL API
@ -78,7 +77,11 @@ import scala.util.Try
unbindPromise.future unbindPromise.future
})) }))
case f: CommandFailed case f: CommandFailed
val ex = BindFailedException val ex = new BindFailedException {
// cannot modify the actual exception class for compatibility reasons
override def getMessage: String = s"Bind failed${f.causedByString}"
}
f.cause.foreach(ex.initCause)
bindingPromise.failure(ex) bindingPromise.failure(ex)
unbindPromise.success(() Future.successful(())) unbindPromise.success(() Future.successful(()))
failStage(ex) failStage(ex)
@ -219,8 +222,8 @@ private[stream] object ConnectionSourceStage {
val sender = evt._1 val sender = evt._1
val msg = evt._2 val msg = evt._2
msg match { msg match {
case Terminated(_) failStage(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now.")) case Terminated(_) failStage(new StreamTcpException("The IO manager actor (TCP) has terminated. Stopping now."))
case CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed")) case f @ CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}"))
case c: Connected case c: Connected
role.asInstanceOf[Outbound].localAddressPromise.success(c.localAddress) role.asInstanceOf[Outbound].localAddressPromise.success(c.localAddress)
connection = sender connection = sender
@ -238,13 +241,13 @@ private[stream] object ConnectionSourceStage {
val sender = evt._1 val sender = evt._1
val msg = evt._2 val msg = evt._2
msg match { msg match {
case Terminated(_) failStage(new StreamTcpException("The connection actor has terminated. Stopping now.")) case Terminated(_) failStage(new StreamTcpException("The connection actor has terminated. Stopping now."))
case CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed")) case f @ CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}"))
case ErrorClosed(cause) failStage(new StreamTcpException(s"The connection closed with error: $cause")) case ErrorClosed(cause) failStage(new StreamTcpException(s"The connection closed with error: $cause"))
case Aborted failStage(new StreamTcpException("The connection has been aborted")) case Aborted failStage(new StreamTcpException("The connection has been aborted"))
case Closed completeStage() case Closed completeStage()
case ConfirmedClosed completeStage() case ConfirmedClosed completeStage()
case PeerClosed complete(bytesOut) case PeerClosed complete(bytesOut)
case Received(data) case Received(data)
// Keep on reading even when closed. There is no "close-read-side" in TCP // Keep on reading even when closed. There is no "close-read-side" in TCP