From b311e9e700e4c85b4a04a3f9bacb6604b3b9854e Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 20 Jun 2013 10:22:47 +0200 Subject: [PATCH] io: change IOException treatment from "fatal error" to "expected during normal operation" Up to now IOExceptions during reading, writing or connecting were treated as fatal actor errors, crashing the connection actor and thus producing ERROR level log messages. This patch treats such exceptions as "expected" during normal operation and prevents them from crashing the actor. Rather, they are logged at DEBUG level and the actor is actively and cleanly stopped. --- .../scala/akka/io/TcpConnectionSpec.scala | 29 +++++++------------ .../main/scala/akka/io/TcpConnection.scala | 16 ++++++---- .../scala/akka/io/TcpOutgoingConnection.scala | 27 ++++++++--------- 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 782405832a..478fb7f0ee 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -457,12 +457,11 @@ class TcpConnectionSpec extends AkkaSpec(""" "report when peer aborted the connection" in new EstablishedConnectionTest() { run { - EventFilter[IOException](occurrences = 1) intercept { - abortClose(serverSideChannel) - selector.send(connectionActor, ChannelReadable) - val err = connectionHandler.expectMsgType[ErrorClosed] - err.cause must be(ConnectionResetByPeerMessage) - } + abortClose(serverSideChannel) + selector.send(connectionActor, ChannelReadable) + val err = connectionHandler.expectMsgType[ErrorClosed] + err.cause must be(ConnectionResetByPeerMessage) + // wait a while connectionHandler.expectNoMsg(200.millis) @@ -475,11 +474,9 @@ class TcpConnectionSpec extends AkkaSpec(""" val writer = TestProbe() abortClose(serverSideChannel) - EventFilter[IOException](occurrences = 1) intercept { - writer.send(connectionActor, Write(ByteString("testdata"))) - // bother writer and handler should get the message - writer.expectMsgType[ErrorClosed] - } + writer.send(connectionActor, Write(ByteString("testdata"))) + // bother writer and handler should get the message + writer.expectMsgType[ErrorClosed] connectionHandler.expectMsgType[ErrorClosed] assertThisConnectionActorTerminated() @@ -501,10 +498,8 @@ class TcpConnectionSpec extends AkkaSpec(""" key.isConnectable must be(true) val forceThisLazyVal = connectionActor.toString Thread.sleep(300) - EventFilter[ConnectException](occurrences = 1) intercept { - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) - } + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) verifyActorTermination(connectionActor) } finally sel.close() @@ -516,9 +511,7 @@ class TcpConnectionSpec extends AkkaSpec(""" override lazy val connectionActor = createConnectionActor(serverAddress = UnboundAddress, timeout = Option(100.millis)) run { connectionActor.toString must not be ("") - EventFilter[SocketTimeoutException](occurrences = 1) intercept { - userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis)))) - } + userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis)))) verifyActorTermination(connectionActor) } } diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 956b500c2c..710087a304 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -250,13 +250,12 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = { if (closedEvent == Aborted) abort() else channel.close() - closedMessage = CloseInformation(Set(handler) ++ closeCommander, closedEvent) - context.stop(self) + stopWith(CloseInformation(Set(handler) ++ closeCommander, closedEvent)) } - def handleError(handler: ActorRef, exception: IOException): Nothing = { - closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))) - throw exception + def handleError(handler: ActorRef, exception: IOException): Unit = { + log.debug("Closing connection due to IO error {}", exception) + stopWith(CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))) } @tailrec private[this] def extractMsg(t: Throwable): String = @@ -279,6 +278,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha channel.close() } + def stopWith(closeInfo: CloseInformation): Unit = { + closedMessage = closeInfo + context.stop(self) + } + override def postStop(): Unit = { if (channel.isOpen) abort() @@ -349,7 +353,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha } try innerWrite(this) - catch { case e: IOException ⇒ handleError(info.handler, e) } + catch { case e: IOException ⇒ handleError(info.handler, e); this } } def hasData = buffer.hasRemaining || remainingData.nonEmpty def consume(writtenBytes: Int): PendingBufferWrite = diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 51d6cde223..e39077d6ec 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -4,15 +4,16 @@ package akka.io -import akka.actor.{ ReceiveTimeout, ActorRef } -import akka.io.Inet.SocketOption -import akka.io.SelectionHandler._ -import akka.io.Tcp._ import java.io.IOException import java.nio.channels.{ SelectionKey, SocketChannel } +import java.net.ConnectException import scala.collection.immutable import scala.concurrent.duration.Duration -import java.net.{ ConnectException, SocketTimeoutException } +import akka.actor.{ ReceiveTimeout, ActorRef } +import akka.io.Inet.SocketOption +import akka.io.TcpConnection.CloseInformation +import akka.io.SelectionHandler._ +import akka.io.Tcp._ /** * An actor handling the connection state machine for an outgoing connection @@ -45,7 +46,9 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, } def connecting(registration: ChannelRegistration, commander: ActorRef, - options: immutable.Traversable[SocketOption]): Receive = + options: immutable.Traversable[SocketOption]): Receive = { + def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage)) + { case ChannelConnectable ⇒ if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout @@ -55,16 +58,14 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, completeConnect(registration, commander, options) } catch { case e: IOException ⇒ - if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", e) - closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage) - throw e + log.debug("Could not establish connection to [{}] due to {}", remoteAddress, e) + stop() } case ReceiveTimeout ⇒ if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout - val failure = new SocketTimeoutException(s"Connection to [$remoteAddress] timed out") - if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", failure) - closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage) - throw failure + log.debug("Connect timeout expired, could not establish connection to [{}]", remoteAddress) + stop() } + } }